1 工业级高并发数据接入消息一致性保障
消息中间件在高并发数据接入大数据平台时的的主要作用: 异步通讯, 解耦, 并发缓冲
消息发送一致性: 是指产生消息的业务动作与消息发送的一致. (也就是说, 如果业务操作成功, 那么由这个业务操作所产生的消息一定要成功投递出去, 否则就丢消息)
2 工业级 MQ 数据接入消息一致性理论模型
2.1 从主动方业务来分析
预发送消息失败, 消息未进存储, 业务操作未执行(可能的原因: 主动方应用, 网络, 消息中间件, 消息存储), 数据此时保持一致.
预发送消息后, 主动方业务没有收到返回消息存储结果, 分为两种可能
(1)消息未进存储, 业务操作未执行 数据此时保持一致.
(2)消息已进存储(待确认), 业务操作未执行 数据此时保持不一致.
收到消息存储成功的返回结果, 但未执行业务操作就失败
消息已进存储(待确认), 业务操作未执行 数据此时保持不一致.
2.2 从消息中间件的角度来分析:
消息中间件没有收到主动方应用的业务操作处理结果
(1)消息已进存储(待确认), 业务操作未执行(或 业务操作出错回滚了) 数据此时保持不一致
(2)消息已进存储(待确认), 业务操作成功 数据此时保持不一致
消息中间件收到业务操作结果(成功 / 失败), 但处理消息存储中的消息状态失败
(1)消息已进存储(待确认), 业务操作未执行(或业务操作出错回滚了) 数据此时保持不一致
(2)消息已进存储(待确认), 业务操作成功 数据此时保持不一致
2.3 可靠消息的整体流程
主动方应用先把消息发给消息中间件, 消息状态标记为 "待确认";
消息中间件收到消息后, 把消息持久化到消息存储中, 但并不向被动方应用投递消息;
消息中间件返回消息持久化结果(成功 / 失败), 主动方应用根据返 回结果进行判断如何进行业务操作处理:
a) 失败: 放弃业务操作处理, 结束(必要时向上层返回失败结果);
b) 成功: 执行业务操作处理;
业务操作完成后, 把业务操作结果 (成功 / 失败) 发送给消息中间件;
消息中间件收到业务操作结果后, 根据业务结果进行处理;
a) 失败: 删除消息存储中的消息, 结束;
b) 成功: 更新消息存储中的消息状态为 "待发送(可发送)";
被动方应用监听并接收 "待发送" 状态的消息, 执行业务处理;
业务处理完成后, 向消息中间件发送 ACK, 确认消息已经收到(消息 主动方应用主机 / 主机集群 消息中间件主机 / 主机集群 被动方应用主机 / 主机集群 中间件将从队列中删除该消息).
2.4 消息重复发送异常分析
1 被动方应用接收到消息, 业务处理完成后应用出问题, 消息中间件不知道消息处理结果, 会重新投递消息.
2 被动方应用接收到消息, 业务处理完成后网络出问题, 消息中间件收不到消息处理结果, 会重新投递消息.
3 被动方应用接收到消息, 业务处理时间过长, 消息中间件因消息超时未确认, 会再次投递消息.
4 被动方应用接收到消息, 业务处理完成, 消息中间件问题导致收不到消息处理结果, 消息会重新投递.
5 被动方应用接收到消息, 业务处理完成, 消息中间件收到了消息处理结果, 但由于消息存储故障导致消息没能成功确认, 消息会再次投递.
3 工业级 MQ 数据接入消息一致性实际处理方案
- public interface RpTransactionMessageService {
- /**
- * 预存储消息.
- */
- public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
- /**
- * 确认并发送消息.
- */
- public void confirmAndSendMessage(String messageId) throws MessageBizException;
- /**
- * 存储并发送消息.
- */
- public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
- /**
- * 直接发送消息.
- */
- public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
- /**
- * 重发消息.
- */
- public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;
- /**
- * 根据 messageId 重发某条消息.
- */
- public void reSendMessageByMessageId(String messageId) throws MessageBizException;
- /**
- * 将消息标记为死亡消息.
- */
- public void setMessageToAreadlyDead(String messageId) throws MessageBizException;
- /**
- * 根据消息 ID 获取消息
- */
- public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;
- /**
- * 根据消息 ID 删除消息
- */
- public void deleteMessageByMessageId(String messageId) throws MessageBizException;
- /**
- * 重发某个消息队列中的全部已死亡的消息.
- */
- public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;
- public class RpTransactionMessage extends BaseEntity {
- private static final long serialVersionUID = 1757377457814546156L;
- private String messageId;
- private String messageBody;
- private String messageDataType;
- private String consumerQueue;
- private Integer messageSendTimes;
- private String areadlyDead;
- private String field1;
- private String field2;
- private String field3;
- }
- public int saveMessageWaitingConfirm(RpTransactionMessage message) {
- if (message == null) {
- throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
- }
- if (StringUtil.isEmpty(message.getConsumerQueue())) {
- throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空");
- }
- message.setEditTime(new Date());
- message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());
- message.setAreadlyDead(PublicEnum.NO.name());
- message.setMessageSendTimes(0);
- return rpTransactionMessageDao.insert(message);
- }
- public void confirmAndSendMessage(String messageId) {
- final RpTransactionMessage message = getMessageByMessageId(messageId);
- if (message == null) {
- throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息 id 查找的消息为空");
- }
- message.setStatus(MessageStatusEnum.SENDING.name());
- message.setEditTime(new Date());
- rpTransactionMessageDao.update(message);
- notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
- notifyJmsTemplate.send(new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return session.createTextMessage(message.getMessageBody());
- }
- });
- }
- public int saveAndSendMessage(final RpTransactionMessage message) {
- if (message == null) {
- throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
- }
- if (StringUtil.isEmpty(message.getConsumerQueue())) {
- throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空");
- }
- message.setStatus(MessageStatusEnum.SENDING.name());
- message.setAreadlyDead(PublicEnum.NO.name());
- message.setMessageSendTimes(0);
- message.setEditTime(new Date());
- int result = rpTransactionMessageDao.insert(message);
- notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
- notifyJmsTemplate.send(new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return session.createTextMessage(message.getMessageBody());
- }
- });
- return result;
- }
- public void directSendMessage(final RpTransactionMessage message) {
- if (message == null) {
- throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
- }
- if (StringUtil.isEmpty(message.getConsumerQueue())) {
- throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空");
- }
- notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
- notifyJmsTemplate.send(new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return session.createTextMessage(message.getMessageBody());
- }
- });
- }
- public void reSendMessage(final RpTransactionMessage message) {
- if (message == null) {
- throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
- }
- if (StringUtil.isEmpty(message.getConsumerQueue())) {
- throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空");
- }
- message.addSendTimes();
- message.setEditTime(new Date());
- rpTransactionMessageDao.update(message);
- notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
- notifyJmsTemplate.send(new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return session.createTextMessage(message.getMessageBody());
- }
- });
- }
- public void reSendMessageByMessageId(String messageId) {
- final RpTransactionMessage message = getMessageByMessageId(messageId);
- if (message == null) {
- throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息 id 查找的消息为空");
- }
- int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
- if (message.getMessageSendTimes()>= maxTimes) {
- message.setAreadlyDead(PublicEnum.YES.name());
- }
- message.setEditTime(new Date());
- message.setMessageSendTimes(message.getMessageSendTimes() + 1);
- rpTransactionMessageDao.update(message);
- notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
- notifyJmsTemplate.send(new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return session.createTextMessage(message.getMessageBody());
- }
- });
- }
- public void setMessageToAreadlyDead(String messageId) {
- RpTransactionMessage message = getMessageByMessageId(messageId);
- if (message == null) {
- throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息 id 查找的消息为空");
- }
- message.setAreadlyDead(PublicEnum.YES.name());
- message.setEditTime(new Date());
- rpTransactionMessageDao.update(message);
- }
来源: https://juejin.im/post/5c3333a06fb9a04a073088fc