一, 应用背景
今天做一个需求, 要将 RabbitMQ 中的任务取出并执行, 为防止任务执行期间出错, 设置 NO_ACK=FALSE 标志, 这样, 一旦任务没有应答的话, 相应的任务就会被 RabbitMQ 自动 Re-Queue, 避免丢失任务. 然而, 由于任务执行时间较长, 通常需要五, 六分钟, 甚至更长; 我们都知道一旦一个任务被取出执行, 该任务就从 Ready 状态更改成 Unacked 状态. 如图所示:
当这个任务执行完之后, 程序将向 RabbitMQ 发送 ACK 消息确认, RabbitMQ 在收到 ACK 消息后, 会将该任务移出队列; 然而, 问题出在任务尚未执行完毕 [执行时间太久] ,RabbitMQ 再等了一段时间 [大约两三分钟] 后, 一直没有收到 ACK 确认消息, 就将该任务自动 Re-Queue 了 [我是一个生产者, 一个消费者模式] , 也就是说, 我们这里发生了死循环 [任务永远也执行不完, 因为会一直 Re-Queue] .
二, 延长 RabbitMQ ACK 应答时间
到这里, 我们急需解决的问题就是, 怎么能设置 RabbitMQ 延长等待 ACK 的时间, 百度一下, 两下, 各种读网络文档, 研究操作 RabbitMQ 工作的文档, 查了一圈资料也没查出怎么延长 RabbitMQ ACK 时间 [废柴啊] . 至此, 一直查不出来, 就想问一下网友的你, 你知道怎么延长 RabbitMQ 接受 ACK 应答时间么?
三, 改变解决问题方式
在查不出如何延长 ACK 应答时间后, 我将注意力转向如何检测当前任务操作超时的, 后来在官网看到这么一段话:
链接官网位置: http://www.rabbitmq.com/heartbeats.html#heartbeats-timeout
后面, 就简单测试下将 heartbeat 参数设置为 0, 以禁用心跳检测, 这样基本解决了我的问题; 虽然官方不建议这么做, 但也是一种解决思路, 如果大家有什么更好的解决办法, 烦请在下面留言 [先谢谢啦] .
至此, 这个问题基本阐述清楚了, 如果有遇到的小伙伴, 也请参考下上面的操作.
测试代码:
- # import json
- # from concurrent.futures import ThreadPoolExecutor
- from queue import Queue
- # from threading import Thread
- from pika import BasicProperties, BlockingConnection, URLParameters
- from pika.exceptions import ConnectionClosed
- # from automation.aiclient.aiclient import AsyncAIRequestManager
- class RabbitMQManager:
- def __init__(self, host = 'localhost', qname = 'queue'):
- self.params = URLParameters(host)
- self.qname = qname
- self.prod_conn = None
- self.prod_chan = None
- self.cons_conn = None
- self.cons_chan = None
- self.ai_signton = None
- def init_prod_conn(self):
- # create send connection
- self.prod_conn = BlockingConnection(self.params)
- self.prod_chan = self.prod_conn.channel()
- self.prod_chan.queue_declare(queue = self.qname, durable = True)
- def init_cons_conn(self):
- # create receive connection
- self.cons_conn = BlockingConnection(self.params)
- self.cons_chan = self.cons_conn.channel()
- self.cons_chan.basic_qos(prefetch_count = 1)
- self.cons_chan.basic_consume(self.callback, queue = self.qname)
- def produceMessages(self, msg = None):
- try:
- if isinstance(msg, str):
- self.prod_chan.basic_publish(exchange = '',
- routing_key = self.qname,
- body = msg,
- properties = BasicProperties(
- delivery_mode = 2, # make message persistent
- ))
- elif isinstance(msg, Queue):
- while 0 != msg.qsize():
- item = msg.get()
- self.prod_chan.basic_publish(exchange = '',
- routing_key = self.qname,
- body = item,
- properties = BasicProperties(
- delivery_mode = 2, # make message persistent
- ))
- else:
- pass
- except Exception as e:
- if isinstance(e, ConnectionClosed):
- print('Reconnection established!')
- self.init_prod_conn()
- # last connection close, re-produce msg
- self.produceMessages(msg)
- else:
- print('Produce msg exception Occur, please check following error message:')
- print(e)
- def consumeMessages(self):
- try:
- self.cons_chan.start_consuming()
- except Exception as e:
- print('Consume msg exception Occur, please check following error message:')
- print(e)
- if isinstance(e, ConnectionClosed):
- print('Reconnection established!')
- self.init_cons_conn()
- self.consumeMessages()
- def callback(self, ch, method, properties, body):
- # handle message body
- print('callback....')
- print(body)
- try:
- print('Consuming....')
- self.cons_conn.process_data_events()
- # 模拟处理任务时间
- import time
- time.sleep(300)
- # if None == self.ai_signton:
- # self.ai_signton = AsyncAIRequestManager()
- # self.ai_signton.run(eval(json.loads(json.dumps(body.decode('utf-8')), encoding = 'utf-8')))
- ch.basic_ack(delivery_tag = method.delivery_tag)
- # t = Thread(target = self.ai_signton.syncToDatabase())
- # t.start()
- except Exception as e:
- if isinstance(e, ConnectionClosed):
- raise ConnectionClosed('Connection has been closed, send to reconnection.')
- else:
- print('Current error msg:')
- print(e)
- def close_prod_conn(self):
- if None != self.prod_conn:
- self.prod_conn.close()
- def close_cons_conn(self):
- if None != self.cons_conn:
- self.cons_conn.close()
- def close(self):
- self.close_prod_conn()
- self.close_cons_conn()
来源: https://www.cnblogs.com/julygift/p/9445107.html