一, 案发现场
1, 生产端疯狂告警
在一个月黑风高的夜晚, 我们 kafka 生产端开始疯狂告警, 出现大量程序队列堵塞, 数据写入失败, 写入性能下降的告警.
程序 Bug?
网络抖动?
集群抖动?
服务过载?
《Java 现网问题排查技巧及实践》: 应用日志是切入问题的最佳点, 通常情况下系统出现问题时应用日志都会有相应的异常日志输出. 通过分析异常日志能够定位到具体的代码片段, 缩小问题排查范围.
我们先看看生产端程序的日志:
image.PNG
在生产端采用参数调优, 增大并发, 服务重启等一系列手段而无果后, 我们将问题排查锁定在后端 kafka 集群.
2, 集群异常日志与分析
我们看到服务端频繁有如下异常日志:
image.PNG
从 google 的信息来看, 可能是由于高版本的客户端连接集群而发送了 kafka 服务端不支持的请求.
二, 问题追踪与解决
1, 开启 Trace 日志
正常日志级别下, 日志是比较稀疏, 我们把异常前一条相关日志的消费组提取出来进行分析, 发现其完全是一个正常版本的客户端. 且其日志时间与异常日志时间间隔较大 (约 7s), 直接相关性不大.
image.PNG
快速瞄了 kafka 服务端 SocketServer.scala 的源码得知: 想要精确查询到每个 request 日志需要开启 trace 日志. 如图修改配置文件:
image.PNG
2, 日志分析
我们检索 server.log 日志, 进行分析
image.PNG
事后复盘时发现从 kafka-request.log 日志排查这类问题更方便一些
3, 寻找异常任务
我们通过来源连接的 ip 与端口, 定位到对应 storm 任务的日志, 果然存在高版本客户端连接的问题. 且任务启动时间与数据写入异常时间点完全吻合.
image.PNG
kill 任务后集群逐渐恢复, 数据写入恢复正常.
三, 深入分析
现场临时恢复了, 但我们对问题深入的分析才刚刚开始.
既然问题源自异常连接, 那我们首先需要回顾一下 kafka 的网络通信模型.
1,kafka 的网络通信模型
熟悉 kafka 的同学都知道, kafka 的网络通信模型是 1(1 个 Acceptor 线程)+N(N 个 Processor 线程)+M(M 个业务处理线程).
线程数 | 线程名 | 线程具体说明 |
---|---|---|
1 | kafka-socket-acceptor_%x | Acceptor 线程,负责监听 Client 端发起的请求 |
N | kafka-network-thread_%d | Processor 线程,负责对 Socket 进行读写 |
M | kafka-request-handler-_%d | Worker 线程,处理具体的业务逻辑并生成 Response 返回 |
Kafka 网络通信层的完整框架图如下图所示:
image.PNG
2, 为什么会数组越界
从源码 org.apache.kafka.common.protocol.ApiKeys 可以看出 0.9.0.1 的 kafka 集群支持如下 ApiKey 的请求:
- PRODUCE(0, "Produce"),
- FETCH(1, "Fetch"),
- LIST_OFFSETS(2, "Offsets"),
- METADATA(3, "Metadata"),
- LEADER_AND_ISR(4, "LeaderAndIsr"),
- STOP_REPLICA(5, "StopReplica"),
- UPDATE_METADATA_KEY(6, "UpdateMetadata"),
- CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"),
- OFFSET_COMMIT(8, "OffsetCommit"),
- OFFSET_FETCH(9, "OffsetFetch"),
- GROUP_COORDINATOR(10, "GroupCoordinator"),
- JOIN_GROUP(11, "JoinGroup"),
- HEARTBEAT(12, "Heartbeat"),
- LEAVE_GROUP(13, "LeaveGroup"),
- SYNC_GROUP(14, "SyncGroup"),
- DESCRIBE_GROUPS(15, "DescribeGroups"),
- LIST_GROUPS(16, "ListGroups");
并将 ApiKey 整合成一个数组, 这样 getRequest() 就能把客户端对应 Apikey 的请求引导到对应的处理方法.
- static {
- int maxKey = -1;
- for (ApiKeys key : ApiKeys.values()) {
- maxKey = Math.max(maxKey, key.id);
- }
- codeToType = new ApiKeys[maxKey + 1];
- for (ApiKeys key : ApiKeys.values()) {
- codeToType[key.id] = key;
- }
- MAX_API_KEY = maxKey;
- }
看 2.2.0 对应的 org.apache.kafka.common.protocol.ApiKeys 发现, kafka 随着版本升级已经新增了大量 Apikey
- SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequest.schemaVersions(), SaslHandshakeResponse.schemaVersions()),
- API_VERSIONS(18, "ApiVersions", ApiVersionsRequest.schemaVersions(), ApiVersionsResponse.schemaVersions()) {
- @Override
- public Struct parseResponse(short version, ByteBuffer buffer) {
- // Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest
- // using a version higher than that supported by the broker, a version 0 response is sent
- // to the client indicating UNSUPPORTED_VERSION.
- return parseResponse(version, buffer, (short) 0);
- }
- },
- CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequest.schemaVersions(), CreateTopicsResponse.schemaVersions()),
- DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequest.schemaVersions(), DeleteTopicsResponse.schemaVersions()),
- DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), DeleteRecordsResponse.schemaVersions()),
- INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequest.schemaVersions(),
- InitProducerIdResponse.schemaVersions()),
- OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", false, OffsetsForLeaderEpochRequest.schemaVersions(),
- OffsetsForLeaderEpochResponse.schemaVersions()),
- ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2,
- AddPartitionsToTxnRequest.schemaVersions(), AddPartitionsToTxnResponse.schemaVersions()),
- ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequest.schemaVersions(),
- AddOffsetsToTxnResponse.schemaVersions()),
- END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2, EndTxnRequest.schemaVersions(),
- EndTxnResponse.schemaVersions()),
- WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequest.schemaVersions(),
- WriteTxnMarkersResponse.schemaVersions()),
- TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2, TxnOffsetCommitRequest.schemaVersions(),
- TxnOffsetCommitResponse.schemaVersions()),
- DESCRIBE_ACLS(29, "DescribeAcls", DescribeAclsRequest.schemaVersions(), DescribeAclsResponse.schemaVersions()),
- CREATE_ACLS(30, "CreateAcls", CreateAclsRequest.schemaVersions(), CreateAclsResponse.schemaVersions()),
- DELETE_ACLS(31, "DeleteAcls", DeleteAclsRequest.schemaVersions(), DeleteAclsResponse.schemaVersions()),
- DESCRIBE_CONFIGS(32, "DescribeConfigs", DescribeConfigsRequest.schemaVersions(),
- DescribeConfigsResponse.schemaVersions()),
- ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequest.schemaVersions(),
- AlterConfigsResponse.schemaVersions()),
- ALTER_REPLICA_LOG_DIRS(34, "AlterReplicaLogDirs", AlterReplicaLogDirsRequest.schemaVersions(),
- AlterReplicaLogDirsResponse.schemaVersions()),
- DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequest.schemaVersions(),
- DescribeLogDirsResponse.schemaVersions()),
- SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequest.schemaVersions(),
- SaslAuthenticateResponse.schemaVersions()),
- CREATE_PARTITIONS(37, "CreatePartitions", CreatePartitionsRequest.schemaVersions(),
- CreatePartitionsResponse.schemaVersions()),
- CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequest.schemaVersions(), CreateDelegationTokenResponse.schemaVersions()),
- RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()),
- EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()),
- DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()),
- DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions()),
- ELECT_PREFERRED_LEADERS(43, "ElectPreferredLeaders", ElectPreferredLeadersRequestData.SCHEMAS,
- ElectPreferredLeadersResponseData.SCHEMAS);
从日志可以看出这次服务端数组越界的 index 是 18, 对应的客户端 ApiVersions 请求, 从名字可以推测还是高版本 kafka 客户端的高频请求, 而 0.9.0.1 只支持 0~16 的请求, 所以有相关异常.
3, 为什么会导致集群 hang 住
受益于 java 类语言的异常机制, scala 程序还是相对健壮. 我们需要深入研究一下为何一个小小的数组越界的影响范围超过了单次请求, 甚至会导致整个集群不稳定.
借助日志, 我们知道应该从 SocketServer.scala 的 kafka.network.Processor 入手 (kafka 基于 java nio 实现了高性能 SocketServer, 据说实现相对优美, 有时间再细品).
- override def run() {
- startupComplete()
- while(isRunning) {
- try {
- // setup any new connections that have been queued up
- configureNewConnections()
- // register any new responses for writing
- processNewResponses()
- // 重写了 selector
- try {
- selector.poll(300)
- } catch {
- case e @ (_: IllegalStateException | _: IOException) =>
- error("Closing processor %s due to illegal state or IO exception".format(id))
- swallow(closeAll())
- shutdownComplete()
- throw e
- }
- selector.completedReceives.asScala.foreach { receive =>
- try {
- val channel = selector.channel(receive.source)
- val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
- channel.socketAddress)
- val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
- requestChannel.sendRequest(req)
- } catch {
- // 重点, 这里没有捕获数组越界异常! 导致被外围的异常捕获, 后续逻辑没有处理
- case e @ (_: InvalidRequestException | _: SchemaException) =>
- // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
- error("Closing socket for" + receive.source + "because of error", e)
- close(selector, receive.source)
- }
- selector.mute(receive.source)
- }
- selector.completedSends.asScala.foreach { send =>
- val resp = inflightResponses.remove(send.destination).getOrElse {
- throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
- }
- resp.request.updateRequestMetrics()
- selector.unmute(send.destination)
- }
- selector.disconnected.asScala.foreach { connectionId =>
- val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
- throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
- }.remoteHost
- // the channel has been closed by the selector but the quotas still need to be updated
- connectionQuotas.dec(InetAddress.getByName(remoteHost))
- }
- } catch {
- // We catch all the throwables here to prevent the processor thread from exiting. We do this because
- // letting a processor exit might cause bigger impact on the broker. Usually the exceptions thrown would
- // be either associated with a specific socket channel or a bad request. We just ignore the bad socket channel
- // or request. This behavior might need to be reviewed if we see an exception that need the entire broker to stop.
- case e : ControlThrowable => throw e
- case e : Throwable =>
- error("Processor got uncaught exception.", e)
- }
- }
- debug("Closing selector - processor" + id)
- swallowError(closeAll())
- shutdownComplete()
- }
SocketServer 有小段的异常捕获, 假如单端异常影响范围是有限. 但是在第二小段的异常捕获却没有捕获数组越界异常, 直接导致其被外围的异常捕获退出而不处理接下来的逻辑, 从而会漏处理一些 Request, 从而导致一些关键的 Request 异常 (如 broker 之间通信, 生产程序与 broker 通信), 从而导致整个集群出现问题数据写入异常.
四, 事后总结
这次问题本质上是 0.9.0.1 版本 kafka 的 bug. 总所周知, kafka 虽然原则上是支持向下兼容, 但只是高版本的服务端兼容低版本的客户端. 在有高版本客户端连接时, 集群会 hang 住, 严重的话直接导致 broker 进程僵死, 同时也会导致其他 0.9.0.1 的客户端僵死.
1, 改进方案一览表
周期 | 改进方案 |
---|---|
短期 | 做好宣导和力所能及的管控: 严禁用户使用高版本客户端连接集群; kafka 服务端对数组越界的日志进行监控告警 |
中期 | 评估是否可以对 kafka 服务端代码打补丁 |
长期 | 升级或迁移到高版本集群 |
2,rdkafka 解决方案
有使用 c++ 客户端 rdkafka 消费我们 0.9 的 kafka. 经过沟通后, 使用如下方法安全连接到 kafka 集群, 供大家参考:
image.PNG
在 rdkafka 构建配置 conf 时, 把 API.version.request=false 配置下, 就可以了.
image.PNG
五, 参考资料
- https://www.jianshu.com/p/d2cbaae38014
- https://www.imooc.com/article/36519
《kafka.network.Processor throws ArrayIndexOutOfBoundsException》 https://issues.apache.org/jira/browse/KAFKA-3593
来源: https://www.qcloud.com/developer/article/1654082