一, 场景
我们经常会碰见, 一个需求就是, 发送一条指令(消息), 延迟一段时间执行, 比如说常见的淘宝当下了一个订单后, 订单支付时间为半个小时, 如果半个小时没有支付, 则关闭该订单. 当然实现的方式有几种, 今天来看看 rabbitMQ 实现的方式.
二, 思路: rabbitMQ 如何实现
rabbitMQ 为每个队列设置消息的超时时间. 只要给队列设置 x-message-ttl 参数, 就设定了该队列所有消息的存活时间, 时间单位是毫秒. 如果声明队列时指定了死信交换器, 则过期消息会成为死信消息
需要设置的参数为:
三, 原理: 上图
将延迟队列 (queue) 在声明的时候设置参数 "x-dead-letter-exchange","x-message-ttl" 分别对应 死信路由器(dlx_exchange) 和 消息过期时间(比如说 30 分钟).
一个消息从生产者发送到延迟队列 , 在延迟队列里等待, 等待 30 分钟后, 会去绑定的死信路由(dlx_exchange). 通过死信路由的规则, 走到死信队列.
这时候监听死信队列的消费者就可以接收到消息, 消费消息. 比如说查看该订单是否支付, 如果没有支付, 则关闭该订单.
四, 代码实战
生产者代码:
- import com.dbg.example.connectionFactory.RQconnFactory;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
- public class DelayProducer {
- public static Logger logger = LoggerFactory.getLogger(DelayProducer.class);
- // 交换机
- public static final String exchangeName = "delay_Task";
- // 声明路由键
- public static final String routekey = "delay_order";
- public static void pend (List<String> orderNos) throws IOException, TimeoutException {
- // 通过单例得到工厂
- ConnectionFactory connectionFactory = RQconnFactory.getrQconnFactory();
- Connection connection = connectionFactory.newConnection();
- final Channel channel = connection.createChannel();
- // 声明一个交换机
- channel.exchangeDeclare(exchangeName , BuiltinExchangeType.DIRECT);
- // 消息绑定
- for (String orderNo :orderNos ) {
- channel.basicPublish(exchangeName , routekey , null ,orderNo.getBytes());
- logger.info("发送订单 :"+ orderNo);
- }
- // 关闭频道和连接
- channel.close();
- connection.close();
- }
- public static void main(String[] args) {
- List orders = new ArrayList();
- orders.add("order 0001");
- orders.add("order 0002");
- orders.add("order 0003");
- try {
- pend(orders);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
消费者代码:
- import com.dbg.example.connectionFactory.RQconnFactory;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
- public class DelayConsumer {
- public static void receive() throws IOException, TimeoutException {
- // 通过单例得到工厂
- ConnectionFactory connectionFactory = RQconnFactory.getrQconnFactory();
- Connection connection = connectionFactory.newConnection();
- final Channel channel = connection.createChannel();
- // 声明一个死信路由
- String dlxExchangeName = "dlx_delay";
- channel.exchangeDeclare(dlxExchangeName , BuiltinExchangeType.TOPIC);
- // 声明一个死信订单队列
- String dlxQueueName = "dlx_delay_order";
- channel.queueDeclare( dlxQueueName ,true , true ,false ,null);
- // 绑定
- channel.queueBind(dlxQueueName , dlxExchangeName , "#");
- // 声明一个交换机
- channel.exchangeDeclare(DelayProducer.exchangeName , BuiltinExchangeType.DIRECT);
- // 声明一个延迟订单队列 , 并绑定死信路由器
- String queueName = "delay_order";
- Map<String ,Object> arguments = new HashMap<>();
- arguments.put("x-dead-letter-exchange",dlxExchangeName);
- arguments.put("x-message-ttl", 5 * 1000); // 5 秒过期时间
- channel.queueDeclare(queueName,true , false ,false , arguments );
- // 绑定
- channel.queueBind(queueName , DelayProducer.exchangeName ,DelayProducer.routekey);
- // 声明一个消费者(关闭为支付订单的服务)
- final Consumer consumer = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println("Received and close [" +envelope.getRoutingKey() +"]"+message);
- }
- };
- // 消费者消费 -- 队列()
- channel.basicConsume(dlxQueueName,true , consumer);
- }
- public static void main(String[] args) {
- try {
- receive();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
3. 得到单例连接工厂
- public class RQconnFactory {
- private volatile static ConnectionFactory rQconnFactory;
- private RQconnFactory(){
- }
- public static ConnectionFactory getrQconnFactory() {
- if (rQconnFactory == null) {
- synchronized (RQconnFactory.class){
- if (rQconnFactory == null ){
- rQconnFactory = new ConnectionFactory();
- rQconnFactory.setHost("192.168.31.220");
- return rQconnFactory;
- }
- }
- }
- return rQconnFactory;
- }
- }
来源: https://www.cnblogs.com/DBGzxx/p/10090840.html