[TOC]
Kafka 笔记整理(三): 消费形式验证与性能测试
Kafka 消费形式验证
前面的 Kafka 笔记整理 (一) 中有提到消费者的消费形式, 说明如下:
1 每个 consumer 属于一个 consumer group, 可以指定组 idgroup.id
2 消费形式:
组内: 组内的消费者消费同一份数据; 同时只能有一个 consumer 消费一个 Topic 中的 1 个 partition;
一个 consumer 可以消费多个 partitions 中的消息所以, 对于一个 topic, 同一个 group 中推荐不能有多于
partitions 个数的 consumer 同时消费, 否则将意味着某些 consumer 将无法得到消息
组间: 每个消费组消费相同的数据, 互不影响
3 在一个 consumer 多个线程的情况下, 一个线程相当于一个消费者
例如: partition 为 3, 一个 consumer 起了 3 个线程消费, 另一个后来的 consumer 就无法消费
下面就来验证 Kafka 的消费形式, 不过需要说明的是, 在消费者的程序代码中, 可以指定消费者的 group.id(我们下面将会在配置文件中指定)
而在使用 kafka 的 shell 命令时, 其实也是可以指定配置文件来指定消费者的 group.id 的, 如果不指定, 那么 kafka 将会随机生成一个 group.id(kafka-console-consumer.sh 中的 kafka.tools.ConsoleConsumer 类, 如果没有指定 group.id, 其策略是随机生成)
在后面的程序代码中, 会使用同一 group.id 开启 4 个消费的线程(因为我们创建的 topic 有 3 个 partition), 然后在终端中通过 kafka shell 来开启另外一个消费者, 进而达到验证 kafka 消费形式的目的
另外, 在测试中使用的 topic 如下:
- $ kafka-topics.sh --describe hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
- Topic:hadoop PartitionCount:3 ReplicationFactor:3 Configs:
- Topic: hadoop Partition: 0 Leader: 103 Replicas: 103,101,102 Isr: 103,101,102
- Topic: hadoop Partition: 1 Leader: 101 Replicas: 101,102,103 Isr: 101,102,103
- Topic: hadoop Partition: 2 Leader: 102 Replicas: 102,103,101 Isr: 102,103,101
即 partition 为 3, 副本因为也为 3.
程序代码
- KafkaProducerOps.java
- package com.uplooking.bigdata.kafka.producer;
- import com.uplooking.bigdata.kafka.constants.Constants;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import java.io.IOException;
- import java.io.InputStream;
- import java.util.Properties;
- import java.util.Random;
- /**
- * 通过这个 KafkaProducerOps 向 Kafka topic 中生产相关的数据
- * <p>
- * Producer
- */
- public class KafkaProducerOps {
- public static void main(String[] args) throws IOException {
- /**
- * 专门加载配置文件
- * 配置文件的格式:
- * key=value
- *
- * 在代码中要尽量减少硬编码
- * 不要将代码写死, 要可配置化
- */
- Properties properties = new Properties();
- InputStream in = KafkaProducerOps.class.getClassLoader().getResourceAsStream("producer.properties");
- properties.load(in);
- /**
- * 两个泛型参数
- * 第一个泛型参数: 指的就是 kafka 中一条记录 key 的类型
- * 第二个泛型参数: 指的就是 kafka 中一条记录 value 的类型
- */
- String[] girls = new String[]{"姚慧莹", "刘向前", "周 新", "杨柳"};
- Producer<String, String> producer = new KafkaProducer<String, String>(properties);
- Random random = new Random();
- int start = 1;
- for (int i = start; i <= start + 20; i++) {
- String topic = properties.getProperty(Constants.KAFKA_PRODUCER_TOPIC);
- String key = i + "";
- String value = "今天的 <--" + girls[random.nextInt(girls.length)] + "--> 很美很美哦~";
- ProducerRecord<String, String> producerRecord =
- new ProducerRecord<String, String>(topic, key, value);
- producer.send(producerRecord);
- }
- producer.close();
- }
- }
- KafkaConsumerOps.java
- package com.uplooking.bigdata.kafka.consumer;
- import org.apache.kafka.clients.consumer.Consumer;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import java.io.IOException;
- import java.util.Arrays;
- import java.util.Collection;
- import java.util.Properties;
- import java.util.concurrent.*;
- /**
- * 从 kafka topic 中消费数据
- */
- public class KafkaConsumerOps {
- public static void main(String[] args) throws IOException {
- // 线程池
- ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
- System.out.println("外部开始时间:" + System.currentTimeMillis());
- for (int i =0; i <4; i++){
- ScheduledFuture<?> schedule = service.schedule(
- new ConsumerThread(),
- 5L,
- TimeUnit.SECONDS);
- }
- }
- }
- class ConsumerThread implements Runnable {
- public void run() {
- System.out.println("线程 ID:" + Thread.currentThread().getId() + "线程开始时间:" + System.currentTimeMillis());
- /**
- * 两个泛型参数
- * 第一个泛型参数: 指的就是 kafka 中一条记录 key 的类型
- * 第二个泛型参数: 指的就是 kafka 中一条记录 value 的类型
- */
- Properties properties = new Properties();
- try {
- properties.load(KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties"));
- } catch (IOException e) {
- e.printStackTrace();
- }
- Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
- Collection<String> topics = Arrays.asList("hadoop");
- // 消费者订阅 topic
- consumer.subscribe(topics);
- ConsumerRecords<String, String> consumerRecords = null;
- while (true) {
- // 接下来就要从 topic 中拉取数据
- consumerRecords = consumer.poll(1000);
- // 遍历每一条记录
- for (ConsumerRecord consumerRecord : consumerRecords) {
- long offset = consumerRecord.offset();
- Object key = consumerRecord.key();
- Object value = consumerRecord.value();
- int partition = consumerRecord.partition();
- System.out.println("CurrentThreadID:" + Thread.currentThread().getId() + "\toffset:" + offset + "\tpartition:" + partition + "\tkey:" + key + "\tvalue:" + value);
- }
- }
- }
- }
- MyKafkaPartitioner.java
- package com.uplooking.bigdata.kafka.partitioner;
- import org.apache.kafka.clients.producer.Partitioner;
- import org.apache.kafka.common.Cluster;
- import java.util.Map;
- import java.util.Random;
- /**
- * 创建自定义的分区, 根据数据的 key 来进行划分
- * <p>
- * 可以根据 key 或者 value 的 hashCode
- * 还可以根据自己业务上的定义将数据分散在不同的分区中
- * 需求:
- * 根据用户输入的 key 的 hashCode 值和 partition 个数求模
- */
- public class MyKafkaPartitioner implements Partitioner {
- public void configure(Map<String, ?> configs) {
- }
- /**
- * 根据给定的数据设置相关的分区
- *
- * @param topic 主题名称
- * @param key key
- * @param keyBytes 序列化之后的 key
- * @param value value
- * @param valueBytes 序列化之后的 value
- * @param cluster 当前集群的元数据信息
- */
- public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
- Integer partitionNums = cluster.partitionCountForTopic(topic);
- int targetPartition = -1;
- if (key == null || keyBytes == null) {
- targetPartition = new Random().nextInt(10000) % partitionNums;
- } else {
- int hashCode = key.hashCode();
- targetPartition = hashCode % partitionNums;
- System.out.println("key:" + key + ", value:" + value + ", hashCode:" + hashCode + ", partition:" + targetPartition);
- }
- return targetPartition;
- }
- public void close() {
- }
- }
- Constants.java
- package com.uplooking.bigdata.kafka.constants;
- public interface Constants {
- /**
- * 生产的 key 对应的常量
- */
- String KAFKA_PRODUCER_TOPIC = "producer.topic";
- }
- producer.properties
- ############################# Producer Basics #############################
- # list of brokers used for bootstrapping knowledge about the rest of the cluster
- # format: host1:port1,host2:port2 ...
- bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092
- # specify the compression codec for all data generated: none, gzip, snappy, lz4
- compression.type=none
- # name of the partitioner class for partitioning events; default partition spreads data randomly
- partitioner.class=com.uplooking.bigdata.kafka.partitioner.MyKafkaPartitioner
- # the maximum amount of time the client will wait for the response of a request
- #request.timeout.ms=
- # how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for
- #max.block.ms=
- # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together
- #linger.ms=
- # the maximum size of a request in bytes
- #max.request.size=
- # the default batch size in bytes when batching multiple records sent to a partition
- #batch.size=
- # the total bytes of memory the producer can use to buffer records waiting to be sent to the server
- #buffer.memory=
- ##### 设置自定义的 topic
- producer.topic=hadoop
- key.serializer=org.apache.kafka.common.serialization.StringSerializer
- value.serializer=org.apache.kafka.common.serialization.StringSerializer
- consumer.properties
- # Zookeeper connection string
- # comma separated host:port pairs, each corresponding to a zk
- # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
- zookeeper.connect= uplooking01:2181,uplooking02:2181,uplooking03:2181
- bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092
- # timeout in ms for connecting to zookeeper
- zookeeper.connection.timeout.ms=6000
- #consumer group id
- group.id=test-consumer-group
- #consumer timeout
- #consumer.timeout.ms=5000
- key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
- pom.xml
主要是 kafka-clients 的依赖:
- <dependencies>
- <!--kafka-->
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>0.10.0.1</version>
- </dependency>
- </dependencies>
测试
先在终端启动一个消费者, 注意由于没有指定配置文件, 所以其 group.id 是随机生成的:
$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
接下来分别执行消费者的代码和生产者的代码, 然后观察各个终端的输出
生产者程序的终端输出如下:
key: 1, value: 今天的 <-- 刘向前 --> 很美很美哦~, hashCode: 49, partition: 1
key: 2, value: 今天的 <-- 刘向前 --> 很美很美哦~, hashCode: 50, partition: 2
key: 3, value: 今天的 <-- 刘向前 --> 很美很美哦~, hashCode: 51, partition: 0
key: 4, value: 今天的 <-- 杨柳 --> 很美很美哦~, hashCode: 52, partition: 1
key: 5, value: 今天的 <-- 姚慧莹 --> 很美很美哦~, hashCode: 53, partition: 2
key: 6, value: 今天的 <-- 姚慧莹 --> 很美很美哦~, hashCode: 54, partition: 0
key: 7, value: 今天的 <-- 杨柳 --> 很美很美哦~, hashCode: 55, partition: 1
key: 8, value: 今天的 <-- 刘向前 --> 很美很美哦~, hashCode: 56, partition: 2
key: 9, value: 今天的 <-- 姚慧莹 --> 很美很美哦~, hashCode: 57, partition: 0
key: 10, value: 今天的 <-- 杨柳 --> 很美很美哦~, hashCode: 1567, partition: 1
key: 11, value: 今天的 <-- 姚慧莹 --> 很美很美哦~, hashCode: 1568, partition: 2
key: 12, value: 今天的 <-- 周 新 --> 很美很美哦~, hashCode: 1569, partition: 0
key: 13, value: 今天的 <-- 姚慧莹 --> 很美很美哦~, hashCode: 1570, partition: 1
key: 14, value: 今天的 <-- 姚慧莹 --> 很美很美哦~, hashCode: 1571, partition: 2
key: 15, value: 今天的 <-- 刘向前 --> 很美很美哦~, hashCode: 1572, partition: 0
key: 16, value: 今天的 <-- 刘向前 --> 很美很美哦~, hashCode: 1573, partition: 1
key: 17, value: 今天的 <-- 杨柳 --> 很美很美哦~, hashCode: 1574, partition: 2
key: 18, value: 今天的 <-- 刘向前 --> 很美很美哦~, hashCode: 1575, partition: 0
key: 19, value: 今天的 <-- 杨柳 --> 很美很美哦~, hashCode: 1576, partition: 1
key: 20, value: 今天的 <-- 姚慧莹 --> 很美很美哦~, hashCode: 1598, partition: 2
key: 21, value: 今天的 <-- 杨柳 --> 很美很美哦~, hashCode: 1599, partition: 0
消费者程序的终端输出如下:
外部开始时间: 1521991118178
线程 ID:20 线程开始时间: 1521991123182
线程 ID:21 线程开始时间: 1521991123182
线程 ID:23 线程开始时间: 1521991123182
线程 ID:22 线程开始时间: 1521991123182
CurrentThreadID: 22 offset: 78 partition: 1 key: 1 value: 今天的 <-- 刘向前 --> 很美很美哦~
CurrentThreadID: 22 offset: 79 partition: 1 key: 4 value: 今天的 <-- 杨柳 --> 很美很美哦~
CurrentThreadID: 22 offset: 80 partition: 1 key: 7 value: 今天的 <-- 杨柳 --> 很美很美哦~
CurrentThreadID: 22 offset: 81 partition: 1 key: 10 value: 今天的 <-- 杨柳 --> 很美很美哦~
CurrentThreadID: 22 offset: 82 partition: 1 key: 13 value: 今天的 <-- 姚慧莹 --> 很美很美哦~
CurrentThreadID: 23 offset: 81 partition: 0 key: 3 value: 今天的 <-- 刘向前 --> 很美很美哦~
CurrentThreadID: 23 offset: 82 partition: 0 key: 6 value: 今天的 <-- 姚慧莹 --> 很美很美哦~
CurrentThreadID: 23 offset: 83 partition: 0 key: 9 value: 今天的 <-- 姚慧莹 --> 很美很美哦~
CurrentThreadID: 23 offset: 84 partition: 0 key: 12 value: 今天的 <-- 周 新 --> 很美很美哦~
CurrentThreadID: 23 offset: 85 partition: 0 key: 15 value: 今天的 <-- 刘向前 --> 很美很美哦~
CurrentThreadID: 23 offset: 86 partition: 0 key: 18 value: 今天的 <-- 刘向前 --> 很美很美哦~
CurrentThreadID: 22 offset: 83 partition: 1 key: 16 value: 今天的 <-- 刘向前 --> 很美很美哦~
CurrentThreadID: 23 offset: 87 partition: 0 key: 21 value: 今天的 <-- 杨柳 --> 很美很美哦~
CurrentThreadID: 21 offset: 78 partition: 2 key: 2 value: 今天的 <-- 刘向前 --> 很美很美哦~
CurrentThreadID: 22 offset: 84 partition: 1 key: 19 value: 今天的 <-- 杨柳 --> 很美很美哦~
CurrentThreadID: 21 offset: 79 partition: 2 key: 5 value: 今天的 <-- 姚慧莹 --> 很美很美哦~
CurrentThreadID: 21 offset: 80 partition: 2 key: 8 value: 今天的 <-- 刘向前 --> 很美很美哦~
CurrentThreadID: 21 offset: 81 partition: 2 key: 11 value: 今天的 <-- 姚慧莹 --> 很美很美哦~
CurrentThreadID: 21 offset: 82 partition: 2 key: 14 value: 今天的 <-- 姚慧莹 --> 很美很美哦~
CurrentThreadID: 21 offset: 83 partition: 2 key: 17 value: 今天的 <-- 杨柳 --> 很美很美哦~
CurrentThreadID: 21 offset: 84 partition: 2 key: 20 value: 今天的 <-- 姚慧莹 --> 很美很美哦~
消费者 shell 的终端输出如下:
$ kafka-console-consumer.sh --topic hadoop --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
今天的 <-- 刘向前 --> 很美很美哦~
今天的 <-- 姚慧莹 --> 很美很美哦~
今天的 <-- 刘向前 --> 很美很美哦~
今天的 <-- 姚慧莹 --> 很美很美哦~
今天的 <-- 姚慧莹 --> 很美很美哦~
今天的 <-- 杨柳 --> 很美很美哦~
今天的 <-- 姚慧莹 --> 很美很美哦~
今天的 <-- 刘向前 --> 很美很美哦~
今天的 <-- 姚慧莹 --> 很美很美哦~
今天的 <-- 姚慧莹 --> 很美很美哦~
今天的 <-- 周 新 --> 很美很美哦~
今天的 <-- 刘向前 --> 很美很美哦~
今天的 <-- 刘向前 --> 很美很美哦~
今天的 <-- 杨柳 --> 很美很美哦~
今天的 <-- 刘向前 --> 很美很美哦~
今天的 <-- 杨柳 --> 很美很美哦~
今天的 <-- 杨柳 --> 很美很美哦~
今天的 <-- 杨柳 --> 很美很美哦~
今天的 <-- 姚慧莹 --> 很美很美哦~
今天的 <-- 刘向前 --> 很美很美哦~
今天的 <-- 杨柳 --> 很美很美哦~
分析
因为使用 kafka shell 的消费者的 group.id 是随机生成的, 所以其肯定可以消费到 topic 下 partition 的消息, 这是属于组间的消费
而由于在消费者的程序代码中, 4 个线程都是使用同一个 group.id 的(都是使用 consumer.properties 这个配置文件), 按照理论知识的理解, 因为 topic hadoop 只有 3 个 partition, 所以只能有 3 个线程即 3 个 consumer 进行消息的消费, 而观察输出, 通过线程 ID, 发现确实只有三个线程消费了 topic 中的消息, 这也验证了 kafka 组内消息的消费形式
Kafka 性能测试
参考文档: https://cwiki.apache.org/confluence/display/KAFKA/Performance+testing
生产能力测试
在 kafka 的安装目录的 bin 里有性能的评估工具
bin/kafka-producer-perf-test.sh
, 主要输出 4 项指标, 总共发送消息量(以 MB 为单位), 每秒发送消息量(MB/second), 发送消息总数, 每秒发送消息数(records/second)
测试如下:
- [uplooking@uplooking01 ~]$ kafka-producer-perf-test.sh --topic flume-kafka --num-records 1000000 --producer-props bootstrap.servers=uplooking01:9092,uplooking02:9092,uplooking03:9092 --throughput 10000 --record-size 100
- 49972 records sent, 9994.4 records/sec (0.95 MB/sec), 3.1 ms avg latency, 258.0 max latency.
- 50200 records sent, 10040.0 records/sec (0.96 MB/sec), 2.4 ms avg latency, 141.0 max latency.
- 50020 records sent, 10004.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 19.0 max latency.
- 50010 records sent, 10000.0 records/sec (0.95 MB/sec), 2.3 ms avg latency, 127.0 max latency.
- 50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.3 ms avg latency, 24.0 max latency.
- 50020 records sent, 10004.0 records/sec (0.95 MB/sec), 2.4 ms avg latency, 186.0 max latency.
- 50010 records sent, 10002.0 records/sec (0.95 MB/sec), 15.1 ms avg latency, 466.0 max latency.
- 50020 records sent, 10002.0 records/sec (0.95 MB/sec), 11.1 ms avg latency, 405.0 max latency.
- 50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 19.0 max latency.
- 50030 records sent, 10004.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 20.0 max latency.
- 50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 30.0 max latency.
- 50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.3 ms avg latency, 19.0 max latency.
- 49990 records sent, 9998.0 records/sec (0.95 MB/sec), 1.4 ms avg latency, 49.0 max latency.
- 50033 records sent, 10006.6 records/sec (0.95 MB/sec), 37.9 ms avg latency, 617.0 max latency.
- 50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.5 ms avg latency, 74.0 max latency.
- 50007 records sent, 10001.4 records/sec (0.95 MB/sec), 1.3 ms avg latency, 19.0 max latency.
- 50000 records sent, 10000.0 records/sec (0.95 MB/sec), 1.8 ms avg latency, 132.0 max latency.
- 50010 records sent, 10002.0 records/sec (0.95 MB/sec), 1.2 ms avg latency, 15.0 max latency.
- 50020 records sent, 10000.0 records/sec (0.95 MB/sec), 1.9 ms avg latency, 121.0 max latency.
- 1000000 records sent, 9999.200064 records/sec (0.95 MB/sec), 4.96 ms avg latency, 617.00 ms max latency, 1 ms 50th, 3 ms 95th, 105 ms 99th, 541 ms 99.9th.
参数说明如下:
--num-records 1000000 总共生产的消息数量
--throughput 10000 每秒需要生产的消息数量
--record-size 100 每条消息的大小, 单位为字节
消费能力测试
- [uplooking@uplooking01 ~]$ kafka-consumer-perf-test.sh --topic flume-kafka --messages 1000000 --broker-list uplooking01:9092,uplooking02:9092,uplooking03:9092 --threads 3 --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
- start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
- 2018-03-26 05:17:21:185, 2018-03-26 05:17:22:458, 97.3055, 76.4380, 1020325, 801512.1760
上面的测试为需要消费一百万条消息, 输出的参数说明如下:
开始时间 结束时间 消费消息总大小 每秒消费大小 消费消息总条数 每秒消费条数
- start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec
- 2018-03-26 05:17:21:185, 2018-03-26 05:17:22:458, 97.3055, 76.4380, 1020325, 801512.1760
Kafka 笔记整理(三): 消费形式验证与性能测试
来源: http://www.bubuko.com/infodetail-2538803.html