下载kafka
下载地址:点击打开链接,不要下错了,src后缀的源码,我们需要的是另外一个,里面包含了kafka需要的.jar包,如果你对源码感兴趣可以下源码下来研究一下.
在以下操作之前请确保java环境配置好。
安装kafka
解压下载文件:tar xvf kafka_2.11-1.0.0.tar,进入解压目录cd kafka_2.11-1.0.0.tar
启动zookeeper
Kafka使用zookeeper作为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一起。同时借助zookeeper,kafka能够生产者、消费者和broker在内的所以组件在无状态的情况下,建立起生产者和消费者的订阅关系,并实现生产者与消费者的负载均衡。因此我们在启动kafka前需要先启动zookeeper。在我们刚刚下载的kafka中已经打包了zookeeper,我们可以使用kafka打包好的zookeeper。当然我们也可以单独下载配置zookeeper,这里不再叙述。
首先我们配置环境变量(我的环境ubuntu-12.04):
打开/etc/profile,添加 export KAFKA_HOME=/usr/local/kafka(具体路径根据自己实际路径而定)
添加path,export PATH=/usr/local/cmake-3.6.1-Linux-x86_64/bin:/usr/local/go/bin:$JAVA_HOME/bin:$JRE_HOME/bin:$KAFKA_HOME/bin:$PATH
执行:zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties &,启动zookeeper(可能会有些文件目录没有权限,chmod赋予权限)
ps aux | grep zookeeper,检测是否启动.
启动kafka
执行kafka-server-start.sh /usr/local/kafka/config/server.properties&,启动kafka
ps
aux | grep kafka,检测是否启动.
创建一个主题名为test,一个分区一个备份
kafka-topics.sh --create --zookeeper localhost:2181 --replication 1 --partitions 1 --topic test
创建好之后,可以通过运行以下命令,查看已创建的topic信息:
向我们创建好的kafka主题发送消息
kafka-console-producer.sh --broker-list localhost:9092 --topic test ,然后再控制台输入字符串发送消息。
新建一个控制台,作为消费者接受消息
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning,我们可以看到控制台显示了我们刚刚键入的消息
kafka是一个分布式消息队列,我们可以创建kafka集群
创建新的配置文件,修改broker id,修改监听端口
cp server.properties server-1.properties
cp server.properties server-2.properties
现在编辑这些新建的文件,设置以下属性:
config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2
kafka-server-start.sh
/usr/local/kafka/config/server-1.properties &
kafka-server-start.sh /usr/local/kafka/config/server-2.properties &启动新的broker
接下来我们新建一个topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-new-topic
查看当前kafka集群状态
kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-new-topic
leader:该节点负责所有指定分区的读和写,每个节点的领导都是随机选择的。
replicas:备份的节点,无论该节点是否是leader或者目前是否还活着,只是显示。
isr:备份节点的集合,也就是活着的节点集合。
查看我们最初kafka集群中的节点
kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
我们可以看到replicas 备份节点为0(我们并没有配置备份节点)
给我们的新broker增加生产者消费者
kafka-console-producer.sh --broker-list localhost:9092 --topic my-new-topic
kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-new-topic --from-beginning
最后我们用my-new-topic测试集群的容错性,我们kill掉其中一个broker
ps aux | grep server-1.properties ,kill -9 12346
再次执行 kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-new-topic --from-beginning
我们依旧可以看到之前的消息。
同时我们再次查看新的broker的集群状态,
kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-new-topic
我们可以看到Isr的备份节点少了一个.
来源: http://blog.csdn.net/d_guco/article/details/78701720