案例代码: https://github.com/q279583842q/springcloud-e-book
在实际开发过程中, 服务与服务之间通信经常会使用到消息中间件, 而以往使用了哪个中间件比如 RabbitMQ, 那么该中间件和系统的耦合性就会非常高, 如果我们要替换为 Kafka 那么变动会比较大, 这时我们可以使用 SpringCloudStream 来整合我们的消息中间件, 来降低系统和中间件的耦合性.
一, 什么是 SpringCloudStream
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架.
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中 binder 交互, 通过我们配置来 binding , 而 Spring Cloud Stream 的 binder 负责与消息中间件交互. 所以, 我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式.
通过使用 Spring Integration 来连接消息代理中间件以实现消息事件驱动. Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现, 引用了发布 - 订阅, 消费组, 分区的三个核心概念. 目前仅支持 RabbitMQ,Kafka.
二, Stream 解决了什么问题?
Stream 解决了开发人员无感知的使用消息中间件的问题, 因为 Stream 对消息中间件的进一步封装, 可以做到代码层面对中间件的无感知, 甚至于动态的切换中间件 (rabbitmq 切换为 kafka), 使得微服务开发的高度解耦, 服务可以关注更多自己的业务流程
官网结构图
组成 | 说明 |
---|---|
Middleware | 中间件,目前只支持 RabbitMQ 和 Kafka |
Binder | Binder 是应用与消息中间件之间的封装,目前实行了 Kafka 和 RabbitMQ 的 Binder,通过 Binder 可以很方便的连接中间件,可以动态的改变消息类型 (对应于 Kafka 的 topic,RabbitMQ 的 exchange),这些都可以通过配置文件来实现 |
@Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的 < font color="red" ztid="68" ow="48" oh="14"> 消息接收 |
@EnableBinding | 指信道 channel 和 exchange 绑定在一起 |
三, 消息驱动入门案例
我们通过一个入门案例来演示下通过 stream 来整合 RabbitMQ 来实现消息的异步通信的效果, 所以首先要开启 RabbitMQ 服务, RabbitMQ 不清楚的请参考此文: https://dpb-bobokaoya-sm.blog.csdn.net/article/details/90409404
1. 创建消息发送者服务
1.1 创建项目
创建一个 SpringCloud 项目
1.2 pom 文件
pom 文件中重点是要添加 spring-cloud-starter-stream-rabbit 这个依赖
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>1.5.13.RELEASE</version>
- </parent>
- <groupId>com.bobo</groupId>
- <artifactId>stream-sender</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>Dalston.SR5</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-eureka</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
1.3 配置文件
配置文件中除了必要的服务名称, 端口和 Eureka 的信息外我们还要添加 RabbitMQ 的注册信息
- spring.application.name=stream-sender
- server.port=9060
- # 设置服务注册中心地址, 指向另一个注册中心
- eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
- #rebbitmq 链接信息
- spring.rabbitmq.host=192.168.88.150
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=dpb
- spring.rabbitmq.password=123
- spring.rabbitmq.virtualHost=/
1.4 创建消费发送者接口
创建一个发送消息的接口. 具体如下: 方法名称自定义, 返回类型必须是 SubscribableChannel, 在 Output 注解中指定交换器名称.
- /**
- * 发送消息的接口
- * @author dengp
- *
- */
- public interface ISendeService {
- /**
- * 指定输出的交换器名称
- * @return
- */
- @Output("dpb-exchange")
- SubscribableChannel send();
- }
1.5 启动类
在启动类中通过 @EnableBinding 注解绑定我们创建的接口类.
- @SpringBootApplication
- @EnableEurekaClient
- // 绑定我们刚刚创建的发送消息的接口类型
- @EnableBinding(value={ISendeService.class})
- public class StreamSenderStart {
- public static void main(String[] args) {
- SpringApplication.run(StreamSenderStart.class, args);
- }
- }
2. 创建消息消费者服务
2.1 创建项目
2.2 pom 文件
添加的依赖和发送消息的服务是一致的
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>1.5.13.RELEASE</version>
- </parent>
- <groupId>com.bobo</groupId>
- <artifactId>stream-receiver</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>Dalston.SR5</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-Web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-eureka</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
- </project>
2.3 配置文件
注意修改服务名称和端口
- spring.application.name=stream-receiver
- server.port=9061
- # 设置服务注册中心地址, 指向另一个注册中心
- eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/
- #rebbitmq 链接信息
- spring.rabbitmq.host=192.168.88.150
- spring.rabbitmq.port=5672
- spring.rabbitmq.username=dpb
- spring.rabbitmq.password=123
- spring.rabbitmq.virtualHost=/
2.4 创建接收消息的接口
此接口和发送消息的接口相似, 注意使用的是 @Input 注解.
- /**
- * 接收消息的接口
- * @author dengp
- *
- */
- public interface IReceiverService {
- /**
- * 指定接收的交换器名称
- * @return
- */
- @Input("dpb-exchange")
- SubscribableChannel receiver();
- }
2.5 创建处理消息的处理类
注意此类并不是实现上面创建的接口. 而是通过 @EnableBinding 来绑定我们创建的接口, 同时通过 @StreamListener 注解来监听 dpb-exchange 对应的消息服务
- /**
- * 具体接收消息的处理类
- * @author dengp
- *
- */
- @Service
- @EnableBinding(IReceiverService.class)
- public class ReceiverService {
- @StreamListener("dpb-exchange")
- public void onReceiver(byte[] msg){
- System.out.println("消费者:"+new String(msg));
- }
- }
2.6 启动类
同样要添加 @EnableBinding 注解
- @SpringBootApplication
- @EnableEurekaClient
- @EnableBinding(value={IReceiverService.class})
- public class StreamReceiverStart {
- public static void main(String[] args) {
- SpringApplication.run(StreamReceiverStart.class, args);
- }
- }
3. 编写测试代码
通过单元测试来测试服务.
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.messaging.Message;
- import org.springframework.messaging.support.MessageBuilder;
- import org.springframework.test.context.junit4.SpringRunner;
- import com.bobo.stream.StreamSenderStart;
- import com.bobo.stream.sender.ISendeService;
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes=StreamSenderStart.class)
- public class StreamTest {
- @Autowired
- private ISendeService sendService;
- @Test
- public void testStream(){
- String msg = "hello stream ...";
- // 将需要发送的消息封装为 Message 对象
- Message message = MessageBuilder
- .withPayload(msg.getBytes())
- .build();
- sendService.send().send(message );
- }
- }
启动消息消费者后, 执行测试代码. 结果如下:
消息接收者获取到了发送者发送的消息, 同时我们在 RabbitMQ 的 Web 界面也可以看到相关的信息
总结
我们同 stream 实现了消息中间件的使用, 我们发现只有在两处地址和 RabbitMQ 有耦合, 第一处是 pom 文件中的依赖, 第二处是 application.properties 中的 RabbitMQ 的配置信息, 而在具体的业务处理中并没有出现任何 RabbitMQ 相关的代码, 这时如果我们要替换为 Kafka 的话我们只需要将这两处换掉即可, 即实现了中间件和服务的高度解耦.
posted on 2019-06-28 17:31 ゞ . 邓澎波 阅读 (...) 评论 (...) 编辑 收藏
来源: https://www.cnblogs.com/dengpengbo/p/11103943.html