前言
关于 kafka 的工作机制, 已经在上篇博文: Kafka 原理及单机部署 https://blog.51cto.com/14154700/2453419 中详细写出来, 这里只是将 kafka 的一个群集部署写了出来.
博文大纲:
一, 环境准备
二, 部署 zookeeper 服务
三, 部署 kafka 集群
一, 环境准备
部署 kafka 群集所需的安装包, 可以从我的网盘链接中下载 https://pan.baidu.com/s/1y1z-ftyLpHwvuoeluS3TAA .
二, 部署 zookeeper 服务
1, 主机 kafka01 配置如下
- # 部署 zookeeper
- [[email protected] src]# tar zxf zookeeper-3.4.9.tar.gz
- [[email protected] src]# mv zookeeper-3.4.9 /usr/local/zookeeper
- # 修改配置文件
- [[email protected] src]# cd /usr/local/zookeeper/conf/
- [[email protected] conf]# cp -p zoo_sample.cfg zoo.cfg
- [[email protected] conf]# sed -i 's/dataDir=\/tmp\/zookeeper/dataDir=\/usr\/local\/zookeeper\/data/g' zoo.cfg
- # 直接群集节点信息, 2888 和 3888 端口用于群集内部通信
- [[email protected] conf]# echo "server.1 192.168.20.2:2888:3888">> zoo.cfg
- [[email protected] conf]# echo "server.2 192.168.20.3:2888:3888">> zoo.cfg
- [[email protected] conf]# echo "server.3 192.168.20.4:2888:3888">> zoo.cfg
- [[email protected] conf]# egrep -v '^$|^#' zoo.cfg #更改后的配置文件如下
- tickTime=2000 #节点之间的心跳检测时间单位为毫秒
- initLimit=10 #达到 5 个访问进行同步数据
- syncLimit=5 #节点之间检查失败次数超过后断开相应的节点
- dataDir=/usr/local/zookeeper/data #日志文件存放路径
- clientPort=2181
- # 声明参与集群的主机
- server.1 192.168.20.2:2888:3888
- server.2 192.168.20.3:2888:3888
- server.3 192.168.20.4:2888:3888
- # 创建所需目录及设置节点的 ID 号
- [[email protected] conf]# mkdir /usr/local/zookeeper/data
- [[email protected] conf]# echo 1> /usr/local/zookeeper/data/myid
- # 将配置好的 zookeeper 目录复制到群集内的其他节点
- [[email protected] conf]# scp -r /usr/local/zookeeper/ [email protected]:/usr/local/
- [[email protected] conf]# scp -r /usr/local/zookeeper/ [email protected]:/usr/local/
- # 启动 zookeeper 服务
- [[email protected] conf]# /usr/local/zookeeper/bin/zkServer.sh start
- [[email protected]afka01 bin]# netstat -antp | egrep '2181|2888|3888' #确认群集端口在监听
2, 主机 kafka02 配置如下
- # 修改 ID 号为 2
- [[email protected] ~]# echo 2> /usr/local/zookeeper/data/myid
- # 启动 zookeeper
- [[email protected] ~]# /usr/local/zookeeper/bin/zkServer.sh start
3, 主机 kafka03 配置如下
- # 修改 ID 号为 3
- [[email protected] ~]# echo 3> /usr/local/zookeeper/data/myid
- # 启动 zookeeper
- [[email protected] ~]# /usr/local/zookeeper/bin/zkServer.sh start
4, 查看 zookeeper 群集内节点的角色
- #kafka01 上如下:
- [[email protected] conf]# /usr/local/zookeeper/bin/zkServer.sh status
- ZooKeeper JMX enabled by default
- Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
- Mode: follower #角色为 follower
- #kafka02 上如下:
- [[email protected] ~]# /usr/local/zookeeper/bin/zkServer.sh status
- ZooKeeper JMX enabled by default
- Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
- Mode: leader #角色为 leader
- #kafka03 上如下:
- [[email protected] ~]# /usr/local/zookeeper/bin/zkServer.sh status
- ZooKeeper JMX enabled by default
- Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
- Mode: follower #角色为 follower
三, 部署 kafka 集群
1, 主机 kafka01 上配置如下
- # 解压至指定目录
- [[email protected] src]# tar zxf kafka_2.11-2.2.1.tgz
- [[email protected] src]# mv kafka_2.11-2.2.1 /usr/local/kafka
- # 修改配置文件
- [[email protected] src]# cd /usr/local/kafka/config/
- [[email protected] config]# sed -i 's/broker.id=0/broker.id=1/g' server.properties
- [[email protected] config]# sed -i 's/#listeners=PLAINTEXT:\/\/:9092/listeners=PLAINTEXT:\/\/192.168.20.2:9092/g' server.properties
- [[email protected] config]# sed -i 's/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=PLAINTEXT:\/\/192.168.20.2:9092/g' server.properties
- [[email protected] config]# sed -i 's/log.dirs=\/tmp\/kafka-logs/log.dirs=\/usr\/local\/zookeeper\/data/g' server.properties
- [[email protected] config]# sed -i 's/zookeeper.connect=localhost:2181/zookeeper.connect=192.168.20.2:2181,192.168.20.3:2181,192.168.20.4:2181/g' server.properties
- [[email protected] config]# sed -i 's/zookeeper.connection.timeout.ms=6000/zookeeper.connection.timeout.ms=600000/g' server.properties
- [[email protected] config]# egrep -v '^$|^#' server.properties #修改后的配置文件如下
- broker.id=1 #kafka 的 ID 号, 这里为 1, 其他节点依次是 2,3
- listeners=PLAINTEXT://192.168.20.2:9092 #节点监听地址, 填写每个节点自己的 IP 地址
- advertised.listeners=PLAINTEXT://192.168.20.2:9092
- # 集群中节点内部交流使用的端口, 填写每个节点自己的 IP 地址
- num.network.threads=3
- num.io.threads=8
- socket.send.buffer.bytes=102400
- socket.receive.buffer.bytes=102400
- socket.request.max.bytes=104857600
- log.dirs=/usr/local/zookeeper/data
- num.partitions=1
- num.recovery.threads.per.data.dir=1
- offsets.topic.replication.factor=1
- transaction.state.log.replication.factor=1
- transaction.state.log.min.isr=1
- log.retention.hours=168
- log.segment.bytes=1073741824
- log.retention.check.interval.ms=300000
- # 声明链接 zookeeper 节点的地址
- zookeeper.connect=192.168.20.2:2181,192.168.20.3:2181,192.168.20.4:2181
- zookeeper.connection.timeout.ms=600000 #修改这的时间, 单位是毫秒, 为了防止连接 zookeeper 超时
- group.initial.rebalance.delay.ms=0
- # 将修改后的 kafka 目录发送至其他节点
- [[email protected] config]# scp -r /usr/local/kafka [email protected]:/usr/local/
- [[email protected] config]# scp -r /usr/local/kafka [email protected]:/usr/local/
- # 启动 kafka
- [[email protected] config]# cd ../bin/
- [[email protected] bin]# ./kafka-server-start.sh ../config/server.properties &
2, 主机 kafka02 配置如下:
- # 修改与 kafka01 冲突之处
- [[email protected] ~]# cd /usr/local/kafka/
- [[email protected] kafka]# sed -i 's/192.168.20.2/192.168.20.3/g' config/server.properties
- [[email protected] kafka]# sed -i 's/broker.id=1/broker.id=2/g' config/server.properties
- # 启动 kafka 服务
- [[email protected] kafka]# cd bin/
- [[email protected] bin]# ./kafka-server-start.sh ../config/server.properties &
- [[email protected] bin]# netstat -anpt | grep 9092 #确定端口在监听
3, 主机 kafka03 配置如下:
- # 修改 kafka 配置文件中冲突之处
- [[email protected] ~]# cd /usr/local/kafka/
- [[email protected] kafka]# sed -i 's/192.168.20.2/192.168.20.4/g' config/server.properties
- [[email protected] kafka]# sed -i 's/broker.id=1/broker.id=3/g' config/server.properties
- # 启动 kafka 服务
- [[email protected] kafka]# cd bin/
- [[email protected] bin]# ./kafka-server-start.sh ../config/server.properties &
- [[email protected] bin]# netstat -anpt | grep 9092 #确定端口在监听
4, 发布与订阅消息测试
- # 创建名为 my-replicated-topic 的 topic
- [[email protected] bin]# ./kafka-topics.sh --create --Bootstrap-server 192.168.20.2:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
- # 查看 topic 的状态和 leader
- [[email protected] bin]# ./kafka-topics.sh --describe --Bootstrap-server 192.168.20.2:9092 --topic my-replicated-topic
- Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824
- Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
- # 返回的信息表示 partition 数量为 1, 副本数量为 3,segment 字节数为 1073741824
- # 名称为 "my-replicated-topic",ID 为 2 的节点为 leader
- [[email protected] bin]# ./kafka-console-producer.sh --broker-list 192.168.20.2:9092 --topic my-replicated-topic
- # 随便写入几行数据
- >aaaaa
- >bbbbb
- >ccccc
- >ddddd
- # 在其他节点上订阅消息
- [[email protected] bin]# ./kafka-console-consumer.sh --Bootstrap-server 192.168.20.3:9092 --from-beginning --topic my-replicated-topic
- ................# 省略部分内容
- aaaaa
- bbbbb
- ccccc
- ddddd
5, 模拟 leader 宕机, 查看 topic 的状态及新的 leader
- # 可以看到当前 leader 是 ID 为 2 的节点
- [[email protected] bin]# ./kafka-topics.sh --describe --Bootstrap-server 192.168.20.2:9092 --topic my-replicated-topic
- Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824
- Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
- # 到 kafka02 主机上停止 kafka 服务
- [[email protected] bin]# ./kafka-server-stop.sh
- # 再次查看 leader 是哪个节点?(可以发现 leader 换成了 ID 为 1 的节点)
- [[email protected] bin]# ./kafka-topics.sh --describe --Bootstrap-server 192.168.20.2:9092 --topic my-replicated-topic
- Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:segment.bytes=1073741824
- Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 2,1,3 Isr: 1,3
来源: http://www.bubuko.com/infodetail-3304270.html