news 2026/7/3 17:47:14

Kafka2.8.2单机加密配置SASL/SCRAM

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Kafka2.8.2单机加密配置SASL/SCRAM

一、启动Zookeeper和kafka

确保Zookeeper和Kafka已启动(此时尚未配置加密,使用PLAINTEXT)

systemctl start zookeeper systemctl start kafka

二、创建用户(未配置完成加密情况下)

2.1 创建管理员用户(admin)

/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --alter --add-config 'SCRAM-SHA-256=[password=admin]' --entity-type users --entity-name admin

2.2 创建应用程序用户

/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --alter --add-config 'SCRAM-SHA-256=[password=dmhs123]' --entity-type users --entity-name dmhs

2.3 验证用户是否创建成功

/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --describe --entity-type users

三、维护SCRAM证书(未配置完成加密情况下)

3.1 查看SCRAM证书

/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --describe --entity-type users --entity-name admin

3.2 删除SCRAM证书

/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name admin

四、服务端配置

在用户证书创建完毕之后开始Kafka服务端的配置。

4.1 创建JAAS文件

进入/usr/local/kafka/config文件夹,执行以下命令:

cat > /usr/local/kafka/config/kafka_server_jaas.conf << EOF KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin"; }; EOF chmod 600 /usr/local/kafka/config/kafka_server_jaas.conf

4.2 启动加载jaas配置文件

4.2.1 方式一:修改systemd服务文件来注入JVM参数(推荐)
cat > /etc/systemd/system/kafka.service << EOF [Unit] Description=Apache Kafka After=zookeeper.service network.target [Service] Type=simple Environment="JAVA_HOME=/usr/local/jdk1.8.0_491" Environment="KAFKA_HEAP_OPTS=-Xmx4G -Xms4G" Environment="KAFKA_OPTS=-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf" ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target EOF systemctl daemon-reload
4.2.2 方式二:修改/usr/local/kafka/bin/kafka-server-start.sh

在内容exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"所在的行注释,并在该行下面子新增一行以下内容:

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf kafka.Kafka "$@"

4.3 配置server.properties

编辑/usr/local/kafka/config/server.properties,内容如下:

broker.id=0 #listeners=PLAINTEXT://0.0.0.0:9092 #advertised.listeners=PLAINTEXT://192.168.10.5:9092 log.dirs=/data/kafka-logs zookeeper.connect=localhost:2181 num.network.threads=5 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 listeners=SASL_PLAINTEXT://0.0.0.0:9092 advertised.listeners=SASL_PLAINTEXT://192.168.10.5:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 sasl.enabled.mechanisms=SCRAM-SHA-256 allow.everyone.if.no.acl.found=false super.users=User:admin authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

4.4 重启Zookeeper和Kafka

systemctl stop kafka zookeeper systemctl start zookeeper kafka

五、客户端配置

5.1 创建SASL客户端配置文件

后续所有操作均使用以下新建的客户端配置文件,不再使用Kafka自带的producer.properties和consumer.properties。

cat > /usr/local/kafka/config/kafka_client_admin.properties << EOF security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin"; request.timeout.ms=60000 EOF cat > /usr/local/kafka/config/kafka_client_dmhs.properties << EOF security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="dmhs" password="dmhs123"; EOF chmod 600 /usr/local/kafka/config/kafka_client_admin.properties chmod 600 /usr/local/kafka/config/kafka_client_dmhs.properties

5.2 创建主题(加密环境下)

/usr/local/kafka/bin/kafka-topics.sh --create --bootstrap-server 192.168.10.5:9092 --topic hstest --partitions 1 --replication-factor 1 --command-config /usr/local/kafka/config/kafka_client_admin.properties /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.10.5:9092 --list --command-config /usr/local/kafka/config/kafka_client_admin.properties

5.3 用户授权

由于server.propertiesallow.everyone.if.no.acl.found=false,因此需要对用户授权。
授权规则:生产者需要主题的WRITE权限;消费者需要主题的READ权限 + 消费组的READ权限。

5.3.1 赋予用户dmhs对主题hstest的写权限
/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server 192.168.10.5:9092 --command-config /usr/local/kafka/config/kafka_client_admin.properties --add --allow-principal User:dmhs --operation Write --topic hstest
5.3.2 赋予用户dmhs对主题hstest的读权限
/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server 192.168.10.5:9092 --command-config /usr/local/kafka/config/kafka_client_admin.properties --add --allow-principal User:dmhs --operation Read --topic hstest
5.3.3 赋予用户dmhs对消费组dmhs-consumer-group的读权限
/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server 192.168.10.5:9092 --command-config /usr/local/kafka/config/kafka_client_admin.properties --add --allow-principal User:dmhs --operation Read --group dmhs-consumer-group

测试环境可一次性授权所有消费组:

/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server 192.168.10.5:9092 --command-config /usr/local/kafka/config/kafka_client_admin.properties --add --allow-principal User:dmhs --operation Read --group '*'
5.3.4 检查权限

确认ACL已包含Write、Read on topic、Read on group:

/usr/local/kafka/bin/kafka-acls.sh --bootstrap-server 192.168.10.5:9092 --command-config /usr/local/kafka/config/kafka_client_admin.properties --list

5.4 启动生产者

/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.10.5:9092 --topic hstest --producer.config /usr/local/kafka/config/kafka_client_dmhs.properties

5.5 启动消费者

已通过--group '*'授权所有消费组,因此以下命令可使用随机组名(不指定--group):

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.5:9092 --topic hstest --from-beginning --consumer.config /usr/local/kafka/config/kafka_client_dmhs.properties

