1 无所不能的 Controller
某一个 broker 被选举出来承担特殊的角色, 就是控制器 Controller.
Leader 会向 zookeeper 上注册 Watcher, 其他 broker 几乎不用监听 zookeeper 的状态变化.
Controller 集群就是用来管理和协调 Kafka 集群的, 具体就是管理集群中所有分区的状态和分区对应副本的状态.
每一个 Kafka 集群任意时刻都只能有一个 controller, 当集群启动的时候, 所有的 broker 都会参与到 controller 的竞选, 最终只能有一个 broker 胜出.
Controller 维护的状态分为两类: 1: 管理每一台 Broker 上对应的分区副本. 2: 管理每一个 Topic 分区的状态.
KafkaController 核心代码, 其中包含副本状态机和分区状态机
- class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
- val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
- this.logIdent = "[Controller" + config.brokerId + "]:"
- private var isRunning = true
- private val stateChangeLogger = KafkaController.stateChangeLogger
- val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
- val partitionStateMachine = new PartitionStateMachine(this)
- val replicaStateMachine = new ReplicaStateMachine(this)
- private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
- onControllerResignation, config.brokerId)
- // have a separate scheduler for the controller to be able to start and stop independently of the
- // kafka server
- private val autoRebalanceScheduler = new KafkaScheduler(1)
- var deleteTopicManager: TopicDeletionManager = null
- val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
- private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
- private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
- private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
- private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
- private val partitionReassignedListener = new PartitionsReassignedListener(this)
- private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
KafkaController 中共定义了五种 selector 选举器
1,ReassignedPartitionLeaderSelector
从可用的 ISR 中选取第一个作为 leader, 把当前的 ISR 作为新的 ISR, 将重分配的副本集合作为接收 LeaderAndIsr 请求的副本集合.
2,PreferredReplicaPartitionLeaderSelector
如果从 assignedReplicas 取出的第一个副本就是分区 leader 的话, 则抛出异常, 否则将第一个副本设置为分区 leader.
3,ControlledShutdownLeaderSelector
将 ISR 中处于关闭状态的副本从集合中去除掉, 返回一个新新的 ISR 集合, 然后选取第一个副本作为 leader, 然后令当前 AR 作为接收 LeaderAndIsr 请求的副本.
4,NoOpLeaderSelector
原则上不做任何事情, 返回当前的 leader 和 isr.
5,OfflinePartitionLeaderSelector
从活着的 ISR 中选择一个 broker 作为 leader, 如果 ISR 中没有活着的副本, 则从 assignedReplicas 中选择一个副本作为 leader,leader 选举成功后注册到 Zookeeper 中, 并更新所有的缓存.
kafka 修改分区和副本数
- ../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test1
- Topic:test1 PartitionCount:3 ReplicationFactor:2 Configs:
- Topic: test1 Partition: 0 Leader: 2 Replicas: 2,4 Isr: 2,4
- Topic: test1 Partition: 1 Leader: 3 Replicas: 3,5 Isr: 3,5
- Topic: test1 Partition: 2 Leader: 4 Replicas: 4,1 Isr: 4,1
topic 分区扩容
./kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 4 --topic test1
2 ReplicaStateMachine (ZK 持久化副本分配方案)
Replica 有 7 种状态:
1 NewReplica: 在 partition reassignment 期间 KafkaController 创建 New replica
2 OnlineReplica: 当一个 replica 变为一个 parition 的 assingned replicas 时
其状态变为 OnlineReplica, 即一个有效的 OnlineReplica
3 Online 状态的 parition 才能转变为 leader 或 isr 中的一员
4 OfflineReplica: 当一个 broker down 时, 上面的 replica 也随之 die, 其状态转变为 Onffline;
ReplicaDeletionStarted: 当一个 replica 的删除操作开始时, 其状态转变为 ReplicaDeletionStarted
5 ReplicaDeletionSuccessful: Replica 成功删除后, 其状态转变为 ReplicaDeletionSuccessful
6 ReplicaDeletionIneligible: Replica 成功失败后, 其状态转变为 ReplicaDeletionIneligible
7 NonExistentReplica: Replica 成功删除后, 从 ReplicaDeletionSuccessful 状态转变为 NonExistentReplica 状态
ReplicaStateMachine 所在文件: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
startup: 启动 ReplicaStateMachine
initializeReplicaState: 初始化每个 replica 的状态, 如果 replica 所在的 broker 是 live 状态, 则此 replica 的状态为 OnlineReplica.
处理可以转换到 Online 状态的 Replica, handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica), 并且发送 LeaderAndIsrRequest 到各 broker nodes: handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
当创建某个 topic 时, 该 topic 下所有分区的所有副本都是 NonExistent.
当 controller 加载 Zookeeper 中该 topic 每一个分区的所有副本信息到内存中, 同时将副本的状态变更为 New.
之后 controller 选择该分区副本列表中的第一个副本作为分区的 leader 副本并设置所有副本进入 ISR, 然后在 Zookeeper 中持久化该决定.
一旦确定了分区的 Leader 和 ISR 之后, controller 会将这些消息以请求的方式发送给所有的副本.
同时将这些副本状态同步到集群的所有 broker 上以便让他们知晓.
最后 controller 会把分区的所有副本状态设置为 Online.
3 partitionStateMachine (根据副本分配方案创建分区)
Partition 有如下四种状态
NonExistentPartition: 这个 partition 还没有被创建或者是创建后又被删除了;
NewPartition: 这个 parition 已创建, replicas 也已分配好, 但 leader/isr 还未就绪;
OnlinePartition: 这个 partition 的 leader 选好;
OfflinePartition: 这个 partition 的 leader 挂了, 这个 parition 状态为 OfflinePartition;
当创建 Topic 时, controller 负责创建分区对象, 它首先会短暂的将所有分区状态设置为 NonExistent.
之后读取 Zookeeper 副本分配方案, 然后令分区状态设置为 NewPartion.
处于 NewPartion 状态的分区尚未有 leader 和 ISR, 因此 Controller 会初始化 leader 和 ISR 信息并设置分区状态为 OnlinePartion, 此时分区正常工作.
来源: https://juejin.im/post/5c0345fb6fb9a049df23cfb2