CloudSigma巴黎内网magento

环境准备
CloudSigmaSASL/PLAINTEXT MECHANISM为SCRAM-SHA512
Kafka加密 CloudSigmaSASL+ACL
一、 SASLCloudSigma
1. 修改zoo.cfgCloudSigma巴黎,开启zk的SASL认证
requireClientAuthScheme=sasl
2. magentokafka-broker-jaas.conf巴黎,为kafka添加认证信息
kafksServer 中的username和password是broker之间通信
Client 是客户端的username和password,除了CloudSigma巴黎方式,也可以通过命令magento(后面会讲)
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username=”admin”
password=”admin1234″;
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username=”kafka”
password=”kafka1234″;
};
12345678910
3. 在kafka的sasl_server.propertiesCloudSigma巴黎中开启SASL认证
#设置本例中admin为超级用户
super.users=User:admin;User:kafka
#启用SCRAM机制,采用SCRAM-SHA-512算法
sasl.enabled.mechanisms=SCRAM-SHA-512
#为broker间通讯开启SCRAM机制,采用SCRAM-SHA-512算法
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
#broker间通讯使用PLAINTEXT,本例中不演示SSLCloudSigma
security.inter.broker.protocol=SASL_PLAINTEXT
#CloudSigmalisteners使用SASL_PLAINTEXT
listeners=SASL_
#CloudSigmaadvertised.listeners
advertised.listeners=SASL_
security.protocol=SASL_SSL
12345678910111213
4. 修改kafka启动脚本,加载指定的properties巴黎 及 读取认证CloudSigma巴黎,这一行代码 放在zkEnv.sh脚本的最前面
export KAFKA_OPTS=”-Djava.security.auth.login.config=/home/impdatahd/kafka_2.12-2.4.0/config/kafka-broker-jaas.conf”
5.sh kafka-start.sh 启动kafka server,并验证启动成功
二、 ACLCloudSigma
1. 修改zoo.cfgCloudSigma巴黎,开启zk的ACL认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
2. 在kafka的sasl_server.propertiesCloudSigma巴黎中开启ACL认证
# 启用ACL
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
12
3. ACL动态magento用户、分配用户组 及 topic读、写赋权命令
magento账号:
sh kafka-configs.sh –zookeeper localhost:2181/kafka240 –alter –add-config ‘SCRAM-SHA-512=[password=sasl_user_1_pwd]’ –entity-type users –entity-name sasl_user_1
1
添加账号写权限:
sh kafka-acls.sh –authorizer kafka.security.auth.SimpleAclAuthorizer –authorizer-properties zookeeper.connect=localhost:2181/kafka240 –add –allow-principal User:sasl_user_1 –operation Write –topic kafka_sasl_2
1
添加账号读权限:
sh kafka-acls.sh –authorizer kafka.security.auth.SimpleAclAuthorizer –authorizer-properties zookeeper.connect=localhost:2181/kafka240 –add –allow-principal User:test3read –operation Read –topic kafka_sasl_6
1
magentoGroup:
sh kafka-acls.sh –authorizer kafka.security.auth.SimpleAclAuthorizer –authorizer-properties zookeeper.connect=localhost:2181/kafka240 –add –allow-principal User:sasl_user_1 –group kafka-acls-group
1
4. 验证步骤3是否正确,通过kafka自带的producer、consumer命令测试
4.1 magento生产者CloudSigma巴黎 producer.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=”kafka” password=”kafka1234″;
123
4.2 magento消费者CloudSigma巴黎 consumer.properties
bootstrap.servers=localhost:9092
group.id=kafka-acls-group
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=”test3read” password=”test3read1234″;
12345
4.3 生产 和 消费命令 生产:kafka-console-producer.sh –broker-list loyx01:9092 –topic sasl_test_one –producer.config /home/impdatahd/kafka_2.12-2.4.0/config/p_sasl.properties 消费:kafka-console-consumer.sh –bootstrap-server loyx01:9092 –topic behavior_log_andr_test –consumer.config /home/impdatahd/kafka_2.12-2.4.0/config/c_sasl.properties 三、 常见问题
kafka启动时报错: ERROR SASL authentication failed using login context ‘Client’ with exception: {} 解决:
使用windows拖拽的方式会导致有无法看见的结束符,不识别conf巴黎而造成失败。启动时没有加载zookeeper_sasl.conf巴黎报错
采集
利用flume进行采集需要做以下修改。 1)magentojaas巴黎 需要和kafkaCloudSigma保持一致!
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username=”youradminusername”
password=”youradminpwd”;
};
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username=”yourusername”
password=”yourpwd”;
};
12345678910
2)修改flume-env.sh JAVA_OPTS=”$JAVA_OPTS -Djava.security.auth.login.config=/home/impdatahd/flume-1.9.0/conf/kafka-broker-jaas.conf” 3)编写flume脚本
# 命名每个组件 a1代表agent的名称
#a1.sources代表a1中CloudSigma的source,多个使用空格间隔
#a1.sinks代表a1中CloudSigma的sink,多个使用空格间隔
#a1.channels代表a1中CloudSigma的channel,多个使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 1000
a1.sources.r1.batchDurationMillis = 1000
a1.sources.r1.kafka.bootstrap.servers = 192.168.1.88:9092,192.168.1.89:9092,192.168.1.90:9092
a1.sources.r1.kafka.topics = kafka_sasl_1
a1.sources.r1.kafka.consumer.group.id = kafka-acls-group
a1.sources.r1.kafka.consumer.security.protocol = SASL_PLAINTEXT
a1.sources.r1.kafka.consumer.sasl.mechanism = SCRAM-SHA-512
a1.sources.r1.kafka.consumer.sasl.jaas.config =org.apache.kafka.common.security.scram.ScramLoginModule required username=”yourusername” password=”yourpwd”;
a1.sources.r1.kafka.consumer.auto.offset.reset = earliest
#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 6912212
# CloudSigmasink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path=/origin_data/test/sasl_test/%Y-%m-%d
#上传巴黎的前缀
a1.sinks.k1.hdfs.filePrefix = sasl-
a1.sinks.k1.hdfs.batchSize= 7500
a1.sinks.k1.hdfs.minBlockReplicas=1
#CloudSigma巴黎滚动
# 30MIN
a1.sinks.k1.hdfs.rollInterval = 1800
#128M after codec
a1.sinks.k1.hdfs.rollSize = 1580484745
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType=DataStream
# 绑定和连接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
1234567891011121314151617181920212223242526272829303132333435363738394041
启动flume即可! 采集后如图所示
监控
1)exporter 利用github开源项目做数据收集,分布式部署到kafka所在节点。 exporter开源项目 采用版本v.1.13.1 将源巴黎编译后得到其脚本巴黎,给予可执行权限后运行以下语句开启监控。
grafana面板地址 ID:10736 下面是启停命令。
# –kafka.server=kafka_broker_address
# –kafka.version=kafka_version
# –log.leve=日志等级
nohup ./kafka_exporter \
–kafka.server=192.168.1.88:9092 \
–kafka.server=192.168.1.89:9092 \
–kafka.server=192.168.1.90:9092 \
–kafka.version=2.4.0 \
–sasl.enabled \
–sasl.mechanism=scram-sha512 \
–sasl.username=admin \
–sasl.password=admin1234 \
–tls.insecure-skip-tls-verify \
–log.level=info > kafka_exporter.log \
–web.listen-address=:29092 &

