前言
很多开发者第一次接触Kafka,往往是在项目出现性能问题之后。
系统用户少的时候,一切都很正常。订单直接写数据库,消息直接调用接口,日志直接落盘,响应速度也看不出什么问题。但随着业务规模增长,请求量越来越大,系统开始出现各种异常:数据库压力飙升、接口调用超时、服务之间互相拖慢,甚至一个模块出问题就能连锁影响整个系统。
这时候很多人才会意识到,高并发系统真正需要解决的,并不只是计算能力的问题。
更重要的是如何让各个系统之间解耦,让流量洪峰有地方缓冲,让数据能够稳定流转。否则每当业务高峰到来时,所有请求都会同时压向数据库和业务服务,最终把整个系统拖垮。电商秒杀、支付系统、日志采集、实时监控这些场景,本质上面对的都是同一个问题。
而Kafka正是在这样的背景下成为越来越多企业的核心基础设施。
从最初的消息队列,到如今的数据流平台,Kafka承担的早已不仅仅是消息传递工作。订单系统通过它异步处理业务请求,日志平台通过它收集海量数据,微服务架构通过它实现服务解耦,实时计算平台则依赖它完成数据分发。对于很多互联网系统来说,Kafka已经逐渐变成了整个数据流转链路中的核心枢纽。
但Kafka真正难的地方并不在安装。
创建Topic、启动Broker、发送消息这些操作,很多人半小时就能学会。真正决定生产环境稳定性的,是分区机制、副本同步、ISR、位移管理、消息可靠性以及集群容灾这些底层原理。很多线上故障并不是因为Kafka性能不足,而是因为对这些机制理解不够深入。
因此,与其把Kafka当作一个简单的消息队列工具,不如把它看成一套高可靠的数据基础设施。只有理解其核心运行机制,才能真正发挥出Kafka在高并发、实时处理以及分布式架构中的价值。
1.安装前准备
1.1 操作系统要求
Kafka可以在多种 [Linux 发行版](https://so.csdn.net/so/search?q=Linux 发行版&spm=1001.2101.3001.7020)上运行,本文以CentOS 7为例,其他发行版步骤类似,只需调整包管理命令。
1.2 java环境要求
Kafka基于Java开发,需安装JDK 8 或以上版本:
java-version1.3 安装JDK
下载 JDK
- Oracle 官网或 OpenJDK 官网下载 Linux 版本
- 示例(OpenJDK 8):
wgethttps://download.java.net/openjdk/jdk8u41/ri/openjdk-8u41-b04-linux-x64-14_jan_2020.tar.gz解压安装包
mkdir-p/usr/local/javatar-zxvfopenjdk-8u41-b04-linux-x64-14_jan_2020.tar.gz-C/usr/local/java配置环境变量
在 /etc/profile 末尾追加:
exportJAVA_HOME=/usr/local/java/jdk1.8.0_41exportPATH=$PATH:$JAVA_HOME/bin使配置生效:
source/etc/profile验证安装
java-version2.安装 Kafka
2.1 下载 Kafka
- 官网下载
- 示例版本:3.6.2
linux系统可以直接命令一键安装:
wgethttps://downloads.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgztar-xzfkafka_2.13-3.9.1.tgzmvkafka_2.13-3.9.1 kafka2.2 创建数据日志目录
在kafka解压目录同一路径下:创建一个kafka_data,用于装kafka和zookeeper的log和数据等:
mkdir-p/opt/kafka_datamkdir-p/opt/kafka_data/zookeepermkdir-p/opt/kafka_data/logmkdir-p/opt/kafka_data/log/kafkamkdir-p/opt/kafka_data/log/zookeeper2.3 配置Kafka配置文件
编辑这个文件:
broker.id=0port=9092host.name=iplog.dirs=/opt/kafka_data/log/kafkazookeeper.connect=localhost:21812.4 配置zookeeper配置文件
dataDir=/opt/kafka_data/zookeeperdataLogDir=/opt/kafka_data/log/zookeeperclientPort=2181maxClientCnxns=100tickTimes=2000initLimit=10syncLimit=53.启动与停止Kafka
3.1开启ZooKeeper
开启ZooKeeper:
./zookeeper-server-start.sh../config/zookeeper.properties&3.2启动Kafka:
./kafka-server-start.sh../config/server.properties&验证是否启动成功:
jps输出应包含:
QuorumPeerMain Kafka3.3停止zookeeper
./zookeeper-server-stop.sh../config/zookeeper.properties&3.4停止kafkfa
./kafka-server-stop.sh../config/server.properties&4.创建生产者topic和消费者topic简单示例
在一个终端执行创建生产者: (推消息到shan)
cd/opt/bin/#进入kafka目录./kafka-console-producer.sh --broker-list192.168.42.140:9092--topicwd_test#wd_test你要建立的topic名在一个终端执行创建消费者: (从shan上消费消息)
cd/opt/bin/#进入kafka目录./kafka-console-producer.sh --broker-list192.168.42.140:9092--topicwd_test#消费shan中topic消息查看效果: 一个终端不断输入推送的消息,另一个终端则消费这个消息
查看当前主题:
./kafka-topics.sh--zookeeperlocalhost:2181--list你正在家里远程办公,突然接到任务:需要验证一个新业务模块的消息生产与消费逻辑。
但Kafka集群部署在公司内网测试环境,没有公网IP,防火墙也不开放9099/9092端口——你既无法连接Broker创建Topic,也无法从本地启动生产者或消费者进行调试。
传统的做法是:
- 提交代码到CI/CD触发部署(慢)
- 求运维临时开防火墙(麻烦)
- 或干脆去公司(不现实)
有没有更敏捷的方式?
有!借助内网穿透工具,我们可以将内网Kafka的9092端口安全暴露到公网。
只需一条隧道命令,你的本地开发机就能像在内网一样:
- 通过 kafka-topics.sh 创建测试 Topic
- 用 kafka-console-producer.sh 发送消息
- 用 kafka-console-consumer.sh 实时消费验证
整个过程无需改动 Kafka 配置、无需网络权限审批,5 分钟打通内外网,让开发调试回归高效。
跟我一起来操作吧~
5.安装cpolar内网穿透工具
cpolar 可以将你本地电脑中的服务(如 SSH、Web、数据库)映射到公网。即使你在家里或外出时,也可以通过公网地址连接回本地运行的开发环境。
❤️以下是安装cpolar步骤:
使用一键脚本安装命令:
sudocurlhttps://get.cpolar.sh|sh安装完成后,执行下方命令查看cpolar服务状态:(如图所示即为正常启动)
sudosystemctl status cpolarCpolar安装和成功启动服务后,在浏览器上输入虚拟机主机IP加9200端口即:【ip:9200】访问Cpolar管理界面,使用Cpolar官网注册的账号登录,登录后即可看到cpolar web 配置界面,接下来在web 界面配置即可:
打开浏览器访问本地9200端口,使用cpolar账户密码登录即可,登录后即可对隧道进行管理。
6.配置公网地址
通过配置,你可以在本地 WSL 或 Linux 系统上运行 SSH 服务,并通过 Cpolar 将其映射到公网,从而实现从任意设备远程连接开发环境的目的。
- 隧道名称:可自定义,本例使用了:zookeeper,注意不要与已有的隧道名称重复
- 协议:tcp
- 本地地址:2181
- 端口类型:随机临时TCP端口
- 地区:China Top
创建成功后,打开左侧在线隧道列表,可以看到刚刚通过创建隧道生成了公网地址,接下来就可以在其他电脑或者移动端设备(异地)上,使用任意一个地址在终端中访问即可。
tcp 表示使用的协议类型
2.tcp.cpolar.top是 Cpolar 提供的域名
13917是随机分配的公网端口号
通过Cpolar提供的公网地址和端口,Kafka就能从本地启动生产者或消费者进行调试啦!
生产:
./kafka-console-producer.sh --broker-list2.tcp.cpolar.top:13917--topicshan消费:
./kafka-console-consumer.sh --bootstrap-server2.tcp.cpolar.top:13917--topicshan7.保留固定TCP公网地址
使用cpolar为其配置TCP地址,该地址为固定地址,不会随机变化。
选择区域和描述:有一个下拉菜单,当前选择的是“China Top”。
右侧输入框,用于填写描述信息。
保留按钮:在右侧有一个橙色的“保留”按钮,点击该按钮可以保留所选的TCP地址。
列表中显示了一条已保留的TCP地址记录。
地区:显示为“China Top”。
地址:显示为“26.tcp.cpolar.top:13166”。
登录cpolar web UI管理界面,点击左侧仪表盘的隧道管理——隧道列表,找到所要配置的隧道Kafka,点击右侧的编辑。
修改隧道信息,将保留成功的TCP端口配置到隧道中。
- 端口类型:选择固定TCP端口
- 预留的TCP地址:填写保留成功的TCP地址
点击更新。
创建完成后,打开在线隧道列表,此时可以看到随机的公网地址已经发生变化,地址名称也变成了保留和固定的TCP地址。
最后就可以使用命令测试啦!
总结
Kafka能够成为行业主流消息中间件,并不是因为它功能最多,而是在吞吐能力、可靠性和扩展能力之间找到了非常优秀的平衡点。从消息解耦、流量削峰到实时数据处理,它已经成为现代分布式系统中不可或缺的一环。
本文从Kafka安装部署开始,深入介绍了Broker、Topic、Partition、副本同步以及消息生产消费机制等核心内容,并结合实际场景演示了Kafka消息收发流程。对于刚接触Kafka的开发者来说,这些内容能够帮助建立完整的知识框架;对于已经在线上使用Kafka的团队来说,也有助于进一步理解其底层运行逻辑。
结合cpolar提供的公网访问能力之后,即使Kafka部署在公司内网、实验室环境或者家庭服务器中,也能够在外部网络下完成远程调试、Topic管理以及生产消费测试。这对于远程开发、异地协作以及测试环境验证来说,能够显著提升效率。
对于很多系统而言,数据库负责存储数据,而Kafka负责让数据流动起来。当业务规模不断扩大之后,一个稳定可靠的消息系统往往比单纯提升服务器配置更有价值。而Kafka之所以能够长期占据消息中间件领域的重要位置,正是因为它解决的从来不只是消息传递,而是整个数据流转体系的稳定性问题。