UpdateMetadata 请求的元数据和 LeaderAndIsr 请求的分区状态信息是有关联的. UpdateMetadata 请求的元数据包括主题中所有分区的状态信息. LeaderAndIsr 请求的内容也是分区的状态信息, 所以当分区的状态信息发生变化时, 控制器除了要发送 LeaderAndIsr 请求给分区所有副本对应的代理节点, 也要发送 UpdateMetadata 请求给所有存活的代理节点.
控制器发送 UpdateMetadata 请求给代理节点, 有两种入口: 发送 LeaderAndIsr 请求和外部事件(控制器的故障转移, 代理节点上下线, 重新分配分区, 删除主题). 因为外部事件会导致分区的状态信息发生变化, 所以控制器处理外部事件, 也需要发送 UpdateMetadata 请求.
控制器向代理节点发送的请求有三种: 分区的主副本和 ISR 信息(LeaderAndIsr), 更新元数据(UpdateMetadata), 停止副本(StopReplicas). 代理节点处理这三种请求, 都会转交给副本管理器执行具体的业务逻辑. 这三个请求的处理都会更新分区的副本状态, 所以这三个操作都会使用同一对象的锁(如果代理节点正在处理 LeaderAndIsr 请求时, 收到了 UpdateMetadata 请求, 这必须等到 LeaderAndIsr 请求执行完成后, 再能执行 UpdateMetadata 请求).
1, 控制器发送 LeaderAndIsr 请求给分区的所有代理节点, 不同代理节点的分区会分别创建分区的主副本或备份副本. 主副本会将分区从拉取管理器移除, 备份副本会将分区加入拉取管理器.
2, 控制器发送 UpdateMetadata 请求给集群的所有代理节点 .
3, 每个代理节点处理 UpdateMetadata 请求, 都会更新元数据缓存.
4, 客户端发送 Metadata 请求, 从元数据缓存中获取主题的元数据.
5, 客户端从主题元数据中找出分区的主副本, 并和分区的主副本进行通信.
步骤 1 必须在步骤 2 之前完成, 如果控制器先发送 UpdateMetadata 请求, 然后才发送 LeaderAndIsr 请求, 那么代理节点的元数据更新完成, 但是不同代理节点上的分区可能还没有成为主副本或备份副本, 这样客户端从元数据中找出了分区的主副本, 和主副本进行通信的话就会出问题.
服务端的元数据缓存
服务端处理控制器发送的更新元数据请求 (UpdateMetadata), 会调用元数据缓存对象(MetadataCache) 的 updateCache()方法更新缓存. 服务端处理客户端发送的 "获取元数据请求"(Metadata), 会调用元数据缓存对象的 getTopicMetadata()方法, 从缓存中构造主题的元数据信息.
元数据缓存保存的映射关系:"主题名称 -->分区编号 -->分区的状态信息", 元数据缓存的每一个主题都包含了所有分区的状态信息. 在获取主题的元数据 (TopicMetadata), 先将每个分区的状态信息(PartitionStateInfo) 转化成分区的元数据(PartitionMetadata), 最后把所有分区元数据对象放入主题元数据对象中.
客户端更新元数据
当客户端需要更新元数据时, 会调用元数据对象的 requestUpdate()方法, 设置元数据对象的 needUpdate 变量为 true.requestUpdate()方法只是请求需要更新元数据, 并没有发送 Metadata 请求. 当客户端执行网络客户端 (NetworkClient) 的轮询时, 如果需要请求更新元数据, 才会发送 Metadata 请求.
步骤:
1, 客户端需要更新元数据, 调用元数据对象的 requestUpdate()方法请求需要更新元数据.
2, 更新元数据的超时时间为 0, 说明客户端需要立即更新元数据, 客户端调用选择器的 send()方法, 准备发送 Metadata 请求.
3, 客户端调用选择器的 poll()方法, 真正发送 Metadata 请求给服务端.
4, 客户端收到 Metadata 请求的响应结果, 更新元数据对象.
发送 Metadata 请求前, 分区没有主副本
生产者的发送线程在发送生产请求前或者消费者的拉取器在创建拉取请求前(还没有开始真正发送生产请求和拉取请求给服务端), 如果分区没有主副本, 都需要更新元数据. 这里只是更新了 needUpdate 变量, 表示需要更新元数据.
处理 Metadata 请求响应, 分区有异常信息
客户端收到并处理响应结果时, 如果元数据有异常:
1, 发送线程处理生产结果, 异常信息是 "无效的元数据";2, 拉取器处理拉取结果, 错误码为 "不是分区的主副本","未知的主题或分区". 以上两种情况都需要调用 requestUpdate()方法, 请求更新客户端的元数据对象.
客户端处理响应结果中的每个分区, 它们的错误码都是在服务端处理请求时设置的 . 服务端处理生产者发送的 "生产请求" 以及消费者发送的 "拉取请求", 如果出现异常信息, 会返回对应的异常类.
服务端集群每个代理节点的元数据缓存都是一致的, 客户端只需要向任意一个代理节点发送元数据请求, 就可以获取缓存的元数据, 并更新客户端的元数据对象. 客户端的元数据对象保存了集群的配置信息, 包括每个分区的主副本. 在生产者发送生产请求时, 生产者使用当前缓存中的元数据对象信息发送生产请求, 但是控制器如果将分区的主副本转移到了其他的代理节点上, 那么当前主副本的代理节点处理生产者发送的生产请求会抛出异常. 客户端收到带有异常的响应结果, 会重新发送元数据请求, 并更新元数据对象.
客户端与服务端交互时, 需要更新元数据的流程:
1, 客户端发送请求之前, 如果分区的主副本为空, 则强制更新元数据 .
2, 服务端处理客户端的请求, 如果处理过程有错误, 为对应的分区返回异常信息 .
3, 客户端处理响应结果, 如果分区的结果存在异常信息, 则强制更新元数据 .
生产者发送数据之前, 必须等待元数据更新完成, 才会将消息追加到记录收集器. 消费者分配分区之前, 必须确保刷新完元数据, 才会开始拉取分区的消息集 .
生产者第一次发送消息, 等待元数据可用
生产者在发送数据之前, 必须确保主题有可用的分区. 生产者调用 waitOnMetadata()方法会一直阻塞, 直到主题有了分区后, 它才会为消息分配分区, 并追加消息到记录收集器中 . 如果元数据对象的主题没有分区, 客户端会先调用元数据对象的 requestUpdate()方法请求更新元数据, 然后调用发送线程的 wakeup()方法唤醒发送线程, 最后调用元数据对象的 awaitUpdate()方法等待元数据更新完成.
- KafkaProducer.waitOnMetadata
- private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long maxWaitMs) throws InterruptedException {
- metadata.add(topic); // 在 metadata 中添加 topic 保存到 Map 如果需要更新又是不是全部更新的情况下, 就会更新在 Map 中保存的 topic 的 metadata.
- Cluster cluster = metadata.fetch(); // 获取元数据中保存的集群信息
- // 从元数据保存的集群信息中获取此 topic 的分区数量, 如果存在则返回该 topic 的 partition 数, 否则返回 null
- Integer partitionsCount = cluster.partitionCountForTopic(topic);
- // 当前此 topic 的分区数量存在, 并且大于准备发送的 partition 小于 topic 的分区数量
/ 如果出现大于的情况, 则可能是有分区所在的节点宕机了. 所以要做更新操作, 更新元数据中保存的集群信息
- // partition==null 就是让生产者根据规则决定发送到哪个分区上
- if (partitionsCount != null && (partition == null || partition <partitionsCount))
- return new ClusterAndWaitTime(cluster, 0);
- long begin = time.milliseconds();
- long remainingWaitMs = maxWaitMs;
- long elapsed;
- // 唤醒 Sender 线程发送获取 metadata 请求, 直到获取了这个 topic 的 metadata 或者请求超时
- do {
- int version = metadata.requestUpdate(); // 把更新标志置为 true, 并返回当前元数据的版本号 sender.wakeup(); // 唤醒 Sender, 发送获取 metadata 请求
- try {
- metadata.awaitUpdate(version, remainingWaitMs); // 等待 metadata 的更新完成
- } catch (TimeoutException ex) {
- }
- cluster = metadata.fetch();
- elapsed = time.milliseconds() - begin;
- if (elapsed>= maxWaitMs)
- throw new TimeoutException("Failed to update metadata after" + maxWaitMs + "ms.");
- if (cluster.unauthorizedTopics().contains(topic)) // 认证失败 throw new TopicAuthorizationException(topic);
- remainingWaitMs = maxWaitMs - elapsed;
- partitionsCount = cluster.partitionCountForTopic(topic);
- } while (partitionsCount == null); // 不停循环, 直到 partitionsCount 不为 null(即直到元数据保存的集群信息中获取此 topic 的分区数量)
- if (partition != null && partition>= partitionsCount) {
- throw new KafkaException( String.format("Invalid partition given with record: %d is not in the range [0...%d).", partition, partitionsCount));
- }
- return new ClusterAndWaitTime(cluster, elapsed);
- }
- Metadata.awaitUpdate
- public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
- if (maxWaitMs <0) {
- throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
- }
- long begin = System.currentTimeMillis();
- long remainingWaitMs = maxWaitMs;
- while (this.version <= lastVersion) {
- // 不断循环, 直到 metadata 更新成功, version 自增
- if (remainingWaitMs != 0)
- wait(remainingWaitMs); // 阻塞线程, 等待 metadata 的更新
- long elapsed = System.currentTimeMillis() - begin;
- if (elapsed>= maxWaitMs)
- throw new TimeoutException("Failed to update metadata after" + maxWaitMs + "ms."); remainingWaitMs = maxWaitMs - elapsed;
- }
- }
maxBlockTimeMs: 更新元数据最多允许花费的时间; waitedOnMetadataMs: 等待更新元数据完成花费的时间; remainingWaitMs: 更新完成元数据后剩余的时间 remainingWaitMs=0 表示剩余需要等待的时间为 0, 即不需要等待.
生产者只有在 "第一次发送消息到主题" 时, 因为客户端的元数据对象还没有记录主题中每个分区对应的主副本, 所以客户端需要等待更新完元数据后, 才可以发送消息给分区的主副本. 在第一次发送消息之后, 生产者的元数据对象中主题的分区一定不等于空, 所以就不会执行 waitOnMetadata()方法中的循环代码, 返回值为 0,remainingWaitMs 等于 maxBlockTimeMs.maxBlockTimeMs 不仅用于控制第一次更新元数据需要的时间, 在第一次发送消息之后, 也会用于控制内存缓存区满了之后的阻塞等待时间.
生产者发送消息之前, 如果主题没有分区, 在等待元数据更新完成之前会唤醒发送线程. 在等待更新元数据完成之后, 追加消息集到记录收集器, 如果批记录满了, 或者创建了新的一批记录, 也会唤醒发送线程. 唤醒发送线程的动作, 最后都会调用选择器的 wakeup()方法.
发送线程在后台不断循环运行, 即使客户端没有需要发送的请求, 也会间隔一段时间调用一次轮询. 选择器在轮询时, 如果没有请求需要发送, 最多阻塞 pollTimeout 时间后, 再次调用轮询方法. 发送线程轮询时会调用选择器的 select(pollTimeout)方法, 当生产者客户端唤醒发送线程时, 它会中断选择器调用的 select()方法. 即当唤醒 (即中断) 操作发生时, 选择器的 select()如果处于阻塞状态, 他会立即返回.
第一次唤醒
生产者第一次发送消息时, 由于还没有发送 "元数据请求", 所以主题的分区为空. 生产者发送消息必须为消息指定分区编号, 因为主题的分区为空, 所以生产者必须立即唤醒发送线程, 让选择器立即从当前的轮询中退出, 进行下一次的轮询 . 又因为生产者请求更新元数据, 所以在下一次的轮询时, 选择器的轮询不会进入阻塞状态, 而是立即发送 "元数据请求" .
第二次唤醒
生产者将消息追加到记录收集器, 只有在一批记录满了, 或者创建了新的一批记录时, 才会唤醒发送线程. 在这之前, 选择器的轮询仍然周期性地运行, 并且每次轮询都会阻塞一段时间. 当需要发送 "生产请求" 时, 生产者会立即让选择器从当前的轮询中退出, 进行下一次的轮询. 在下一次轮询时, 发送线程会从记录收集器中选出需要发送的批记录, 并立即发送 "生产请求" .
消费者确保元数据刷新完成
消费者在轮询时, 为了确保分配到分区, 也需要等待元数据更新完成, 然后才会发送拉取请求. 消费者的协调者对象 (ConsumerCoordinator) 在执行分区分配时, 必须知道主题的所有分区, 才能为同一个消费组中所有的消费者分配分区. 如果没有元数据信息, 协调者就无法执行分区分配的任务.
生产者等待元数据更新完成的条件是: "主题的分区不为空 ." 消费者等待元数据更新的条件是:"元数据的版本号发生变化". 一旦主题的分区不为空, 或者元数据的版本号发生变化, 说明客户端已经更新完元数据. 客户端后续的操作, 就可以调用元数据的 fetch()方法获取集群配置.
消费者的协调者在分配分区时, 等待元数据更新完成的前提条件是 "元数据需要更新", 或者 "元数据的更新时间为 0". 如果元数据不需要更新, 或者更新时间大于 0, 消费者分配分区的操作就不会被阻塞, 而是立即执行. 同样地, 生产者发送消息时, 如果主题的分区不为空, 也不需要阻塞地更新元数据, 它也会立即为消息指定分区, 并将消息追加到记录集中. 总结这两种场景, 只有在必要的时候才需要更新客户端的元数据. 如果不需要更新元数据, 客户端的元数据对象就保持不变.
参考资料:
Kafka 技术内幕: 图文详解 Kafka 源码设计与实现
来源: http://www.jianshu.com/p/485dbdd11b8c