z 概览
本文主要介绍如何使用 RabbitMQ 消息代理来实现分布式系统之间的通信, 从而促进微服务的松耦合.
RabbitMQ, 也被称为开源消息代理, 它支持多种消息协议, 并且可以部署在分布式系统上. 它轻量级, 便于部署应用程序. 它主要充当一个队列, 其中输入的消息可以首先被操作. RabbitMQ 可以在许多操作系统和云环境中运行, 并为大多数流行语言提供了广泛的开发工具. 它是生产者 - 消费者模式, 生产者发出信息, 消费者消费信息. RabbitMQ 的主要特点如下:
异步消息
分布式部署
管理和监控
企业和云计算
安装
对于 RabbitMQ, 首先需要在系统中安装 ErLang, 因为 RabbitMQ 是用 ErLang 语言编写的. 安装 Erlang 之后, 你可以通过下面的介绍从它的官网下载最新版本的 RabbitMQ .
在微服务中使用 RabbitMQ
在您的微服务体系结构中, RabbitMQ 是实现消息队列的最简单的免费的可用选项之一. 这些队列模式有助于解耦各个微服务之间的通信来增加应用程序的弹性. 我们可以将这些队列用于各种目的, 比如核心微服务之间的交互, 微服务的解耦, 实现故障转移机制, 以及通过消息代理发送电子邮件通知.
无论在哪里, 只要有两个或两个以上的核心模块需要相互通信, 我们就不应该进行直接的 HTTP 调用, 因为它们会使核心层产生紧耦合, 并且当每个核心模块有更多实例时将很难管理. 而且每当服务宕机时, HTTP 调用模式就会失败, 因为在服务重启之后, 我们将无法跟踪旧的 HTTP 请求调用. 这就产生了对 RabbitMQ 的需求.
图片描述 (最多 50 字)
在微服务中设置 RabbitMQ
在微服务架构中, 为了演示, 我们将使用一个可以通过任何核心微服务发送电子邮件通知的示例模式. 在这种模式下, 我们将有一个可以存在任何核心微服务的生产者, 它将生成电子邮件内容并将其发送到队列. 然后, 这个电子邮件内容由总是在等待队列中新消息的消费者来处理.
请注意, 由于正在使用 Spring Boot 构建微服务, 因此我们将为 Spring 提供配置.
1) 生产者: 这一层负责生成电子邮件内容, 并将此内容发送给 RabbitMQ 中的消息代理.
a) 在 properties 文件中, 我们需要配置队列名和交换类型, 以及安装 RabbitMQ 服务器的主机和端口.
- queue.name=messagequeue
- fanout.exchange=messagequeue-exchange
- spring.rabbitmq.host: localhost
- spring.rabbitmq.port: 5672
- spring.rabbitmq.username: guest
- spring.rabbitmq.password: guest
b) 我们需要创建一个配置类, 它将使用队列名和交换类型将队列绑定到微服务模块.
- @Configuration
- public class RabbitConfiguration {
- @Value("${fanout.exchange}")
- private String fanoutExchange;
- @Value("${queue.name}")
- private String queueName;
- @Bean
- Queue queue() {
- return new Queue(queueName, true);
- }
- @Bean
- FanoutExchange exchange() {
- return new FanoutExchange(fanoutExchange);
- }
- @Bean
- Binding binding(Queue queue, FanoutExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange);
- }
- }
c) 最后, 我们需要一个工具类, 它将使用 Spring 框架提供的 RabbitTemplate 将实际的电子邮件内容发送到队列中.
- @Component
- public class QueueProducer {
- protected Logger logger = LoggerFactory.getLogger(getClass());
- @Value("${fanout.exchange}")
- private String fanoutExchange;
- private final RabbitTemplate rabbitTemplate;
- @Autowired
- public QueueProducer(RabbitTemplate rabbitTemplate) {
- super();
- this.rabbitTemplate = rabbitTemplate;
- }
- public void produce(NotificationRequestDTO notificationDTO) throws Exception {
- logger.info("Storing notification...");
- rabbitTemplate.setExchange(fanoutExchange);
- rabbitTemplate.convertAndSend(new ObjectMapper().writeValueAsString(notificationDTO));
- logger.info("Notification stored in queue sucessfully");
- }
- }
d) 然后, 您可以在模块的任何地方调用这个 produce 方法.
- {
- queueProducer.produce(notificationDTO);
- }
2) 消费者: 这一层负责使用 FIFO 方法从 RabbitMQ 消息代理中消费消息, 然后执行与电子邮件相关的操作.
a) 在这个 properties 文件中, 我们需要配置队列名和交换类型, 以及安装 RabbitMQ 服务器的主机和端口.
- queue.name=messagequeue
- fanout.exchange=messagequeue-exchange
- spring.rabbitmq.host: localhost
- spring.rabbitmq.port: 5672
- spring.rabbitmq.username: guest
- spring.rabbitmq.password: guest
b) 我们需要创建一个配置类, 它将使用队列名和交换类型将队列绑定到微服务模块. 此外, 在消费者的 RabbitMQ 配置中, 我们需要创建一个充当消费者的 MessageListenerAdapter bean, 它始终侦听从队列中传入的消息. 这个 MessageListenerAdapter 将有一个带有消费者工具类和 defaultListenerMethod 的有参构造函数, 在这里我们可以指定与电子邮件相关的操作.
- @Configuration
- public class RabbitConfiguration {
- private static final String LISTENER_METHOD = "receiveMessage";
- @Value("${queue.name}")
- private String queueName;
- @Value("${fanout.exchange}")
- private String fanoutExchange;
- @Bean
- Queue queue() {
- return new Queue(queueName, true);
- }
- @Bean
- FanoutExchange exchange() {
- return new FanoutExchange(fanoutExchange);
- }
- @Bean
- Binding binding(Queue queue, FanoutExchange exchange) {
- return BindingBuilder.bind(queue).to(exchange);
- }
- @Bean
- SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
- MessageListenerAdapter listenerAdapter) {
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- container.setQueueNames(queueName);
- container.setMessageListener(listenerAdapter);
- return container;
- }
- @Bean
- MessageListenerAdapter listenerAdapter(QueueConsumer consumer) {
- return new MessageListenerAdapter(consumer, LISTENER_METHOD);
- }
- }
c) 然后, 需要创建具有特定消息侦听器方法的 QueueConsumer 类, 在该类中我们可以进行实际发送电子邮件的操作.
- @Component
- public class QueueConsumer {
- @Autowired
- MailServiceImpl mailServiceImpl;
- protected Logger logger = LoggerFactory.getLogger(getClass());
- public void receiveMessage(String message) {
- logger.info("Received (String)" + message);
- processMessage(message);
- }
- public void receiveMessage(byte[] message) {
- String strMessage = new String(message);
- logger.info("Received (No String)" + strMessage);
- processMessage(strMessage);
- }
- private void processMessage(String message) {
- try {
- MailDTO mailDTO = new ObjectMapper().readValue(message, MailDTO.class);
- ValidationUtil.validateMailDTO(mailDTO);
- mailServiceImpl.sendMail(mailDTO, null);
- } catch (JsonParseException e) {
- logger.warn("Bad JSON in message:" + message);
- } catch (JsonMappingException e) {
- logger.warn("cannot map JSON to NotificationRequest:" + message);
- } catch (Exception e) {
- logger.error(e.getMessage());
- }
- }
- }
总结
通过使用 RabbitMQ, 您可以避免服务之间直接的 HTTP 调用, 并消除核心微服务的紧密耦合. 这将帮助您在更高级别上实现微服务的可伸缩性, 并在微服务之间添加故障转移机制.
来源: http://blog.51cto.com/13952953/2294220