1. 背景
了解过 RabbitMQ 的 Fanout 模式, 应该知道它原本的 Fanout 模式就是用来做广播的. 但是它的广播有一点区别, 来回顾下它的含义: Fanout 类型没有路由键的概念, 只要队列绑定到了改 exchange 上面, 就会接收到所有的消息.
使用过程一般就是先 new 出一个 Fanout 类型的交换机, 然后往这个交换机上绑定多个队列 queue, 不同的消费者各自监听不同的队列, 这就实现了广播效果, 因为同一个消息, 会分发到所有队列中.
举个例子:
应用 A 监听了队列 A, 应用 B 监听了队列 B,Fanout 类型交换机同时绑定了队列 A 和 B. 假设生产者端发送了一条消息到 Fanout 类型交换机, 交换机就会把消息分发到所有队列, 这时应用 A 和应用 B 会收到同一条消息, 这就是广播.
说了上面一大堆, 只是为了强调, 对于 RabbitMQ 的原本 Fanout 模式, 它的设计就是多个消费者必须监听不同的队列, 多个消费者之间才会形成广播关系.
那么问题来了, 假如在 Fanout 工作模式下, 多个消费者同时监听的是同一个队列, 会怎样? 实践过的同学应该都知道, 这种情况下, 这些消费者会形成竞争关系, 现象是同一个消息只会被其中一个消费者接收, 达不到广播的效果..
2. 需求
假如现在有一个需求, 要做到对同一个应用的多个节点进行广播, 怎么实现?
注意, 这里所说的同一个应用多个节点, 通俗点理解就是一个 war 包, 布在多个服务器节点上.
在实际部署集群时, 为了高可用, 同一个应用可能会部署多个节点, 那假如工程里已经通过配置定义某个队列, 那多个节点它们定义的队列就会是相同的, 那按照上面的背景, 那这些节点间肯定就会存在竞争关系, 即便是 Fanout 模式的交换机, 一条消息也只能被其中一个节点接收, 其他节点收不到, 达不到广播的效果. 那该如何做?
相信看到这里, 有人会问, 为何会有 对同一个应用的多个节点进行广播的需求场景? 为什么要有这个需求. 生产中的业务系统很多, 自然而然场景就很多.
举两个经典的例子:
1. 想要同时刷新所有节点的缓存
业务系统离不开缓存, 有时会用内存缓存, 假如我要刷新所有节点的内存缓存, 多个节点前可能有负载均衡例如 nginx 之类的, 我只需要访问其中一个节点, 然后让这个节点做广播通知所有其他节点刷缓存.(广播刷缓存)
2.websocket 会话寻找
websocket 是比较受欢迎的实时消息推送方案. 用过 websocket 应该知道, websocket 只能与多个节点中的其中一个节点做长连接会话保持, 也就是说用户的会话只会存在于一个节点上, 假设服务端要主动向用户推一条消息, 必须要知道用户的会话在哪个节点上, 怎么得知? 可以通过广播, 通过消息广播, 把消息发到多个节点上, 然后节点收到消息只需要判断用户会话是否就在本节点上, 假如在则主动推消息, 不在, 则丢弃这条消息.
类似上面这两种需求, 就需要用到广播, 并且是对同一个应用的多个节点进行广播. 当然不用广播肯定也有其他通知方案, 本文我们只讨论用 MQ 怎么做到.
3. 思路
假如继续用 RabbitMQ 的 Fanout 模式, 怎么做到对同一个应用的多个节点进行广播?
要起到广播效果, 关键就是让多个应用节点间不要存在竞争关系或者存在竞争关系时它们的消息怎么共享? 可以从这两个方向解决这个问题.
方法可能很多种, 在这里, 我只描述两种比较容易实现的方案.
方案 1
image
过程大致如下
应用启动, 多个节点监听同一个队列 (此时多个节点是竞争关系, 一条消息只会发到其中一个节点上)
消息生产者发送消息, 同一条消息只被其中一个节点收到
收到消息的节点通过 Redis 的发布订阅模式来通知其他兄弟节点
这种方案是最容易想到的, 思路就是依赖其他组件来做消息共享, 例如 Redis 这种可以替换成其他方案, 只要能做到消息共享就行, 那么最终的效果就肯定是广播效果了.
方案 2
image
过程大致如下
应用启动, 利用监听器生成唯一 ID
生成的唯一 ID, 通过文件写入的方式写到配置文件中
spring 启动, 把这个唯一 ID 加载为全局属性 (为何要用唯一 ID, 就是为了用这个 ID 作为该节点的监听队列名, 当然前缀可以用相同的, 后缀用唯一 ID 区分即可, 举个例子就是: 节点 1 监听队列 kunghsu-123 节点 2 监听队列 kunghsu-456. 必须保证它们的唯一 ID 是唯一的, 不然还是会存在竞争关系)
多个节点监听了多个队列 (让每个队列名都不同, 目的就是让他们不存在竞争关系, 没有竞争关系就不用做消息共享, 只管由 MQ 分发即可, 这时同一条消息就会发到多个节点上)
到 MQ 控制台, 将所有节点生成的队列手动绑定到指定的 Fanout 交换机上 (这一步是手动的, 当然也可以通过 API 做到, 下面会说到)
生产者发送消息指定的 Fanout 交换机, 交换机将同一条消息被分发到多个节点上
广播效果达成!
这种方案, 也比较容易. 这样做, 就是为了让多个节点间是广播关系. 总的来说不麻烦, 其中第五步手动操作其实有点挫, 这种手动操作步骤其实是应该转成自动化, 让应用程序来完成, 方便以后自动化建设.
这种方案的 spring 配置也比较简单, 参考 Fanout 模式的配置即可. 本文重点在这个思路的实现过程.
只列举部分代码如下:
消息生产者
- <!-- 只申明交换机, 不定义队列 -->
- <rabbit:fanout-exchange name="exchangeFour" durable="true" auto-delete="false">
- </rabbit:fanout-exchange>
- <!-- 定义 rabbit template 用于数据的接收和发送 -->
- <rabbit:template id="amqpTemplate4" connection-factory="connectionFactory2"
- exchange="exchangeFour" />
消息消费者
- <rabbit:queue name="${queue-name-fanout}" durable="true" auto-delete="false"
- exclusive="false" declared-by="connectAdmin2" />
- <bean id="fanoutTwoConsumer" class="com.lunch.foo.rabbitmq.FanoutTwoConsumer">
- </bean>
- <rabbit:listener-container connection-factory="connectionFactory2">
- <rabbit:listener queues="${queue-name-fanout}" ref="fanoutOneConsumer"
- />
- </rabbit:listener-container>
另外, RabbitMQ 的客户端 API 支持让我们 将队列绑定到指定的交换机上. 具体可参考我的工具类代码.
代码如下:
- package com.lunch.foo.rabbitmq;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * Created by xuyaokun On 2019/3/10 2:26
- * @desc:
- */
- public class RabbitMQUtil {
- private static final String HOST = "192.168.3.128";
- private static final int PORT = AMQP.PROTOCOL.PORT;
- private static final String USERNAME = "kunghsu";
- private static final String PASSWORD = "123456";
- private static final String VIRTUALHOST = "/";
- public static void main(String[] args) {
- String QUEUE_NAME = "queueOneX";
- String EXCHANGE_NAME = "exchangeFour";
- try {
- queueBind(EXCHANGE_NAME, QUEUE_NAME);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- /**
- * 获取会话链接
- *
- * @return
- * @throws IOException
- * @throws TimeoutException
- */
- private static Connection getConnection() throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost(HOST);
- factory.setPort(PORT);
- factory.setUsername(USERNAME);
- factory.setPassword(PASSWORD);
- factory.setVirtualHost(VIRTUALHOST);
- return factory.newConnection();
- }
- /**
- * 绑定队列到指定交换机
- *
- * @param exchangeName
- * @param queueName
- * @throws IOException
- * @throws TimeoutException
- */
- public static void queueBind(String exchangeName, String queueName) throws IOException, TimeoutException {
- Channel channel = null;
- try{
- channel = getConnection().createChannel();
- } catch(Exception e){
- System.out.println("获取 RabbitMQ 会话连接失败! 取消做队列绑定.");
- return ;
- }
- // 默认持久化
- channel.queueDeclare(queueName, true, false, false, null);
- // 声明交换机: 指定交换机的名称和类型 (广播: fanout)
- channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true);
- // 在消费者端队列绑定
- channel.queueBind(queueName, exchangeName, "");
- channel.close();
- }
- }
总结
RabbitMQ 的 Fanout 模式相关的文章, 网上一抓一大把, 但是几乎没有人讲到 如何实现 对同一个应用的多个节点进行广播. 希望通过这篇文章, 能帮助到有需要的朋友.
本文转载于: RabbitMQ 如何实现对同一个应用的多个节点进行广播
作者: xyk_1021
来源: http://www.jianshu.com/p/b65ceadb9857