1. 前言
2. Spring 中的消息框架
- 2.1 Spring Messaging
- 2.2 Spring Cloud Stream
3. spring-boot-starter 的实现
3.1 spring-boot-starter 的实现步骤
3.2 消息发送端实现
3.3. 消息消费端实现
4. 使用示例
4.1 RocketMQ 服务端的准备
4.2. 编译 rocketmq-spring-boot-starter
4.3. 编写客户端代码
5. 参考文档
RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
您对于源码的疑问每条留言都将得到认真回复. 甚至不知道如何读源码也可以请教噢.
新的源码解析文章实时收到通知. 每周更新一篇左右.
认真的源码交流微信群.
1. 前言
上世纪 90 年代末, 随着 Java EE(Enterprise Edition)的出现, 特别是 Enterprise Java Beans 的使用需要复杂的描述符配置和死板复杂的代码实现, 增加了广大开发者的学习曲线和开发成本, 由此基于简单的 xml 配置和普通 Java 对象 (Plain Old Java Objects) 的 Spring 技术应运而生, 依赖注入 (Dependency Injection), 控制反转(Inversion of Control) 和面向切面编程 (AOP) 的技术更加敏捷地解决了传统 Java 企业及版本的不足.
随着 Spring 的持续演进, 基于注解 (Annotation) 的配置逐渐取代了 xml 文件配置, 2014 年 4 月 1 日, Spring Boot 1.0.0 正式发布, 它基于 "约定大于配置"(Convention over configuration)这一理念来快速地开发, 测试, 运行和部署 Spring 应用, 并能通过简单地与各种启动器 (如 spring-boot-web-starter) 结合, 让应用直接以命令行的方式运行, 不需再部署到独立容器中. 这种简便直接快速构建和开发应用的过程, 可以使用约定的配置并且简化部署, 受到越来越多的开发者的欢迎.
Apache RocketMQ 是业界知名的分布式消息和流处理中间件, 简单地理解, 它由 Broker 服务器和客户端两部分组成:
其中客户端一个是消息发布者客户端(Producer), 它负责向 Broker 服务器发送消息;
另外一个是消息的消费者客户端(Consumer), 多个消费者可以组成一个消费组, 来订阅和拉取消费 Broker 服务器上存储的消息.
为了利用 Spring Boot 的快速开发和让用户能够更灵活地使用 RocketMQ 消息客户端, Apache RocketMQ 社区推出了 spring-boot-starter 实现. 随着分布式事务消息功能在 RocketMQ 4.3.0 版本的发布, 近期升级了相关的 spring-boot 代码, 通过注解方式支持分布式事务的回查和事务消息的发送.
本文将对当前的设计实现做一个简单的介绍, 读者可以通过本文了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节, 然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置, 发送和消费 RocketMQ 消息.
2. Spring 中的消息框架
顺便在这里讨论一下在 Spring 中关于消息的两个主要的框架, 即 Spring Messaging 和 Spring Cloud Stream. 它们都能够与 Spring Boot 整合并提供了一些参考的实现. 和所有的实现框架一样, 消息框架的目的是实现轻量级的消息驱动的微服务, 可以有效地简化开发人员对消息中间件的使用复杂度, 让系统开发人员可以有更多的精力关注于核心业务逻辑的处理.
2.1 Spring Messaging
Spring Messaging 是 Spring Framework 4 中添加的模块, 是 Spring 与消息系统集成的一个扩展性的支持. 它实现了从基于 JmsTemplate 的简单的使用 JMS 接口到异步接收消息的一整套完整的基础架构, Spring AMQP 提供了该协议所要求的类似的功能集. 在与 Spring Boot 的集成后, 它拥有了自动配置能力, 能够在测试和运行时与相应的消息传递系统进行集成.
单纯对于客户端而言, Spring Messaging 提供了一套抽象的 API 或者说是约定的标准, 对消息发送端和消息接收端的模式进行规定, 不同的消息中间件提供商可以在这个模式下提供自己的 Spring 实现: 在消息发送端需要实现的是一个 XXXTemplate 形式的 Java Bean, 结合 Spring Boot 的自动化配置选项提供多个不同的发送消息方法; 在消息的消费端是一个 XXXMessageListener 接口(实现方式通常会使用一个注解来声明一个消息驱动的 POJO), 提供回调方法来监听和消费消息, 这个接口同样可以使用 Spring Boot 的自动化选项和一些定制化的属性.
- <groupId>
- org.apache.rocketmq
- </groupId>
- <artifactId>
- spring-boot-starter-rocketmq
- </artifactId>
- <version>
- 1.0.0-SNAPSHOT
- </version>
- <dependencies>
- <!-- spring-boot-start internal depdencies -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <!-- rocketmq dependencies -->
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>rocketmq-client</artifactId>
- <version>${rocketmq-version}</version>
- </dependency>
- </dependencies>
- <dependencyManagement>
- <dependencies>
- <!-- spring-boot-start parent depdency definition -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>${spring.boot.version}</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
- <properties>
- <spring-boot-starter-rocketmq-version>1.0.0-SNAPSHOT</spring-boot-starter-rocketmq-version>
- </properties>
- <dependency>
- <groupId>org.apache.rocketmq</groupId>
- <artifactId>spring-boot-starter-rocketmq</artifactId>
- <version>${spring-boot-starter-rocketmq-version}</version>
- </dependency>
- # 定义 name-server 地址
- spring.rocketmq.name-server=localhost:9876
- # 定义发布者组名
- spring.rocketmq.producer.group=my-group1
- # 定义要发送的 topic
- spring.rocketmq.topic=string-topic
- import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
- ...
- @SpringBootApplication
- public class ProducerApplication implements CommandLineRunner {
- // 声明并引用 RocketMQTemplate
- @Resource
- private RocketMQTemplate rocketMQTemplate;
- // 使用 application.properties 里定义的 topic 属性
- @Value("${spring.rocketmq.springTopic}")
- private String springTopic;
- public static void main(String[] args){
- SpringApplication.run(ProducerApplication.class, args);
- }
- public void run(String... args) throws Exception {
- // 以同步的方式发送字符串消息给指定的 topic
- SendResult sendResult = rocketMQTemplate.syncSend(springTopic, "Hello, World!");
- // 打印发送结果信息
- System.out.printf("string-topic syncSend1 sendResult=%s %n", sendResult);
- }
- }
- # 定义 name-server 地址
- spring.rocketmq.name-server=localhost:9876
- # 定义发布者组名
- spring.rocketmq.consumer.group=my-customer-group1
- # 定义要发送的 topic
- spring.rocketmq.topic=string-topic
- @SpringBootApplication
- public class ConsumerApplication {
- public static void main(String[] args) {
- SpringApplication.run(ConsumerApplication.class, args);
- }
- }
- // 声明消费消息的类, 并在注解中指定, 相关的消费信息
- @Service
- @RocketMQMessageListener(topic = "${spring.rocketmq.topic}", consumerGroup = "${spring.rocketmq.consumer.group}")
- class StringConsumer implements RocketMQListener<String> {
- @Override
- public void onMessage(String message) {
- System.out.printf("------- StringConsumer received: %s %f", message);
- }
- }
来源: https://juejin.im/entry/5c7da018f265da2ddc3c9a15