最近学习 RabbitMQ 的使用方式, 记录下来, 方便以后使用, 也方便和大家共享, 相互交流.
RabbitMQ 的六种工作模式:
- ,Work queues
- ,Publish/subscribe
- ,Routing
- ,Topics
5,Header 模式
6,RPC
一, Work queues
多个消费端消费同一个队列中的消息, 队列采用轮询的方式将消息是平均发送给消费者;
特点:
1, 一条消息只会被一个消费端接收;
2, 队列采用轮询的方式将消息是平均发送给消费者的;
3, 消费者在处理完某条消息后, 才会收到下一条消息
生产端:
1, 声明队列
2, 创建连接
3, 创建通道
4, 通道声明队列
5, 制定消息
6, 发送消息, 使用默认交换机
消费端:
1, 声明队列
2, 创建连接
3, 创建通道
4, 通道声明队列
5, 重写消息消费方法
6, 执行消息方法
新建两个 maven 工程, 生产消息的生产端, 消费消息的消费端;
pom.xml 文件中依赖坐标如下:
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-logging</artifactId>
- <version>2.1.0.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.7.0</version>
- </dependency>
- </dependencies>
生产端的代码如下:
- package com.xyfer;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /*
- 1, 声明队列
- 2, 创建连接
- 3, 创建通道
- 4, 通道声明队列
- 5, 制定消息
- 6, 发送消息, 使用默认交换机
- */
- public class Producer02 {
- // 声明队列
- private static final String QUEUE ="queue";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");//mq 服务 ip 地址
- connectionFactory.setPort(5672);//mq client 连接端口
- connectionFactory.setUsername("guest");//mq 登录用户名
- connectionFactory.setPassword("guest");//mq 登录密码
- connectionFactory.setVirtualHost("/");//rabbitmq 默认虚拟机名称为 "/", 虚拟机相当于一个独立的 mq 服务器
- // 创建与 RabbitMQ 服务的 TCP 连接
- connection = connectionFactory.newConnection();
- // 创建与 Exchange 的通道, 每个连接可以创建多个通道, 每个通道代表一个会话任务
- channel = connection.createChannel();
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE,true,false,false,null);// 通道绑定邮件队列
- for(int i = 0;i<10;i++){
- String message = new String("mq 发送消息...");
- /**
- * 消息发布方法
- * param1:Exchange 的名称, 如果没有指定, 则使用 Default Exchange
- * param2:routingKey, 消息的路由 Key, 是用于 Exchange(交换机)将消息转发到指定的消息队列
- * param3: 消息包含的属性
- * param4: 消息体
- * 这里没有指定交换机, 消息将发送给默认交换机, 每个队列也会绑定那个默认的交换机, 但是不能显示绑定或解除绑定
- * 默认的交换机, routingKey 等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- channel.basicPublish("",QUEUE,null,message.getBytes("utf-8"));
- System.out.println("mq 消息发送成功!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
消费端的代码如下:
- package com.xyfer;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /*
- 1, 声明队列
- 2, 创建连接
- 3, 创建通道
- 4, 通道声明队列
- 5, 重写消息消费方法
- 6, 执行消息方法
- */
- public class Consumer02 {
- private static final String QUEUE ="queue";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE,true,false,false,null);// 通道绑定邮件队列
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签, 在 channel.basicConsume()去指定
- * @param envelope 消息包的内容, 可从中获取消息 id, 消息 routingkey, 交换机, 消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 交换机
- String exchange = envelope.getExchange();
- // 路由 key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body,"utf-8");
- System.out.println("mq 收到的消息是:"+msg );
- }
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE,true,consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
生产端启动后, 控制台打印信息如下:
RabbitMQ 中的已有消息:
queue 中的消息正是生产端发送的消息:
二, Publish/subscribe 模式
这种模式又称为发布订阅模式, 相对于 Work queues 模式, 该模式多了一个交换机, 生产端先把消息发送到交换机, 再由交换机把消息发送到绑定的队列中, 每个绑定的队列都能收到由生产端发送的消息.
发布订阅模式:
1, 每个消费者监听自己的队列;
2, 生产者将消息发给 broker, 由交换机将消息转发到绑定此交换机的每个队列, 每个绑定交换机的队列都将接收
到消息
应用场景: 用户通知, 当用户充值成功或转账完成系统通知用户, 通知方式有短信, 邮件多种方法;
生产端:
1, 声明队列, 声明交换机
2, 创建连接
3, 创建通道
4, 通道声明交换机
5, 通道声明队列
6, 通过通道使队列绑定到交换机
7, 制定消息
8, 发送消息
消费端:
1, 声明队列, 声明交换机
2, 创建连接
3, 创建通道
4, 通道声明交换机
5, 通道声明队列
6, 通过通道使队列绑定到交换机
7, 重写消息消费方法
8, 执行消息方法
Publish/subscribe 模式绑定两个消费端, 因此需要有两个消费端, 一个邮件消费端, 一个短信消费端;
生产端的代码如下:
- package com.xyfer;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Producer01 {
- // 声明两个队列和一个交换机
- //Publish/subscribe 发布订阅模式
- private static final String QUEUE_EMAIL ="queueEmail";
- private static final String QUEUE_SMS ="queueSms";
- private static final String EXCHANGE = "messageChange";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");//mq 服务 ip 地址
- connectionFactory.setPort(5672);//mq client 连接端口
- connectionFactory.setUsername("guest");//mq 登录用户名
- connectionFactory.setPassword("guest");//mq 登录密码
- connectionFactory.setVirtualHost("/");//rabbitmq 默认虚拟机名称为 "/", 虚拟机相当于一个独立的 mq 服务器
- // 创建与 RabbitMQ 服务的 TCP 连接
- connection = connectionFactory.newConnection();
- // 创建与 Exchange 的通道, 每个连接可以创建多个通道, 每个通道代表一个会话任务
- channel = connection.createChannel();
- // 通道绑定交换机
- /**
- * 参数明细
- * 1, 交换机名称
- * 2, 交换机类型, fanout,topic,direct,headers
- */
- //Publish/subscribe 发布订阅模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);// 通道绑定邮件队列
- channel.queueDeclare(QUEUE_SMS,true,false,false,null);// 通道绑定短信队列
- // 交换机和队列绑定
- /**
- * 参数明细
- * 1, 队列名称
- * 2, 交换机名称
- * 3, 路由 key
- */
- //Publish/subscribe 发布订阅模式
- channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");
- channel.queueBind(QUEUE_SMS,EXCHANGE,"");
- for(int i = 0;i<10;i++){
- String message = new String("mq 发送消息...");
- /**
- * 消息发布方法
- * param1:Exchange 的名称, 如果没有指定, 则使用 Default Exchange
- * param2:routingKey, 消息的路由 Key, 是用于 Exchange(交换机)将消息转发到指定的消息队列
- * param3: 消息包含的属性
- * param4: 消息体
- * 这里没有指定交换机, 消息将发送给默认交换机, 每个队列也会绑定那个默认的交换机, 但是不能显示绑定或解除绑定
- * 默认的交换机, routingKey 等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- //Publish/subscribe 发布订阅模式
- channel.basicPublish(EXCHANGE,"",null,message.getBytes());
- System.out.println("mq 消息发送成功!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
邮件消费端的代码如下:
- package com.xyfer;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer01 {
- //Publish/subscribe 发布订阅模式
- private static final String QUEUE_EMAIL ="queueEmail";
- private static final String EXCHANGE = "messageChange";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- // 通道绑定交换机
- /**
- * 参数明细
- * 1, 交换机名称
- * 2, 交换机类型, fanout,topic,direct,headers
- */
- //Publish/subscribe 发布订阅模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);// 通道绑定邮件队列
- // 交换机和队列绑定
- /**
- * 参数明细
- * 1, 队列名称
- * 2, 交换机名称
- * 3, 路由 key
- */
- //Publish/subscribe 发布订阅模式
- channel.queueBind(QUEUE_EMAIL,EXCHANGE,"");
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签, 在 channel.basicConsume()去指定
- * @param envelope 消息包的内容, 可从中获取消息 id, 消息 routingkey, 交换机, 消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 交换机
- String exchange = envelope.getExchange();
- // 路由 key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body,"utf-8");
- System.out.println("mq 收到的消息是:"+msg );
- }
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_EMAIL,true,consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
短信消费端的代码如下:
- package xyfer;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer01 {
- //Publish/subscribe 发布订阅模式
- private static final String QUEUE_SMS ="queueSms";
- private static final String EXCHANGE = "messageChange";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- // 通道绑定交换机
- /**
- * 参数明细
- * 1, 交换机名称
- * 2, 交换机类型, fanout,topic,direct,headers
- */
- //Publish/subscribe 发布订阅模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE_SMS,true,false,false,null);// 通道绑定短信队列
- // 交换机和队列绑定
- /**
- * 参数明细
- * 1, 队列名称
- * 2, 交换机名称
- * 3, 路由 key
- */
- //Publish/subscribe 发布订阅模式
- channel.queueBind(QUEUE_SMS,EXCHANGE,"");
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签, 在 channel.basicConsume()去指定
- * @param envelope 消息包的内容, 可从中获取消息 id, 消息 routingkey, 交换机, 消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 交换机
- String exchange = envelope.getExchange();
- // 路由 key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body,"utf-8");
- System.out.println("mq 收到的消息是:"+msg );
- }
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_SMS,true,consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
三, Routing 路由模式
Routing 模式又称路由模式, 该种模式除了要绑定交换机外, 发消息的时候还要制定 routing key, 即路由 key, 队列通过通道绑定交换机的时候, 需要指定自己的 routing key, 这样, 生产端发送消息的时候也会指定 routing key, 通过 routing key 就可以把相应的消息发送到绑定相应 routing key 的队列中去.
路由模式:
1, 每个消费者监听自己的队列, 并且设置 routingkey;
2, 生产者将消息发给交换机, 由交换机根据 routingkey 来转发消息到指定的队列;
应用场景: 用户通知, 当用户充值成功或转账完成系统通知用户, 通知方式有短信, 邮件多种方法;
生产端:
1, 声明队列, 声明交换机
2, 创建连接
3, 创建通道
4, 通道声明交换机
5, 通道声明队列
6, 通过通道使队列绑定到交换机并指定该队列的 routingkey
7, 制定消息
8, 发送消息并指定 routingkey
消费端:
1, 声明队列, 声明交换机
2, 创建连接
3, 创建通道
4, 通道声明交换机
5, 通道声明队列
6, 通过通道使队列绑定到交换机并指定 routingkey
7, 重写消息消费方法
8, 执行消息方法
按照假设的应用场景, 同样, Routing 路由模式也是一个生产端, 两个消费端, 所不同的是, 声明交换机的类型不同, 队列绑定交换机的时候需要指定 Routing key, 发送消息的时候也需要指定 Routing key, 这样根据 Routing key 就能把相应的消息发送到相应的队列中去.
生产端的代码如下:
- package com.xyfer;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Producer03 {
- // 声明两个队列和一个交换机
- //Routing 路由模式
- private static final String QUEUE_EMAIL ="queueEmail";
- private static final String QUEUE_SMS ="queueSms";
- private static final String EXCHANGE = "messageChange";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");//mq 服务 ip 地址
- connectionFactory.setPort(5672);//mq client 连接端口
- connectionFactory.setUsername("guest");//mq 登录用户名
- connectionFactory.setPassword("guest");//mq 登录密码
- connectionFactory.setVirtualHost("/");//rabbitmq 默认虚拟机名称为 "/", 虚拟机相当于一个独立的 mq 服务器
- // 创建与 RabbitMQ 服务的 TCP 连接
- connection = connectionFactory.newConnection();
- // 创建与 Exchange 的通道, 每个连接可以创建多个通道, 每个通道代表一个会话任务
- channel = connection.createChannel();
- // 通道绑定交换机
- /**
- * 参数明细
- * 1, 交换机名称
- * 2, 交换机类型, fanout,topic,direct,headers
- */
- //Routing 路由模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);// 通道绑定邮件队列
- channel.queueDeclare(QUEUE_SMS,true,false,false,null);// 通道绑定短信队列
- // 交换机和队列绑定
- /**
- * 参数明细
- * 1, 队列名称
- * 2, 交换机名称
- * 3, 路由 key
- */
- //Routing 路由模式
- channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);
- channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);
- // 给 email 队列发消息
- for(int i = 0;i<10;i++){
- String message = new String("mq 发送 email 消息...");
- /**
- * 消息发布方法
- * param1:Exchange 的名称, 如果没有指定, 则使用 Default Exchange
- * param2:routingKey, 消息的路由 Key, 是用于 Exchange(交换机)将消息转发到指定的消息队列
- * param3: 消息包含的属性
- * param4: 消息体
- * 这里没有指定交换机, 消息将发送给默认交换机, 每个队列也会绑定那个默认的交换机, 但是不能显示绑定或解除绑定
- * 默认的交换机, routingKey 等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- //Routing 路由模式
- channel.basicPublish(EXCHANGE,QUEUE_EMAIL,null,message.getBytes());
- System.out.println("mq 消息发送成功!");
- }
- // 给 sms 队列发消息
- for(int i = 0;i<10;i++){
- String message = new String("mq 发送 sms 消息...");
- /**
- * 消息发布方法
- * param1:Exchange 的名称, 如果没有指定, 则使用 Default Exchange
- * param2:routingKey, 消息的路由 Key, 是用于 Exchange(交换机)将消息转发到指定的消息队列
- * param3: 消息包含的属性
- * param4: 消息体
- * 这里没有指定交换机, 消息将发送给默认交换机, 每个队列也会绑定那个默认的交换机, 但是不能显示绑定或解除绑定
- * 默认的交换机, routingKey 等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- //Routing 路由模式
- channel.basicPublish(EXCHANGE,QUEUE_SMS,null,message.getBytes());
- System.out.println("mq 消息发送成功!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
邮件消费端的代码如下:
- package com.xyfer;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer03 {
- //Routing 路由模式
- private static final String QUEUE_EMAIL ="queueEmail";
- private static final String EXCHANGE = "messageChange";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- // 通道绑定交换机
- /**
- * 参数明细
- * 1, 交换机名称
- * 2, 交换机类型, fanout,topic,direct,headers
- */
- //Routing 路由模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);// 通道绑定邮件队列
- // 交换机和队列绑定
- /**
- * 参数明细
- * 1, 队列名称
- * 2, 交换机名称
- * 3, 路由 key
- */
- //Routing 路由模式
- channel.queueBind(QUEUE_EMAIL,EXCHANGE,QUEUE_EMAIL);
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签, 在 channel.basicConsume()去指定
- * @param envelope 消息包的内容, 可从中获取消息 id, 消息 routingkey, 交换机, 消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 交换机
- String exchange = envelope.getExchange();
- // 路由 key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body,"utf-8");
- System.out.println("mq 收到的消息是:"+msg );
- }
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_EMAIL,true,consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
短信消费端的代码如下:
- package xyfer;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer03 {
- //Routing 路由模式
- private static final String QUEUE_SMS ="queueSms";
- private static final String EXCHANGE = "messageChange";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- // 通道绑定交换机
- /**
- * 参数明细
- * 1, 交换机名称
- * 2, 交换机类型, fanout,topic,direct,headers
- */
- //Routing 路由模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE_SMS,true,false,false,null);// 通道绑定短信队列
- // 交换机和队列绑定
- /**
- * 参数明细
- * 1, 队列名称
- * 2, 交换机名称
- * 3, 路由 key
- */
- //Routing 路由模式
- channel.queueBind(QUEUE_SMS,EXCHANGE,QUEUE_SMS);
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签, 在 channel.basicConsume()去指定
- * @param envelope 消息包的内容, 可从中获取消息 id, 消息 routingkey, 交换机, 消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 交换机
- String exchange = envelope.getExchange();
- // 路由 key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body,"utf-8");
- System.out.println("mq 收到的消息是:"+msg );
- }
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_SMS,true,consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
四, Topics 模式
Topics 模式和 Routing 路由模式最大的区别就是, Topics 模式发送消息和消费消息的时候是通过通配符去进行匹配的.
路由模式:
1, 每个消费者监听自己的队列, 并且设置带统配符的 routingkey
2, 生产者将消息发给 broker, 由交换机根据 routingkey 来转发消息到指定的队列
应用场景: 用户通知, 当用户充值成功或转账完成系统通知用户, 通知方式有短信, 邮件多种方法;
生产端:
1, 声明队列, 声明交换机
2, 创建连接
3, 创建通道
4, 通道声明交换机
5, 通道声明队列
6, 通过通道使队列绑定到交换机并指定该队列的 routingkey(通配符)
7, 制定消息
8, 发送消息并指定 routingkey(通配符)
消费端:
1, 声明队列, 声明交换机
2, 创建连接
3, 创建通道
4, 通道声明交换机
5, 通道声明队列
6, 通过通道使队列绑定到交换机并指定 routingkey(通配符)
7, 重写消息消费方法
8, 执行消息方法
按照假设的应用场景, Topics 模式也是一个生产端, 两个消费端, 生产端队列绑定交换机的时候, 需要指定的 routingkey 是通配符, 发送消息的时候绑定的 routingkey 也是通配符, 消费端队列绑定交换机的时候 routingkey 也是通配符, 这样就能根据通配符匹配到消息了.
生产端的代码如下:
- package com.xyfer;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Producer04 {
- // 声明两个队列和一个交换机
- //Topics 模式
- private static final String QUEUE_EMAIL ="queueEmail";
- private static final String QUEUE_SMS ="queueSms";
- private static final String EXCHANGE = "messageChange";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");//mq 服务 ip 地址
- connectionFactory.setPort(5672);//mq client 连接端口
- connectionFactory.setUsername("guest");//mq 登录用户名
- connectionFactory.setPassword("guest");//mq 登录密码
- connectionFactory.setVirtualHost("/");//rabbitmq 默认虚拟机名称为 "/", 虚拟机相当于一个独立的 mq 服务器
- // 创建与 RabbitMQ 服务的 TCP 连接
- connection = connectionFactory.newConnection();
- // 创建与 Exchange 的通道, 每个连接可以创建多个通道, 每个通道代表一个会话任务
- channel = connection.createChannel();
- // 通道绑定交换机
- /**
- * 参数明细
- * 1, 交换机名称
- * 2, 交换机类型, fanout,topic,direct,headers
- */
- //Topics 模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);// 通道绑定邮件队列
- channel.queueDeclare(QUEUE_SMS,true,false,false,null);// 通道绑定短信队列
- // 交换机和队列绑定
- /**
- * 参数明细
- * 1, 队列名称
- * 2, 交换机名称
- * 3, 路由 key
- */
- channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#");
- channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#");
- // 给 email 队列发消息
- for(int i = 0;i<10;i++){
- String message = new String("mq 发送 email 消息...");
- /**
- * 消息发布方法
- * param1:Exchange 的名称, 如果没有指定, 则使用 Default Exchange
- * param2:routingKey, 消息的路由 Key, 是用于 Exchange(交换机)将消息转发到指定的消息队列
- * param3: 消息包含的属性
- * param4: 消息体
- * 这里没有指定交换机, 消息将发送给默认交换机, 每个队列也会绑定那个默认的交换机, 但是不能显示绑定或解除绑定
- * 默认的交换机, routingKey 等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- channel.basicPublish(EXCHANGE,"inform.email",null,message.getBytes());
- System.out.println("mq email 消息发送成功!");
- }
- // 给 sms 队列发消息
- for(int i = 0;i<10;i++){
- String message = new String("mq 发送 sms 消息...");
- /**
- * 消息发布方法
- * param1:Exchange 的名称, 如果没有指定, 则使用 Default Exchange
- * param2:routingKey, 消息的路由 Key, 是用于 Exchange(交换机)将消息转发到指定的消息队列
- * param3: 消息包含的属性
- * param4: 消息体
- * 这里没有指定交换机, 消息将发送给默认交换机, 每个队列也会绑定那个默认的交换机, 但是不能显示绑定或解除绑定
- * 默认的交换机, routingKey 等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- channel.basicPublish(EXCHANGE,"inform.sms",null,message.getBytes());
- System.out.println("mq sms 消息发送成功!");
- }
- // 给 email 和 sms 队列发消息
- for(int i = 0;i<10;i++){
- String message = new String("mq 发送 email sms 消息...");
- /**
- * 消息发布方法
- * param1:Exchange 的名称, 如果没有指定, 则使用 Default Exchange
- * param2:routingKey, 消息的路由 Key, 是用于 Exchange(交换机)将消息转发到指定的消息队列
- * param3: 消息包含的属性
- * param4: 消息体
- * 这里没有指定交换机, 消息将发送给默认交换机, 每个队列也会绑定那个默认的交换机, 但是不能显示绑定或解除绑定
- * 默认的交换机, routingKey 等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- channel.basicPublish(EXCHANGE,"inform.email.sms",null,message.getBytes());
- System.out.println("mq email sms 消息发送成功!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
邮件消费端的代码如下:
- package com.xyfer;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer04 {
- private static final String QUEUE_EMAIL ="queueEmail";
- private static final String EXCHANGE = "messageChange";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- // 通道绑定交换机
- /**
- * 参数明细
- * 1, 交换机名称
- * 2, 交换机类型, fanout,topic,direct,headers
- */
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);// 通道绑定邮件队列
- // 交换机和队列绑定
- /**
- * 参数明细
- * 1, 队列名称
- * 2, 交换机名称
- * 3, 路由 key
- */
- channel.queueBind(QUEUE_EMAIL,EXCHANGE,"inform.#.email.#");
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签, 在 channel.basicConsume()去指定
- * @param envelope 消息包的内容, 可从中获取消息 id, 消息 routingkey, 交换机, 消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 交换机
- String exchange = envelope.getExchange();
- // 路由 key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body,"utf-8");
- System.out.println("mq 收到的消息是:"+msg );
- }
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_EMAIL,true,consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
短信消费端的代码如下:
- package xyfer;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- public class Consumer04 {
- private static final String QUEUE_SMS ="queueSms";
- private static final String EXCHANGE = "messageChange";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- // 通道绑定交换机
- /**
- * 参数明细
- * 1, 交换机名称
- * 2, 交换机类型, fanout,topic,direct,headers
- */
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE_SMS,true,false,false,null);// 通道绑定邮件队列
- // 交换机和队列绑定
- /**
- * 参数明细
- * 1, 队列名称
- * 2, 交换机名称
- * 3, 路由 key
- */
- channel.queueBind(QUEUE_SMS,EXCHANGE,"inform.#.sms.#");
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签, 在 channel.basicConsume()去指定
- * @param envelope 消息包的内容, 可从中获取消息 id, 消息 routingkey, 交换机, 消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 交换机
- String exchange = envelope.getExchange();
- // 路由 key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body,"utf-8");
- System.out.println("mq 收到的消息是:"+msg );
- }
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_SMS,true,consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
由于生产端同时发送了 email 的消息(10 条),sms 的消息(10 条),email 和 sms 同时收到的消息(10 条), 所以每个消费端都应收到各自的 10 条消息, 加上同时都能收到的 10 条消息, 每个消费端应该收到 20 条消息;
生产端控制台打印:
邮件消费端控制台打印:
短信消费端的控制台打印:
生产端执行后, RabbitMQ 上的消息队列情况:
两个消费端执行完后, RabbitMQ 上的消息队列情况:
五, Header 模式
header 模式与 routing 不同的地方在于, header 模式取消 routingkey, 使用 header 中的 key/value(键值对)匹配队列.
案例:
根据用户的通知设置去通知用户, 设置接收 Email 的用户只接收 Email, 设置接收 sms 的用户只接收 sms, 设置两种通知类型都接收的则两种通知都有效.
根据假设使用场景, 需要一个生产端, 两个消费端, 不同的是, 生产端声明交换机时, 交换机的类型不同, 是 headers 类型, 生产端队列绑定交换机时, 不使用 routingkey, 而是使用 header 中的 key/value(键值对)匹配队列, 发送消息时也是使用 header 中的 key/value(键值对)匹配队列.
消费端同样是声明交换机时, 交换机的类型不同, 是 headers 类型, 消费端队列绑定交换机时, 不使用 routingkey, 而是使用 header 中的 key/value(键值对)匹配队列, 消费消息时也是使用 header 中的 key/value(键值对)匹配队列.
生产端的代码如下:
- package com.xyfer;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.Hashtable;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
- public class Producer05 {
- // 声明两个队列和一个交换机
- //Header 模式
- private static final String QUEUE_EMAIL ="queueEmail";
- private static final String QUEUE_SMS ="queueSms";
- private static final String EXCHANGE = "messageChange";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");//mq 服务 ip 地址
- connectionFactory.setPort(5672);//mq client 连接端口
- connectionFactory.setUsername("guest");//mq 登录用户名
- connectionFactory.setPassword("guest");//mq 登录密码
- connectionFactory.setVirtualHost("/");//rabbitmq 默认虚拟机名称为 "/", 虚拟机相当于一个独立的 mq 服务器
- // 创建与 RabbitMQ 服务的 TCP 连接
- connection = connectionFactory.newConnection();
- // 创建与 Exchange 的通道, 每个连接可以创建多个通道, 每个通道代表一个会话任务
- channel = connection.createChannel();
- // 通道绑定交换机
- /**
- * 参数明细
- * 1, 交换机名称
- * 2, 交换机类型, fanout,topic,direct,headers
- */
- //Header 模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);// 通道绑定邮件队列
- channel.queueDeclare(QUEUE_SMS,true,false,false,null);// 通道绑定短信队列
- // 交换机和队列绑定
- /**
- * 参数明细
- * 1, 队列名称
- * 2, 交换机名称
- * 3, 路由 key
- * 4,
- * String queue, String exchange, String routingKey, Map<String, Object> arguments
- */
- Map<String,Object> headers_email = new Hashtable<String,Object>();
- headers_email.put("inform_type","email");
- Map<String,Object> headers_sms = new Hashtable<String, Object>();
- headers_sms.put("inform_type","sms");
- channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email);
- channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_sms);
- // 给 email 队列发消息
- for(int i = 0;i<10;i++){
- String message = new String("mq 发送 email 消息...");
- Map<String,Object> headers = new Hashtable<String,Object>();
- headers.put("inform_type","email");// 匹配 email 通知消费者绑定的 header
- /**
- * 消息发布方法
- * param1:Exchange 的名称, 如果没有指定, 则使用 Default Exchange
- * param2:routingKey, 消息的路由 Key, 是用于 Exchange(交换机)将消息转发到指定的消息队列
- * param3: 消息包含的属性
- * param4: 消息体
- * 这里没有指定交换机, 消息将发送给默认交换机, 每个队列也会绑定那个默认的交换机, 但是不能显示绑定或解除绑定
- * 默认的交换机, routingKey 等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
- properties.headers(headers);
- //Email 通知
- channel.basicPublish(EXCHANGE,"",properties.build(),message.getBytes());
- System.out.println("mq email 消息发送成功!");
- }
- // 给 sms 队列发消息
- for(int i = 0;i<10;i++){
- String message = new String("mq 发送 sms 消息...");
- Map<String,Object> headers = new Hashtable<String,Object>();
- headers.put("inform_type","sms");// 匹配 sms 通知消费者绑定的 header
- /**
- * 消息发布方法
- * param1:Exchange 的名称, 如果没有指定, 则使用 Default Exchange
- * param2:routingKey, 消息的路由 Key, 是用于 Exchange(交换机)将消息转发到指定的消息队列
- * param3: 消息包含的属性
- * param4: 消息体
- * 这里没有指定交换机, 消息将发送给默认交换机, 每个队列也会绑定那个默认的交换机, 但是不能显示绑定或解除绑定
- * 默认的交换机, routingKey 等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
- properties.headers(headers);
- //sms 通知
- channel.basicPublish(EXCHANGE,"",properties.build(),message.getBytes());
- System.out.println("mq sms 消息发送成功!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
邮件消费端的代码如下:
- package com.xyfer;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.Hashtable;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
- public class Consumer05 {
- private static final String QUEUE_EMAIL ="queueEmail";
- private static final String EXCHANGE = "messageChange";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- // 通道绑定交换机
- /**
- * 参数明细
- * 1, 交换机名称
- * 2, 交换机类型, fanout,topic,direct,headers
- */
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE_EMAIL,true,false,false,null);// 通道绑定邮件队列
- // 交换机和队列绑定
- /**
- * 参数明细
- * 1, 队列名称
- * 2, 交换机名称
- * 3, 路由 key
- * 4,
- * String queue, String exchange, String routingKey, Map<String, Object> arguments
- */
- Map<String,Object> headers_email = new Hashtable<String,Object>();
- headers_email.put("inform_email","email");
- channel.queueBind(QUEUE_EMAIL,EXCHANGE,"",headers_email);
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签, 在 channel.basicConsume()去指定
- * @param envelope 消息包的内容, 可从中获取消息 id, 消息 routingkey, 交换机, 消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 交换机
- String exchange = envelope.getExchange();
- // 路由 key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body,"utf-8");
- System.out.println("mq 收到的消息是:"+msg );
- }
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_EMAIL,true,consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
短信消费端的代码如下:
- package xyfer;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.Hashtable;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
- public class Consumer05 {
- private static final String QUEUE_SMS ="queueSms";
- private static final String EXCHANGE = "messageChange";
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("127.0.0.1");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- // 通道绑定交换机
- /**
- * 参数明细
- * 1, 交换机名称
- * 2, 交换机类型, fanout,topic,direct,headers
- */
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
- // 通道绑定队列
- /**
- * 声明队列, 如果 Rabbit 中没有此队列将自动创建
- * param1: 队列名称
- * param2: 是否持久化
- * param3: 队列是否独占此连接
- * param4: 队列不再使用时是否自动删除此队列
- * param5: 队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
- *
- */
- channel.queueDeclare(QUEUE_SMS,true,false,false,null);// 通道绑定邮件队列
- // 交换机和队列绑定
- /**
- * 参数明细
- * 1, 队列名称
- * 2, 交换机名称
- * 3, 路由 key
- * 4,
- * String queue, String exchange, String routingKey, Map<String, Object> arguments
- */
- Map<String,Object> headers_email = new Hashtable<String,Object>();
- headers_email.put("inform_email","sms");
- channel.queueBind(QUEUE_SMS,EXCHANGE,"",headers_email);
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签, 在 channel.basicConsume()去指定
- * @param envelope 消息包的内容, 可从中获取消息 id, 消息 routingkey, 交换机, 消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 交换机
- String exchange = envelope.getExchange();
- // 路由 key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body,"utf-8");
- System.out.println("mq 收到的消息是:"+msg );
- }
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_SMS,true,consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
生产端启动后 RabbitMQ 上面的消息队列情况:
六, RPC 模式
RPC 即客户端远程调用服务端的方法 , 使用 MQ 可以实现 RPC 的异步调用, 基于 Direct 交换机实现, 流程如下:
1, 客户端即是生产者也是消费者, 向 RPC 请求队列发送 RPC 调用消息, 同时监听 RPC 响应队列.
2, 服务端监听 RPC 请求队列的消息, 收到消息后执行服务端的方法, 得到方法返回的结果.
3, 服务端将 RPC 方法 的结果发送到 RPC 响应队列.
4, 客户端 (RPC 调用方) 监听 RPC 响应队列, 接收到 RPC 调用结果.
至此, RabbitMQ 的六种工作模式已经介绍完毕, 手动代码实现, 实际体验六种工作模式的不同.
来源: https://www.cnblogs.com/xyfer1018/p/11581511.html