本人今天上午参考了不少博文, 发现不少博文不是特别好, 不是因为依赖冲突问题就是因为版本问题.
于是我结合相关的博文和案例, 自己改写了下并参考了下, 于是就有了这篇文章. 希望能够给大家帮助, 少走一些弯路.
一, KafKa 的介绍
1. 主要功能
根据官网的介绍, ApacheKafka® 是一个分布式流媒体平台, 它主要有 3 种功能:
a. 发布和订阅消息流, 这个功能类似于消息队列, 这也是 kafka 归类为消息队列框架的原因.
b. 以容错的方式记录消息流, kafka 以文件的方式来存储消息流.
c. 可以再消息发布的时候进行处理.
2. 使用场景
a. 在系统或应用程序之间构建可靠的用于传输实时数据的管道, 消息队列功能.
b. 构建实时的流数据处理程序来变换或处理数据流, 数据处理功能.
3. 详细介绍
Kafka 目前主要作为一个分布式的发布订阅式的消息系统使用, 下面简单介绍一下 kafka 的基本机制
消息传输过程:
Producer 即生产者, 向 Kafka 集群发送消息, 在发送消息之前, 会对消息进行分类, 即 Topic, 上图展示了两个 producer 发送了分类为 topic1 的消息, 另外一个发送了 topic2 的消息.
Topic 即主题, 通过对消息指定主题可以将消息分类, 消费者可以只关注自己需要的 Topic 中的消息
Consumer 即消费者, 消费者通过与 kafka 集群建立长连接的方式, 不断地从集群中拉取消息, 然后可以对这些消息进行处理.
二, 安装
安装包下载地址: http://kafka.apache.org/downloads
找到 0.11.0.1 版本, 如图:
1. 下载
wget https://archive.apache.org/dist/kafka/0.11.0.1/kafka_2.11-0.11.0.1.tgz
2. 解压
tar -xzvf kafka_2.11-0.11.0.1.tgz
配置说明:
consumer.properites 消费者配置, 这个配置文件用于配置开启的消费者, 此处我们使用默认的即可.
producer.properties 生产者配置, 这个配置文件用于配置开启的生产者, 此处我们使用默认的即可.
server.properties kafka 服务器的配置, 此配置文件用来配置 kafka 服务器, 目前仅介绍几个最基础的配置.
a.broker.id 申明当前 kafka 服务器在集群中的唯一 ID, 需配置为 integer, 并且集群中的每一个 kafka 服务器的 id 都应是唯一的, 我们这里采用默认配置即可.
b.listeners 申明此 kafka 服务器需要监听的端口号, 如果是在本机上跑虚拟机运行可以不用配置本项, 默认会使用 localhost 的地址, 如果是在远程服务器上运行则必须配置,
例如: listeners=PLAINTEXT:// 192.168.126.143:9092. 并确保服务器的 9092 端口能够访问.
c.zookeeper.connect 申明 kafka 所连接的 zookeeper 的地址 , 需配置为 zookeeper 的地址, 由于本次使用的是 kafka 高版本中自带 zookeeper,
使用默认配置即可, zookeeper.connect=localhost:2181.
3. 运行
首先运行 zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
运行成功, 显示如图:
然后运行 kafka
bin/kafka-server-start.sh config/server.properties
运行成功, 显示如图:
三, 整合 KafKa
1. 新建 Maven 项目导入 Maven 依赖
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>cn.test</groupId>
- <artifactId>kafka_demo</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>1.5.9.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <java.version>1.8</java.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- <version>1.1.1.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>com.google.code.gson</groupId>
- <artifactId>gson</artifactId>
- <version>2.8.2</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- <!-- 指定编译版本 -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- </plugins>
- <finalName>${project.artifactId}</finalName>
- </build>
- </project>
2. 编写消息实体
- package com.springboot.kafka.bean;
- import java.util.Date;
- import lombok.Data;
- @Data
- public class Message {
- private Long id; //id
- private String msg; // 消息
- private Date sendTime; // 时间戳
- }
有了 lombok, 每次编写实体不必要使用快捷键生成 seter 或 geter 方法了, 代码看起来更加简洁了.
3. 编写消息发送者 (可以理解为生产者, 最好联系详细介绍中的图)
- package com.springboot.kafka.producer;
- import java.util.Date;
- import java.util.UUID;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
- import com.google.gson.Gson;
- import com.google.gson.GsonBuilder;
- import com.springboot.kafka.bean.Message;
- import lombok.extern.slf4j.Slf4j;
- @Component
- @Slf4j
- public class KafkaSender {
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- private Gson gson = new GsonBuilder().create();
- // 发送消息方法
- public void send() {
- Message message = new Message();
- message.setId(System.currentTimeMillis());
- message.setMsg(UUID.randomUUID().toString());
- message.setSendTime(new Date());
- log.info("+++++++++++++++++++++ message = {}", gson.toJson(message));
- kafkaTemplate.send("zhisheng", gson.toJson(message));
- }
- }
4. 编写消息接收者 (可以理解为消费者)
- package com.springboot.kafka.producer;
- import java.util.Date;
- import java.util.UUID;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.kafka.core.KafkaTemplate;
- import org.springframework.stereotype.Component;
- import com.google.gson.Gson;
- import com.google.gson.GsonBuilder;
- import com.springboot.kafka.bean.Message;
- import lombok.extern.slf4j.Slf4j;
- @Component
- @Slf4j
- public class KafkaSender {
- @Autowired
- private KafkaTemplate<String, String> kafkaTemplate;
- private Gson gson = new GsonBuilder().create();
- // 发送消息方法
- public void send() {
- Message message = new Message();
- message.setId(System.currentTimeMillis());
- message.setMsg(UUID.randomUUID().toString());
- message.setSendTime(new Date());
- log.info("+++++++++++++++++++++ message = {}", gson.toJson(message));
- kafkaTemplate.send("zhisheng", gson.toJson(message));
- }
- }
5. 编写启动类
- package com.springboot.kafka;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.ConfigurableApplicationContext;
- import com.springboot.kafka.producer.KafkaSender;
- @SpringBootApplication
- public class KafkaApplication {
- public static void main(String[] args) {
- ConfigurableApplicationContext context = SpringApplication.run(KafkaApplication.class, args);
- KafkaSender sender = context.getBean(KafkaSender.class);
- for (int i = 0; i < 3; i++) {
- // 调用消息发送类中的消息发送方法
- sender.send();
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
6. 编写 application.properties 配置文件
- #============== kafka ===================
- # \u6307\u5B9Akafka \u4EE3\u7406\u5730\u5740\uFF0C\u53EF\u4EE5\u591A\u4E2A
- spring.kafka.Bootstrap-servers=192.168.126.143:9092
- #=============== provider =======================
- spring.kafka.producer.retries=0
- # \u6BCF\u6B21\u6279\u91CF\u53D1\u9001\u6D88\u606F\u7684\u6570\u91CF
- spring.kafka.producer.batch-size=16384
- spring.kafka.producer.buffer-memory=33554432
- # \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F
- spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
- spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
- #=============== consumer =======================
- # \u6307\u5B9A\u9ED8\u8BA4\u6D88\u8D39\u8005group id
- spring.kafka.consumer.group-id=test-consumer-group
- spring.kafka.consumer.auto-offset-reset=earliest
- spring.kafka.consumer.enable-auto-commit=true
- spring.kafka.consumer.auto-commit-interval=100
- # \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F
- spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
7. 运行结果
示例代码地址: https://github.com/youcong1996/study_simple_demo/tree/kafka_demo
如果按照上述流程没有达到预计的效果可以 Git clone 到本地.
来源: https://www.cnblogs.com/youcong/p/10216573.html