接着上一文的内容 继续升入研究
topic 如何创建于删除的
topic 的创建
具体流程文字为:
1, controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher, 当 topic 被创建, 则 controller 会通过 watch 得到该 topic 的 partition/replica 分配.
2, controller 从 /brokers/ids 读取当前所有可用的 broker 列表, 对于 set_p 中的每一个 partition:
2.1, 从分配给该 partition 的所有 replica(称为 AR) 中任选一个可用的 broker 作为新的 leader, 并将 AR 设置为新的 ISR
2.2, 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
3, controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest.
注意: 此部分 和 partition 的 leader 选举过程很类似 都是需要 zk 参与 相关信息都是记录到 zk 中
controller 在这些过程中启到非常重要的作用.
topic 的删除
文字过程:
, controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher, 当 topic 被删除, 则 controller 会通过 watch 得到该 topic 的 partition/replica 分配.
, 若 delete.topic.enable=false, 结束; 否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest.
前面我们讲到的很多的处理故障过程 包括 topic 创建删除 partition leader 的转换 broker 发生故障的过程中如何保证高可用 都涉及到了一个组件 controller, 关于 kafka 中出现的相关概念名词, 我会专门的写一个博客, 这里先简单的提一下.
Controller:Kafka 集群中的其中一个服务器, 用来进行 Leader Election 以及各种 Failover.
大家有没有想过一个问题, 就是如果 controller 出现了故障, 怎么办, 如何 failover 的呢? 我们往下看.
首先我们最一个实验, 我们在 zk 中找到 controller 在哪个 broker 上, 并查看 controller_epoch 的次数
- [zk: localhost:2181(CONNECTED) 14] ls /kafkagroup/controller
- controller_epoch controller
- [zk: localhost:2181(CONNECTED) 14] ls /kafkagroup/controller
- []
- [zk: localhost:2181(CONNECTED) 15] get /kafkagroup/controller
- {
- "version":1,"brokerid":1002,"timestamp":"1566648802297"
- }
- [zk: localhost:2181(CONNECTED) 22] get /kafkagroup/controller_epoch
- 23
我们可以看到当前的 controller 在 1002 上 在此之前发了 23 次 controller 的切换
我们手动到 1002 节点上杀死 kafka 进程
- [[email protected]$ jps
- 11665 Jps
- 10952 Kafka
- 11068 ZooKeeperMain
- 10495 QuorumPeerMain
- [[email protected]$ kill -9 10952
- [[email protected]$ jps
- 11068 ZooKeeperMain
- 11678 Jps
- 10495 QuorumPeerMain
再看 zk 上的信息, 相关信息已经同步到 zk 中了
- [zk: localhost:2181(CONNECTED) 16] get /kafkagroup/controller
- {
- "version":1,"brokerid":1003,"timestamp":"1566665835022"
- }
- [zk: localhost:2181(CONNECTED) 22] get /kafkagroup/controller_epoch
- 24
- [zk: localhost:2181(CONNECTED) 25] ls /kafkagroup/brokers
- [ids, topics, seqid]
- [zk: localhost:2181(CONNECTED) 26] ls /kafkagroup/brokers/ids
- [1003, 1001]
在后台日志中就会看到很多
- [[email protected] logs]$ VIM state-change.log
- [2019-08-25 01:01:07,886] TRACE [Controller id=1003 epoch=24] Received response {
- error_code=0
- } for request UPDATE_METADATA wit
- h correlation id 7 sent to broker 10.211.55.13:9092 (id: 1003 Rack: null) (state.change.logger)
- [[email protected] logs]$ pwd
- /data/kafka/kafka-server-logs/logs
state 改变的信息
- [[email protected] logs]$ tailf controller.log
- [2019-08-25 01:05:42,295] TRACE [Controller id=1003] Leader imbalance ratio for broker 1002 is 1.0 (kafka.controller.KafkaCont
- roller)
- [2019-08-25 01:05:42,295] INFO [Controller id=1003] Starting preferred replica leader election for partitions (kafka.controll
- er.KafkaController)
接下来, 我们具体的分析一下, 他到底内部发生了什么, 如何切换的
当 controller 宕机时会触发 controller failover. 每个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher, 当 controller 宕机时 zookeeper 中的临时节点消失, 所有存活的 broker 收到 fire 的通知, 每个 broker 都尝试创建新的 controller path, 只有一个竞选成功并当选为 controller.
当新的 controller 当选时, 会触发 KafkaController.onControllerFailover 方法, 在该方法中完成如下操作:
1, 读取并增加 Controller Epoch.
2, 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher.
3, 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher.
4, 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher.
5, 若 delete.topic.enable=true(默认值是 false), 则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher.
6, 通过 replicaStateMachine 在 Broker Ids Patch(/brokers/ids) 上注册 Watch.
7, 初始化 ControllerContext 对象, 设置当前所有 topic,"活" 着的 broker 列表, 所有 partition 的 leader 及 ISR 等.
8, 启动 replicaStateMachine 和 partitionStateMachine.
9, 将 brokerState 状态设置为 RunningAsController.
10, 将每个 partition 的 Leadership 信息发送给所有 "活" 着的 broker.
11, 若 auto.leader.rebalance.enable=true(默认值是 true), 则启动 partition-rebalance 线程.
12, 若 delete.topic.enable=true 且 Delete Topic Patch(/admin/delete_topics) 中有值, 则删除相应的 Topic.
可以看到, 都是在 zk 上进行交互, controller 的从新选举会依次通知 zk 中相关的位置 并注册 watcher , 在此过程中 就会发送 partition 的 leader 的选举, 还会发生 partition-rebalanced 删除无用的 topic 等一系列操作 (因为我们这里是直接考虑的最糟糕的情况就是 broker 宕机了一个, 然而宕机的这台上就是 controller)
consumer 是如何消费消息的
重要概念: 每个 Consumer 都划归到一个逻辑 Consumer Group 中, 一个 Partition 只能被同一个 Consumer Group 中的一个 Consumer 消费, 但可以被不同的 Consumer Group 消费.
若 Topic 的 Partition 数量为 p,Consumer Group 中订阅此 Topic 的 Consumer 数量为 c, 则:
p <c: 会有 c - p 个 consumer 闲置, 造成浪费
p> c: 一个 consumer 对应多个 partition
p = c: 一个 consumer 对应一个 partition
应该合理分配 Consumer 和 Partition 的数量, 避免造成资源倾斜,
本人建议最好 Partiton 数目是 Consumer 数目的整数倍.
在 consumer 消费的过程中如何把 partition 分配给 consumer?
也可以理解为 consumer 发生 rebalance 的过程是如何的?
生产过程中 Broker 要分配 Partition, 消费过程这里, 也要分配 Partition 给消费者.
类似 Broker 中选了一个 Controller 出来, 消费也要从 Broker 中选一个 Coordinator, 用于分配 Partition.// Coordinator 和 Controller 都是一个概念, 协调者 组织者
当 Partition 或 Consumer 数量发生变化时, 比如增加 Consumer, 减少 Consumer(主动或被动), 增加 Partition, 都会进行 consumer 的 Rebalance.// 发生 rebalance 发生在 consumer 端
见图:
文字信息为:
1,Consumer 给 Coordinator 发送 JoinGroupRequest 请求. 这时其他 Consumer 发 Heartbeat 请求过来时, Coordinator 会告诉他们, 要 Rebalance 了. 其他 Consumer 也发送 JoinGroupRequest 请求.
2,Coordinator 在 Consumer 中选出一个 Leader, 其他作为 Follower, 通知给各个 Consumer, 对于 Leader, 还会把 Follower 的 Metadata 带给它.
3,Consumer Leader 根据 Consumer Metadata 重新分配 Partition.
4,Consumer 向 Coordinator 发送 SyncGroupRequest, 其中 Leader 的 SyncGroupRequest 会包含分配的情况.
5,Coordinator 回包, 把分配的情况告诉 Consumer, 包括 Leader.
接下来思考一个问题, consumer 是如何取消息的 Consumer Fetch Message
Consumer 采用 "拉模式" 消费消息, 这样 Consumer 可以自行决定消费的行为.
Consumer 调用 Poll(duration) 从服务器拉取消息. 拉取消息的具体行为由下面的配置项决定:
- #consumer.properties
- # 消费者最多 poll 多少个 record
- max.poll.records=500
- # 消费者 poll 时 partition 返回的最大数据量
- max.partition.fetch.bytes=1048576
- #Consumer 最大 poll 间隔
- # 超过此值服务器会认为此 consumer failed
- # 并将此 consumer 踢出对应的 consumer group
- max.poll.interval.ms=300000
小结:
1, 在 Partition 中, 每个消息都有一个 Offset. 新消息会被写到 Partition 末尾 (最新的一个 Segment 文件末尾), 每个 Partition 上的消息是顺序消费的, 不同的 Partition 之间消息的消费顺序是不确定的.
2, 若一个 Consumer 消费多个 Partition, 则各个 Partition 之前消费顺序是不确定的, 但在每个 Partition 上是顺序消费.
3, 若来自不同 Consumer Group 的多个 Consumer 消费同一个 Partition, 则各个 Consumer 之间的消费互不影响, 每个 Consumer 都会有自己的 Offset.
举个官方小栗子:
参考链接 Wie:
Kafka 学习之路 (三)Kafka 的高可用 https://www.cnblogs.com/qingyunzong/p/9004703.html
来源: http://www.bubuko.com/infodetail-3169094.html