前言
在 上一篇 中讲述如何搭建 kafka 集群, 本篇则讲述如何简单的使用 kafka . 不过在使用 kafka 的时候, 还是应该简单的了解下 kafka.
Kafka 的介绍
Kafka 是一种高吞吐量的分布式发布订阅消息系统, 它可以处理消费者规模的网站中的所有动作流数据.
Kafka 有如下特性:
以时间复杂度为 O(1) 的方式提供消息持久化能力, 即使对 TB 级以上数据也能保证常数时间复杂度的访问性能.
高吞吐率. 即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条以上消息的传输.
支持 Kafka Server 间的消息分区, 及分布式消费, 同时保证每个 Partition 内的消息顺序传输.
同时支持离线数据处理和实时数据处理.
Scale out: 支持在线水平扩展.
kafka 的术语
Broker:Kafka 集群包含一个或多个服务器, 这种服务器被称为 broker.
Topic: 每条发布到 Kafka 集群的消息都有一个类别, 这个类别被称为 Topic.(物理上不同 Topic 的消息分开存储, 逻辑上一个 Topic 的消息虽然保存于一个或多个 broker 上但用户只需指定消息的 Topic 即可生产或消费数据而不必关心数据存于何处)
Partition:Partition 是物理上的概念, 每个 Topic 包含一个或多个 Partition.
Producer: 负责发布消息到 Kafka broker.
Consumer: 消息消费者, 向 Kafka broker 读取消息的客户端.
Consumer Group: 每个 Consumer 属于一个特定的 Consumer Group(可为每个 Consumer 指定 group name, 若不指定 group name 则属于默认的 group).
kafka 核心 Api
kafka 有四个核心 API
应用程序使用 producer API 发布消息到 1 个或多个 topic 中.
应用程序使用 consumer API 来订阅一个或多个 topic, 并处理产生的消息.
应用程序使用 streams API 充当一个流处理器, 从 1 个或多个 topic 消费输入流, 并产生一个输出流到 1 个或多个 topic, 有效地将输入流转换到输出流.
connector API 允许构建或运行可重复使用的生产者或消费者, 将 topic 链接到现有的应用程序或数据系统.
示例图如下:
kafka 应用场景
构建可在系统或应用程序之间可靠获取数据的实时流数据管道.
构建实时流应用程序, 可以转换或响应数据流.
以上介绍参考 kafka 官方文档.
开发准备
如果我们要开发一个 kafka 的程序, 应该做些什么呢?
首先, 在搭建好 kafka 环境之后, 我们要考虑的是我们是生产者还是消费者, 也就是消息的发送者还是接受者.
不过在本篇中, 生产者和消费者都会进行开发和讲解.
在大致的了解 kafka 之后, 我们来开发第一个程序.
这里用的开发语言是 Java, 构建工具 Maven.
Maven 的依赖如下:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
Kafka Producer
在开发生产的时候, 先简单的介绍下 kafka 各种配置说明:
bootstrap.servers: kafka 的地址.
acks: 消息的确认机制, 默认值是 0.
acks=0: 如果设置为 0, 生产者不会等待 kafka 的响应.
acks=1: 这个配置意味着 kafka 会把这条消息写到本地日志文件中, 但是不会等待集群中其他机器的成功响应.
acks=all: 这个配置意味着 leader 会等待所有的 follower 同步完成. 这个确保消息不会丢失, 除非 kafka 集群中所有机器挂掉. 这是最强的可用性保证.
retries: 配置为大于 0 的值的话, 客户端会在消息发送失败时重新发送.
batch.size: 当多条消息需要发送到同一个分区时, 生产者会尝试合并网络请求. 这会提高 client 和生产者的效率.
key.serializer: 键序列化, 默认 org.apache.kafka.common.serialization.StringDeserializer.
value.deserializer: 值序列化, 默认 org.apache.kafka.common.serialization.StringDeserializer.
...
还有更多配置, 可以去查看官方文档, 这里就不在说明了.
那么我们 kafka 的 producer 配置如下:
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
kafka 的配置添加之后, 我们便开始生产数据, 生产数据代码只需如下就行:
producer.send(new ProducerRecord<String, String>(topic,key,value));
topic: 消息队列的名称, 可以先行在 kafka 服务中进行创建. 如果 kafka 中并未创建该 topic, 那么便会自动创建!
key: 键值, 也就是 value 对应的值, 和 Map 类似.
value: 要发送的数据, 数据格式为 String 类型的.
在写好生产者程序之后, 那我们先来生产吧!
我这里发送的消息为:
String messageStr="你好, 这是第"+messageNo+"条数据";
并且只发送 1000 条就退出, 结果如下:
可以看到信息成功的打印了.
如果不想用程序进行验证程序是否发送成功, 以及消息发送的准确性, 可以在 kafka 服务器上使用命令查看.
Kafka Consumer
kafka 消费这块应该来说是重点, 毕竟大部分的时候, 我们主要使用的是将数据进行消费.
kafka 消费的配置如下:
bootstrap.servers: kafka 的地址.
group.id: 组名 不同组名可以重复消费. 例如你先使用了组名 A 消费了 kafka 的 1000 条数据, 但是你还想再次进行消费这 1000 条数据, 并且不想重新去产生, 那么这里你只需要更改组名就可以重复消费了.
enable.auto.commit: 是否自动提交, 默认为 true.
auto.commit.interval.ms: 从 poll(拉) 的回话处理时长.
session.timeout.ms: 超时时间.
max.poll.records: 一次最大拉取的条数.
auto.offset.reset: 消费规则, 默认 earliest .
earliest: 当各分区下有已提交的 offset 时, 从提交的 offset 开始消费; 无提交的 offset 时, 从头开始消费 .
latest: 当各分区下有已提交的 offset 时, 从提交的 offset 开始消费; 无提交的 offset 时, 消费新产生的该分区下的数据 .
none: topic 各分区都存在已提交的 offset 时, 从 offset 后开始消费; 只要有一个分区不存在已提交的 offset, 则抛出异常.
key.serializer: 键序列化, 默认 org.apache.kafka.common.serialization.StringDeserializer.
value.deserializer: 值序列化, 默认 org.apache.kafka.common.serialization.StringDeserializer.
那么我们 kafka 的 consumer 配置如下:
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records", 1000);
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
由于我这是设置的自动提交, 所以消费代码如下:
我们需要先订阅一个 topic, 也就是指定消费哪一个 topic.
consumer.subscribe(Arrays.asList(topic));
订阅之后, 我们再从 kafka 中拉取数据:
ConsumerRecords<String, String> msgList=consumer.poll(1000);
一般来说进行消费会使用监听, 这里我们就用 for(;;) 来进行监听, 并且设置消费 1000 条就退出!
结果如下:
可以看到我们这里已经成功消费了生产的数据了.
代码
那么生产者和消费者的代码如下:
生产者:
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
/**
*
* Title: KafkaProducerTest
* Description:
* kafka 生产者 demo
* Version:1.0.0
* @author pancm
* @date 2018 年 1 月 26 日
*/
public class KafkaProducerTest implements Runnable {
private final KafkaProducer < String,
String > producer;
private final String topic;
public KafkaProducerTest(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer < String,
String > (props);
this.topic = topicName;
}@Override public void run() {
int messageNo = 1;
try {
for (;;) {
String messageStr = "你好, 这是第" + messageNo + "条数据";
producer.send(new ProducerRecord < String, String > (topic, "Message", messageStr));
// 生产了 100 条就打印
if (messageNo % 100 == 0) {
System.out.println("发送的信息:" + messageStr);
}
// 生产 1000 条就退出
if (messageNo % 1000 == 0) {
System.out.println("成功发送了" + messageNo + "条");
break;
}
messageNo++;
}
} catch(Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
public static void main(String args[]) {
KafkaProducerTest test = new KafkaProducerTest("KAFKA_TEST");
Thread thread = new Thread(test);
thread.start();
}
}
消费者:
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
*
* Title: KafkaConsumerTest
* Description:
* kafka 消费者 demo
* Version:1.0.0
* @author pancm
* @date 2018 年 1 月 26 日
*/
public class KafkaConsumerTest implements Runnable {
private final KafkaConsumer < String,
String > consumer;
private ConsumerRecords < String,
String > msgList;
private final String topic;
private static final String GROUPID = "groupA";
public KafkaConsumerTest(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer < String,
String > (props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}@Override public void run() {
int messageNo = 1;
System.out.println("--------- 开始消费 ---------");
try {
for (;;) {
msgList = consumer.poll(1000);
if (null != msgList && msgList.count() > 0) {
for (ConsumerRecord < String, String > record: msgList) {
// 消费 100 条就打印 , 但打印的数据不一定是这个规律的
if (messageNo % 100 == 0) {
System.out.println(messageNo + "=======receive: key =" + record.key() + ", value =" + record.value() + "offset===" + record.offset());
}
// 当消费了 1000 条就退出
if (messageNo % 1000 == 0) {
break;
}
messageNo++;
}
} else {
Thread.sleep(1000);
}
}
} catch(InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void main(String args[]) {
KafkaConsumerTest test1 = new KafkaConsumerTest("KAFKA_TEST");
Thread thread1 = new Thread(test1);
thread1.start();
}
}
注: master,slave1,slave2 是因为我在自己的环境做了关系映射, 这个可以换成服务器的 IP.
当然项目我放在 Github 上了, 有兴趣的可以看看. https://github.com/xuwujing/kafka
总结
简单的开发一个 kafka 的程序需要以下步骤:
成功搭建 kafka 服务器, 并成功启动!
得到 kafka 服务信息, 然后在代码中进行相应的配置.
配置完成之后, 监听 kafka 中的消息队列是否有消息产生.
将产生的数据进行业务逻辑处理!
kafka 介绍参考官方文档:
http://kafka.apache.org/intro
到此, 本文就结束了, 谢谢阅读!
来源: https://www.cnblogs.com/xuwujing/p/8371127.html