kafka-go 简介
segmentio/kafka-go 是一款开源的 golang kafka 读写 sdk, 开源地址为: https://github.com/segmentio/kafka-go . 截止写文章时, 这个开源代码库收获了 3.3K 的 star, 在很多公司内外部项目广泛使用. 与 https://github.com/confluentinc/confluent-kafka-go 和 https://github.com/Shopify/sarama 一起, 作为最常用的三个 golang kafka sdk.
本文介绍在使用 kafka-go 的时候遇到的一个读写 kafka 数据丢失问题和问题定位解决的过程.
背景
在实现一个数据分析平台的项目中, 引入了 kafka 作为数据落地和中转的通道, 抽象出来讲, 就是使用 kafka-go 的 writer 将数据写入到 kafka 的指定 topic, 然后使用 kafka-go 的 reader 将数据从指定的 topic 读取出来返回给用户.
抽象使用场景
故障
在项目运行一段时间后, 用户反馈从 kafka 读出的数据条数少于投递到 kafka 的数据, 即存在数据丢失的问题.
定位过程
1. 压测确定问题存在
在用户反馈后, 通过日志分析发现, writer.WriteMessages 提示写入成功, 但是 http_proxy 没有读到, 确认消费组等配置正确后, 查看消费组的消费情况, 一切正常, 没有发现问题. 所以首先压测尝试复现问题. 压测结果让我很震惊, 简单的发 1024 条, 收到 1013 条, 丢失了 11 条, 所以问题确定存在并且可以复现, 数据丢失比例还很高.
2. 确认丢失发生的环节
在压测程序中将读写的数据打印出来, 同时将 reader 读取到的 kafka.Message 结构中的 partition 和 offset 信息打印出来, 通过 awk 处理压测程序的日志, 发现 offset 不连续, 使用 kafka 自带工具查看被跳过的 offset 信息:
./kafka-console-consumer.sh -Bootstrap-server 10.10.0.7:9092 --topic topicName --partition 0 --offset 231131 --max-messages 1
发现可以读取到消息, 至此可以确定, 数据丢失发生在读取环节, 而不是写入环节.
3. 跟踪分析代码找到问题原因
http_proxy 中, 为防止 http 阻塞, 使用 context.WithTimeout 作为参数传给 kafka-go reader 读取消息, 在超时后立刻返回. ctx 的超时时间设置为 100ms.
- ctx, cancel := context.WithTimeout(context.Background(), time.Duration(m.ReadTimeoutMs)*time.Millisecond)
- msg, err := consumer.KReader.ReadMessage(ctx)
- cancel()
- return msg, err
下面是 reader.ReadMessage 的实现.
- func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
- m, err := r.FetchMessage(ctx)
- if err != nil {
- return Message{}, err
- }
- if r.useConsumerGroup() {
- if err := r.CommitMessages(ctx, m); err != nil {
- return Message{}, err
- }
- }
- return m, nil
- }
应该可以发现, 如果 FetchMessage 成功, 但是 CommitMessages 假失败的话, 数据将被丢失, kafka 服务器得到的信息是消息已经被正常消费掉了.
CommitMessages 需要与服务器通信, 提交 offset 被消费的信息, 与服务器通信的过程可能会失败, sdk 内部会使用平方退让策略进行等待和重试, 重试的时间最长为 0+1*1*100 + 2*2*100 =500ms .
跟踪 CommitMessages 代码, 它的实现中会把请求写入到一个 reader 成员的队列中, 然后通过另一个临时 channel 变量等待应答, 在等待应答的时候, ctx 可能超时提前返回, 这时 commit 大概率还是会在三次重试之内成功的, 并成功的应答写入到管道, 但是调用方 commitMessages 已经因超时退出不再等待了.
到这里, 问题已经清晰了, 就是由于我们设置的 ctx 为 100ms, 导致发生 FetchMessage 成功但是 CommitMessage 在 100ms 后才成功.
修复方法
读到这里, 修复的方法已经很清晰了. 你可能觉得只需要把 ctx 的时间改成 500ms 就可以了. 如果是 500ms, 你发现仍然会丢数据, 直观的, 你可能认为 500ms 仍然丢数据是因为你的 500ms 先于 sdk 内部的 500ms 计时, 所以会有数据丢失.
然后你改成 550ms, 你压测发现, 还会有万分之一左右的数据丢失. 你再看看代码, 发现 FetchMessage 也使用到了 ctx, 而且在它的内部实现中, 也是通过 select chan 和 ctx.Done() 的方式来实现超时控制的, 它也会花时间. 留给 CommitMessages 的时间是不准确的.
说到这里, 彻底的修复方法应该已经清楚了, 就是重写这个实现, 搞两个 ctx 传进来, 分别控制时间, 其中 CommitMessages 的 ctx 等待时间要略大于 500ms, 比如 510ms.
其他
解决后 GitHub 提交 bug 单, 发现已经有人提了, 看来坑过不少人.
来源: https://www.qcloud.com/developer/article/1809467