一、启动Zookeeper和kafka
确保Zookeeper和Kafka已启动(此时尚未配置加密,使用PLAINTEXT)
systemctl start zookeeper systemctl start kafka二、创建用户(未配置完成加密情况下)
2.1 创建管理员用户(admin)
/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --alter --add-config 'SCRAM-SHA-256=[password=admin]' --entity-type users --entity-name admin2.2 创建应用程序用户
/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --alter --add-config 'SCRAM-SHA-256=[password=dmhs123]' --entity-type users --entity-name dmhs2.3 验证用户是否创建成功
/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --describe --entity-type users三、维护SCRAM证书(未配置完成加密情况下)
3.1 查看SCRAM证书
/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --describe --entity-type users --entity-name admin3.2 删除SCRAM证书
/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name admin四、服务端配置
在用户证书创建完毕之后开始Kafka服务端的配置。
4.1 创建JAAS文件
进入/usr/local/kafka/config文件夹,执行以下命令:
cat > /usr/local/kafka/config/kafka_server_jaas.conf << EOF KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin"; }; EOF chmod 600 /usr/local/kafka/config/kafka_server_jaas.conf4.2 启动加载jaas配置文件
4.2.1 方式一:修改systemd服务文件来注入JVM参数(推荐)
cat > /etc/systemd/system/kafka.service << EOF [Unit] Description=Apache Kafka After=zookeeper.service network.target [Service] Type=simple Environment="JAVA_HOME=/usr/local/jdk1.8.0_491" Environment="KAFKA_HEAP_OPTS=-Xmx4G -Xms4G" Environment="KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf" ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target EOF systemctl daemon-reload4.2.2 方式二:修改/usr/local/kafka/bin/kafka-server-start.sh
在内容exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"所在的行注释,并在该行下面子新增一行以下内容:
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf kafka.Kafka "$@"4.3 配置server.properties
编辑/usr/local/kafka/config/server.properties,内容如下:
broker.id=0 #listeners=PLAINTEXT://0.0.0.0:9092 #advertised.listeners=PLAINTEXT://192.168.10.5:9092 log.dirs=/data/kafka-logs zookeeper.connect=localhost:2181 num.network.threads=5 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 listeners=SASL_PLAINTEXT://0.0.0.0:9092 advertised.listeners=SASL_PLAINTEXT://192.168.10.5:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 sasl.enabled.mechanisms=SCRAM-SHA-256 allow.everyone.if.no.acl.found=false super.users=User:admin authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer4.4 重启Zookeeper和Kafka
systemctl stop kafka zookeeper systemctl start zookeeper kafka五、客户端配置
5.1 创建SASL客户端配置文件
后续所有操作均使用以下新建的客户端配置文件,不再使用Kafka自带的producer.properties和consumer.properties。
cat > /usr/local/kafka/config/kafka_client_admin.properties << EOF security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin"; request.timeout.ms=60000 EOF cat > /usr/local/kafka/config/kafka_client_dmhs.properties << EOF security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="dmhs" password="dmhs123"; EOF chmod 600 /usr/local/kafka/config/kafka_client_admin.properties chmod 600 /usr/local/kafka/config/kafka_client_dmhs.properties5.2 创建主题(加密环境下)
/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.10.5:9092 --topic hstest --partitions 1 --replication-factor 1 --command-config /usr/local/kafka/config/kafka_client_admin.properties /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.10.5:9092 --list --command-config /usr/local/kafka/config/kafka_client_admin.properties5.3 用户授权
由于server.properties中allow.everyone.if.no.acl.found=false,因此需要对用户授权。
授权规则:生产者需要主题的WRITE权限;消费者需要主题的READ权限 + 消费组的READ权限。
5.3.1 赋予用户dmhs对主题hstest的写权限
/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server 192.168.10.5:9092 --command-config /usr/local/kafka/config/kafka_client_admin.properties --add --allow-principal User:dmhs --operation Write --topic hstest5.3.2 赋予用户dmhs对主题hstest的读权限
/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server 192.168.10.5:9092 --command-config /usr/local/kafka/config/kafka_client_admin.properties --add --allow-principal User:dmhs --operation Read --topic hstest5.3.3 赋予用户dmhs对消费组dmhs-consumer-group的读权限
/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server 192.168.10.5:9092 --command-config /usr/local/kafka/config/kafka_client_admin.properties --add --allow-principal User:dmhs --operation Read --group dmhs-consumer-group测试环境可一次性授权所有消费组:
/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server 192.168.10.5:9092 --command-config /usr/local/kafka/config/kafka_client_admin.properties --add --allow-principal User:dmhs --operation Read --group '*'5.3.4 检查权限
确认ACL已包含Write、Read on topic、Read on group:
/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server 192.168.10.5:9092 --command-config /usr/local/kafka/config/kafka_client_admin.properties --list5.4 启动生产者
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.10.5:9092 --topic hstest --producer.config /usr/local/kafka/config/kafka_client_dmhs.properties5.5 启动消费者
已通过--group '*'授权所有消费组,因此以下命令可使用随机组名(不指定--group):
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.5:9092 --topic hstest --from-beginning --consumer.config /usr/local/kafka/config/kafka_client_dmhs.properties如需固定组,请指定--group(必须已授权该组):
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.5:9092 --topic hstest --from-beginning --consumer.config /usr/local/kafka/config/kafka_client_dmhs.properties --group dmhs-consumer-group每次测试使用不同组名(避免偏移量残留):
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.5:9092 --topic hstest --from-beginning --consumer.config /usr/local/kafka/config/kafka_client_dmhs.properties --group dmhs-test-$(date +%s)5.6 消费行为说明
- 对单分区主题,Kafka保证同一个分区只能被同一个消费组内的一个消费者消费。
- 如果消费组中的所有消费者共享同一份偏移量,窗口1消费了消息并提交偏移量后,窗口2随后启动(或再平衡后接管分区),会从窗口1提交的偏移量之后开始消费,而不是从最早消息开始。
- 如果消费者启动时报
OFFSET_OUT_OF_RANGE,可以临时指定--group为全新组名(如dmhs-test-$(date +%s)),或者检查kafka_client_dmhs.properties中的auto.offset.reset是否设为earliest(当前已设置)。 - 建议:同一业务逻辑的多实例使用相同消费组,以实现负载均衡和高可用;不同业务或独立测试用途应使用不同的消费组,避免互相干扰偏移量,也避免单分区场景下一个消费者空转。
5.7 测试用超级用户admin直接消费hstest
无需指定组,Kafka会自动为其生成一个全新的随机消费组:
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.5:9092 --topic hstest --from-beginning --consumer.config /usr/local/kafka/config/kafka_client_admin.properties六、已配置加密情况下新建用户
6.1 创建用户(需携带admin凭证)
/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --alter --add-config 'SCRAM-SHA-256=[password=dmhs123]' --entity-type users --entity-name dmhs --command-config /usr/local/kafka/config/kafka_client_admin.properties6.2 验证用户是否创建成功(ZooKeeper)
/usr/local/kafka/bin/zookeeper-shell.sh localhost:2181 <<< "ls /config/users"6.3 查看某个用户详情(使用ZooKeeper避免超时)
/usr/local/kafka/bin/zookeeper-shell.sh localhost:2181 <<< "get /config/users/dmhs"6.4 删除用户
/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name dmhs --command-config /usr/local/kafka/config/kafka_client_admin.properties若上述命令超时,可直接从ZooKeeper删除(谨慎操作):
# /usr/local/kafka/bin/zookeeper-shell.sh localhost:2181 <<< "deleteall /config/users/dmhs"七、测试环境清空主题中的消息
7.1 物理清空主题数据(保留ACL)
# 停止 Kafka systemctl stop kafka # 清空hstest主题的数据文件(删除分区目录下所有文件) rm -rf /data/kafka-logs/hstest-0/* # 启动Kafka systemctl start kafka7.2 验证
启动消费者从头消费,应该看不到任何历史消息:
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.5:9092 --topic hstest --from-beginning --consumer.config /usr/local/kafka/config/kafka_client_admin.properties注意:如果dmhs-consumer-group之前消费过并提交了偏移量,现在Kafka重启后分区日志从offset 0开始,但消费组记录的偏移量可能是旧值(比如9)。下次消费者以dmhs-consumer-group启动时,会尝试从offset 9消费,由于日志被清空,会抛出OFFSET_OUT_OF_RANGE警告。此时使用kafka_client_dmhs.properties时若已设置auto.offset.reset=earliest,会自动重置到最早位置;否则,则需依赖--from-beginning参数。