概念
运维
配置
监控
生产者与消费者
流处理
分区 partition
一定条件下, 分区数越多, 吞吐量越高. 分区也是保证消息被顺序消费的基础, kafka 只能保证一个分区内消息的有序性
副本
每个分区有一至多个副本 (Replica), 分区的副本分布在集群的不同代理上, 提高可用性. 分区的每个副本在存储上对应与日志对象 log 对应
AR
每个分区的多个部分之间称为 AR(assigned replicas), 包含至多一份 leader 副本和多个 follower 副本
ISR
kafka 在 zookeeper 中动态维护了一个 ISR(In-sync Replica), 即保存同步的副本列表. 列表中保存的是与 leader 副本保持消息同步的所有副本对应的代理节点 id
优先副本
AR 列表中的第一个副本. 理想情况下, 优先副本是该分区的 leader 副本. 所有的读写请求都有分区 leader 副本处理, kafka 要保证优先副本在集群中均匀分布, 保证了所有分区 leader 均匀分布,
代理
每一个 kafka 实例称为代理 (broker), 每个代理都有唯一标示 id 即 broker id, 一台服务器上可以配置一个或多个代理
kafka streams
java 语言实现的用于流处理的 jar 文件
controller_epoch
控制器轮值次数, 每选出一个新的控制器,+1, 对应 zookeeper 的 controller_epoch 字段
选举策略是在 zk 的 controller / 路径下创建临时节点
zkVersion
类似于数据库乐观锁, 用于更新 zk 下相应元数据信息
leader_epoch
分区 leader 更新次数
配置文件
- [root@sjck-node03 config]# cd /usr/local/kafka/config
- [root@sjck-node03 config]# cp server.properties server.properties.bak
- [root@sjck-node03 config]# cp server.properties server-1.properties
- [root@sjck-node03 config]# cp server.properties server-2.properties
修改不同的配置文件, 每个对应的 broker.id 不一样, 012, 类似 zk 里的 myid,port 是监听的端口
- broker.id=0
- listeners=PLAINTEXT://sjck-node03:9092
- log.dirs=/home/soft/kafka/logs/broker-0
- zookeeper.connect=sjck-node03:2181,sjck-node03:2182,sjck-node03:2183
- broker.id=1
- listeners=PLAINTEXT://sjck-node03:9093
- log.dirs=/home/soft/kafka/logs/broker-1
- zookeeper.connect=sjck-node03:2181,sjck-node03:2182,sjck-node03:2183
- broker.id=2
- listeners=PLAINTEXT://sjck-node03:9094
- log.dirs=/home/soft/kafka/logs/broker-2
- zookeeper.connect=sjck-node03:2181,sjck-node03:2182,sjck-node03:2183
配置 kafka 环境变量
- VIM /etc/profile
- #KAFKA_HOME
- export KAFKA_HOME=/usr/local/kafka
- export PATH=$PATH:$KAFKA_HOME/bin
- source /etc/profile
先启动 zookeeper, 再启动 kafka, 启动脚本, daemon 参数是后台守护进程
- #!/bin/bash
- brokers=(server server-1 server-2)
- for broker in ${
- brokers[@]
- }
- do
- echo $broker
- ${
- KAFKA_HOME
- }/bin/kafka-server-start.sh -daemon ${
- KAFKA_HOME
- }/config/${
- broker
- }.properties
- done
关闭脚本
- #!/bin/bash
- SIGNAL=${
- SIGNAL:-TERM
- }
- for element in `jps -l|grep kafka|awk '{print $1}'`
- do
- echo $element
- kill -s $SIGNAL $element
- done
查看启动状态
- [root@sjck-node03 ~]# jps -l
- 3299 org.apache.zookeeper.server.quorum.QuorumPeerMain
- 6148 kafka.Kafka
- 3258 org.apache.zookeeper.server.quorum.QuorumPeerMain
- 5787 kafka.Kafka
- 6524 sun.tools.jps.Jps
- 6494 kafka.Kafka
- 3231 org.apache.zookeeper.server.quorum.QuorumPeerMain
连接 zkclient, 查看节点注册情况
- [root@sjck-node03 bin]# ./zkCli.sh -server localhost:2181
- [zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
- [0, 1, 2]
- [zk: localhost:2181(CONNECTED) 1] get /controller
- {
- "version":1,"brokerid":0,"timestamp":"1552626305509"
- }
- cZxid = 0x140000002c
- ctime = Fri Mar 15 13:05:05 CST 2019
- mZxid = 0x140000002c
- mtime = Fri Mar 15 13:05:05 CST 2019
- pZxid = 0x140000002c
- cversion = 0
- dataVersion = 0
- aclVersion = 0
- ephemeralOwner = 0x2000001158f0000
- dataLength = 54
- numChildren = 0
创建主题, 包含 2 个副本, 3 个分区
- [root@sjck-node03 kafka]# kafka-topics.sh --create --zookeeper sjck-node03:2181,sjck-node03:2182,sjck-node03:2183 --replication-factor 2 --partitions 3 --topic kafka-action
- Created topic "kafka-action".
登录 zk 查看创建主题对应的分区
- [zk: localhost:2181(CONNECTED) 1] ls /brokers/topics/kafka-action/partitions
- [0, 1, 2]
- [zk: localhost:2181(CONNECTED) 2] get /brokers/topics/kafka-action
- {
- "version":1,"partitions":{
- "2":[1,2],"1":[0,1],"0":[2,0]
- }
- }
删除 topic, 当配置文件里的 delete.topic.enable=true, 真正执行删除操作, zk 中的节点和 log 中的分区文件被真正删除
- kafka-topics.sh --delete --zookeeper sjck-node03:2181,sjck-node03:2182,sjck-node03:2183 --topic kafka-action
- Topic kafka-action is marked for deletion.
- Note: This will have no impact if delete.topic.enable is not set to true.
查看分区优先副本数
- kafka-topics.sh --zookeeper sjck-node03:2181,sjck-node03:2182,sjck-node03:2183 --describe --topic kafka-action
- Topic:kafka-action PartitionCount:3 ReplicationFactor:2 Configs:
- Topic: kafka-action Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
- Topic: kafka-action Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
- Topic: kafka-action Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2
生产者消费者测试, 单播和多播,
注意 broker-list 的域名, 要和 server.properties 里 listeners 的域名写的一样, 127.0.0.1 和 localhost 也会出错, 没有解析对应上
1 个生产者
kafka-console-producer.sh --broker-list sjck-node03:9092,sjck-node03:9093,sjck-node03:9094 --topic kafka-action
2 个消费者
kafka-console-consumer.sh --Bootstrap-server sjck-node03:9092,sjck-node03:9093,sjck-node03:9094 --topic kafka-action --consumer-property group.id=single-consumer-group
一条消息只能被同一个消费组的一个消费者消费
多播
再创建一个属于不同组的消费者
kafka-console-consumer.sh --Bootstrap-server sjck-node03:9092,sjck-node03:9093,sjck-node03:9094 --topic kafka-action --consumer-property group.id=multi-consumer-group
一条消息能被不同消费组的消费者消费
流处理器
sink 处理器, 从上游处理器接受到的数据发送到指定 topic 中
source 处理器, 把 topic 消费数据当做输入流, 发送到下游处理器
处理器拓扑 (processor topgyolo)
流处理程序进行数据处理的计算逻辑, 是流处理器和和相连接的流组成的有向无环图, 其中流处理器是节点, 流是边
来源: https://www.cnblogs.com/wanli002/p/10585001.html