- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>1.0.0</version>
- </dependency>
- public class DemoConsumer {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "10.20.0.139:9092");
- props.put("group.id", "test");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer < String,
- String > consumer = new KafkaConsumer < >(props);
- consumer.subscribe(Arrays.asList("foo", "bar"));
- while (true) {
- ConsumerRecords < String,
- String > records = consumer.poll(0);
- for (ConsumerRecord < String, String > record: records) {
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- }
- }
- }
- }
程序解释
- public class ManualConsumerDemo {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "10.20.0.139:9092");
- props.put("group.id", "test");
- props.put("enable.auto.commit", "false");---------------------------------//改动1
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList("foo", "bar"));
- final int minBatchSize = 200;---------------------------------------------//改动2
- List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
- while (true) {
- ConsumerRecords<String, String> records = consumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.println(record.key() + ", " + record.value());
- buffer.add(record);
- }
- if (buffer.size() >= minBatchSize) {
- consumer.commitSync();-------------------------------------------//改动3
- buffer.clear();
- }
- }
- }
- }
程序解释
- public class ResetOffsetDemo {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "10.20.0.139:9092");
- props.put("group.id", "test2"); --------------------------------------------改动1 props.put("auto.offset.reset", "earliest"); --------------------------------改动2 props.put("enable.auto.commit", "false");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer < String,
- String > consumer = new KafkaConsumer < >(props);
- consumer.subscribe(Arrays.asList("foo", "bar"));
- while (true) {--------------------------------------------------------------改动3
- //从订阅的主题中获取所有自上次offset提交到最新的数据
- ConsumerRecords < String,
- String > records = consumer.poll(Long.MAX_VALUE);
- //获取消息的分区信息
- for (TopicPartition partition: records.partitions()) {
- //获取指定分区得到的消息
- List < ConsumerRecord < String,
- String >> partitionRecords = records.records(partition);
- for (ConsumerRecord < String, String > record: partitionRecords) {
- //显示该分区得到记录的偏移量和值
- System.out.println(record.offset() + ", " + record.value());
- }
- //获取该分区上对于该消费者组的最近一条消息偏移量
- long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
- //提交最新的偏移量
- consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
- }
- }
- }
- }
程序解释
- public class PartitionAssignDemo {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "10.20.0.139:9092");
- props.put("group.id", "test3");
- props.put("auto.offset.reset", "earliest");
- props.put("enable.auto.commit", "false");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer < String,
- String > consumer = new KafkaConsumer < >(props);
- String topic = "foo";
- TopicPartition partition0 = new TopicPartition(topic, 0);
- //TopicPartition partition1 = new TopicPartition(topic, 1);
- consumer.assign(Arrays.asList(partition0));
- while (true) {
- ConsumerRecords < String,
- String > records = consumer.poll(2000);
- for (ConsumerRecord < String, String > record: records) {
- System.out.printf("offset = %d, partition = %d, value = %s%n", record.offset(), record.partition(), record.value());
- }
- }
- }
- }
此处的 Topic "foo" 只有一个 partition,id 为 0,因此如果分配不存在的 partition 给消费者时,在 poll 的时候就会造成阻塞
程序解释
创建一个新的 topic "muti_part",指定分区数为 2,往 "bar" 中发送几条消息,确保 2 个分区中都有消息
- //创建Topic
- kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic muti_part
- //发送消息
- kafka-console-producer.sh --broker-list localhost:9092 --topic muti_part
- >abc
- >def
- >ghi
- >ijk
- >jkl
- >lmn
- >123
- >
读取消息
- public class PartitionAssignDemo {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "10.20.0.139:9092");
- props.put("group.id", "test");
- props.put("auto.offset.reset", "earliest");
- props.put("enable.auto.commit", "false");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer < String,
- String > consumer = new KafkaConsumer < >(props);
- String topic = "muti_part";
- TopicPartition partition0 = new TopicPartition(topic, 0);
- //TopicPartition partition1 = new TopicPartition(topic, 1);
- consumer.assign(Arrays.asList(partition0));
- while (true) {
- ConsumerRecords < String,
- String > records = consumer.poll(100);
- for (ConsumerRecord < String, String > record: records) {
- System.out.printf("offset = %d, partition = %d, value = %s%n", record.offset(), record.partition(), record.value());
- }
- }
- }
- }
输出结果如下:
- 15:46:17,064 [ main ] [ INFO ]:109 - Kafka version : 1.0.0
- 15:46:17,064 [ main ] [ INFO ]:110 - Kafka commitId : aaa7af6d4a11b29d
- 15:46:17,172 [ main ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=test] Discovered coordinator 10.20.0.139:9092 (id: 2147483647 rack: null)
- offset = 0, partition = 0, value = def
- offset = 1, partition = 0, value = ijk
- offset = 2, partition = 0, value = lmn
结论:
kafka 均匀的把消息放到不同的 partition 中,该消费者只能获取指定分区的消息
情形一:Topic 只有一个 partition 时,以 topic "foo" 为例
- public class MultiConsumerSinglePartition {
- static class MyConsumer implements Runnable {
- KafkaConsumer < String,
- String > consumer;
- int buffer;
- String name;
- MyConsumer(KafkaConsumer < String, String > consumer, String name) {
- this.consumer = consumer;
- this.buffer = 0;
- this.name = name;
- }
- @Override public void run() {
- while (true) {
- ConsumerRecords < String,
- String > records = consumer.poll(100);
- for (ConsumerRecord < String, String > record: records) {
- System.out.printf("name = %s, partition = %d, offset = %d, key = %s, value = %s%n", name, record.partition(), record.offset(), record.key(), record.value());
- buffer++;
- }
- if (buffer >= 5) {
- consumer.commitSync();
- buffer = 0;
- System.out.println(name + " commit");
- }
- }
- }
- }
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "10.20.0.139:9092");
- props.put("group.id", "mcsp");
- props.put("enable.auto.commit", "false");
- props.put("auto.offset.reset", "earliest");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer < String,
- String > consumer = new KafkaConsumer < >(props);
- consumer.subscribe(Arrays.asList("foo"));
- new Thread(new MyConsumer(consumer, "consumer1")).start();
- }
- }
程序说明
输出结果如下:
- 16:30:51,033 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Discovered coordinator 10.20.0.139:9092 (id: 2147483647 rack: null)
- 16:30:51,036 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Revoking previously assigned partitions []
- 16:30:51,036 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp3] (Re-)joining group
- 16:30:51,049 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Successfully joined group with generation 1
- 16:30:51,050 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Setting newly assigned partitions [foo-0]
- name = consumer1, partition = 0, offset = 0, key = null, value = hello
- name = consumer1, partition = 0, offset = 1, key = null, value = baby
- name = consumer1, partition = 0, offset = 2, key = null, value = so
- ...
- name = consumer1, partition = 0, offset = 16, key = null, value = 8
- consumer1 commit
可以看出,进行了一次分组分配,把 partition0 分配给了 consumer1,consumer1 把所有消息读完之后,更新了 offset 值,此时,在开一个终端,把 name 改为 consumer2,重新启动一个进程,此时,输出如下:
- consumer1上的输出:16 : 31 : 18,
- 052[Thread - 1][INFO] : 341 - [Consumer clientId = consumer - 1, groupId = mcsp3] Revoking previously assigned partitions[foo - 0] 16 : 31 : 18,
- 053[Thread - 1][INFO] : 336 - [Consumer clientId = consumer - 1, groupId = mcsp3](Re - ) joining group 16 : 31 : 18,
- 068[Thread - 1][INFO] : 341 - [Consumer clientId = consumer - 1, groupId = mcsp3] Successfully joined group with generation 2 16 : 31 : 18,
- 069[Thread - 1][INFO] : 341 - [Consumer clientId = consumer - 1, groupId = mcsp3] Setting newly assigned partitions[foo - 0]
- consumer2上的输出:16 : 31 : 15,
- 567[main][INFO] : 109 - Kafka version: 1.0.0 16 : 31 : 15,
- 567[main][INFO] : 110 - Kafka commitId: aaa7af6d4a11b29d 16 : 31 : 15,
- 669[Thread - 1][INFO] : 341 - [Consumer clientId = consumer - 1, groupId = mcsp3] Discovered coordinator 10.20.0.139 : 9092(id: 2147483647 rack: null) 16 : 31 : 15,
- 672[Thread - 1][INFO] : 341 - [Consumer clientId = consumer - 1, groupId = mcsp3] Revoking previously assigned partitions[] 16 : 31 : 15,
- 672[Thread - 1][INFO] : 336 - [Consumer clientId = consumer - 1, groupId = mcsp3](Re - ) joining group 16 : 31 : 18,
- 066[Thread - 1][INFO] : 341 - [Consumer clientId = consumer - 1, groupId = mcsp3] Successfully joined group with generation 2 16 : 31 : 18,
- 069[Thread - 1][INFO] : 341 - [Consumer clientId = consumer - 1, groupId = mcsp3] Setting newly assigned partitions[]
可以看出,由于 consumer2 的加入,导致重新消费者组的消息均衡策略被重新刷新,现在往 foo 中发送 2 条消息,结果如下:只有 consumer1 有输出,consumer2 没有输出,也就是 partition0 被分配给了 consumer1,由于只有 2 条消息,consumer1 并没有提交 offset,现在断开 consumer1 进程,发现 consumer2 输出如下:
- 16:32:27,074 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Revoking previously assigned partitions []
- 16:32:27,074 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp3] (Re-)joining group
- 16:32:27,081 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Successfully joined group with generation 3
- 16:32:27,082 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp3] Setting newly assigned partitions [foo-0]
- name = consumer2, partition = 0, offset = 17, key = null, value = a
- name = consumer2, partition = 0, offset = 18, key = null, value = b
没错,消费者组策略又被重新分配了,consumer2 输出了刚刚发送的 2 条消息,这里就导致了一个问题,由于 consumer1 的异常关闭,导致没有提交最新的 offset,导致那 2 条消息被消费了 2 次,解决这个问题的办法见扩展六:在消费者监听 Topic 时添加 ConsumerRebalanceListener
情形二:Topic 有多个 patition 时,以 topic "muti_part" 为例
代码和操作方式跟上述一致,把 topic 名字改为 muti_part 即可
当先启动的 consumer 把消息消费完后,有新消费者加入是,会 rebalence,此时,再往该 Topic 里面发消息时,出现:
- consumer1:
- 17:12:32,879 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp] Revoking previously assigned partitions [muti_part-0, muti_part-1]
- 17:12:32,879 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp] (Re-)joining group
- 17:12:32,890 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp] Successfully joined group with generation 4
- 17:12:32,892 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp] Setting newly assigned partitions [muti_part-0]
- name = consumer1, partition = 0, offset = 3, key = null, value = b
- name = consumer1, partition = 0, offset = 4, key = null, value = d
- name = consumer1, partition = 0, offset = 5, key = null, value = f
- name = consumer1, partition = 0, offset = 6, key = null, value = h
- consumer2:
- name = consumer2, partition = 1, offset = 4, key = null, value = a
- name = consumer2, partition = 1, offset = 5, key = null, value = c
- name = consumer2, partition = 1, offset = 6, key = null, value = e
- name = consumer2, partition = 1, offset = 7, key = null, value = g
可以看出,每个消费者负责一个 partition
- public class MultiConsumerSinglePartition {
- static class MyConsumer implements Runnable {
- KafkaConsumer < String,
- String > consumer;
- int buffer;
- String name;
- MyConsumer(KafkaConsumer < String, String > consumer, String name) {
- this.consumer = consumer;
- this.buffer = 0;
- this.name = name;
- }
- @Override public void run() {
- while (true) {
- ConsumerRecords < String,
- String > records = consumer.poll(100);
- for (ConsumerRecord < String, String > record: records) {
- System.out.printf("name = %s, partition = %d, offset = %d, key = %s, value = %s%n", name, record.partition(), record.offset(), record.key(), record.value());
- buffer++;
- }
- if (buffer >= 5) {
- consumer.commitSync();
- buffer = 0;
- System.out.println(name + " commit");
- }
- }
- }
- }
- static class MyListener implements ConsumerRebalanceListener {
- KafkaConsumer < String,
- String > consumer;
- String name;
- MyListener(KafkaConsumer < String, String > consumer, String name) {
- this.consumer = consumer;
- this.name = name;
- }
- @Override public void onPartitionsRevoked(Collection < TopicPartition > partitions) {
- Map < TopicPartition,
- OffsetAndMetadata > map = new HashMap < >();
- for (TopicPartition partition: partitions) {
- System.out.println("revoke " + name + " from partition " + partition.partition());
- System.out.println("commit partition " + partition.partition() + " offset " + consumer.position(partition));
- map.put(partition, new OffsetAndMetadata(consumer.position(partition)));
- }
- consumer.commitSync(map);
- }
- @Override public void onPartitionsAssigned(Collection < TopicPartition > partitions) {
- for (TopicPartition partition: partitions) {
- System.out.println("assign partition " + partition.partition() + " to " + name);
- }
- }
- }
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "47.100.49.129:9092");
- props.put("group.id", "mcsp2");
- props.put("enable.auto.commit", "false");
- props.put("auto.offset.reset", "earliest");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer < String,
- String > consumer = new KafkaConsumer < >(props);
- String name = "consumer2";
- consumer.subscribe(Arrays.asList("muti_part"), new MyListener(consumer, name));
- new Thread(new MyConsumer(consumer, name)).start();
- }
- }
程序说明
自定义 MyListener 类实现 ConsumerRebalanceListener 接口,其中 onPartitionsRevoked 方法表示当某个分区从指定消费者移除时应该做的动作,这里实现为提交每个分区最新的 offset 值,以免 rebalance 完成之后消息重复消费
首先启动一个消费者,在启动另一个消费者,第一个消费者输出如下:
- 22:26:49,555 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Discovered coordinator 47.100.49.129:9092 (id: 2147483647 rack: null)
- 22:26:49,565 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Revoking previously assigned partitions []
- 22:26:49,565 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp2] (Re-)joining group
- assign partition 0 to consumer1
- assign partition 1 to consumer1
- 22:26:49,645 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Successfully joined group with generation 3
- 22:26:49,645 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Setting newly assigned partitions [muti_part-0, muti_part-1]
- name = consumer1, partition = 0, offset = 0, key = null, value = 2
- name = consumer1, partition = 1, offset = 0, key = null, value = 1
- name = consumer1, partition = 1, offset = 1, key = null, value = 3
- revoke consumer1 from partition 0
- commit partition 0 offset 1
- 22:27:49,735 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Revoking previously assigned partitions [muti_part-0, muti_part-1]
- revoke consumer1 from partition 1
- commit partition 1 offset 2
- 22:27:49,765 [ Thread-1 ] [ INFO ]:336 - [Consumer clientId=consumer-1, groupId=mcsp2] (Re-)joining group
- 22:27:49,795 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Successfully joined group with generation 4
- assign partition 0 to consumer1
- 22:27:49,795 [ Thread-1 ] [ INFO ]:341 - [Consumer clientId=consumer-1, groupId=mcsp2] Setting newly assigned partitions [muti_part-0]
可以看出,在没有启动第二个消费者之前,2 个分区都被指派给了 consumer1,consumer1 读取了 3 条消息,并没有提交
consumer2 输出如下:
- 22 : 27 : 48,
- 488[Thread - 1][INFO] : 341 - [Consumer clientId = consumer - 1, groupId = mcsp2] Discovered coordinator 47.100.49.129 : 9092(id: 2147483647 rack: null) 22 : 27 : 48,
- 488[Thread - 1][INFO] : 341 - [Consumer clientId = consumer - 1, groupId = mcsp2] Revoking previously assigned partitions[] 22 : 27 : 48,
- 488[Thread - 1][INFO] : 336 - [Consumer clientId = consumer - 1, groupId = mcsp2](Re - ) joining group assign partition 1 to consumer2 22 : 27 : 49,
- 795[Thread - 1][INFO] : 341 - [Consumer clientId = consumer - 1, groupId = mcsp2] Successfully joined group with generation 4 22 : 27 : 49,
- 795[Thread - 1][INFO] : 341 - [Consumer clientId = consumer - 1, groupId = mcsp2] Setting newly assigned partitions[muti_part - 1]
可以看出,当 consumer2 启动时,kafka 将所有分区收回再重新分配,收回触发了 consumer1 的 listener 接口提交最新的 offset,因此 consumer2 不会重复读到数据
注:此种方法只能用于有新的消费者加入组时使用,当消费者异常断开时,依然不会提交 offset,若想要保证消费者断开时不会重复消费数据,则可以通过指定 partition 的方式监听,同时把 offset 保存起来,原则是不让 kafka 进行 rebalance
接口一览
- public void seek(TopicPartition partition, long offset);
- public void seekToBeginning(Collection < TopicPartition > partitions);
- public void seekToEnd(Collection < TopicPartition > partitions);
Demo 程序
- public class ManualSeekDemo {
- public static void main(String[] args) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "47.100.49.129:9092");
- props.put("group.id", "mcsp5");
- props.put("enable.auto.commit", "false");
- props.put("auto.offset.reset", "earliest");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer < String,
- String > consumer = new KafkaConsumer < >(props);
- consumer.subscribe(Arrays.asList("foo"));
- int buffer = 0;
- long lastOffset = 0;
- int part = 0;
- while (true) {
- ConsumerRecords < String,
- String > records = consumer.poll(100);
- for (ConsumerRecord < String, String > record: records) {
- System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
- buffer++;
- lastOffset = record.offset();
- part = record.partition();
- }
- if (buffer >= 3) {
- buffer = 0;
- System.out.println("seek to " + (lastOffset - 1));
- consumer.seek(new TopicPartition("foo", part), (lastOffset - 1));
- }
- }
- }
- }
程序说明
来源: http://www.jianshu.com/p/1a5d67beb914