Apache Kafka Java Client API
Kafka 集成了 Producer/Consumer 连接 Broker 的客户端工具,但是在消息处理方面,这两者主要用于服务端(Broker)的简单操作,如:
1. 创建 Topic
2. 罗列出已存在的 Topic
3. 对已有 Topic 的 Produce/Consume 测试
跟其他的消息系统一样,Kafka 提供了多种不用语言实现的客户端 API,如: Java,Python,Ruby,Go 等。这些 API 极大的方便用户使用 Kafka 集群,本文将展示这些 API 的使用
Maven pom.xml 如下:
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.randy</groupId>
- <artifactId>kafka_api_demo</artifactId>
- <version>1.0-SNAPSHOT</version>
- <name>Maven</name>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>0.11.0.0</version>
- </dependency>
- </dependencies>
- </project>
4.1 Producer 的源码
- package com.randy;
- import java.util.Properties;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.Producer;
- import org.apache.kafka.clients.producer.ProducerRecord;
- /**
- * Author : RandySun
- * Date : 2017-08-13 16:23
- * Comment :
- */
- public class ProducerDemo {
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.1.110:9092");
- properties.put("acks", "all");
- properties.put("retries", 0);
- properties.put("batch.size", 16384);
- properties.put("linger.ms", 1);
- properties.put("buffer.memory", 33554432);
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- Producer < String,
- String > producer = null;
- try {
- producer = new KafkaProducer < String,
- String > (properties);
- for (int i = 0; i < 100; i++) {
- String msg = "Message " + i;
- producer.send(new ProducerRecord < String, String > ("HelloWorld", msg));
- System.out.println("Sent:" + msg);
- }
- } catch(Exception e) {
- e.printStackTrace();
- } finally {
- producer.close();
- }
- }
- }
可以使用 KafkaProducer 类的实例来创建一个 Producer,KafkaProducer 类的参数是一系列属性值,下面分析一下所使用到的重要的属性:
- properties.put("bootstrap.servers", "192.168.1.110:9092");
bootstrap.servers 是 Kafka 集群的 IP 地址,如果 Broker 数量超过 1 个,则使用逗号分隔,如 "192.168.1.110:9092,192.168.1.110:9092"。其中,192.168.1.110 是我的其中一台虚拟机的
IP 地址,9092 是所监听的端口
- properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
序列化类型。 Kafka 消息是以键值对的形式发送到 Kafka 集群的,其中 Key 是可选的,Value 可以是任意类型。但是在 Message 被发送到 Kafka 集群之前,Producer 需要把不同类型的消
息序列化为二进制类型。本例是发送文本消息到 Kafka 集群,所以使用的是 StringSerializer。
- for (int i = 0; i < 100; i++) {
- String msg = "Message " + i;
- producer.send(new ProducerRecord < String, String > ("HelloWorld", msg));
- System.out.println("Sent:" + msg);
- }
上述代码会发送 100 个消息到 HelloWorld 这个 Topic
4.2 Consumer 的源码
- package com.randy;
- import java.util.Arrays;
- import java.util.Properties;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- /**
- * Author : RandySun
- * Date : 2017-08-13 17:06
- * Comment :
- */
- public class ConsumerDemo {
- public static void main(String[] args) {
- Properties properties = new Properties();
- properties.put("bootstrap.servers", "192.168.1.110:9092");
- properties.put("group.id", "group-1");
- properties.put("enable.auto.commit", "true");
- properties.put("auto.commit.interval.ms", "1000");
- properties.put("auto.offset.reset", "earliest");
- properties.put("session.timeout.ms", "30000");
- properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer < String,
- String > kafkaConsumer = new KafkaConsumer < >(properties);
- kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
- while (true) {
- ConsumerRecords < String,
- String > records = kafkaConsumer.poll(100);
- for (ConsumerRecord < String, String > record: records) {
- System.out.printf("offset = %d, value = %s", record.offset(), record.value());
- System.out.println();
- }
- }
- }
- }
可以使用 KafkaConsumer 类的实例来创建一个 Consumer,KafkaConsumer 类的参数是一系列属性值,下面分析一下所使用到的重要的属性:
和 Producer 一样,是指向 Kafka 集群的 IP 地址,以逗号分隔。
Consumer 分组 ID
发序列化。Consumer 把来自 Kafka 集群的二进制消息反序列化为指定的类型。因本例中的 Producer 使用的是 String 类型,所以调用 StringDeserializer 来反序列化
Consumer 订阅了 Topic 为 HelloWorld 的消息,Consumer 调用 poll 方法来轮循 Kafka 集群的消息,其中的参数 100 是超时时间(Consumer 等待直到 Kafka 集群中没有消息为止):
- kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));
- while (true) {
- ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.printf("offset = %d, value = %s", record.offset(), record.value());
- System.out.println();
- }
- }
本文展示了如何创建一个 Producer 并生成 String 类型的消息,Consumer 消费这些消息。这些都是基于 Apache Kafka 0.11.0 Java API。
来源: http://www.cnblogs.com/qizhelongdeyang/p/7354183.html