如需固定组,请指定--group(必须已授权该组):

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.5:9092 --topic hstest --from-beginning --consumer.config /usr/local/kafka/config/kafka_client_dmhs.properties --group dmhs-consumer-group

每次测试使用不同组名(避免偏移量残留):

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.5:9092 --topic hstest --from-beginning --consumer.config /usr/local/kafka/config/kafka_client_dmhs.properties --group dmhs-test-$(date +%s)

5.6 消费行为说明

  • 对单分区主题,Kafka保证同一个分区只能被同一个消费组内的一个消费者消费。
  • 如果消费组中的所有消费者共享同一份偏移量,窗口1消费了消息并提交偏移量后,窗口2随后启动(或再平衡后接管分区),会从窗口1提交的偏移量之后开始消费,而不是从最早消息开始。
  • 如果消费者启动时报OFFSET_OUT_OF_RANGE,可以临时指定--group为全新组名(如dmhs-test-$(date +%s)),或者检查kafka_client_dmhs.properties中的auto.offset.reset是否设为earliest(当前已设置)。
  • 建议:同一业务逻辑的多实例使用相同消费组,以实现负载均衡和高可用;不同业务或独立测试用途应使用不同的消费组,避免互相干扰偏移量,也避免单分区场景下一个消费者空转。

5.7 测试用超级用户admin直接消费hstest

无需指定组,Kafka会自动为其生成一个全新的随机消费组:

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.5:9092 --topic hstest --from-beginning --consumer.config /usr/local/kafka/config/kafka_client_admin.properties

六、已配置加密情况下新建用户

6.1 创建用户(需携带admin凭证)

/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --alter --add-config 'SCRAM-SHA-256=[password=dmhs123]' --entity-type users --entity-name dmhs --command-config /usr/local/kafka/config/kafka_client_admin.properties

6.2 验证用户是否创建成功(ZooKeeper)

/usr/local/kafka/bin/zookeeper-shell.sh localhost:2181 <<< "ls /config/users"

6.3 查看某个用户详情(使用ZooKeeper避免超时)

/usr/local/kafka/bin/zookeeper-shell.sh localhost:2181 <<< "get /config/users/dmhs"

6.4 删除用户

/usr/local/kafka/bin/kafka-configs.sh --bootstrap-server 192.168.10.5:9092 --alter --delete-config 'SCRAM-SHA-256' --entity-type users --entity-name dmhs --command-config /usr/local/kafka/config/kafka_client_admin.properties

若上述命令超时,可直接从ZooKeeper删除(谨慎操作):

# /usr/local/kafka/bin/zookeeper-shell.sh localhost:2181 <<< "deleteall /config/users/dmhs"

七、测试环境清空主题中的消息

7.1 物理清空主题数据(保留ACL)

# 停止 Kafka systemctl stop kafka # 清空hstest主题的数据文件(删除分区目录下所有文件) rm -rf /data/kafka-logs/hstest-0/* # 启动Kafka systemctl start kafka

7.2 验证

启动消费者从头消费,应该看不到任何历史消息:

/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.5:9092 --topic hstest --from-beginning --consumer.config /usr/local/kafka/config/kafka_client_admin.properties

注意:如果dmhs-consumer-group之前消费过并提交了偏移量,现在Kafka重启后分区日志从offset 0开始,但消费组记录的偏移量可能是旧值(比如9)。下次消费者以dmhs-consumer-group启动时,会尝试从offset 9消费,由于日志被清空,会抛出OFFSET_OUT_OF_RANGE警告。此时使用kafka_client_dmhs.properties时若已设置auto.offset.reset=earliest,会自动重置到最早位置;否则,则需依赖--from-beginning参数。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/7/3 17:46:36

在Keil中使用Clang-Format对齐代码

简介 Clang-format既是一个库&#xff0c;也是一个独立的工具&#xff0c;它的目标是根据可配置的样式指南自动重新格式化源文件&#xff0c;主要是处理缩进、换行、对齐、空格等排版问题。可用来格式化C/C、Java、JavaScript、Objective-C、Protobuf、C# 等代码。其内置有几种…

作者头像 李华
网站建设 2026/7/3 17:45:59

GEO的KPI不止是“提及率”——搭建多维度的KPI体系

很多品牌在做GEO时&#xff0c;把所有的注意力都放在了“提及率”这一个指标上。但GEO的KPI远不止这么简单。一个科学、完整的GEO KPI体系&#xff0c;应该像一份体检报告一样&#xff0c;涵盖多个维度&#xff0c;既有“结果指标”&#xff0c;也有“过程指标”和“质量指标”…

作者头像 李华
网站建设 2026/7/3 17:43:45

WandEnhancer开源增强工具:解锁游戏修改新体验的完整指南

WandEnhancer开源增强工具&#xff1a;解锁游戏修改新体验的完整指南 【免费下载链接】Wand-Enhancer Advanced UX and interoperability extension for Wand (WeMod) app 项目地址: https://gitcode.com/gh_mirrors/we/Wand-Enhancer WandEnhancer是一款专为Wand&#…

作者头像 李华
网站建设 2026/7/3 17:40:14

EM3080-W条形码解码器与STM32F303RC的硬件协同设计

1. EM3080-W条形码解码器芯片的核心特性解析EM3080-W作为Newland Auto-ID Tech推出的专业级条形码解码芯片&#xff0c;其设计哲学围绕三个核心维度展开&#xff1a;适应性解码能力、能效优化和接口友好性。这款芯片在硬件层面集成了多码制并行识别引擎&#xff0c;支持从传统E…

作者头像 李华