在上一章中, 我们构建了一个简单的日志系统, 我们可以把消息广播给很多的消费者. 在本章中我们将增加一个特性: 我们可以订阅这些信息中的一些信息. 例如, 我们希望只将 error 级别的错误存储到硬盘中, 同时可以将所有级别 (error,info,warning 等) 的日志都打印在控制台上.
1, 绑定(Bindings)
在上一章中, 我们已经创建了绑定关系, 回顾一下代码:
1 channel.queueBind(queueName, EXCHANGE_NAME, "");
一个绑定是一个交换器与队列之间的关系. 意思是指: 这个队列对这个交换器的消息感兴趣.
该方法同时还有另一个 routing Key 参数, 为了避免与 basic_public 参数产生中的路由键 (routing key) 混淆, 我们称之为绑定键(bingind key), 下面展示了如何通过一个绑定 key 创建一个绑定:
1 channel.queueBind(queueName, EXCHANGE_NAME, "black");
注意, 这个绑定键 (这里是 "black") 的含义依赖于交换器的类型. 比如在我们的日志系统中, 交换器类型为 fanout, 此时, 绑定键没有任何意义, 会被忽略掉.
2, 直连交换机(Direct Exchange)
在我们之前的日志系统中, 所有的消息被广播给所有的消费者, 但是本章的需要是希望有一个程序可以只接收 error 级别的日志并保存到磁盘中, 而不用浪费空间去存储那些 info,warning 级别的日志.
我们正在用的广播模式的交换器并不够灵活, 它只是不加思索地进行广播. 因此, 需要使用 direct exchange 来代替. 直连交换器的路由算法非常简单: 将消息推送到 binding key 与该消息的 routing key 相同的队列.
为了说明这点, 请看下图:
在该图中, 直连交换器 X 上绑定了两个队列. 第一个队列绑定了绑定键 orange, 第二个队列有两个绑定键: black 和 green. 在这种场景下, 一个消息在布时指定了路由键为 orange 将会只被路由到队列 Q1, 路由键为 black 和 green 的消息都将被路由到队列 Q2. 其他的消息都将被丢失.
3, 多重绑定
同一个绑定键可以绑定到不同的队列上去, 在上图中, 我们也可以增加一个交换器 X 与队列 Q2 的绑定键, 在这种情况下, 直连交换器将会和广播交换器有着相同的行为, 将消息推送到所有匹配的队列. 一个路由键为 black 的消息将会同时被推送到队列 Q1 和 Q2.
4, 发送日志
首先我们要一如既往地创建一个交换器:
1 channel.exchangeDeclare(EXCHANGE_NAME, "direct");
并准备发送消息:
1 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
我们需要确保在我们日志系统中参数 "severity" 是 "info","warning" 和 "error" 中的一个.
5, 订阅
创建接收消息与上一章基本相同, 唯一不同的是, 需要在创建绑定关系时, 指定 severity 的值:
- String queueName = channel.queueDeclare().getQueue();
- for(String severity : argv){
- channel.queueBind(queueName, EXCHANGE_NAME, severity);
- }
6, 完整的代码
EmitLogDirect.java
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class EmitLogDirect {
- private static final String EXCHANGE_NAME = "direct_logs";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
- String severity = getSeverity(argv);
- String message = getMessage(argv);
- channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
- System.out.println("[x] Sent'" + severity + "':'" + message + "'");
- }
- }
- //..
- }
ReceiveLogsDirect.java
- import com.rabbitmq.client.*;
- public class ReceiveLogsDirect {
- private static final String EXCHANGE_NAME = "direct_logs";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
- String queueName = channel.queueDeclare().getQueue();
- if (argv.length <1) {
- System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
- System.exit(1);
- }
- for (String severity : argv) {
- channel.queueBind(queueName, EXCHANGE_NAME, severity);
- }
- System.out.println("[*] Waiting for messages. To exit press CTRL+C");
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println("[x] Received'" +
- delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
- };
- channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
- }
- }
为了测试方便, 我们可以把 "info","error","warning" 都绑定到一个队列上去, 然后生产者分别往 "info","error","warning" 发送消息:
此时查看 RabbitMq 控制台:
到此, 发布 - 订阅涉及到的相关知识点都讲解完了, 下一章将讲解 Topic(主题模式).
来源: https://www.cnblogs.com/wuhenzhidu/p/10801103.html