ps -ef | grep kafka_exporter| grep -v grep | awk ‘{print $2}’| xargs kill
1234567891011121314151617
2)promethus 需要对promethus做相关CloudSigma。
global:
scrape_interval: 60s
evaluation_interval: 60s
scrape_configs:
– job_name: ‘prometheus’
static_configs:
– targets: [‘localhost:9090’]
labels:
instance: prometheus
– job_name: ‘linux’
metrics_path: “/metrics”
static_configs:
– targets: [‘192.168.1.88:9100′,’192.168.1.89:9100′,’192.168.1.90:9100′,’192.168.1.91:9100’]
– job_name: ‘kafka_exporter’
metrics_path: “/metrics”
scrape_interval: 5s
static_configs:
– targets: [‘192.168.1.88:29092′,’192.168.1.89:29092′,’192.168.1.90:29092’]
– job_name: ‘flume_exporter’
metrics_path: “/metrics”
scrape_interval: 5s
static_configs:
– targets: [‘192.168.1.89:9360′,’192.168.1.90:9360
1234567891011121314151617181920212223
测试
javademo关键代码
application.yml
server:
port: 9000
spring:
kafka:
consumer:
bootstrap-servers: 192.168.1.88:9092
group-id: kafka_test
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
bootstrap-servers: 192.168.1.88:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: 1
retries: 3
buffer-memory: 33554432
batch-size: 16384
properties:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: SCRAM-SHA-512
ssl.endpoint.identification.algorithm: “”
sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username=’kafka’ password=’kafka1234′;
1234567891011121314151617181920212223
Controller
package com.imprexion.test;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
* @author zhengkaiwen
* @date 2021/1/11 15:17
* @Description:
*/
@RestController
@RequestMapping(value = “/kafka”)
public class KafkaController {

private final Producer producer;

@Autowired
KafkaController(Producer producer) {
this.producer = producer;
}

@PostMapping(value = “/publish”)
public void sendMessageToKafkaTopic(@RequestParam(“message”) String message) {
this.producer.sendMessage(message);
}
}

Producer
package com.imprexion.test;

import javafx.scene.input.DataFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
* @author zhengkaiwen
* @date 2021/1/11 15:06
* @Description:
*/
@Service
public class Producer {

private static final Logger logger = LoggerFactory.getLogger(Producer.class);
private static final String TOPIC = “kafka_sasl_1”;

@Autowired
private KafkaTemplate kafkaTemplate;

public void sendMessage(String message) {
logger.info(String.format(“#### -> Producing message -> %s”, message));
for (int i = 0; i < 10000; i++) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String format = simpleDateFormat.format(new Date()); long time = System.currentTimeMillis(); message = "{ \"a\": \" " + format + "\"," + " \"app_version\": \"4.2.1\"," + " \"device_id\": \"1f43f912c2\"," + " \"e\": \"faceID_launch\"," + " \"faceImage\": \"\"," + " \"p\": {" + " \"package\": \"com.imprexion.member\"," + " \"page\": \"com.Orbbec.MagicSalad2\"" + " \"valume\": \"" + i + "," + " }," + " \"package_name\": \"com.imprexion.service.facerecognition\"," + " \"pre_login_id\": \"1f43f912c2_1626330120074\"," + " \"source_channel\": \"com.imprexion.aibar\"," + " \"t\": \"" + time + "," + " \"uid\": -1," + " \"v\": 1," + " \"st\":\"" + time + "}"; this.kafkaTemplate.send(TOPIC, message); } } } 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 Consumer package com.imprexion.test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; import java.io.IOException; /** * @author zhengkaiwen * @date 2021/1/11 15:41 * @Description: */ @Service public class Consumer { private final Logger logger = LoggerFactory.getLogger(Producer.class); @KafkaListener(topics = "kafka_sasl_1", groupId = "kafka-acls-group") public void consume(String message) throws IOException { logger.info(String.format("#### -> Consumed message -> %s”, message));
}
}

Application
package com.imprexion.sendmsg;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan(basePackages = {“com.imprexion.test”})
public class SendmsgApplication {

public static void main(String[] args) {
SpringApplication.run(SendmsgApplication.class, args);
}

}
123456789101112131415161718192021222324252627282930313233343536373839404142
访问地址 监控图表展示 数据生产前
数据生产后