1. 概述
在本文中, 我们将向您介绍 Spring Cloud Stream https://spring.io/projects/spring-cloud-stream , 这是一个用于构建消息驱动的微服务应用程序的框架, 这些应用程序由一个常见的消息传递代理 (如 RabbitMQ https://www.rabbitmq.com/ ,Apache Kafka https://kafka.apache.org/ 等) 连接.
Spring Cloud Stream 构建在现有 Spring 框架 (如 Spring Messaging https://spring.io/guides/gs/messaging-jms/ 和 Spring Integration https://spring.io/projects/spring-integration ) 之上. 尽管这些框架经过了实战测试, 工作得非常好, 但是实现与使用的 message broker 紧密耦合. 此外, 有时对某些用例进行扩展是困难的.
Spring Cloud Stream 背后的想法是一个非常典型的 Spring Boot 概念 -- 抽象地讲, 让 Spring 根据配置和依赖关系管理在运行时找出实现自动注入. 这意味着您可以通过更改依赖项和配置文件来更改 message broker. 可以在这里找到目前已经支持的各种消息代理.
本文将使用 RabbitMQ 作为 message broker. 在此之前, 让我们了解一下 broker(代理)的一些基本概念, 以及为什么要在面向微服务的体系架构中需要它.
2. 微服务中的消息
在微服务体系架构中, 我们有许多相互通信以完成请求的小型应用程序 - 它们的主要优点之一是改进了的可伸缩性. 一个请求从多个下游微服务传递到完成是很常见的. 例如, 假设我们有一个 Service-A 内部调用 Service-B 和 Service-C 来完成一个请求:
[外链图片转存失败(img-jzvHHRXw-1562549429195)()]
是的, 还会有其他组件, 比如 Spring Cloud Eureka,Spring Cloud Zuul 等等, 但我们还是专注关心这类架构的特有问题.
假设由于某种原因 Service-B 需要更多的时间来响应. 也许它正在执行 I/O 操作或长时间的 DB 事务, 或者进一步调用其它导致 Service-B 变得更慢的服务, 这些都使其无法更具效率.
现在, 我们可以启动更多的 Service-B 实例来解决这个问题, 这样很好, 但是 Service-A 实际上是响应很快的, 它需要等待 Service-B 的响应来进一步处理. 这将导致 Service-A 无法接收更多的请求, 这意味着我们还必须启动 Service-A 的多个实例.
另一种方法解决类似情况的是使用事件驱动的微服务体系架构. 这基本上意味着 Service-A 不直接通过 HTTP 调用 Service-B 或 Service-C, 而是将请求或事件发布给 message broker(消息代理).Service-B 和 Service-C 将成为 message broker(消息代理)上此事件的订阅者.
与依赖 HTTP 调用的传统微服务体系架构相比, 这有许多优点:
提高可伸缩性和可靠性 -- 现在我们知道哪些服务是整个应用程序中的真正瓶颈.
鼓励松散耦合 --Service-A 不需要了解 Service-B 和 Service-C. 它只需要连接到 message broker 并发布事件. 事件如何进一步编排取决于代理设置. 通过这种方式, Service-A 可以独立地运行, 这是微服务的核心概念之一.
与遗留系统交互 -- 通常我们不能将所有东西都移动到一个新的技术堆栈中. 我们仍然必须使用遗留系统, 虽然速度很慢, 但是很可靠.
3. RabbitMQ
高级消息队列协议(AMQP) https://www.amqp.org/ 是 RabbitMQ 用于消息传递的协议. 虽然 RabbitMQ 支持其他一些协议, 但是 AMQP 由于兼容性和它提供的大量特性而更受欢迎.
3.1 RabbitMQ 架构设计
因此发布者将消息发布到 RabbitMQ 中称为 Exchange(交换器).Exchange(交换器)接收消息并将其路由到一个或多个 Queues(队列). 路由算法依赖于 Exchange(交换器)类型和 routing(路由)key/header(与消息一起传递). 将 Exchange(交换器)连接到 Queues(队列)的这些规则称为 bindings(绑定).
绑定可以有 4 种类型:
Direct: 它根据 routing key(路由键)将 Exchange(交换器)类型直接路由到特定的 Queues(队列).
Fanout: 它将消息路由到绑定 Exchange(交换器)中的所有 Queues(队列).
Topic: 它根据完全匹配或部分据 routing key(路由键)匹配将消息路由到 (0,1 或更多) 的 Queues(队列).
Headers: 它类似于 Topic(主题)交换类型, 但是它是基 routing header(路由头)而不是 routing key(路由键)来路由的.
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
- </dependency>
- interface HelloBinding {
- @Output("greetingChannel")
- MessageChannel greeting();
- }
- @RestController
- public class ProducerController {
- private MessageChannel greet;
- public ProducerController(HelloBinding binding) {
- greet = binding.greeting();
- }
- @GetMapping("/greet/{name}")
- public void publish(@PathVariable String name) {
- String greeting = "Hello," + name + "!";
- Message<String> msg = MessageBuilder.withPayload(greeting)
- .build();
- this.greet.send(msg);
- }
- }
- @EnableBinding(HelloBinding.class)
- @SpringBootApplication
- public class Application {
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- }
- spring.rabbitmq.addresses=<amqp url>
- spring.cloud.stream.bindings.greetingChannel.destination = greetings
- server.port=8080
- public interface HelloBinding {
- String GREETING = "greetingChannel";
- @Input(GREETING)
- SubscribableChannel greeting();
- }
- @EnableBinding(HelloBinding.class)
- public class HelloListener {
- @StreamListener(target = HelloBinding.GREETING)
- public void processHelloChannelGreeting(String msg) {
- System.out.println(msg);
- }
- }
- @SpringBootApplication
- public class Application {
- public static void main(String[] args) {
- SpringApplication.run(Application.class, args);
- }
- }
- spring.rabbitmq.addresses=<amqp url>
- spring.cloud.stream.bindings.greetingChannel.destination=greetings
- server.port=9090
来源: https://www.cnblogs.com/liululee/p/11149302.html