什么是 kafka
Kafka 官网自己的介绍是: 一个可支持分布式的流平台.
kafka 官网介绍 https://kafka.apache.org/intro
kafka 三个关键能力:
1. 发布订阅记录流, 类似于消息队列与企业信息系统
2. 以容错的持久方式存储记录流
3. 对流进行处理
kafka 通常应用再两大类应用中:
1. 构建实时流数据管道, 在系统或应用程序之间可靠地获取数据
2. 构建转换或响应数据流的实时流应用程序
kafka 的一些基本概念:
1.Kafka 作为一个集群运行在一个或多个服务器上, 这些服务器可以跨越多个数据中心.
2.Kafka 集群将记录流存储在称为 topic 的类别中.
3. 每个记录由一个键, 一个值和一个时间戳组成.
kafka 核心 API:
1.Producer API: 允许应用程序将记录流发布到一个或多个 topic.
2.Consumer API: 允许应用程序订阅一个或多个 topic 并处理生成给它们的记录流.
3.Streams API: 允许应用程序充当流处理器, 使用来自一个或多个 topic 的输入流,
并生成一个或多个输出 topic 的输出流, 从而有效地将输入流转换为输出流.
4.Connector API: 允许构建和运行可重用的生产者或消费者, 将 topic 连接到现有的应用程序或数据系统.
例如, 到关系数据库的连接器可能捕获对表的每个更改.
作为消息系统
传统消息传递有两类模型: 消息队列, 发布订阅. 在消息队列中, 一个消费者池可以从一个服务器读取数据, 而每个记录都将被发送到其中一个服务器; 在发布 - 订阅中, 记录被广播给所有消费者. 这两种模型各有优缺点:
消息队列优缺点:
它允许您在多个使用者实例上划分数据处理, 这使您可以扩展处理.
队列不是多订阅者的 - 一旦一个进程读取了它丢失的数据.
发布订阅优缺点:
Publish-subscribe 允许您将数据广播到多个进程,
但是由于每个消息都传递到每个订阅者, 因此无法扩展处理.
作为消息传递系统, 那么跟 mq 有什么区别呢?(RabbitMq\Redis\RocketMq\ActiveMq)
RabbitMQ:
遵循 AMQP 协议, 由内在高并发的 erlang 语言开发, 用在实时的对可靠性要求比较高的消息传递上.
万级数据量, 社区活跃度极高, 可视化操作界面丰富.
提供了全面的核心功能, 是消息队列的优秀产品.
因为是 erlang 语言开发, 难以维护并且开发者很难二次开发.
Redis:
Redis 的主要场景是内存数据库, 作为消息队列来说可靠性太差, 而且速度太依赖网络 IO.
在服务器本机上的速度较快, 且容易出现数据堆积的问题, 在比较轻量的场合下能够适用.
RocketMq:
rocketMq 几十万级别数据量, 基于 Java 开发. 是阿里巴巴开源的一个消息产品.
应对了淘宝双十一考验, 并且文档十分的完善, 拥有一些其他消息队列不具备的高级特性,
如定时推送, 其他消息队列是延迟推送, 如 rabbitMq 通过设置 expire 字段设置延迟推送时间.
又比如 rocketmq 实现分布式事务, 比较可靠的. RocketMq 也是用过的唯一支持分布式事务的一款产品.
Kafka:
kafka 原本设计的初衷是日志统计分析, 现在基于大数据的背景下也可以做运营数据的分析统计.
kafka 真正的大规模分布式消息队列, 提供的核心功能比较少. 基于 zookeeper 实现的分布式消息订阅.
几十万级数据量级, 比 RokectMq 更强.
客户端和服务器之间的通信是通过一个简单的, 高性能的, 语言无关的 TCP 协议来完成的.
ActiveMq:
Apache ActiveMQ? 是最流行的开源, 多协议, 基于 java 的消息服务器. 它支持行业标准协议,
因此用户可以在各种语言和平台上选择客户端. 可以使用来自 C,c++,Python,. net 等的连接性.
使用通用的 AMQP 协议集成您的多平台应用程序. 使用 STOMP 在 websockets 上交换 Web 应用程序之间的消息.
使用 MQTT 管理物联网设备. 支持您现有的 JMS 基础结构及其他. ActiveMQ 提供了支持任何 messagi 的强大功能
和灵活性.
备注: 因为该文章主要介绍 kafka, 所以上述只是简单罗列了一些特点, 如果有兴趣的同学可以详细的分析一下, 这些产品我后续都会专门写文章来归纳总结分析, 在这里先简单带过.
为什么要用消息队列?
该部分是扩展内容, 很多人包括我刚毕业那年使用消息队列, 但别人问道我为啥用消息队列, 我都没有一个很清晰的认识, 所以在这里也说一下. 希望给有需要的同学一些帮助.
那么为什么要使用消息队列呢? 首先我们来回顾一下消息传递. 前端而言, 传统方式是通过全局变量来传递, 后面有了数据总线的概念, 再后来有相应的解决方案产品比如说 vuex,redux,store 等. 对于后端来说, 最先系统之间的通信, 消息传递都非常依赖于通信对象彼此, 高度耦合, 后面有了一些产品来解决这些问题, 比如说 webservice. 但这样的方式极其不友好, 而且维护繁琐, 职责难以分清, 工作量增加, 所以 mq 诞生后, 基本解决了这些问题.
消息队列的引入是为了:
1. 解耦:
比如: A 系统操作 p, 需要将消息传递给 B,C 两个系统, 如果没有消息队列, 那么 A 系统中需要给 B 发一条消息,
又得给 C 发一条消息, 然后有一天 D,E,F 系统说: A 系统你也要给我发 p 的消息, 这个时候 A 又得修改代码,
发布上线, DEF 才能正常接收消息. 然后过了 n 天, C 又说, 不要给我发消息了, 把给我发消息的部分去掉吧.
A 系统的开发人员又得哐哧哐哧的去掉, 发布上线. 这样日复一日, 随着系统增多, 接入和退出的操作增多,
那么 A 系统需要频繁发布上线, 降低了稳定性, 可用时间, 同时每次上线都需要测试跟踪测试, 这里面的成本
与风险不言而喻. 而消息队列一旦引入, A 不需要关心谁消费, 谁退出消费, A 只负责将消息放入队列即可,
而其他系统只需要监听这个队列, 就算其他系统退出, 对 A 而言也是没有任何影响的, 能够一直持续不断的
提供服务, 这难道不香吗?
2. 异步
比如说: 传统方式发送消息给 B,C,D, 需要 120ms, 那么如果采用了消息队列, 就可以大大降低耗时. 但
这些对于那些非必要的同步业务逻辑适用.
3. 削峰
传统模式下, 请求直接进入到数据库, 当峰值到达一定时, 必然会挂掉. 如果适用了中间件消息队列, 那么就可以很好的保证系统正常提供服务, 这也是秒杀系统中会常常谈到的限流, 这样可以防止系统崩溃, 提供系统可用性.
配置 MAVEN
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.12</artifactId>
- <version>1.0.0</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>1.0.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-streams</artifactId>
- <version>1.0.0</version>
- </dependency>
生产者
- /**
- * @author chandlerHuang
- * @description @TODO
- * @date 2020/1/15
- */
- public class KafkaProducerService implements Runnable {
- private final KafkaProducer<String,String> producer;
- private final String topic;
- public KafkaProducerService(String topic) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "绑定的外网 IP:9092");
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("batch.size", 16384);
- props.put("key.serializer", StringSerializer.class.getName());
- props.put("value.serializer", StringSerializer.class.getName());
- this.producer = new KafkaProducer<String, String>(props);
- this.topic = topic;
- }
- @Override
- public void run() {
- int messageNo = 1;
- try {
- for(;;) {
- String messageStr="["+messageNo+"]:hello,boys!";
- producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
- // 生产了 100 条就打印
- if(messageNo%100==0){
- System.out.println("sendMessages:" + messageStr);
- }
- // 生产 1000 条就退出
- if(messageNo%1000==0){
- System.out.println("successCount:"+messageNo);
- break;
- }
- messageNo++;
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- producer.close();
- }
- }
- public static void main(String args[]) {
- KafkaProducerService test = new KafkaProducerService(TopicConstant.CHART_TOPIC);
- Thread thread = new Thread(test);
- thread.start();
- }
- }
消费者
- /**
- * @author chandlerHuang
- * @description @TODO
- * @date 2020/1/15
- */
- public class KafkaConsumerService implements Runnable{
- private final KafkaConsumer<String, String> consumer;
- private ConsumerRecords<String, String> msgList;
- private final String topic;
- private static final String GROUPID = "groupA";
- public KafkaConsumerService(String topicName) {
- Properties props = new Properties();
- props.put("bootstrap.servers", "绑定的外网 IP:9092");
- props.put("group.id", GROUPID);
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "1000");
- props.put("session.timeout.ms", "30000");
- props.put("auto.offset.reset", "earliest");
- props.put("key.deserializer", StringDeserializer.class.getName());
- props.put("value.deserializer", StringDeserializer.class.getName());
- this.consumer = new KafkaConsumer<String, String>(props);
- this.topic = topicName;
- this.consumer.subscribe(Arrays.asList(topic));
- }
- @Override
- public void run() {
- int messageNo = 1;
- System.out.println("--------- 开始消费 ---------");
- try {
- for (;;) {
- msgList = consumer.poll(1000);
- if(null!=msgList&&msgList.count()>0){
- for (ConsumerRecord<String, String> record : msgList) {
- // 消费 100 条就打印 , 但打印的数据不一定是这个规律的
- if(messageNo%100==0){
- System.out.println(messageNo+"=======receive: key =" + record.key() + ", value =" + record.value()+"offset==="+record.offset());
- }
- // 当消费了 1000 条就退出
- if(messageNo%1000==0){
- break;
- }
- messageNo++;
- }
- }else{
- Thread.sleep(1000);
- }
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- consumer.close();
- }
- }
- public static void main(String args[]) {
- KafkaConsumerService test1 = new KafkaConsumerService(TopicConstant.CHART_TOPIC);
- Thread thread1 = new Thread(test1);
- thread1.start();
- }
- }
备注: 上述 demo 编写过程中, 发现报了一个 Exception:Kafka java client 连接异常 (org.apache.kafka.common.errors.TimeoutException: Failed to update metadata )...
kafka 中需要配置 server. 文件:
advertised.listeners=PLAINTEXT:// 外网地址: 9092
zookeeper.connect = 内网地址: 2181
如果你是云服务器的话需要, 在安全组设置对应端口开放, 否则无法访问响应接口!
来源: http://www.bubuko.com/infodetail-3382014.html