模型
生产者
- package cn.wh;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- import cn.util.RabbitMqConnectionUtil;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- public class Send {
- private static final String EXCHANGE_NAME="test_exchange_direct";
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection = RabbitMqConnectionUtil.getConnection();
- Channel channel = connection.createChannel();
- //exchange
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
- String msg="hello direct!";
- String routingKey="error";
- channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
- System.out.println("send"+msg);
- channel.close();
- connection.close();
- }
- }
消费者
- package cn.wh;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- import cn.util.RabbitMqConnectionUtil;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- import com.rabbitmq.client.AMQP.BasicProperties;
- public class Recv1 {
- private static final String EXCHANGE_NAME = "test_exchange_direct";
- private static final String QUEUE_NAME = "test_queue_direct_1";
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection = RabbitMqConnectionUtil.getConnection();
- final Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
- channel.basicQos(1);
- // 定义一个消费者
- Consumer consumer=new DefaultConsumer(channel){
- // 消息到达 触发这个方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String msg=new String(body,"utf-8");
- System.out.println("[1] Recv msg:"+msg);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally{
- System.out.println("[1] done");
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
- };
- boolean autoAck=false;// 自动应答 false
- channel.basicConsume(QUEUE_NAME,autoAck , consumer);
- }
- }
消费者 2
- package cn.wh;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- import cn.util.RabbitMqConnectionUtil;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- import com.rabbitmq.client.AMQP.BasicProperties;
- public class Recv2 {
- private static final String EXCHANGE_NAME = "test_exchange_direct";
- private static final String QUEUE_NAME = "test_queue_direct_2";
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection = RabbitMqConnectionUtil.getConnection();
- final Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
- channel.basicQos(1);
- // 定义一个消费者
- Consumer consumer=new DefaultConsumer(channel){
- // 消息到达 触发这个方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String msg=new String(body,"utf-8");
- System.out.println("[2] Recv msg:"+msg);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally{
- System.out.println("[2] done");
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
- };
- boolean autoAck=false;// 自动应答 false
- channel.basicConsume(QUEUE_NAME,autoAck , consumer);
- }
- }
Topic 模型
- public class Send {
- private final static String EXCHANGE_NAME = "test_exchange_topic";
- public static void main(String[] argv) throws Exception {
- // 获取到连接以及 mq 通道
- Connection connection = ConnectionUtils.getConnection();
- Channel channel = connection.createChannel();
- // 声明 exchange
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- // 消息内容
- String message = "id=1001";
- channel.basicPublish(EXCHANGE_NAME, "item.delete", null, message.getBytes());
- System.out.println("[x] Sent'" + message + "'");
- channel.close();
- connection.close();
- }
- }
消费者
- public class Recv {
- private final static String QUEUE_NAME = "test_queue_topic_1";
- private final static String EXCHANGE_NAME = "test_exchange_topic";
- public static void main(String[] argv) throws Exception {
- // 获取到连接以及 mq 通道
- Connection connection = ConnectionUtils.getConnection();
- final Channel channel = connection.createChannel();
- // 声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- // 绑定队列到交换机
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
- // 同一时刻服务器只会发一条消息给消费者
- channel.basicQos(1);
- // 定义队列的消费者
- Consumer consumer = new DefaultConsumer(channel) {
- // 消息到达 触发这个方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String msg = new String(body, "utf-8");
- System.out.println("[2] Recv msg:" + msg);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- System.out.println("[2] done");
- // 手动回执
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
- };
- boolean autoAck = false;
- channel.basicConsume(QUEUE_NAME, autoAck, consumer);
- }
- }
消费者 2
- public class Recv {
- private final static String QUEUE_NAME = "test_queue_topic_1";
- private final static String EXCHANGE_NAME = "test_exchange_topic";
- public static void main(String[] argv) throws Exception {
- // 获取到连接以及 mq 通道
- Connection connection = ConnectionUtils.getConnection();
- final Channel channel = connection.createChannel();
- // 声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- // 绑定队列到交换机
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.update");
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "item.delete");
- // 同一时刻服务器只会发一条消息给消费者
- channel.basicQos(1);
- // 定义队列的消费者
- Consumer consumer = new DefaultConsumer(channel) {
- // 消息到达 触发这个方法
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope,
- BasicProperties properties, byte[] body) throws IOException {
- String msg = new String(body, "utf-8");
- System.out.println("[2] Recv msg:" + msg);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } finally {
- System.out.println("[2] done");
- // 手动回执
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
- };
- boolean autoAck = false;
- channel.basicConsume(QUEUE_NAME, autoAck, consumer);
- }
- }
- Exchanges(转发器 | 交换机)
转发器一方面它接受生产者的消息, 另一方面向队列推送消息
Nameless exchange(匿名转发)
之前我们对转换器一无所知, 却可以将消息发送到队列, 那是可能是我们用了默认的转发器, 转发器名为空字符串 "". 之前我们发布消息的代码
Fanout Exchange
不处理路由键. 你只需要将队列绑定到交换机上. 发送消息到交换机都会被转发到与该交换机绑定的所有队列
Direct Exchange
处理路由键.
需要将一个队列绑定到交换机上, 要求该消息与一个特定的路由键完全匹配. 这是一个完整的匹配. 如果一个队列绑定到该交换机上要求路由键 "dog", 则只有被标记为 "dog" 的消息才被转发, 不会转发 dog.puppy, 也不会转发 dog.guard, 只会转发 dog.
Topic Exchange
将路由键和某模式进行匹配.
此时队列需要绑定要一个模式上. 符号 "#" 匹配一个或多个词, 符号 "*" 匹配一个词. 因此 "audit.#" 能够匹配到 "audit.irs.corporate", 但是 "audit.*" 只会匹配到 "audit.irs".
来源: https://www.cnblogs.com/wh1520577322/p/10071505.html