jdk-kafka-zookeeper
都解压后,配置java环境变量
cd kafka/bin
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
nohup bin/kafka-server-start.sh config/server.properties &
jps
#创建topic
[root@master bin]# ./kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test
#创建生产者
[root@master kafka]# bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
#创建消费者
[root@master bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
#同一个消费者组中只会在其中一个消费一次,不会重复消费
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testgroup
#查看消费者组信息,总共有多少消息,接受了多少,多少没接收
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group testgroup
#------------------------------------------------------------------------------------------------------------------------------
#搭建集群
vi /etc/hosts
ip work1
ip work2
ip work3
#三个节点都安装zookeeper
cd /app/zookeeper
cd conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
#创建数据目录
mkdir -p /app/zookeeper/data
cd /app/zookeeper/data
echo 1> myid#20写1,21写2,22写3
#启动zookeeper(三个节点都搞)
cd /app/zookeeper
bin/zkServer.sh --config conf start
#三个节点也都要安装kafka
cd /kafka/config
vi server.properties
#不同节点要不一样区分id
#listen
#改日志路径
#连接到zookeeper集群
#改完可以通过scp复制过去就再做些小修改就行
scp -r /app/kafka/ root@work2:/app/kafka
#启动kafka
cd /app/kafka
#daemon是后台启动
bin/kafka-server-start.sh -daemon config/server.properties
jps
#确保三个都启动成功了java,zookeeper,kafka
#创建一个topic
bin/kafka-topics.sh --bootstrap-server work1:9092 --create --topic distopic --replication-factor 2 --partitions 4
#开启四个partitions 两个备份
#创建生产者
bin/kafka-console-producer.sh --bootstrap-server work1:9092 --create --topic distopic
#创建消费者并设置组
bin/kafka-console-consumer.sh --bootstrap-server work1:9092 --group testgroup --topic distopic
#通过组查看消息队列数据
bin/kafka-consumer-group.sh --bootstrap-server wor1:9092 --describe --group testgroup
#这些leader才是真实发送数据的
不管你是在--bootstrap-server指定其他节点,最终都是找到leader再转发同步的
#初始化选举
按顺序启动时,第三个被选举为leader
因为要选举第一个只有一个投自己没用自动变为从节点
第二个双数节点没用
第三个节点上来时就选举为leader了
#znode
#进入znode
cd /app/zookeeper/bin
./zkCli.sh
#指定节点 -server
./zkCli.sh -server 192.168.255.21
#znode管理命令
ls -s 路径 #查看详细信息等同于 stat 路径
#类似都是操作目录,文件
create
stat
ls
#create加参数e指创建临时节点
create -e
#set设置值key value形式
#每set一次会更新 datalength 是实际的value长度,并且 dataversion 会加一
#watcher时间 -w参数,只能单次,应该是用来检测程序的
stat -w /opt
执行后下次修改/opt 会提示修改了什么
父节点检测,修改子节点不触发监控
#delete
只能当节点是空值时删除
当节点有·子节点时再像删除要deleteall
#ACL权限控制
#查看权限
getAcl /path
#权限都是针对起直属的第一个子节点,子节点之后就约束不了了
假设当父节点有c权限创建了子节点,那子节点默认是最大权限cdrwa
#修改权限
setAcl /opt world:anyone:crwa
#加参数-r就可以遍历子节点设置权限,就可以让子节点全都没用删除权限
setAcl -R /optworld:anyone:crwa
#auth
#addauth先创建一个用户密码——digest就是对密码加密
addauth digest ljh:123456
#在有这个账号的基础上就可以给一些文件设置账号密码登录原账号是(world,anyone)
#给/z1设置用户密码
setAcl auth:ljh:123456:crwad
之后在想修改要
#超级用户可以在忘记密码时使用
先创建一个用户
addauth digest cxk:cxk666
然后修改配置文件
/app/zookeeper/bin/zkserver.sh
添加超级用户,再重启文件
./zkserver.sh restart