前提
创建项目
监控
错误解决方法
总结
参考文章
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复. 甚至不知道如何读源码也可以请教噢.
新的源码解析文章实时收到通知. 每周更新一篇左右.
认真的源码交流微信群.
前提
通过前面两篇文章可以简单的了解 RocketMQ 和 安装 RocketMQ , 今天就将 SpringBoot 和 RocketMQ 整合起来使用.
创建项目
在 IDEA 创建一个 SpringBoot 项目, 项目结构如下:
pom 文件
引入 RocketMQ 的一些相关依赖, 最后的 pom 文件如下:
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <groupId>com.zhisheng</groupId>
- <artifactId>rocketmq</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
- <name>rocketmq</name>
- <description>Demo project for Spring Boot RocketMQ</description>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>1.5.9.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <java.version>1.8</java.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-common</artifactId>
- <version>4.2.0</version>
- </dependency>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>4.2.0</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
配置文件
application.properties 中如下:
- # 消费者的组名
- apache.rocketmq.consumer.PushConsumer=PushConsumer
- # 生产者的组名
- apache.rocketmq.producer.producerGroup=Producer
- # NameServer 地址
- apache.rocketmq.namesrvAddr=localhost:9876
生产者
- package com.zhisheng.rocketmq.client;
- import org.apache.rocketmq.client.producer.DefaultMQProducer;
- import org.apache.rocketmq.common.message.Message;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import org.springframework.util.StopWatch;
- import javax.annotation.PostConstruct;
- /**
- * Created by zhisheng_tian on 2018/2/6
- */
- @Component
- public class RocketMQClient {
- /**
- * 生产者的组名
- */
- @Value("${apache.rocketmq.producer.producerGroup}")
- private String producerGroup;
- /**
- * NameServer 地址
- */
- @Value("${apache.rocketmq.namesrvAddr}")
- private String namesrvAddr;
- @PostConstruct
- public void defaultMQProducer() {
- // 生产者的组名
- DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
- // 指定 NameServer 地址, 多个地址以 ; 隔开
- producer.setNamesrvAddr(namesrvAddr);
- try {
- /**
- * Producer 对象在使用之前必须要调用 start 初始化, 初始化一次即可
- * 注意: 切记不可以在每次发送消息时, 都调用 start 方法
- */
- producer.start();
- // 创建一个消息实例, 包含 topic,tag 和 消息体
- // 如下: topic 为 "TopicTest",tag 为 "push"
- Message message = new Message("TopicTest", "push", "发送消息 ----zhisheng-----".getBytes(RemotingHelper.DEFAULT_CHARSET));
- StopWatch stop = new StopWatch();
- stop.start();
- for (int i = 0; i <10000; i++) {
- SendResult result = producer.send(message);
- System.out.println("发送响应: MsgId:" + result.getMsgId() + ", 发送状态:" + result.getSendStatus());
- }
- stop.stop();
- System.out.println("---------------- 发送一万条消息耗时:" + stop.getTotalTimeMillis());
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- producer.shutdown();
- }
- }
- }
消费者
- package com.zhisheng.rocketmq.server;
- import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
- import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
- import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
- import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.remoting.common.RemotingHelper;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import javax.annotation.PostConstruct;
- /**
- * Created by zhisheng_tian on 2018/2/6
- */
- @Component
- public class RocketMQServer {
- /**
- * 消费者的组名
- */
- @Value("${apache.rocketmq.consumer.PushConsumer}")
- private String consumerGroup;
- /**
- * NameServer 地址
- */
- @Value("${apache.rocketmq.namesrvAddr}")
- private String namesrvAddr;
- @PostConstruct
- public void defaultMQPushConsumer() {
- // 消费者的组名
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
- // 指定 NameServer 地址, 多个地址以 ; 隔开
- consumer.setNamesrvAddr(namesrvAddr);
- try {
- // 订阅 PushTopic 下 Tag 为 push 的消息
- consumer.subscribe("TopicTest", "push");
- // 设置 Consumer 第一次启动是从队列头部开始消费还是队列尾部开始消费
- // 如果非第一次启动, 那么按照上次消费的位置继续消费
- consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {
- try {
- for (MessageExt messageExt : list) {
- System.out.println("messageExt:" + messageExt);// 输出消息内容
- String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
- System.out.println("消费响应: msgId :" + messageExt.getMsgId() + ", msgBody :" + messageBody);// 输出消息内容
- }
- } catch (Exception e) {
- e.printStackTrace();
- return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 稍后再试
- }
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费成功
- });
- consumer.start();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
启动类
- package com.zhisheng.rocketmq;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- @SpringBootApplication
- public class RocketmqApplication {
- public static void main(String[] args) {
- SpringApplication.run(RocketmqApplication.class, args);
- }
- }
- RocketMQ
代码已经都写好了, 接下来我们需要将与 RocketMQ 有关的启动起来.
启动 Name Server
在前面文章中已经写过怎么启动,
进入到目录 :
cd distribution/target/apache-rocketmq
启动:
- nohup sh bin/mqnamesrv &
- tail -f ~/logs/rocketmqlogs/namesrv.log // 通过日志查看是否启动成功
启动 Broker
- nohup sh bin/mqbroker -n localhost:9876 &
- tail -f ~/logs/rocketmqlogs/broker.log // 通过日志查看是否启动成功
然后运行启动类, 运行效果如下:
监控
RocketMQ 有一个对其扩展的开源项目 ocketmq-console , 如今也提交给了 Apache , 地址在: GitHub.com/apache/rock... , 官方也给出了其支持的功能的中文文档: GitHub.com/apache/rock... , 那么该如何安装?
Docker 安装
1, 获取 Docker 镜像
docker pull styletang/rocketmq-console-ng
2, 运行, 注意将你自己的 NameServer 地址替换下面的 127.0.0.1
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=127.0.0.1:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng
非 Docker 安装
我们 Git clone 一份代码到本地:
- Git clone https://github.com/apache/rocketmq-externals.git
- cd rocketmq-externals/rocketmq-console/
需要 jdk 1.7 以上. 执行以下命令:
mvn spring-boot:run
或者
- mvn clean package -Dmaven.test.skip=true
- java -jar target/rocketmq-console-ng-1.0.0.jar
注意:
1, 如果你下载依赖缓慢, 你可以重新设置 maven 的 mirror 为阿里云的镜像
- <mirrors>
- <mirror>
- <id>alimaven</id>
- <name>aliyun maven</name>
- <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
- <mirrorOf>central</mirrorOf>
- </mirror>
- </mirrors>
2, 如果你使用的 RocketMQ 版本小于 3.5.8, 如果您使用 rocketmq <3.5.8, 请在启动 rocketmq-console-ng 时添加 -Dcom.rocketmq.sendMessageWithVIPChannel = false(或者您可以在 ops 页面中更改它)
3, 更改 resource / application.properties 中的 rocketmq.config.namesrvAddr(或者可以在 ops 页面中更改它)
错误解决方法
1,Docker 启动项目报错
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <null> failed
将 Docker 启动命令改成如下以后:
docker run -e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876 -Drocketmq.config.isVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng
报错信息改变了, 新的报错信息如下:
- ERROR op=global_exception_handler_print_error
- org.apache.rocketmq.console.exception.ServiceException: This date have't data!
看到网上有人也遇到这个问题, 他们都通过自己的方式解决了, 但是方法我都试了, 不适合我. 不得不说, 阿里, 你能再用心点吗? 既然把 RocketMQ 捐给 Apache 了, 这些文档啥的都必须更新啊, 不要还滞后着呢, 不然少不了被吐槽!
搞了很久这种方法没成功, 暂时放弃! mmp
2, 非 Docker 安装, 只好把源码编译打包了.
注意需要修改如下图中的配置:
- rocketmq.config.namesrvAddr=localhost:9876 // 注意替换你自己的 ip
- # 如果你 rocketmq 版本小于 3.5.8 才需设置 `rocketmq.config.isVIPChannel` 为 false, 默认是 true, 这个可以在源码中可以看到的
- rocketmq.config.isVIPChannel=
执行以下命令:
mvn clean package -Dmaven.test.skip=true
编译成功:
可以看到已经打好了 jar 包:
运行:
java -jar rocketmq-console-ng-1.0.0.jar
成功, 不报错了, 开心, 访问 http://localhost:8080/
整个监控大概就是这些了.
然后我运行之前的 SpringBoot 整合项目, 查看监控信息如下:
总结
整篇文章讲述了 SpringBoot 与 RocketMQ 整合和 RocketMQ 监控平台的搭建.
参考文章
1,www.ymq.io/2018/02/02/...
2,GitHub 官方 README
来源: https://juejin.im/entry/5c7c4e98f265da2ddb2980eb