在上一篇文章《Linux 安装 Kafka》中, 已经介绍了如何在 Linux 安装 Kafka, 以及 Kafka 的启动 / 关闭和创建发话题并产生消息和消费消息. 这篇文章就介绍介绍 Kafka 的那些常用的命令.
关于 Kafka 的启停 / 创建话题 / 消息的产生和消费等命令在上一篇文章《Linux 安装 Kafka》中已经指出, 这里就不说了. 就说说其他常用命令.
1 查看消费者状态和消费详情
有时候我们需要关心消费者应用的状态, 一般消费者应用会自己通过日志获知当前消费到了哪个 topic 的哪个 partition 的哪个 offset, 但当消费者出问题之后, 或者出于监控的原因, 我们需要知道消费者的状态和详情, 那么需要借助 kafka 提供的相关命令.
命令格式:
- bin/kafka-consumer-groups.sh --Bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --list
- bin/kafka-consumer-groups.sh --Bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --describe
- Tips:
BROKER_HOST 是 kafka server 的 ip 地址, PORT 是 server 的监听端口. 多个 host port 之间用逗号隔开
第一条命令是获取 group 列表, 一般而言, 应用是知道消费者 group 的, 通常在应用的配置里, 如果已知, 该步骤可以省略
第二条命令是查看具体的消费者 group 的详情信息, 需要给出 group 的名称
示例:
- bin/kafka-consumer-groups.sh --Bootstrap-server localhost:9092 --list
- bin/kafka-consumer-groups.sh --Bootstrap-server localhost:9092 --group console-consumer-68682 --describe
效果图:
TOPIC: 该 group 里消费的 topic 名称
PARTITION: 分区编号
CURRENT-OFFSET: 该分区当前消费到的 offset
LOG-END-OFFSET: 该分区当前 latest offset
LAG: 消费滞后区间, 为 LOG-END-OFFSET-CURRENT-OFFSET, 具体大小需要看应用消费速度和生产者速度, 一般过大则可能出现消费跟不上, 需要引起应用注意
CONSUMER-ID:server 端给该分区分配的 consumer 编号
HOST: 消费者所在主机
CLIENT-ID: 消费者 id, 一般由应用指定
2 查询 topic 的 offset 的范围
用下面命令可以查询到 topic:demo broker:localhost:9092 的 offset 的最小值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic demo --time -2
查询 offset 的最大值:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic demo --time -1
效果图:
3 重置消费者 offset
有些场景可能希望修改消费者消费到的 offset 位置, 以达到重新消费, 或者跳过一部分消息的目的, 这时候重置 offset 的工具就非常实用.
命令格式:
- bin/kafka-consumer-groups.sh --Bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --reset-offsets --execute --to-offset NEW_OFFSET --topic TOPIC_NAME
- bin/kafka-consumer-groups.sh --Bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --reset-offsets --execute --to-earliest/--to-latest --topic TOPIC_NAME
- Tips:
BROKER_HOST 是 kafka server 的 ip 地址, PORT 是 server 的监听端口. 多个 host port 之间用逗号隔开
第一条命令是将指定 GROUP_NAME 和 topic 的 offset 修改到 NEW_OFFSET 的位置, 重启消费者后, 消费中将从指定的 offset 处消费. 注意这里只能 NEW_OFFSET 只能设置一个值, 也就是说, 所有的分区都将使用这个值, 如果分区消息负载不均衡, 需要考虑是否适用.
第二条命令是将指定 GROUP_NAME 和 topic 的 offset 修改到 earliest 或者 latest 位置, 使得消费者从头或者从尾部消费.
示例:
- bin/kafka-consumer-groups.sh --Bootstrap-server localhost:9092 --group console-consumer-68682 --reset-offsets --execute --to-offset 3 --topic demo
- bin/kafka-consumer-groups.sh --Bootstrap-server localhost:9092 --group console-consumer-68682 --reset-offsets --execute --to-latest --topic demo
效果图:
可以通过直接更换消费者 group id 的方式, 配合消费者默认的消费策略, 可以达到类似的效果, 反而更加简单, 高效和安全.
4 查看 topic 的状态和分区负载详情
当 broker 出现宕机, 恢复之后, 我们可以看下 topic 的 leader 是否负载均衡. 因为 kafka 的所有读写消息的请求, 都是发送到 partition leader 上的, 因此在生产环境, 负载均衡显得尤其重要.
命令格式:
- bin/kafka-topics.sh --zookeeper ZOOKEEPER_HOST1:PORT1,ZOOKEEPER_HOST2:PORT2 --describe --topic TOPIC_NAME
- Tips:
ZOOKEEPER_HOST 是 kafka 所使用的 zookeeper 的 ip 地址, PORT 是 zookeeper 监听的端口. 多个 host port 之间用逗号隔开
类似的, zookeeper 集群不需要全部列上, 给出一个可用的 zk 地址和端口即可
示例:
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic demo
效果图:
如果发现以下现象说明 kafka 异常:
某个 topic 的每个分区, 同步副本数量和设定的副本数量不一致
某个 topic 的每个分区, leader 的 id 数值是 - 1 或者 none
5 消费消息
从头开始消费:
bin/kafka-console-consumer.sh --Bootstrap-server localhost:9092 --topic demo --from-beginning
效果图:
从尾部开始:
bin/kafka-console-consumer.sh --Bootstrap-server localhost:9092 --topic demo --offset latest --partition 0
效果图:
指定分区:
bin/kafka-console-consumer.sh --Bootstrap-server localhost:9092 --topic demo --offset latest --partition 0
取指定个数:
bin/kafka-console-consumer.sh --Bootstrap-server localhost:9092 --topic demo --offset latest --partition 0 --max-messages 2
如示例部分, 取 2 个消息, 取完自动结束回话.
自带压测工具
测试使用 Kafka 自带的测试脚本, 通过命令对 Kafka 发起写入 MQ 消息和 Kafka 消费 MQ 消息的请求. 模拟不同数量级的 MQ 消息写入和 MQ 消息消费场景, 根据 Kafka 的处理结果, 评估 Kafka 是否满足处理亿级以上的消息的能力.
命令格式:
bin/kafka-producer-perf-test.sh --topic demo --num-records 100 --record-size 1 --throughput 100 --producer-props Bootstrap.servers=localhost:9092
示例:
测试项 | 压测消息数(单位: W) | 测试命令 |
写入 MQ 消息 | 10 | bin/kafka-producer-perf-test.sh --topic demo --num-records 100000 --record-size 1000 --throughput 2000 --producer-props bootstrap.servers=localhost:9092 |
100 | bin/kafka-producer-perf-test.sh --topic demo --num-records 1000000 --record-size 2000 --throughput 5000 --producer-props bootstrap.servers=localhost:9092 | |
1000 | bin/kafka-producer-perf-test.sh --topic demo --num-records 10000000 --record-size 2000 --throughput 5000 --producer-props bootstrap.servers=localhost:9092 | |
消费 MQ 消息 | 10 | bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --topic demo --fetch-size 1048576 --messages 100000 --threads 1 |
100 | bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --topic demo --fetch-size 1048576 --messages 1000000 --threads 1 | |
1000 | bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --topic demo --fetch-size 1048576 --messages 10000000 --threads 1 |
本文中部分内容翻译或借鉴于以下学习资料, 特别鸣谢:
Using Apache Kafka Command-line Tools
kafka 的 offset 的重置
Kafka Broker | Command-line Options and Procedure https://data-flair.training/blogs/kafka-broker/
kafka 运维常用命令 https://www.jianshu.com/p/4f582c968af0
kafka 命令 https://www.jianshu.com/p/cf30c1492956
作 者: 请叫我头头哥
来源: https://www.cnblogs.com/toutou/p/kafka_command.html