一 重复消息
为什么会出现消息重复? 消息重复的原因有两个: 1. 生产时消息重复, 2. 消费时消息重复.
1.1 生产时消息重复
由于生产者发送消息给 MQ, 在 MQ 确认的时候出现了网络波动, 生产者没有收到确认, 实际上 MQ 已经接收到了消息. 这时候生产者就会重新发送一遍这条消息.
生产者中如果消息未被确认, 或确认失败, 我们可以使用定时任务 +(Redis/db) 来进行消息重试.
- @Component
- @Slf4J
- public class SendMessage {
- @Autowired
- private MessageService messageService;
- @Autowired
- private RabbitTemplate rabbitTemplate;
- // 最大投递次数
- private static final int MAX_TRY_COUNT = 3;
- /**
- * 每 30s 拉取投递失败的消息, 重新投递
- */
- @Scheduled(cron = "0/30 * * * * ?")
- public void resend() {
- log.info("开始执行定时任务 (重新投递消息)");
- List<MsgLog> msgLogs = messageService.selectTimeoutMsg();
- msgLogs.forEach(msgLog -> {
- String msgId = msgLog.getMsgId();
- if (msgLog.getTryCount()>= MAX_TRY_COUNT) {
- messageService.updateStatus(msgId, Constant.MsgLogStatus.DELIVER_FAIL);
- log.info("超过最大重试次数, 消息投递失败, msgId: {}", msgId);
- } else {
- messageService.updateTryCount(msgId, msgLog.getNextTryTime());// 投递次数 + 1
- CorrelationData correlationData = new CorrelationData(msgId);
- rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(), MessageHelper.objToMsg(msgLog.getMsg()), correlationData);// 重新投递
- log.info("第" + (msgLog.getTryCount() + 1) + "次重新投递消息");
- }
- });
- log.info("定时任务执行结束 (重新投递消息)");
- }
- }
1.2 消费时消息重复
消费者消费成功后, 再给 MQ 确认的时候出现了网络波动, MQ 没有接收到确认, 为了保证消息被消费, MQ 就会继续给消费者投递之前的消息. 这时候消费者就接收到了两条一样的消息.
修改消费者, 模拟异常
- @RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
- public void receive(String message, @Headers Map<String,Object> headers, Channel channel) throws Exception{
- System.out.println("重试"+System.currentTimeMillis());
- System.out.println(message);
- int i = 1 / 0;
- }
配置 YAML 重试策略
- spring:
- rabbitmq:
- listener:
- simple:
- retry:
- enabled: true # 开启消费者进行重试
- max-attempts: 5 # 最大重试次数
- initial-interval: 3000 # 重试时间间隔
由于重复消息是由于网络原因造成的, 因此不可避免重复消息. 但是我们需要保证消息的幂等性.
二 如何保证消息幂等性
让每个消息携带一个全局的唯一 ID, 即可保证消息的幂等性, 具体消费过程为:
消费者获取到消息后先根据 id 去查询 Redis/db 是否存在该消息
如果不存在, 则正常消费, 消费完毕后写入 Redis/db
如果存在, 则证明消息被消费过, 直接丢弃.
生产者
- @PostMapping("/send")
- public void sendMessage(){
- JSONObject jsonObject = new JSONObject();
- jsonObject.put("message","Java 旅途");
- String JSON = jsonObject.toJSONString();
- Message message = MessageBuilder.withBody(JSON.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("UTF-8").setMessageId(UUID.randomUUID()+"").build();
- amqpTemplate.convertAndSend("javatrip",message);
- }
消费者
- @Component
- @RabbitListener(queuesToDeclare = @Queue(value = "javatrip", durable = "true"))
- public class Consumer {
- @RabbitHandler
- public void receiveMessage(Message message) throws Exception {
- Jedis jedis = new Jedis("localhost", 6379);
- String messageId = message.getMessageProperties().getMessageId();
- String msg = new String(message.getBody(),"UTF-8");
- System.out.println("接收到的消息为:"+msg+"== 消息 id 为:"+messageId);
- String messageIdRedis = jedis.get("messageId");
- if(messageId == messageIdRedis){
- return;
- }
- JSONObject jsonObject = JSONObject.parseObject(msg);
- String email = jsonObject.getString("message");
- jedis.set("messageId",messageId);
- }
- }
如果需要存入 db 的话, 可以直接将这个 ID 设为消息的主键, 下次如果获取到重复消息进行消费时, 由于数据库主键的唯一性, 则会直接抛出异常.
> 如果觉得文章不错, 欢迎点赞, 留言
> 关注公众号《Java 旅途》, 每日推送精品文章
来源: https://segmentfault.com/a/1190000023516648