前提
通过前面两篇文章可以简单的了解 RocketMQ 和 安装 RocketMQ , 今天就将 SpringBoot 和 RocketMQ 整合起来使用
1SpringBoot Kafka 整合使用
2SpringBoot RabbitMQ 整合使用
3SpringBoot ActiveMQ 整合使用
4Kafka 安装及快速入门
5SpringBoot RabbitMQ 整合进阶版
6RocketMQ 初探
7RocketMQ 安装及快速入门
关注我
转载请务必注明原创地址为: http://www.54tianzhisheng.cn/2018/02/07/SpringBoot-RocketMQ/
创建项目
在 IDEA 创建一个 SpringBoot 项目, 项目结构如下:
POM 文件
引入 RocketMQ 的一些相关依赖, 最后的 pom 文件如下:
- 4.0.0
- com.zhisheng
- rocketmq
- 0.0.1-SNAPSHOT
- jar
- rocketmq
- Demo project for Spring Boot RocketMQ
- org.springframework.boot
- spring-boot-starter-parent
- 1.5.9.RELEASE
- UTF-8
- UTF-8
- 1.8
- org.springframework.boot
- spring-boot-starter-web
- org.springframework.boot
- spring-boot-starter-test
- test
- org.apache.rocketmq
- rocketmq-common
- 4.2.0
- org.apache.rocketmq
- rocketmq-client
- 4.2.0
- org.springframework.boot
- spring-boot-maven-plugin
配置文件
application.properties 中如下:
- # 消费者的组名
- apache.rocketmq.consumer.PushConsumer=PushConsumer
- # 生产者的组名
- apache.rocketmq.producer.producerGroup=Producer
- # NameServer 地址
- apache.rocketmq.namesrvAddr=localhost:9876
生产者
- package com.zhisheng.rocketmq.client;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import org.springframework.util.StopWatch;
- import javax.annotation.PostConstruct;
- /**
- * Created by zhisheng_tian on 2018/2/6
- */
- @Component
- public class RocketMQClient {
- /**
- * 生产者的组名
- */
- @Value("${apache.rocketmq.producer.producerGroup}")
- private String producerGroup;
- /**
- * NameServer 地址
- */
- @Value("${apache.rocketmq.namesrvAddr}")
- private String namesrvAddr;
- @PostConstruct
- public void defaultMQProducer() {
- // 生产者的组名
- DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
- // 指定 NameServer 地址, 多个地址以 ; 隔开
- producer.setNamesrvAddr(namesrvAddr);
- try {
- /**
- * Producer 对象在使用之前必须要调用 start 初始化, 初始化一次即可
- * 注意: 切记不可以在每次发送消息时, 都调用 start 方法
- */
- producer.start();
- // 创建一个消息实例, 包含 topictag 和 消息体
- // 如下: topic 为 "TopicTest",tag 为 "push"
- Message message = new Message("TopicTest", "push", "发送消息 ----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
- StopWatch stop = new StopWatch();
- stop.start();
- for (int i = 0; i < 10000; i++) {
- SendResult result = producer.send(message);
- System.out.println("发送响应: MsgId:" + result.getMsgId() + ", 发送状态:" + result.getSendStatus());
- }
- stop.stop();
- System.out.println("---------------- 发送一万条消息耗时:" + stop.getTotalTimeMillis());
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- producer.shutdown();
- }
- }
- }
消费者
- package com.zhisheng.rocketmq.server;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- /**
- * Created by zhisheng_tian on 2018/2/6
- */
- @Component public class RocketMQServer {
- /**
- * 消费者的组名
- */
- @Value("${apache.rocketmq.consumer.PushConsumer}") private String consumerGroup;
- /**
- * NameServer 地址
- */
- @Value("${apache.rocketmq.namesrvAddr}") private String namesrvAddr;@PostConstruct public void defaultMQPushConsumer() {
- // 消费者的组名
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
- // 指定 NameServer 地址, 多个地址以 ; 隔开
- consumer.setNamesrvAddr(namesrvAddr);
- try {
- // 订阅 PushTopic 下 Tag 为 push 的消息
- consumer.subscribe("TopicTest", "push");
- // 设置 Consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费
- // 如果非第一次启动, 那么按照上次消费的位置继续消费
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.registerMessageListener((MessageListenerConcurrently)(list, context) - >{
- try {
- for (MessageExt messageExt: list) {
- System.out.println("messageExt:" + messageExt); // 输出消息内容
- String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
- System.out.println("消费响应: msgId :" + messageExt.getMsgId() + ", msgBody :" + messageBody); // 输出消息内容
- }
- } catch(Exception e) {
- e.printStackTrace();
- return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 稍后再试
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费成功
- });
- consumer.start();
- } catch(Exception e) {
- e.printStackTrace();
- }
- }
- }
启动类
- package com.zhisheng.rocketmq;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- @SpringBootApplication
- public class RocketmqApplication {
- public static void main(String[] args) {
- SpringApplication.run(RocketmqApplication.class, args);
- }
- }
- RocketMQ
代码已经都写好了, 接下来我们需要将与 RocketMQ 有关的启动起来
启动 NAME SERVER
在前面文章中已经写过怎么启动, http://www.54tianzhisheng.cn/2018/02/06/RocketMQ-install/#启动-NameServer
进入到目录 :
cd distribution / target / apache - rocketmq
启动:
nohup sh bin / mqnamesrv & tail - f~ / logs / rocketmqlogs / namesrv.log // 通过日志查看是否启动成功
启动 BROKER
nohup sh bin / mqbroker - n localhost: 9876 & tail - f~ / logs / rocketmqlogs / broker.log // 通过日志查看是否启动成功
然后运行启动类, 运行效果如下:
监控
RocketMQ 有一个对其扩展的开源项目 ocketmq-console , 如今也提交给了 Apache , 地址在: https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console , 官方也给出了其支持的功能的中文文档: https://github.com/apache/rocketmq-externals/blob/master/rocketmq-console/doc/1_0_0/UserGuide_CN.md , 那么该如何安装?
DOCKER 安装
1 获取 Docker 镜像
docker pull styletang / rocketmq - console - ng
2 运行, 注意将你自己的 NameServer 地址替换下面的 127.0.0.1
docker run - e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" - p 8080 : 8080 - t styletang / rocketmq - console - ng
非 DOCKER 安装
我们 git clone 一份代码到本地:
- git clone https: //github.com/apache/rocketmq-externals.git
- cd rocketmq - externals / rocketmq - console /
需要 jdk 1.7 以上 执行以下命令:
mvn spring-boot:run
或者
- mvn clean package -Dmaven.test.skip=true
- java -jar target/rocketmq-console-ng-1.0.0.jar
注意:
1 如果你下载依赖缓慢, 你可以重新设置 maven 的 mirror 为阿里云的镜像
- alimaven
- aliyun maven
- http://maven.aliyun.com/nexus/content/groups/public/
- central
2 如果你使用的 RocketMQ 版本小于 3.5.8, 如果您使用 rocketmq < 3.5.8, 请在启动 rocketmq-console-ng 时添加
- Dcom.rocketmq.sendMessageWithVIPChannel = false
(或者您可以在 ops 页面中更改它)
3 更改 resource / application.properties 中的 rocketmq.config.namesrvAddr(或者可以在 ops 页面中更改它)
错误解决方法
1Docker 启动项目报错
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to < null > failed
将 Docker 启动命令改成如下以后:
docker run - e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Drocketmq.config.isVIPChannel=false" - p 8080 : 8080 - t styletang / rocketmq - console - ng
报错信息改变了, 新的报错信息如下:
- ERROR op=global_exception_handler_print_error
- org.apache.rocketmq.console.exception.ServiceException: This date have't data!
看到网上有人也遇到这个问题, 他们都通过自己的方式解决了, 但是方法我都试了, 不适合我不得不说, 阿里, 你能再用心点吗? 既然把 RocketMQ 捐给 Apache 了, 这些文档啥的都必须更新啊, 不要还滞后着呢, 不然少不了被吐槽!
搞了很久这种方法没成功, 暂时放弃! mmp
2 非 Docker 安装, 只好把源码编译打包了
1) 注意需要修改如下图中的配置:
- rocketmq.config.namesrvAddr = localhost: 9876 // 注意替换你自己的 ip
- # 如果你 rocketmq 版本小于 3.5.8 才需设置 `rocketmq.config.isVIPChannel` 为 false, 默认是 true,
这个可以在源码中可以看到的 rocketmq.config.isVIPChannel =
2) 执行以下命令:
mvn clean package - Dmaven.test.skip = true
编译成功:
可以看到已经打好了 jar 包:
运行:
java - jar rocketmq - console - ng - 1.0.0.jar
成功, 不报错了, 开心, 访问 http://localhost:8080/
整个监控大概就是这些了
然后我运行之前的 SpringBoot 整合项目, 查看监控信息如下:
总结
整篇文章讲述了 SpringBoot 与 RocketMQ 整合和 RocketMQ 监控平台的搭建
来源: http://www.54tianzhisheng.cn/2018/02/07/SpringBoot-RocketMQ/