最近遇到一个 kafka 方面的问题, 大致就是由于 consumer 处理业务超时, 导致无法正常提交 Offset, 进而导致无法消费新消息的问题. 下面我想从以下几个方面对此次故障排查进行复盘分析: 业务背景, 问题描述, 排查思路, 经验教训.
一, 业务背景
先简单描述一下业务背景吧. 我们有个业务需要严格按顺序消费 Topic 消息, 所以针对该 topic 设置了唯一的 partition, 以及唯一的副本. 当同一个消费组的多个 consumer 启动时, 只会有一个 consumer 订阅到该 Topic, 进行消费, 保证同一个消费组内的消费顺序.
注: 消费组的 groupId 名称为 "smart-building-consumer-group", 订阅的 Topic 名称为 "gate_contact_modify".
二, 问题描述
有一天我们突然收到一个问题反馈: producer 侧的业务产生消息后, consumer 侧并没有得到预期的结果. 经过排查, 排除了业务逻辑出现问题的可能性, 我们判断最有可能是因为 kafka 消息没有被消费到. 为了印证这个猜测, 我们查看了 consumer 消费日志, 发现日志中存在这样几处问题:
(1) 日志偶尔会打印出一条 Kafka 的警告日志, 内容为:
org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.maybeAutoCommitOffsetsSync:648 - Auto-commit of offsets {gate_contact_modify-0=OffsetAndMetadata{offset=2801, metadata=''}} failed for group smart-building-consumer-group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
(2) 接着进行了一次 rebalance;
(3)consumer 侧输出了 Topic 消费者的业务日志, 表明正常获取到了 Topic 消息.
接着我们查看 kafka 消费组中该 Topic 对应的 Offset 的变化情况, 发现 Offset 一直没有变化.
三, 排查思路
日志中的异常信息很明确的告知我们, topic 消息消费完成后, 由于 group 发生了一次 rebalance, 导致 Commit 没有被提交, 这表明两次 poll 消息的间隔时间超过了 max.poll.interval.ms 定义的最大间隔, 这也意味着一次 poll 后处理消息的过程超时了, 正是由于 poll 间隔时间超时, 导致了一次 rebalance. 同时建议我们要么增加间隔时间, 要么减少每次拉取的最大消息数.
另外, 由于 Commit 没有被提交, 导致 OffSet 值没有变化, 那么每次拉取到的消息都是同一批重复消息. 具体的异常流程如下图:
根据上述信息, 我们进一步检查了 consumer 的 max.poll.records 配置, max.poll.interval.ms 配置, 并统计了每条 Topic 消息的处理耗时, 发现 max.poll.records 使用了默认配置值 500,max.poll.interval.ms 使用了默认配置值为 300s, 而每条 Topic 消息的处理耗时为 10S. 这进一步证实了我们的推论:
由于每次拉取的消息数太多, 而每条消息处理时间又较长, 导致每次消息处理时间超过了拉取时间间隔, 从而使得 group 进行了一次 rebalance, 导致 commit 失败, 并最终导致下次拉取重复的消息, 继续处理超时, 进入一个死循环状态.
知道问题根源后, 我们结合业务特点, 更改了 max.poll.records=1, 每次仅拉取一条消息进行处理, 最终解决了这个问题.
四, 经验教训
这次故障排查, 使我们对 Kafka 消息 poll 机制, rebalance 和 commit 之间的相互影响等有了更深的理解.
(1)kafka 每次 poll 可以指定批量消息数, 以提高消费效率, 但批量的大小要结合 poll 间隔超时时间和每条消息的处理时间进行权衡;
(2) 一旦两次 poll 的间隔时间超过阈值, group 会认为当前 consumer 可能存在故障点, 会触发一次 rebalance, 重新分配 Topic 的 partition;
(3) 如果在 commit 之前进行了一次 rebalance, 那么本次 commit 将会失败, 下次 poll 会拉取到旧的数据 (重复消费), 因此要保证好消息处理的幂等性;
来源: http://blog.51cto.com/andrewli/2440857