背景
由于项目上 Flink 在设置 parallel 多于 1 的情况下, job 没法正确地获取 watermark, 所以周末来研究一下一部分, 大概已经锁定了原因:
虽然我们的 topic 只设置了 1 的 partition, 但是 Kafka 的 Comsumer 还是起了好几个 subtask 去读索引是 2,3 的 partition, 然后这几个 subtask 的 watermark 一直不更新, 导致我们 job 整体的 watermark 一直是 Long.MIN_VALUE. 现在需要去了解一下 subtask 获取 partition 的流程, 等上班的时候 debug 一遍应该就可以知道原因.
翻源码的过程
通过 log 找到分配 partition 的大概位置
从图中可以看到, 在 org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase 这个类中可以找到一些关键信息.
跟踪源码
往上翻翻, 看有没有有用信息
关键源码, 附上注释
- public void open(Configuration configuration) throws Exception {
- // determine the offset commit mode
- this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
- getIsAutoCommitEnabled(),
- enableCommitOnCheckpoints,
- ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
- // create the partition discoverer
- this.partitionDiscoverer = createPartitionDiscoverer(
- topicsDescriptor,
- getRuntimeContext().getIndexOfThisSubtask(),
- getRuntimeContext().getNumberOfParallelSubtasks());
- this.partitionDiscoverer.open();
- subscribedPartitionsToStartOffsets = new HashMap<>();
- // 重点函数, 这个函数或获取到 subtask 的所有 partition.
- final List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
- if (restoredState != null) {
- ...
- } else {
- // use the partition discoverer to fetch the initial seed partitions,
- // and set their initial offsets depending on the startup mode.
- // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
- // for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
- // when the partition is actually read.
- switch (startupMode) {
- ...
- default:
- for (KafkaTopicPartition seedPartition : allPartitions) {
- subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
- }
- }
- if (!subscribedPartitionsToStartOffsets.isEmpty()) {
- switch (startupMode) {
- ...
- case GROUP_OFFSETS:
- LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
- getRuntimeContext().getIndexOfThisSubtask(),
- subscribedPartitionsToStartOffsets.size(),
- subscribedPartitionsToStartOffsets.keySet());
- }
- } else {
- LOG.info("Consumer subtask {} initially has no partitions to read from.",
- getRuntimeContext().getIndexOfThisSubtask());
- }
- }
- public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {
- if (!closed && !wakeup) {
- try {
- List<KafkaTopicPartition> newDiscoveredPartitions;
- // (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern
- if (topicsDescriptor.isFixedTopics()) {
- // 对于没有使用通配符的 topic, 直接获取 topic 的所有 partition
- newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());
- } else {
- // 对于使用了通配符的 topic, 先找到所有 topic, 再一一 match
- List<String> matchedTopics = getAllTopics();
- // retain topics that match the pattern
- Iterator<String> iter = matchedTopics.iterator();
- while (iter.hasNext()) {
- if (!topicsDescriptor.isMatchingTopic(iter.next())) {
- iter.remove();
- }
- }
- if (matchedTopics.size() != 0) {
- // get partitions only for matched topics
- newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);
- } else {
- newDiscoveredPartitions = null;
- }
- }
- // (2) eliminate partition that are old partitions or should not be subscribed by this subtask
- if (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {
- throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor:" + topicsDescriptor);
- } else {
- Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();
- KafkaTopicPartition nextPartition;
- while (iter.hasNext()) {
- nextPartition = iter.next();
- // 只保留符合要求的 partition, 这就是我们要找的函数
- if (!setAndCheckDiscoveredPartition(nextPartition)) {
- iter.remove();
- }
- }
- }
- return newDiscoveredPartitions;
- }...
- }...
- }
- public boolean setAndCheckDiscoveredPartition(KafkaTopicPartition partition) {
- if (isUndiscoveredPartition(partition)) {
- discoveredPartitions.add(partition);
- // 在这
- return KafkaTopicPartitionAssigner.assign(partition, numParallelSubtasks) == indexOfThisSubtask;
- }
- return false;
- }
- public static int assign(KafkaTopicPartition partition, int numParallelSubtasks) {
- // 先算出此 topic 的 hash(partition.getTopic().hashCode() * 31), 这里不知道为什么不直接用 hash, 还要再 * 31, 然后取正数 (& 0x7FFFFFFF), 最后获取到此 topic 的起始位置.
- int startIndex = ((partition.getTopic().hashCode() * 31) & 0x7FFFFFFF) % numParallelSubtasks;
- // here, the assumption is that the id of Kafka partitions are always ascending
- // starting from 0, and therefore can be used directly as the offset clockwise from the start index
- // 计算当前的 partition 应该属于哪个 subtask. 例如: 一共有 20 个 subtask, 算出来的起始位置是 5,partition 是 5, 那么最后就是
- // (5 + 5) % 20 = 10, 这个 partition 应该分给 10 号 subtask.
- return (startIndex + partition.getPartition()) % numParallelSubtasks;
- }
思考
某 topic 的每个 partition 会分给哪个 subtask 其实是确定的
topic 名字是确定的 -> topic 的 hashCode 是确定的 && subtask 的数量是确定的 -> startIndex 是确定的 -> 某 partition 会分给哪个 subtask 其实是确定的
为什么要算 startIndex
大概是为了平均分配不同的 topic, 如果 topic 很多, 每个 topic 都只从 0 开始, 那么 subtask 0,1,2 之类的靠前 subtask 就需要读大量的 partition.
来源: https://www.cnblogs.com/boydfd/p/11748127.html