官网:kafka.apache.org 框架简介
Apache Kafka 是分布式发布 - 订阅消息系统.它最初由 LinkedIn 公司开发,之后成为 Apache 项目的一部分.Kafka 是一种快速,可扩展的,设计内在就是分布式的,分区的和可复制的提交日志服务.
相关概念 ** 生产者
提供数据源生产的地方,对于同一个 topic,生产者只能有一个,这样可以确保同一个 topic 数据来自同一个业务数据,支持多并发
** 消费者
消费数据的客户端,对于同一个 topic,可以有多个消费者,比如 spark,storm 等等
** Broker
消息中间件处理结点,一个 Kafka 节点就是一个 broker,多个 broker 可以组成一个 Kafka 集群.
** Topic
同一类消息的统称,Kafka 集群能够同时负载多个 topic 分发.
** Partition
topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列,同一个 topic 里面的数据是存放在不同的分区中.
** Replication
每个分区或者 topic 都是有副本的,副本的数量也是可以在创建 topic 的时候就指定好,保证数据的安全性,以及提供高并发读取效率.
** Segment
partition 物理上由多个 segment 组成
** Offset
每个 partition 都由一系列有序的,不可变的消息组成,这些消息被连续的追加到 partition 中.partition 中的每个消息都有一个连续的序列号叫做 offset,用于 partition 唯一标识一条消息
框架特色
** 同时为发布和订阅提供高吞吐量.Kafka 每秒可以生产约 25 万消息(约 50 MB),每秒处理 55 万消息(约 110 MB).
** 可进行持久化操作.将消息持久化到磁盘,因此可用于批量消费,例如 ETL,以及实时应用程序.通过将数据持久化到硬盘以及 replication 防止数据丢失.
** 分布式系统,易于向外扩展.所有的 producer,broker 和 consumer 都会有多个,均为分布式的.无需停机即可扩展机器.
** 消息被处理的状态是在 consumer 端维护,而不是由 server 端维护.当失败时能自动平衡.
架构图
* 框架部署
** 相关下载
kafka 以及 scala:链接:http://pan.baidu.com/s/1pLBFJf1 密码:seto
** 解压 Kafka 以及 scala
** 安装 JDK 并配置环境变量
$ tar -zxf kafka_2.10-0.8.2.1.tgz -C /opt/modules/cdh/
$ tar -zxf scala-2.10.4.tgz -C /opt/modules/cdh/
不再赘述
** 安装并启动 zookeeper
在 zookeeper 的根目录下:
$ bin/zkServer.sh start
** 配置 scala 环境变量
(注意以上两条语句的执行用户)
# vi /etc/profile
$ source /etc/profile
添加如下:
使用命令检查 scala 配置是否正确:
##SCALA_HOME
SCALA_HOME=/opt/modules/cdh/scala-2.10.4
export PATH=$PATH:$SCALA_HOME/bin
$ scala -version,如图:
** 修改 Kafka 配置文件 server.properties 修改为如下:producer.properties 变动内容如下:consumer.properties 变动内容如下:** 启动 Kafka
$ bin/kafka-server-start.sh config/server.properties
** 创建 Topic
$ bin/kafka-topics.sh --create --zookeeper z01:2181 --replication-factor 1 --partitions 1 --topic testTopic
** 启动生产者
$ bin/kafka-console-producer.sh --broker-list z01:9092 --topic testTopic
** 启动消费者
$ bin/kafka-console-consumer.sh --zookeeper z01:2181 --topic testTopic --from-beginning
在生产者窗口输入数据,在消费者窗口查看数据,测试如图:
消费者:
生产者:
* 整合测试
使用 flume+kafka 整合测试
** 配置 flume
原来我们配置 flume,是在 tomcat 所在机器节点开启了一个 flume 收集日志,并直接上传到 HDFS,如果集群中存在多个机器节点,则势必导致对 HDFS 集群占用率过高,所以在面临多个 flume 集群时,一般会采用 1~2 个单独的 flume 节点来收集另外 flume 节点的日志,相当于弄了一个中转站,由中转站收集其他 flume,再统一放置到 HDFS 系统中,此刻我们采用方案 2,原理如图:
背景:在一台机器上开两个 flume,分别收集 tomcat 日志和 hive 日志,这两者的日志信息分别输入到中间层 flume(这个中间层 flume 也模拟在同一个机器节点上),然后中间层 flume 在将数据写入到 HDFS.
首先检查一下 hive 的 conf 目录下的 hive-log4j.properties 配置中,是否已经指定了 hive 的日志目录,如果没有,请指定,如图:
涉及 flume 文件:以下文件存在于 flume 的 conf 目录下,如果不存在,请自行创建即可.
flume-apache-log-kafka.confflume-hive-log-kafka.confflume-connector-kafka.conf
依次启动:
测试后如图,即可发现,日志在 HDFS 和 kafka 中都已经显示出来:
a4:
$ bin/flume-ng agent --conf conf/ --name a4 --conf-file conf/flume-connector-kafka.conf
a3:
$ bin/flume-ng agent --conf conf/ --name a3 --conf-file conf/flume-hive-log-kafka.conf
a2:
$ bin/flume-ng agent --conf conf/ --name a2 --conf-file conf/flume-apache-log-kafka.conf
个人微博:http://weibo.com/seal13
QQ 大数据技术交流群(广告勿入):476966007
来源: http://www.bubuko.com/infodetail-2459049.html