1 前言
Controller 是从 Kafka 集群中选取一个的 broker, 负责管理 topic 分区和副本的状态的变化, 以及执行重分配分区之类的管理任务.
第一个启动的 broker 会成为一个 controller, 它会在 Zookeeper 上创建一个临时节点 (ephemeral):/controller. 其他后启动的 broker 也尝试去创建这样一个临时节点, 但会报错, 此时这些 broker 会在该 zookeeper 的 / controller 节点上创建一个监控(Watch), 这样当该节点状态发生变化(比如: 被删除) 时, 这些 broker 就会得到通知. 此时, 这些 broker 就可以在得到通知时, 继续创建该节点. 保证该集群一直都有一个 controller 节点.
当 controller 所在的 broker 节点宕机或断开和 Zookeeper 的连接, 它在 Zookeeper 上创建的临时节点就会被自动删除. 其他在该节点上都安装了监控的 broker 节点都会得到通知, 此时, 这些 broker 都会尝试去创建这样一个临时的 / controller 节点, 但它们当中只有一个 broker(最先创建的那个)能够创建成功, 其他的 broker 会报错: node already exists, 接收到该错误的 broker 节点会再次在该临时节点上安装一个 watch 来监控该节点状态的变化. 每次当一个 broker 被选举时, 将会赋予一个更大的数字(通过 zookeeper 的条件递增实现), 这样其他节点就知道 controller 目前的数字.
当一个 broker 宕机而不在当前 Kafka 集群中时, controller 将会得到通知(通过监控 zookeeper 的路径实现), 若有些 topic 的主分区恰好在该 broker 上, 此时 controller 将重新选择这些主分区. controller 将会检查所有没有 leader 的分区, 并决定新的 leader 是谁(简单的方法是: 选择该分区的下一个副本分区), 并给所有的 broker 发送请求.
每个分区的新 leader 指导, 它将接收来自客户端的生产者和消费者的请求. 同时 follower 也指导, 应该从这个新的 leader 开始复制消息.
当一个新的 broker 节点加入集群时, controller 将会检查, 在该 broker 上是否存在分区副本. 若存在, controller 通知新的和存在的 broker 这个变化, 该 broker 开始从 leader 处复制消息.
下面将从以下几个介绍 controller 的相关原理:
2 controller 启动
2.1 选举 controller
在 KafkaServer.startup()中, KafkaController 对象被构建, 在启动 KafkaApis,replicaManager 后, KafkaController.startup()被调用.
startup()函数非常简单, 这里直接粘代码:
除去日志以及标识状态的 isRunning 赋值, 值得看的代码就两句. 其中 registerSessionExpirationListener()用于在 zookeeper 会话失效后重连时取消注册在 zookeeper 上的各种 Listener, 而 controllerElector.startup 则启动了选举, 这些都将发生在 ZookeeperLeaderElector 类中.
Kafka 集群中每个 Broker 都会调用 startup()函数, 但是一个集群只有一个 Broker 能够成为 Controller. 那么, 谁将成为这个 controller 呢?
KafkaController 选举是直接通过 zookeeper 实现的, 就是在 zookeeper 创建临时目录 / controller / 并在目录下存放当前 brokerId. 如果在 zookeeper 下创建路径没有抛出 ZkNodeExistsException 异常, 则当前 broker 成功晋级为 Controller. 除了调用 elect 外, controllerElector.startup 还会在 / controller / 路径上注册 Listener, 监听 dataChange 事件和 dataDelete 事件, 当 / controller / 下数据发生变化时, 表示 Controller 发生了变化; 而因为 / controller / 下的数据为临时数据, 当 Controller 发生 failover 时, 数据会被删除, 触发 dataDelete 事件, 这时就需要重新选举新一任 Controller.
2.1 注册 listener
成为 KafkaController 之后很重要的一件事, 就是在 zookeeper 各个关键路径上添加 Listener, 所以这里很有必要先总结一下跟 controller 相关的路径 ([ ] 表示其中的值是随实际情况变化的):
l /controller/{"brokerid":"1"}: 决定谁才是这一届 Controller, 路径下存放当前 Controller 的 brokerId, 这些信息以临时数据的形式存放, 在会话失效时会被删除. 类 LeaderChangeListener 监听该路径下的 dataChange 和 dataDelete 事件.
l /brokers/topics: 子目录为所有 topic 列表. 类 TopicChangeListener 监听子目录列表变化, 如果有新增 topic, 则调用 onNewTopicCreation 创建新的 topic.
l /brokers/topics/[topic]/: 存放的是 topic 下各个分区的 AR, 目录下存放的格式为{"partitions":{"partitionId1":[broker1,broker2], ...}}. 类 AddPartitionsListener 监听路径下的数据变化, 在有新增 partition 时调用 Controller.onNewPartitionCreate, 即创建新的 partition.
l /brokers/topics/[topic]/partitions/[partitionId]/state/: 存放的是各个分区的 leaderAndIsr 信息, 即各个分区当前的 leaderId, 以及 ISR. 类 ReassignedPartitionsIsrChangeListener 监听该路径下的数据变化, 在重新分配 replica 到 partition 时, 需要等待新的 replica 追赶上 leader 后才能执行后续操作.
l /brokers/ids/[brokerId]/brokerInfoString: 存放 broker 信息, brokerInfoString 包括 broker 的 IP, 端口等信息; BrokerChangeListener 监听 / brokers/ids / 下子目录变化, 从而通知 Controller broker 的上下线消息. brokerInfoString 是 Controller 判断的 broker 是否活着的条件之一, controllerContext 中的 liveBrokers 需要相应路径下能够获取到 brokerInfo.
l /admin/reassign_partitions: 指导重新分配 AR 的路径, 通过命令修改 AR 时会写入到这个路径下. 类 PartitionsReassignedListener 监听该路径下的内容变化, 调用 initiateReassignReplicasForTopicPartition, 执行重新分配 AR 操作.
l /admin/preferred_replica_election: 分区需要重新选举第一个 replica 作为 leader, 即所谓的 preferred replica. 类 PreferredReplicaElectionListener 监听该路径, 并对路径下的 partitions 执行重新选举 preferred replica 作为 leader.
名词解释
l AR 当前已分配的副本列表
l RAR 重分配过的副本列表
l ORA 重分配之前的副本列表
l 分区 Leader 给定分区负责客户端读写的结点
l ISR "in-sync" replicas, 能够与 Leader 保持同步的副本集合(Leader 也在 ISR 中)
成为 KafkaController 以后, 会执行什么操作呢?
升级 Controller Epoch, 并将新的 epoch 写入到 zookeeper 中; 新的 epoch 标识着下一个时代的 leader, 向其他 broker 发送命令时会校验 epoch;
监听 zookeeper 路径 / admin/reassign_partitions;
监听 zookeeper 路径 / admin/preferred_replica_election;
注册 partition 状态机中的监听器, 监听路径 / brokers/topics 的子目录变化, 随时准备创建 topic;
注册 replica 状态机中的监听器, 监听路径 / brokers/ids / 的子目录, 以便在新的 broker 加入时能够感知到;
初始化 ControllerContext, 主要是从 zookeeper 中读取数据初始化 context 中的变量, 诸如 liveBrokers,allTopics,AR,LeadershipInfo 等;
初始化 ReplicaStateMachine, 将所有在活跃 broker 上的 replica 的状态变为 OnlineReplica;
初始化 PartitionStateMachine, 将所有 leader 在活跃 broker 上的 partition 的状态设置为 Onlinepartition; 其他的 partition 状态为 OfflinePartition.Partition 是否为 Online 的标识就是 leader 是否活着; 之后还会触发 OfflinePartition 和 NewPartition 向 OnlinePartition 转变, 因为 OfflinePartition 和 NewPartition 可能是选举 leader 不成功, 所以没有成为 OnlinePartition, 在环境变化后需要重新触发;
在所有的 topic 的 zookeeper 路径 / brokers/topics/[topic]/ 上添加 AddPartitionsListener, 监听 partition 变化;
KafkaController 启动后, 触发一次最优 leader 选举操作, 如果需要的情况下;
KafkaController 启动后, 如果开启了自动 leader 均衡, 启动自动 leader 均衡线程, 它会根据配置的信息定期运行.
完成对各个 zookeeper 路径的监听后, zookeeper 内容的变化驱动 Controller 进行各种操作, 处理如新建 topic, 删除 topic,broker 失效, broker 恢复等事件.
2.3 Controller Failover
前面 startup()中 registerSessionExpirationListener()会注册会话监听器, 在 zookeeper 会话过期后又重连成功时调用 onControllerResignation(), 并重新执行选举操作. 此外, 当 Controller 会话失效时, 会删除 / controller / 路径下创建的临时数据. 与此同时, 其他 broker 上的 ZookeeperLeaderElector 类中的 LeaderChangeListener 感知到数据删除后会重新执行选举.
onControllerResignation()是 Controller 转变为普通 broker 时执行的操作, 就是将前面注册的各个 Listener 取消注册, 不再关注 zookeeper 变化
2.4 initializeControllerContext 初始化 Controller 上下文信息
在 KafkaController 中
l 有两个状态机: 分区状态机和副本状态机;
l 一个管理器: Channel 管理器, 负责管理所有的 Broker 通信;
l 相关缓存: Partition 信息, Topic 信息, broker id 信息等;
l 四种 leader 选举机制: 分别是用 leader offline,broker 掉线, partition reassign, 最优 leader 选举时触发;
在 initializeControllerContext() 初始化 KafkaController 上下文信息的方法中, 主要做了以下事情:
l 从 zk 获取所有 alive broker 列表, 记录到 liveBrokers;
l 从 zk 获取所有的 topic 列表, 记录到 allTopic 中;
l 从 zk 获取所有 Partition 的 replica 信息, 更新到 partitionReplicaAssignment 中;
l 从 zk 获取所有 Partition 的 LeaderAndIsr 信息, 更新到 partitionLeadershipInfo 中;
l 调用 startChannelManager() 启动 Controller 的 Channel Manager;
l 通过 initializePreferredReplicaElection() 初始化需要最优 leader 选举的 Partition 列表, 记录到 partitionsUndergoingPreferredReplicaElection 中;
l 通过 initializePartitionReassignment() 方法初始化需要进行副本迁移的 Partition 列表, 记录到 partitionsBeingReassigned 中;
l 通过 initializeTopicDeletion() 方法初始化需要删除的 topic 列表及 TopicDeletionManager 对象;
最优 leader 选举: 就是默认选择 Replica 分配中第一个 replica 作为 leader, 为什么叫做最优 leader 选举呢? 因为 Kafka 在给每个 Partition 分配副本时, 它会保证分区的主副本会均匀分布在所有的 broker 上, 这样的话只要保证第一个 replica 被选举为 leader, 读写流量就会均匀分布在所有的 Broker 上, 当然这是有一个前提的, 那就是每个 Partition 的读写流量相差不多, 但是在实际的生产环境, 这是不太可能的, 所以一般情况下, 大集群是不建议开自动 leader 均衡的, 可以通过额外的算法计算, 手动去触发最优 leader 选举.
2.5 Controller Channel Manager
initializeControllerContext() 方法会通过 startChannelManager() 方法初始化 ControllerChannelManager 对象, 如下所示:
ControllerChannelManager 在初始化时, 会为集群中的每个节点初始化一个 ControllerBrokerStateInfo 对象, 该对象包含四个部分:
l NetworkClient: 网络连接对象;
l Node: 节点信息;
l BlockingQueue: 请求队列;
l RequestSendThread: 请求的发送线程.
其具体实现如下所示:
清楚了上面的逻辑, 再来看 KafkaController 部分是如何向 Broker 发送请求的
KafkaController 实际上是调用的 ControllerChannelManager 的 sendRequest() 方法向 Broker 发送请求信息, 其实现如下所示:
它实际上只是把对应的请求添加到该 Broker 对应的 MessageQueue 中, 并没有真正的去发送请求, 请求的的发送是在 每台 Broker 对应的 RequestSendThread 中处理的.
2.6 Controller 原生的四种 leader 选举机制
四种 leader 选举实现类及对应触发条件如下所示
实现 | 触发条件 |
OfflinePartitionLeaderSelector | leader 掉线时触发 |
ReassignedPartitionLeaderSelector | 分区的副本重新分配数据同步完成后触发的 |
PreferredReplicaPartitionLeaderSelector | 最优 leader 选举,手动触发或自动 leader 均衡调度时触发 |
ControlledShutdownLeaderSelector | broker 发送 ShutDown 请求主动关闭服务时触发 |
OfflinePartitionLeaderSelector
选举的逻辑是:
l 如果 isr 中至少有一个副本是存活的, 那么从该 Partition 存活的 isr 中选举第一个副本作为新的 leader, 存活的 isr 作为新的 isr;
l 否则, 如果脏选举 (unclear elect) 是禁止的, 那么就抛出 NoReplicaOnlineException 异常;
l 否则, 即允许脏选举的情况下, 从存活的, 所分配的副本 (不在 isr 中的副本) 中选出一个副本作为新的 leader 和新的 isr 集合;
l 否则, 即是 Partition 分配的副本没有存活的, 抛出 NoReplicaOnlineException 异常;
一旦 leader 被成功注册到 zk 中, 它将会更新到 KafkaController 缓存中的 allLeaders 中.
ReassignedPartitionLeaderSelector
ReassignedPartitionLeaderSelector 是在 Partition 副本迁移后, 副本同步完成 (RAR 都处在 isr 中, RAR 指的是该 Partition 新分配的副本) 后触发的, 其 leader 选举逻辑如下:
l leader 选择存活的 RAR 中的第一个副本, 此时 RAR 都在 isr 中了;
l new isr 是所有存活的 RAR 副本列表;
PreferredReplicaPartitionLeaderSelector
PreferredReplicaPartitionLeaderSelector 是最优 leader 选举, 选择 AR(assign replica)中的第一个副本作为 leader, 前提是该 replica 在是存活的, 并且在 isr 中, 否则会抛出 StateChangeFailedException 的异常.
ControlledShutdownLeaderSelector
ControlledShutdownLeaderSelector 是在处理 broker 下线时调用的 leader 选举方法, 它会选举 isr 中第一个没有正在关闭的 replica 作为 leader, 否则抛出 StateChangeFailedException 异常.
参考资料:
https://github.com/wangzzu/awesome/issues/7
来源: http://www.bubuko.com/infodetail-3087472.html