只做下工作记录, 比较重要的几个属性:
concurrency: 一个生产者可以同时由多少个消费者消费, 这个一般根据你的机器性能来进行配置
prefetch: 允许为每个 consumer 指定最大的 unacked messages 数目. 要是对实时性要求很高的话, prefetch 应该设置成 1,concurrency 的值调高点
队列中 Ready 状态和 Unacknowledged 状态的消息数, 分别指的是等待投递给消费者的消息数和已经投递给消费者但是未收到 ack 信号的消息数.
注意配置超时重连机制, 防止死机
- spring.rabbitmq.listener.simple.retry.max-attempts=5
- spring.rabbitmq.listener.simple.retry.enabled=true
- spring.rabbitmq.listener.simple.retry.initial-interval=5000
- spring.rabbitmq.listener.simple.default-requeue-rejected=false
- spring.rabbitmq.listener.simple.prefetch=1
Springboot 中消费简单实现
- @RabbitListener(concurrency = "12",bindings = {@QueueBinding(value = @Queue(value = YOUR_QUEUE), exchange = @Exchange(value = YOUR_EXCHANGE,type = "fanout"))})
- public void process(Message message, com.rabbitmq.client.Channel channel) {
- Long deliveryTag = null;
- String data = null;
- try {
- deliveryTag = message.getMessageProperties().getDeliveryTag();
- data = new String(message.getBody(), "UTF-8");
- // 业务处理
- } catch (Exception e) {
- logger.warn("===== 消息处理异常");
- logger.warn("MQ 异常 {} , {} , {}" , e.getMessage(),e.getCause(),data);
- }finally {
- try {
- channel.basicAck(deliveryTag, false); // 确认消息成功消费
- } catch (IOException e) {
- logger.warn("====== 应答出错, 请检查");
- }
- logger.warn("====== 消息结束");
- }
- }
来源: http://www.bubuko.com/infodetail-3064384.html