[转载请注明] :
一, kafka 介绍
Kafka 是最初由 Linkedin 公司开发, 是一个分布式, 分区的, 多副本的, 多订阅者, 基于 zookeeper 协调的分布式日志系统 (也可以当做 MQ 系统), 常见可以用于 web/nginx 日志, 访问日志, 消息服务等等, Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目.
主要应用场景是: 日志收集系统和消息系统.
Kafka 主要设计目标如下:
以时间复杂度为 O(1) 的方式提供消息持久化能力, 即使对 TB 级以上数据也能保证常数时间的访问性能.
高吞吐率. 即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条消息的传输.
支持 Kafka Server 间的消息分区, 及分布式消费, 同时保证每个 partition 内的消息顺序传输.
同时支持离线数据处理和实时数据处理.
支持在线水平扩展
二, kafka 架构图
三, kafka 安装与测试
1, 配置 JDK 环境
Kafka 使用 Zookeeper 来保存相关配置信息, Kafka 及 Zookeeper 依赖 Java 运行环境, 从 oracle 网站下载 JDK 安装包, 解压安装
- tar zxvf jdk-8u171-Linux-x64.tar.gz
- mv jdk1.8.0_171 /usr/local/java/
设置 Java 环境变量:
- #java
- export JAVA_HOME=/usr/local/java/jdk1.8.0_171
- export PATH=$PATH:$JAVA_HOME/bin
- export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib:$JAVA_HOME/jre/lib
2, 安装 kafka
下载地址: http://kafka.apache.org/downloads
- cd /opt
- wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
- tar zxvf kafka_2.11-2.3.0.tgz
- mv kafka_2.11-2.3.0 /usr/local/apps/
- cd /usr/local/apps/
- ln -s kafka_2.11-2.3.0 kafka
3, 启动测试
(1) 启动 Zookeeper 服务
- cd /usr/local/apps/kafka
- #执行脚本
- bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
- #查看进程
- jps
(2) 启动单机 Kafka 服务
- # 执行脚本
- bin/kafka-server-start.sh config/server.properties
- #查看进程
- jps
(3) 创建 topic 进行测试
- # 执行脚本
- bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
(4) 查看 topic 列表
- # 执行脚本
- bin/kafka-topics.sh --list --zookeeper localhost:2181
输出: test
(5) 生产者消息测试
- # 执行脚本 (使用 kafka-console-producer.sh 发送消息)
- bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
(6) 消费者消息测试
- # 执行脚本 (使用 kafka-console-consumer.sh 接收消息并在终端打印)
- bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
4, 单机多 broker 集群配置
单机部署多个 broker, 不同的 broker, 设置不同的 id, 监听端口, 日志目录
- cp config/server.properties config/server-1.properties
- VIM server-1.properties
- #修改:
- broker.id=1
- port=9093
- log.dir=/tmp/kafka-logs-1
- #启动 Kafka 服务
- bin/kafka-server-start.sh config/server-1.properties &
5,java 代码实现生产者消费者
(1)maven 项目添加 kafka 依赖
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <version>2.3.0</version>
- </dependency>
(2)java 代码实现
- package com.server.kafka;
- import org.apache.kafka.clients.consumer.ConsumerConfig;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.apache.kafka.clients.consumer.ConsumerRecords;
- import org.apache.kafka.clients.consumer.KafkaConsumer;
- import org.apache.kafka.clients.producer.KafkaProducer;
- import org.apache.kafka.clients.producer.ProducerConfig;
- import org.apache.kafka.clients.producer.ProducerRecord;
- import org.apache.kafka.common.serialization.StringDeserializer;
- import org.apache.kafka.common.serialization.StringSerializer;
- import java.util.Collections;
- import java.util.Properties;
- import java.util.Random;
- public class KafakaExecutor {
- public static String topic = "test";
- public static void main(String[] args) {
- new Thread(()-> new Producer().execute()).start();
- new Thread(()-> new Consumer().execute()).start();
- }
- public static class Consumer {
- private void execute() {
- Properties p = new Properties();
- p.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.21.181:9092");
- p.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- p.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- p.put(ConsumerConfig.GROUP_ID_CONFIG, topic);
- KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(p);
- // 订阅消息
- kafkaConsumer.subscribe(Collections.singletonList(topic));
- while (true) {
- ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
- for (ConsumerRecord<String, String> record : records) {
- System.out.println(String.format("topic:%s,offset:%d, 消息:%s", //
- record.topic(), record.offset(), record.value()));
- }
- }
- }
- }
- public static class Producer {
- private void execute() {
- Properties p = new Properties();
- //kafka 地址, 多个地址用逗号分割
- p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.21.181:9092");
- p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(p);
- try {
- while (true) {
- String msg = "Hello," + new Random().nextInt(100);
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
- kafkaProducer.send(record);
- System.out.println("消息发送成功:" + msg);
- Thread.sleep(500);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- kafkaProducer.close();
- }
- }
- }
- }
(3) 测试结果 (上面使用脚本命令执行消费者的终端也会同步输出消息数据)
参考: https://www.cnblogs.com/frankdeng/p/9310684.html
加七哥微信: kinyseven, 来扯犊子啊
动动手, 关注一下, 扎心了
大道七哥
来源: https://www.cnblogs.com/jstarseven/p/11364852.html