[TOC]
1. 背景和现象
1.1 kafka 版本和部署状态
kafka 版本
server 和 client 都是 0.11.0
部署状态
kafka 多个节点(具体多少不清楚, 但是肯定不是单节点),zookeeper3 个节点. topic 的分区副本数为 2. 具备高可用.
1.2 事件现象
在一次生产事件中, 其中一个 kafka 节点和 zk 节点因物理机宕机下线, zk 和 kafka broker 恢复后, 生产者应用并没有恢复, 最终无法发送消息.
此时生产者端的应用业务流程无法继续执行, 流程走到 producer 模块就被 Block 住, 然后每隔 10s 报错一次.
重启 producer 之后, 应用恢复.
关键日志
- 2018-11-07 10:52:24,015 [kfkBolt-tbl_qqhis_sq_trans_flow_raw-thread-0] [com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter:65] [ERROR] - produce: fail at seco
- nd time.
- java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 10000 ms.
- at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1057)
- at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:764)
- at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701)
- at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:609)
- at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter$1.onCompletion(KafkaProducerWriter.java:57)
- at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:760)
- at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701)
- at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter.doOnce(KafkaProducerWriter.java:48)
- at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter.doOnce(KafkaProducerWriter.java:27)
- at com.unionpay.cloudatlas.upstorm.component.SimpleBolt.doOnce(SimpleBolt.java:187)
- at com.unionpay.cloudatlas.upstorm.component.SimpleBolt$InnerThread.run(SimpleBolt.java:105)
- Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 10000 ms.
1.3 生产者代码和配置
生产者代码
- @Override
- public DataRecord doOnce(Record record) {
- // TODO Auto-generated method stub
- try {
- OperCounter.getInstance().increment(Constant.KEY_KAFKA_RECEIVE);
- final ProducerRecord<String, Record> proRecord = new ProducerRecord<String, Record>(topicPub, record);
- producer.send(proRecord, new Callback() {
- @Override
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- // TODO Auto-generated method stub
- // 发送失败
- if (exception != null) {
- OperCounter.getInstance().increment(Constant.KEY_KAFKA_REIN);
- logger.warn("producer send fail and resend.", exception);
- try {
- producer.send(proRecord).get();
- OperCounter.getInstance().increment(Constant.KEY_KAFKA_DEAL);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- logger.error("produce: fail at second time.\t", e);
- OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
- } catch (ExecutionException e) {
- // TODO Auto-generated catch block
- logger.error("produce: fail at second time.\t", e);
- OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
- } catch (Exception e) {
- logger.error("produce: fail at second time.\t", e);
- OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT);
- }
- } else {
- OperCounter.getInstance().increment(Constant.KEY_KAFKA_DEAL);
- }
- }
- });
- return null;
- } catch (TimeoutException e) {
- logger.error("produce: fail.\t", e);
- OperCounter.getInstance().increment(Constant.KEY_KAFKA_REJECT);
- } catch (Exception e) {
- logger.error("produce: fail.\t", e);
- OperCounter.getInstance().increment(Constant.KEY_KAFKA_REJECT);
- }
- return null;
- }
生产者配置
- Bootstrap.servers=${
- KAFKA_SERVER_IN
- }
- key.serializer=org.apache.kafka.common.serialization.StringSerializer
- value.serializer=com.unionpay.cloudatlas.galaxy.common.protocol.kafka.RecordKryoSerializer
- #max time to wait,default to 60s
- max.block.ms=10000
- batch.size=65536
- buffer.memory=134217728
- retries=3
通过上述代码和配置可以看出
最大 block 事件为 1000ms, 也就是 10s
buffer 配置的较大, 为 134M
生产者先是异步发送, 如果发送失败, 则执行一次同步发送
2. 问题初步定位和分析
2.1 kafka 生产者简介
排除服务端的疑点
在最终定位之前, 我们怀疑过很多点, 比如是不是 kafka 高可用存在 bug, 是不是 zk 出问题了, 是不是 kafka 选主失败等, 最终通过生产的其他应用现象推论以及理论分析得出以下基本结论
kafka 本身高可用机制还是比较可靠的, 宕机 1 台节点, server 的状态可以快速回复正常
zookeeper 的高可用也没有问题, 3 个节点的情况下, 是允许 1 个节点下线的, zookeeper 服务正常
宕机期间以及恢复后, kafka 完成了 leader 节点的选举
总的来说, 就是不要怀疑服务端有问题.
当然,"不要怀疑服务端有问题" 只是我们定位到了原因之后的后置结论, 并不表示故障排查的时候忽略服务端的潜在问题, 毕竟不管是硬件资源还是软件质量都可能存在缺陷, 尤其是开源产品的发展本身就是一个不断迭代完善的过程.
关于 kafka 生产者
在说原因之前, 还需要说明一下 kafka 的 producer 流程. kafka 生产者发送消息的粗略流程如下:
首先应用调用 send 发送
消息的 KV 序列化
根据分区器决定消息发送到那个分区
将消息添加到本地缓冲区, 如果缓冲区满, 则当前线程 block, 直到缓冲区有足够的空间或者达到最大阻塞时间(max.block.ms)
有一个独立的 IO 线程负责从缓冲区中将消息发送到服务端
IO 线程收到响应之后, 通知 producer 线程完成了发送, 如果需要, 调用 producer 指定的回调函数
注意, 从上面的流程我们可以看出, 在 kafka 的高版本客户端 (貌似是 0.9 之后) 中, 发送消息天然的是一个异步的过程, 也就是说, 消息发送都是异步方式进行的. 而我们如果需要使用同步的方式发送消息, 那么我们只能通过 KafkaProducer.send 返回的 Future 对象完成, 调用 Future.get, 关键代码如下
- //KafkaProducer.send 的方法签名
- // 不提供回调
- public Future<RecordMetadata> send(ProducerRecord<K, V> record);
- // 提供回调
- public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
当我们调用了 Future.get 的时候, 我们做了什么
上面出问题的代码, 使用了同步的方式等待结果, 那么同步的 get, 到底是什么样的操作呢?
先来看下 KafkaProducer.send 返回的具体 Future 实现
- KafkaProducer.doSend
- RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
- serializedValue, headers, interceptCallback, remainingWaitMs);
- if (result.batchIsFull || result.newBatchCreated) {
- log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
- this.sender.wakeup();
- }
- return result.future;
上面的代码中返回的 future 即 FutureRecordMetadata 的实例, 其实现了 Future 的 get 方法
- public final class FutureRecordMetadata implements Future<RecordMetadata> {
- @Override
- public RecordMetadata get() throws InterruptedException, ExecutionException {
- // 阻塞等待
- this.result.await();
- if (nextRecordMetadata != null)
- return nextRecordMetadata.get();
- return valueOrError();
- }
- @Override
- public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- // Handle overflow.
- long now = System.currentTimeMillis();
- long deadline = Long.MAX_VALUE - timeout <now ? Long.MAX_VALUE : now + timeout;
- // 阻塞等待
- boolean occurred = this.result.await(timeout, unit);
- if (nextRecordMetadata != null)
- return nextRecordMetadata.get(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- if (!occurred)
- throw new TimeoutException("Timeout after waiting for" + TimeUnit.MILLISECONDS.convert(timeout, unit) + "ms.");
- return valueOrError();
- }
- }
可以看到, get 里通过 result.await 阻塞等待, 再看看这里的 result 对应的类 ProduceRequestResult
- public final class ProduceRequestResult {
- private final CountDownLatch latch = new CountDownLatch(1);
- /**
- * Mark this request as complete and unblock any threads waiting on its completion.
- */
- public void done() {
- if (baseOffset == null)
- throw new IllegalStateException("The method `set` must be invoked before this method.");
- this.latch.countDown();
- }
- /**
- * Await the completion of this request
- */
- public void await() throws InterruptedException {
- latch.await();
- }
- }
可以看到, 其内部的 await 中调用了 CountDownLatch.await 进行等待, 同时提供了 done 方法, 解除等待的状态.
看到这里就比较清晰了, 如果应用通过 get 方式同步等待结果, 其内部实现时使用了 CountDownLatch 的 await 方法, 当结果返回的时候, IO 线程会调用 done 方法结束等待状态, 并且返回结果. 我们前面的分析只介绍了如何等待的, 至于如何唤醒, 将在下文介绍.
2.2 问题定位
通过上面的代码分析, 我们几乎可以猜测到问题的出现可能和这里的设计有关 -- 调用了 get 阻塞等待, 但是由于某种原因, 导致没有人唤醒等待着的线程.
为了进一步验证我们的想法, 在开发环境复现生产事件的情况, 当出现上述现象时, 通过 jstack 抓一下线程快照, 进一步证实了我们的猜想:
- "kafka-producer-network-thread | PRODUCER_VERSION_UP_KAKFA_20181129_195652" daemon prio=10 tid=0x00007fd280253000 nid=0x20a5 waiting on condition [0x00007fd2879d8000]
- java.lang.Thread.State: WAITING (parking)
- at sun.misc.Unsafe.park(Native Method)
- - parking to wait for <0x0000000785982e70> (a java.util.concurrent.CountDownLatch$Sync)
- at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
- at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
- at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
- at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
- at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
- at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76)
- at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61)
- at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29)
- at com.unionpay.arch.bigdata.test.BigDataUPKafkaProducer$MyCallback.onCompletion(BigDataUPKafkaProducer.java:60)
- at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
- at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185)
- at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:599)
- at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:575)
- at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:539)
- at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:474)
- at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75)
- at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:660)
- at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
- at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454)
- at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446)
- at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
- at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
- at java.lang.Thread.run(Thread.java:745)
这里是开发环境复现的代码执行情况, 因此 Producer 代码和生产上的不完全一致, 但是逻辑相同.
通过线程快照可以发现, IO 线程一致处于 await 状态, 因此后续流程无法执行!
关于 CountDownLatch 的设计和实现, 感兴趣的可以查看相关文档. 其实这里说生产者应用被 Block, 严格意义上来说是不对的, 线程状态其实是 WAITING, 因此这里的 Block 指的是 "代码执行不下去, 在当前状态一直堵着" 的状态~
因此, 问题定位如下
在异步发送的的回调里使用了同步的方式再次发送, 由于 kafka producer 的同步发送是阻塞等待, 且使用的是不带超时时间的无限期等待 (future.get() 中未指定超时时间), 因此当不被唤醒时会一直 wai 下去
kafka 生产者的 IO 线程 (实际执行数据发送的线程) 是单线程模型, 且回调函数是在 IO 线程中执行的, 因此回调函数的阻塞会直接导致 IO 线程阻塞, 于是生产者缓冲区的数据无法被发送
kafka 生产者还在不断的被应用调用, 因此缓冲区一直累积并增大, 当缓冲区满的时候, 生产者线程会被阻塞, 最大阻塞时间为 max.block.time, 如果改时间到达之后还是无法将数据塞入缓冲区, 则会抛出一个异常, 因此日志中看到达到 10s 之后, 打印出异常栈
由于使用了 get 没有指定超时时间, 且该 await 一直无法被唤醒, 因此这种情况会一直持续, 在没有人工干预的情况下, 永远不会发送成功
生产建议
kafka 生产者推荐使用异步方式发送, 并且提供回调以响应发送成功或者失败
如果需要使用 future.get 的方式模拟同步发送, 则需要在 get 里加上合适的超时时间, 避免因为不可预知的外部因素导致线程无法被唤醒, 即使用 Future.get(long timeout)的 API 而不是不带超时参数的 Future.get()
不要在异步回调中执行阻塞操作或者耗时比较久的操作, 如果有必要可以考虑交给另一个线程 (池) 去做
3. Future.get 为何没有被唤醒
在前面的介绍中, 我们定位了问题的原因, 但也留下了一些疑问:
为何 future.get 没有被唤醒?
producer 是何时执行了回调操作的?
这种情况属于应用使用不当还是 kafka 的 bug?
3.1 HOW: 分析思路
想要彻底弄清楚这个问题, 恐怕要去好好撸一撸 kafka producer 的源码了. 由于 kafka producer 的代码非常多, 其中有缓冲区操作模块, IO 执行模块, 元数据更新模块, 事务支持模块等很多设计, 这里就只从这次的事件问题切入分析, 后面如果对 kafka producer 源码全面分析了之后再专门用几篇文章描述.
那么思路很简单, 主要从以下几个方面入手
上一节中我们说到, 造成 wait 的原因就是调用了 CountDownLatch 的 await 方法, 那么何处调用了 CountDownLatch 的 countdown 方法?
在所有调用了 CountDownLatch.countdown 的地方, 是否包含了对 kafka 节点下线的处理? 也就是说, 难道 kafka 节点下线之后, 流程就不会走到 countdown 了吗?
为了弄清楚以上两个问题, 我们先去看看源码. 通过对 ProduceRequestResult 的成员变量 CountDownLatch latch 分析可以知道, 修改其状态的方法只有 2 个 await 方法和一个 done 方法
- /**
- * Mark this request as complete and unblock any threads waiting on its completion.
- */
- public void done() {
- if (baseOffset == null)
- throw new IllegalStateException("The method `set` must be invoked before this method.");
- this.latch.countDown();
- }
- /**
- * Await the completion of this request
- */
- public void await() throws InterruptedException {
- latch.await();
- }
- /**
- * Await the completion of this request (up to the given time interval)
- * @param timeout The maximum time to wait
- * @param unit The unit for the max time
- * @return true if the request completed, false if we timed out
- */
- public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
- return latch.await(timeout, unit);
- }
另外还有一个 completed 方法只是读取状态, 不是修改, 这里就忽略了
其中两个 await 方法一个是有时间参数的一个是没有的, 对应 Future.get()和 Future.get(long timeout), 是导致阻塞的入口, 因此也不用考虑, 那么重点就是这个 done 方法了.
3.2 WHEN: 谁调用了 done / 什么场景下会正常唤醒
通过 eclipse 提供的工具, 可以一层一层追踪出, 有哪些地方调用了这个 done, 基本结论如下,
在 Producer 中 , 主要有设计到两个逻辑(两个类), 其中
Sender, 主要对于异常情况做做一些处理, 以唤醒 await 的线程, 包括
当链接被强制关闭时
当事务管理器中认为需要丢弃时
当有过期的数据时
NetworkClient, 主要是处理发送结果, 包括
当发送后返回失败时
当返回消息太大需要切分的时候
当发送成功的时候
相应的逻辑和流程可以看具体的源码
Sender 中相关逻辑流程图如下
Untitled Diagram.PNG
NetworkClient 中相关逻辑
NetworClient 中调用 done 的地方. PNG
安利一个良心在线制图网站 https://www.draw.io/
3.3 WHY: 为何会一直 wait 却没有被唤醒
通过上面的分析, 我们梳理了解除线程阻塞 (WAIT) 的几个场景和时机, 然而不幸的是, 上面的场景均没有机会被执行:
在 kafka 节点宕机时, 同步发送操作的 message 依然会被加入到生产者缓冲区, 因为加入到缓冲区的过程和链路情况是解耦的, 因此可以成功被塞到 buffer
由于是同步的过程, 因此塞到 buffer 之后, 发送者便开始了 get()的无限期等待, 直到有 "人" 唤醒
通过上面的分析我们发现: 唤醒该同步等待的操作, 都需要在 Sender 也就是 IO 线程中执行: 要么是由于各种原因觉得这个消息需要 abort, 要么是收到了正确或者错误的应答(fail or complete or split).
此时奇妙的现象就发生了: 同步等待的操作在 IO 线程, 唤醒的操作也是在 IO 线程, 这是同一个线程! 也就是说, 此刻已经发生了某种意义的 "死锁"
IO 线程已经被无限 WAITing 了, 因此 buffer 中的数据再也无法被发送
于是 buffer 越堆越多, 直到达到 buffer sizez 之后, 开始被 block
producer 对 block 进行了控制, 每次最大 block 的时间为 max.block.time, 然后向上抛出一个异常, 于是出现了日志中的现象
综上, 这次生产实践的原委基本清楚了. 关于 producer 源码中的细节, 后面再细细研读~
来源: http://www.jianshu.com/p/45258f744425