根据上文的描述, 发送事务消息的入口为:
- TransactionMQProducer#sendMessageInTransaction:
- public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException {
- if (null == this.transactionListener) { // @1
- throw new MQClientException("TransactionListener is null", null);
- }
- return this.defaultMQProducerImpl.sendMessageInTransaction(msg, transactionListener, arg); // @2
- }
代码 @1: 如果 transactionListener 为空, 则直接抛出异常.
代码 @2: 调用 defaultMQProducerImpl 的 sendMessageInTransaction 方法.
接下来重点分享 sendMessageInTransaction 方法
- DefaultMQProducerImpl#sendMessageInTransaction
- public TransactionSendResult sendMessageInTransaction(final Message msg,
- final TransactionListener tranExecuter, final Object arg) throws MQClientException {
Step1: 首先先阐述一下参数含义.
final Message msg: 消息
TransactionListener tranExecuter: 事务监听器
Object arg: 其他附加参数
- DefaultMQProducerImpl#sendMessageInTransaction
- SendResult sendResult = null;
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
- MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
- try {
- sendResult = this.send(msg);
- } catch (Exception e) {
- throw new MQClientException("send message Exception", e);
- }
Step2: 在消息属性中增加两个属性: TRAN_MSG, 其值为 true, 表示为事务消息; PGROUP: 消息所属发送者组, 然后以同步方式发送消息. 在消息发送之前, 会先检查消息的属性 TRAN_MSG, 如果存在并且值为 true, 则通过设置消息系统标记的方式, 设置消息为 MessageSysFlag.TRANSACTION_PREPARED_TYPE.
- DefaultMQProducerImpl#sendKernelImpl
- final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
- sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
- }
- SendMessageProcessor#sendMessage
- String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (traFlag != null && Boolean.parseBoolean(traFlag)) {
- if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
- response.setCode(ResponseCode.NO_PERMISSION);
- response.setRemark(
- "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
- + "] sending transaction message is forbidden");
- return response;
- }
- putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
- } else {
- putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
- }
Step3:Broker 端收到客户端发送消息请求后, 判断消息类型. 如果是事务消息, 则调用 TransactionalMessageService#prepareMessage 方法, 否则走原先的逻辑, 调用 MessageStore#putMessage 方法将消息存入 Broker 服务端.
本节重点阐述事务消息的实现原理, 故接下来将重点关注 prepareMessage 方法, 如想了解 RocketMQ 消息存储相关, 可以关注作者源码分析 RocketMQ 系列.
- org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl#prepareMessage
- public PutMessageResult prepareMessage(MessageExtBrokerInner messageInner) {
- return transactionalMessageBridge.putHalfMessage(messageInner);
- }
step4: 事务消息, 将调用 TransactionalMessageServiceImpl#prepareMessage 方法, 继而调用 TransactionalMessageBridge#prepareMessage 方法.
- TransactionalMessageBridge#parseHalfMessageInner
- public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
- return store.putMessage(parseHalfMessageInner(messageInner));
- }
- private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
- MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
- MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
- String.valueOf(msgInner.getQueueId()));
- msgInner.setSysFlag(
- MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
- msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
- msgInner.setQueueId(0);
- msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
- return msgInner;
- }
Step5: 备份消息的原主题名称与原队列 ID, 然后取消消息的事务消息标签, 重新设置消息的主题为: RMQ_SYS_TRANS_HALF_TOPIC, 队列 ID 固定为 0. 然后调用 MessageStore#putMessage 方法将消息持久化, 这里 TransactionalMessageBridge 桥接类, 就是封装事务消息的相关流程, 最终调用 MessageStore 完成消息的持久化. 消息入库后, 会继续回到 DefaultMQProducerImpl#sendMessageInTransaction, 上文的 Step2 后面, 也就是通过同步将消息发送到消息服务端.
注: 这是事务消息 Prepare 状态的处理逻辑, 消息是存储在消息服务器了, 但存储的并不是原主题, 而是 RMQ_SYS_TRANS_HALF_TOPIC, 故此时消费端是无法消费 shen
生产者发送的消息的. 看到这里, 如果对 RocketMQ 比较熟悉的话, 肯定会有一个 "定时任务" 去取这个主题下的消息, 然后则 "合适" 的时机将消息的主题恢复.
- DefaultMQProducerImpl#sendMessageInTransaction
- switch (sendResult.getSendStatus()) {
- case SEND_OK: {
- try {
- if (sendResult.getTransactionId() != null) {
- msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
- }
- String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
- if (null != transactionId && !"".equals(transactionId)) {
- msg.setTransactionId(transactionId);
- }
- localTransactionState = tranExecuter.executeLocalTransaction(msg, arg);
- if (null == localTransactionState) {
- localTransactionState = LocalTransactionState.UNKNOW;
- }
- if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
- log.info("executeLocalTransactionBranch return {}", localTransactionState);
- log.info(msg.toString());
- }
- } catch (Throwable e) {
- log.info("executeLocalTransactionBranch exception", e);
- log.info(msg.toString());
- localException = e;
- }
- }
- break;
- case FLUSH_DISK_TIMEOUT:
- case FLUSH_SLAVE_TIMEOUT:
- case SLAVE_NOT_AVAILABLE:
- localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
- break;
- default:
- break;
- }
Step6: 如果消息发送成功, 会回调 TransactionListener#executeLocalTransaction 方法, 执行本地事务, 并且返回本地事务状态为 LocalTransactionState, 枚举值如下:
- COMMIT_MESSAGE,
- ROLLBACK_MESSAGE,
- UNKNOW
注意: TransactionListener#executeLocalTransaction 是在发送者成功发送 PREPARED 消息后, 会执行本地事务方法, 然后返回本地事务状态; 如果 PREPARED 消息发送失败, 则不会调用 TransactionListener#executeLocalTransaction, 并且本地事务消息, 设置为 LocalTransactionState.ROLLBACK_MESSAGE, 表示消息需要被回滚.
- DefaultMQProducerImpl#sendMessageInTransaction
- try {
- this.endTransaction(sendResult, localTransactionState, localException);
- } catch (Exception e) {
- log.warn("local transaction execute" + localTransactionState + ", but end broker transaction failed", e);
- }
step7: 调用 endTransaction 方法结束事务(提交或回滚).
- DefaultMQProducerImpl#endTransaction
- EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
- requestHeader.setTransactionId(transactionId);
- requestHeader.setCommitLogOffset(id.getOffset());
- switch (localTransactionState) {
- case COMMIT_MESSAGE:
- requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
- break;
- case ROLLBACK_MESSAGE:
- requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
- break;
- case UNKNOW:
- requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
- break;
- default:
- break;
- }
- requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
- requestHeader.setMsgId(sendResult.getMsgId());
step8: 组装结束事务请求, 主要参数为: 事务 ID, 事务操作(commitOrRollback), 消费组, 消息队列偏移量, 消息 ID,fromTransactionCheck, 从这里发出的请求, 默认为 false.Broker 端的请求处理器为: EndTransactionProcessor.
step9:EndTransactionProcessor 根据事务提交类型: TRANSACTION_COMMIT_TYPE(提交事务),TRANSACTION_ROLLBACK_TYPE(回滚事务),TRANSACTION_NOT_TYPE(忽略该请求).
到目前为止, 已详细梳理了 RocketMQ 事务消息的发送流程, 更加准确的说是 Prepare 状态的消息发送流程. 具体流程如图所示:
本文到这里, 初步展示了事务消息的发送流程, 总的说来, RocketMQ 的事务消息发送使用二阶段提交思路, 首先, 在消息发送时, 先发送消息类型为 Prepread 类型的消息, 然后在将该消息成功存入到消息服务器后, 会回调 TransactionListener#executeLocalTransaction, 执行本地事务状态回调函数, 然后根据该方法的返回值, 结束事务:
1,COMMIT_MESSAGE : 提交事务.
2,ROLLBACK_MESSAGE: 回滚事务.
3,UNKNOW: 未知事务状态, 此时消息服务器 (Broker) 收到 EndTransaction 命令时, 将不对这种消息做处理, 消息还处于 Prepared 类型, 存储在主题为: RMQ_SYS_TRANS_HALF_TOPIC 的队列中, 然后消息发送流程将结束, 那这些消息如何提交或回滚呢?
为了实现避免客户端需要再次发送提交, 回滚命令, RocketMQ 会采取定时任务将 RMQ_SYS_TRANS_HALF_TOPIC 中的消息取出, 然后回到客户端, 判断该消息是否需要提交或回滚, 来完成事务消息的声明周期, 该部分内容将在下节重点探讨.
来源: https://yq.aliyun.com/articles/684232