目录
订阅发布模式
1, 交换器(Exchange)
1.1, 创建交换器
1.2 , 推送消息到交换器
2, 临时队列
3, 绑定(bingdings)
5, 代码例子
5.1, 生产者代码示例
5.2, 消费者代码示例
订阅发布模式
1, 交换器(Exchange)
在 Work Queue 背后, 其实是 rabbitMQ 把每条任务消息只发给一个消费者. 本篇中我们将要研究如何把一条消息推送给多个消费者, 这种模式被称为 publish/subscribe(发布 / 订阅)
RabbitMQ 的消息发送模型核心思想是生产者不直接把消息发送到消息队列中. 事实上, 生产者不知道自己的消息将会被缓存到哪个队列中.
其实生产者者可以把消息发送到 exchange(消息交换机)上. exchange 是一个很简单的事情, 它一边接收生产者的消息, 另一边再把消息推送到消息队列中. Exchange 必须知道在它接收到一条消息应该怎么去处理. 应该把这条消息推送到指定的消息队列中? 还是把消息推送到所有的队列中? 或是把消息丢掉? 这些规则都可以用 exchange 类型来定义
1.1, 创建交换器
有一些可用的 exchange 类型: direct, topic, headers 和 fanout. 这里我们主要看最后一个: fanout, 这里我们创建一个名字为 logs, 类型为 fanout 的 exchange:
channel.exchangeDeclare("logs", "fanout");
fanout 类型的 exchange 是很简单的. 就是它把它能接收到的所有消息广播到它知道的所有队列中.
没有名字的 exchange
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
如上面的代码我们没有指定 exchagne 的名字, 采用的是 "", 空字符串的符号指的是默认的或没有命名的 exchange: 消息会根据 routingKey 被路由到指定的消息队列中
- // 申明交换器, 第一个参数: 交换器的名字; 第二个参数: 交换器的类型
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
1.2 , 推送消息到交换器
现在我们来把消息推送到已命名的 exchange 上, 原来的做法是推送到默认的交换器上面的;
原来的做法
- // 第一个参数: 交换器的名称
- // 第二个参数: 队列名称
- // 第三个参数: 消息的属性
- // 第四个参数: 消息体
- channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
推送到交换器
- // 第一个参数: 交换器名称;
- // 第二个参数: 队列名称;
- // 第三个参数: 消息属性;
- // 第四个参数: 消息体
- channel.basicPublish(EXCHANGE_NAME,"",null,"i am message".getBytes());
2, 临时队列
之前的例子中, 应该会发现我们都是使用了一个指定名字的消息队列. 对应的生产者和消费者之间都要使用相同的消息队列名称.
但是在我们的 log 系统中却不是这样, 我们希望能够接收到所有的 log 消息, 不只是其中的一部分. 我们只要处理当前的 log 消息, 不用管过去的历史 log. 为了实现, 我们需要做以下两步:
无论什么时候我们和 RabbitMQ 建立连接时, 我们都要刷新, 清空 Queue. 为了达到这一的目的, 我们可以用一个随机的名字 (随机性可由自己来定义) 来创建 Queue, 也可以让服务器来自动建立一个随见的 Queue.
当消费者断开连接时, Queue 能自动被删除.
使用 java 客户端时, 我们使用无参数的 queueDeclare 方法, 就可以创建一个已经生成名字的, 排他性, 会自动删除的 Queue
String queueName = channel.queueDeclare().getQueue();
这里面我们就可以拿到一个随机名字的 queue, 如: amq.gen-JzTY20BRgKO-HjmUJj0wLg
3, 绑定(bingdings)
现在已经创建好了一个 fanout 类型的 exchange 和一个队列. 那么接下来我们就需要让 exchange 向我们的 queue 里发送消息, Exchange 和 queue 之间的关系就是绑定(bindings)
channel.queueBind(queueName,exchangeName,"");
5, 代码例子
现在的代码和之前的区别不是很大;
主要的区别就是:
我们把消息推送到一个命名的 exchange 上, 而不是之前未命名的默认 exchange
在我们发送消息时需要提供一个 routingKey, 但对于 fanout 类型的 exchange 可以忽略
5.1, 生产者代码示例
- /**
- * @author zhaodi
- * @description
- * @date 2018/9/28 16:50
- */
- public class Producer {
- private static final String EXCHANGE_NAME = "my-exchange-1";
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection = MqConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- // 申明交换器,
- // 第一个参数: 交换器的名字;
- // 第二个参数: 交换器的类型
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- // 第一个参数: 交换器名称;
- // 第二个参数: 队列名称;
- // 第三个参数: 消息属性;
- // 第四个参数: 消息体
- channel.basicPublish(EXCHANGE_NAME,"",null,"i am message".getBytes());
- channel.close();
- connection.close();
- }
正如你所见, 在建立连接后我们声明了 exchange. 这一步是必须的, 因为禁止向一个不存在的 exchange 推送消息.
如果没有对 exchange 负责的 queue, 那么消息将会被丢失, 这是没有问题的; 如果没有消费者监听的话, 我们会安全的丢掉这些消息.
5.2, 消费者代码示例
- /**
- * @author zhaodi
- * @desc 发布订阅模式
- */
- public class Consumer {
- private static final String EXCHANGE_NAME="my-exchange-1";
- public static void main(String[] args) throws IOException {
- Connection connection = MqConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- // 申明消息路由的名称和类型
- channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
- // 申明一个随机的消息队列名称
- String queueName = channel.queueDeclare().getQueue();
- // 绑定消息路由和消息队列
- channel.queueBind(queueName,EXCHANGE_NAME,"");
- // 创建消费者
- com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("c1--->:"+new String(body));
- // 手动应答
- // 第一个参数: 消息标志
- channel.basicAck(envelope.getDeliveryTag(),false);
- }
- };
- // 监听, 关闭自动应答
- boolean autoAck = false;
- channel.basicConsume(queueName,autoAck,consumer);
- }
- }
来源: http://www.bubuko.com/infodetail-3164913.html