一, 前言
1,Kafka 简介
Kafka 是一个开源的分布式消息引擎 / 消息中间件, 同时 Kafka 也是一个流处理平台. Kakfa 支持以发布 / 订阅的方式在应用间传递消息, 同时并基于消息功能添加了 Kafka Connect,Kafka Streams 以支持连接其他系统的数据(Elasticsearch https://ken.io/note/elk-deploy-guide ,Hadoop https://ken.io/note/hadoop-cluster-deploy-guide 等)
Kafka 最核心的最成熟的还是他的消息引擎, 所以 Kafka 大部分应用场景还是用来作为消息队列削峰平谷. 另外, Kafka 也是目前性能最好的消息中间件.
2,Kafka 架构
在 Kafka 集群 (Cluster) 中, 一个 Kafka 节点就是一个 Broker, 消息由 Topic 来承载, 可以存储在 1 个或多个 Partition 中. 发布消息的应用为 Producer, 消费消息的应用为 Consumer, 多个 Consumer 可以促成 Consumer Group 共同消费一个 Topic 中的消息.
概念 / 对象 | 简单说明 |
---|---|
Broker | Kafka 节点 |
Topic | 主题,用来承载消息 |
Partition | 分区,用于主题分片存储 |
Producer | 生产者,向主题发布消息的应用 |
Consumer | 消费者,从主题订阅消息的应用 |
Consumer Group | 消费者组,由多个消费者组成 |
3, 准备工作
1,Kafka 服务器
准备 3 台 CentOS 服务器, 并配置好静态 IP, 主机名
服务器名 | IP | 说明 |
---|---|---|
kafka01 | 192.168.88.51 | Kafka 节点 1 |
kafka02 | 192.168.88.52 | Kafka 节点 2 |
kafka03 | 192.168.88.53 | Kafka 节点 3 |
软件版本说明
项 | 说明 |
---|---|
Linux Server | CentOS 7 |
Kafka | 2.3.0 |
2,ZooKeeper 集群
Kakfa 集群需要依赖 ZooKeeper 存储 Broker,Topic 等信息, 这里我们部署三台 ZK
服务器名 | IP | 说明 |
---|---|---|
zk01 | 192.168.88.21 | ZooKeeper 节点 |
zk02 | 192.168.88.22 | ZooKeeper 节点 |
zk03 | 192.168.88.23 | ZooKeeper 节点 |
部署过程参考: https://ken.io/note/zookeeper-cluster-deploy-guide
二, 部署过程
1, 应用 & 数据目录
- # 创建应用目录
- mkdir /usr/kafka
- # 创建 Kafka 数据目录
- mkdir /kafka
- mkdir /kafka/logs
- chmod 777 -R /kafka
2, 下载 & 解压
Kafka 官方下载地址: https://kafka.apache.org/downloads
这次我下载的是 2.3.0 版本
- # 创建并进入下载目录
- mkdir /home/downloads
- cd /home/downloads
- # 下载安装包
- wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
- # 解压到应用目录
- tar -zvxf kafka_2.12-2.3.0.tgz -C /usr/kafka
kafka_2.12-2.3.0.tgz 其中 2.12 是 Scala 编译器的版本, 2.3.0 才是 Kafka 的版本
3,Kafka 节点配置
- # 进入应用目录
- cd /usr/kafka/kafka_2.12-2.3.0/
- # 修改配置文件
- vi config/server.properties
通用配置
配置日志目录, 指定 ZooKeeper 服务器
- # A comma separated list of directories under which to store log files
- log.dirs=/kafka/logs
- # root directory for all kafka znodes.
- zookeeper.connect=192.168.88.21:2181,192.168.88.22:2181,192.168.88.23:2181
分节点配置
- Kafka01
- broker.id=0
- #listeners=PLAINTEXT://:9092
- listeners=PLAINTEXT://192.168.88.51:9092
- Kafka02
- broker.id=1
- #listeners=PLAINTEXT://:9092
- listeners=PLAINTEXT://192.168.88.52:9092
- Kafka03
- broker.id=2
- #listeners=PLAINTEXT://:9092
- listeners=PLAINTEXT://192.168.88.53:9092
4, 防火墙配置
- # 开放端口
- firewall-cmd --add-port=9092/tcp --permanent
- # 重新加载防火墙配置
- firewall-cmd --reload
5, 启动 Kafka
- # 进入 kafka 根目录
- cd /usr/kafka/kafka_2.12-2.3.0/
- # 启动
- /bin/kafka-server-start.sh config/server.properties &
- # 启动成功输出示例(最后几行)
- [2019-06-26 21:48:57,183] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
- [2019-06-26 21:48:57,183] INFO Kafka startTimeMs: 1561531737175 (org.apache.kafka.common.utils.AppInfoParser)
- [2019-06-26 21:48:57,185] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
三, Kafka 测试
1, 创建 Topic
在 kafka01(Broker)上创建测试 Tpoic:test-ken-io, 这里我们指定了 3 个副本, 1 个分区
bin/kafka-topics.sh --create --Bootstrap-server 192.168.88.51:9092 --replication-factor 3 --partitions 1 --topic test-ken-io
Topic 在 kafka01 上创建后也会同步到集群中另外两个 Broker:kafka02,kafka03
2, 查看 Topic
我们可以通过命令列出指定 Broker 的
bin/kafka-topics.sh --list --Bootstrap-server 192.168.88.52:9092
3, 发送消息
这里我们向 Broker(id=0)的 Topic=test-ken-io 发送消息
- bin/kafka-console-producer.sh --broker-list 192.168.88.51:9092 --topic test-ken-io
- # 消息内容
- > test by ken.io
4, 消费消息
在 Kafka02 上消费 Broker03 的消息
bin/kafka-console-consumer.sh --Bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning
在 Kafka03 上消费 Broker02 的消息
bin/kafka-console-consumer.sh --Bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning
然后均能收到消息
test by ken.io
这是因为这两个消费消息的命令是建立了两个不同的 Consumer
如果我们启动 Consumer 指定 Consumer Group Id 就可以作为一个消费组协同工, 1 个消息同时只会被一个 Consumer 消费到
- bin/kafka-console-consumer.sh --Bootstrap-server 192.168.88.53:9092 --topic test-ken-io --from-beginning --group testgroup_ken
- bin/kafka-console-consumer.sh --Bootstrap-server 192.168.88.52:9092 --topic test-ken-io --from-beginning --group testgroup_ken
四, 备注
1,Kafka 常用配置项说明
Kafka 常用 Broker 配置说明:
配置项 | 默认值 / 示例值 | 说明 |
---|---|---|
broker.id | 0 | Broker 唯一标识 |
listeners | PLAINTEXT://192.168.88.53:9092 | 监听信息,PLAINTEXT 表示明文传输 |
log.dirs | kafka/logs | kafka 数据存放地址,可以填写多个。用 "," 间隔 |
message.max.bytes | message.max.bytes | 单个消息长度限制,单位是字节 |
num.partitions | 1 | 默认分区数 |
log.flush.interval.messages | Long.MaxValue | 在数据被写入到硬盘和消费者可用前最大累积的消息的数量 |
log.flush.interval.ms | Long.MaxValue | 在数据被写入到硬盘前的最大时间 |
log.flush.scheduler.interval.ms | Long.MaxValue | 检查数据是否要写入到硬盘的时间间隔。 |
log.retention.hours | 24 | 控制一个 log 保留时间,单位:小时 |
zookeeper.connect | 192.168.88.21:2181 | ZooKeeper 服务器地址,多台用 "," 间隔 |
2, 附录
- https://kafka.apache.org/
- https://zh.wikipedia.org/zh-cn/Kafka
本文首发于我的独立博客: https://ken.io/note/kafka-cluster-deploy-guide
来源: https://www.cnblogs.com/ken-io/p/kafka-cluster-deploy-guide.html