上篇博客介绍过 Producer 的启动, 这里涉及到相关内容就不再累赘了 [RocketMQ 中 Producer 的启动源码分析]
Producer 发送消息, 首先需要生成 Message 实例:
- public class Message implements Serializable {
- private static final long serialVersionUID = 8445773977080406428L;
- private String topic;
- private int flag;
- private Map<String, String> properties;
- private byte[] body;
- private String transactionId;
- public Message() {}
- public Message(String topic, byte[] body) {
- this(topic, "","", 0, body, true);
- }
- public Message(String topic, String tags, byte[] body) {
- this(topic, tags, "", 0, body, true);
- }
- public Message(String topic, String tags, String keys, byte[] body) {
- this(topic, tags, keys, 0, body, true);
- }
- public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK) {
- this.topic = topic;
- this.flag = flag;
- this.body = body;
- if (tags != null && tags.length()> 0)
- this.setTags(tags);
- if (keys != null && keys.length()> 0)
- this.setKeys(keys);
- this.setWaitStoreMsgOK(waitStoreMsgOK);
- }
- public void setTags(String tags) {
- this.putProperty(MessageConst.PROPERTY_TAGS, tags);
- }
- public void setKeys(String keys) {
- this.putProperty(MessageConst.PROPERTY_KEYS, keys);
- }
- public void setWaitStoreMsgOK(boolean waitStoreMsgOK) {
- this.putProperty(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, Boolean.toString(waitStoreMsgOK));
- }
- void putProperty(final String name, final String value) {
- if (null == this.properties) {
- this.properties = new HashMap<String, String>();
- }
- this.properties.put(name, value);
- }
- public void putUserProperty(final String name, final String value) {
- if (MessageConst.STRING_HASH_SET.contains(name)) {
- throw new RuntimeException(String.format(
- "The Property<%s> is used by system, input another please", name));
- }
- if (value == null || value.trim().isEmpty()
- || name == null || name.trim().isEmpty()) {
- throw new IllegalArgumentException(
- "The name or value of property can not be null or blank string!"
- );
- }
- this.putProperty(name, value);
- }
- }
其中 properties 中存放需要配置的属性, 由 MessageConst 规定其 key:
- public class MessageConst {
- public static final String PROPERTY_KEYS = "KEYS";
- public static final String PROPERTY_TAGS = "TAGS";
- public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
- public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
- public static final String PROPERTY_RETRY_TOPIC = "RETRY_TOPIC";
- public static final String PROPERTY_REAL_TOPIC = "REAL_TOPIC";
- public static final String PROPERTY_REAL_QUEUE_ID = "REAL_QID";
- public static final String PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG";
- public static final String PROPERTY_PRODUCER_GROUP = "PGROUP";
- public static final String PROPERTY_MIN_OFFSET = "MIN_OFFSET";
- public static final String PROPERTY_MAX_OFFSET = "MAX_OFFSET";
- public static final String PROPERTY_BUYER_ID = "BUYER_ID";
- public static final String PROPERTY_ORIGIN_MESSAGE_ID = "ORIGIN_MESSAGE_ID";
- public static final String PROPERTY_TRANSFER_FLAG = "TRANSFER_FLAG";
- public static final String PROPERTY_CORRECTION_FLAG = "CORRECTION_FLAG";
- public static final String PROPERTY_MQ2_FLAG = "MQ2_FLAG";
- public static final String PROPERTY_RECONSUME_TIME = "RECONSUME_TIME";
- public static final String PROPERTY_MSG_REGION = "MSG_REGION";
- public static final String PROPERTY_TRACE_SWITCH = "TRACE_ON";
- public static final String PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX = "UNIQ_KEY";
- public static final String PROPERTY_MAX_RECONSUME_TIMES = "MAX_RECONSUME_TIMES";
- public static final String PROPERTY_CONSUME_START_TIMESTAMP = "CONSUME_START_TIME";
- public static final String PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET";
- public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";
- public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";
- }
在创建完 Message 后, 通过 DefaultMQProducer 的 send 方法对消息进行发送
Producer 支持三种模式的消息发送, 由 CommunicationMode 枚举规定:
- public enum CommunicationMode {
- SYNC,
- ASYNC,
- ONEWAY,
- }
分别代表: 同步, 异步以及单向发送
其中同步和异步是根据不同参数类型的 send 方法来决定的
只要 send 方法中带有 SendCallback 参数, 都代表着异步发送, 否则就是同步, SendCallback 提供了异步发送的回滚事件响应:
- public interface SendCallback {
- void onSuccess(final SendResult sendResult);
- void onException(final Throwable e);
- }
而单向发送需要使用 sendOneway 方法
无论使用哪种方式, 最后都是通过调用 DefaultMQProducer 包装的 defaultMQProducerImpl 的 sendDefaultImpl 方法
DefaultMQProducerImpl 的 sendDefaultImpl 方法:
- private SendResult sendDefaultImpl(
- Message msg,
- final CommunicationMode communicationMode,
- final SendCallback sendCallback,
- final long timeout
- ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- this.makeSureStateOK();
- Validators.checkMessage(msg, this.defaultMQProducer);
- final long invokeID = random.nextLong();
- long beginTimestampFirst = System.currentTimeMillis();
- long beginTimestampPrev = beginTimestampFirst;
- long endTimestamp = beginTimestampFirst;
- TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
- if (topicPublishInfo != null && topicPublishInfo.ok()) {
- boolean callTimeout = false;
- MessageQueue mq = null;
- Exception exception = null;
- SendResult sendResult = null;
- int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
- int times = 0;
- String[] brokersSent = new String[timesTotal];
- for (; times <timesTotal; times++) {
- String lastBrokerName = null == mq ? null : mq.getBrokerName();
- MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
- if (mqSelected != null) {
- mq = mqSelected;
- brokersSent[times] = mq.getBrokerName();
- try {
- beginTimestampPrev = System.currentTimeMillis();
- long costTime = beginTimestampPrev - beginTimestampFirst;
- if (timeout < costTime) {
- callTimeout = true;
- break;
- }
- sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
- switch (communicationMode) {
- case ASYNC:
- return null;
- case ONEWAY:
- return null;
- case SYNC:
- if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
- if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
- continue;
- }
- }
- return sendResult;
- default:
- break;
- }
- } catch (RemotingException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
- exception = e;
- continue;
- } catch (MQClientException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
- exception = e;
- continue;
- } catch (MQBrokerException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
- log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
- exception = e;
- switch (e.getResponseCode()) {
- case ResponseCode.TOPIC_NOT_EXIST:
- case ResponseCode.SERVICE_NOT_AVAILABLE:
- case ResponseCode.SYSTEM_ERROR:
- case ResponseCode.NO_PERMISSION:
- case ResponseCode.NO_BUYER_ID:
- case ResponseCode.NOT_IN_CURRENT_UNIT:
- continue;
- default:
- if (sendResult != null) {
- return sendResult;
- }
- throw e;
- }
- } catch (InterruptedException e) {
- endTimestamp = System.currentTimeMillis();
- this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
- log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
- log.warn(msg.toString());
- log.warn("sendKernelImpl exception", e);
- log.warn(msg.toString());
- throw e;
- }
- } else {
- break;
- }
- }
- if (sendResult != null) {
- return sendResult;
- }
- String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
- times,
- System.currentTimeMillis() - beginTimestampFirst,
- msg.getTopic(),
- Arrays.toString(brokersSent));
- info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);
- MQClientException mqClientException = new MQClientException(info, exception);
- if (callTimeout) {
- throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
- }
- if (exception instanceof MQBrokerException) {
- mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());
- } else if (exception instanceof RemotingConnectException) {
- mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);
- } else if (exception instanceof RemotingTimeoutException) {
- mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);
- } else if (exception instanceof MQClientException) {
- mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);
- }
- throw mqClientException;
- }
- List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
- if (null == nsList || nsList.isEmpty()) {
- throw new MQClientException(
- "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
- }
- throw new MQClientException("No route info of this topic," + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
- null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
- }
其中 CommunicationMode 参数会根据调用的 API 进行如上所说进行发送类型的设置
而 SendCallback 参数, 只有当使用异步发送的 API 时才不是 null
首先调用 makeSureStateOK 方法, 检查 Producer 是否启动:
- private void makeSureStateOK() throws MQClientException {
- if (this.serviceState != ServiceState.RUNNING) {
- throw new MQClientException("The producer service state not OK,"
- + this.serviceState
- + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
- null);
- }
- }
serviceState 在上一篇博客中介绍过了
在检查完 Producer 的状态后, 还需要通过 Validators 的 checkTopic 方法验证 Message 的合法性:
- public static void checkTopic(String topic) throws MQClientException {
- if (UtilAll.isBlank(topic)) {
- throw new MQClientException("The specified topic is blank", null);
- }
- if (!regularExpressionMatcher(topic, PATTERN)) {
- throw new MQClientException(String.format(
- "The specified topic[%s] contains illegal characters, allowing only %s", topic,
- VALID_PATTERN_STR), null);
- }
- if (topic.length()> CHARACTER_MAX_LENGTH) {
- throw new MQClientException("The specified topic is longer than topic max length 255.", null);
- }
- //whether the same with system reserved keyword
- if (topic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
- throw new MQClientException(
- String.format("The topic[%s] is conflict with AUTO_CREATE_TOPIC_KEY_TOPIC.", topic), null);
- }
- }
验证完毕后, 记录开始时间戳, 预示着发送的真正开始
接着调用 tryToFindTopicPublishInfo, 根据 Topic 获取路由信息
tryToFindTopicPublishInfo 方法:
- private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
- TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
- if (null == topicPublishInfo || !topicPublishInfo.ok()) {
- this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
- topicPublishInfo = this.topicPublishInfoTable.get(topic);
- }
- if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
- return topicPublishInfo;
- } else {
- this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
- topicPublishInfo = this.topicPublishInfoTable.get(topic);
- return topicPublishInfo;
- }
- }
在 Producer 启动中已经介绍过了 topicPublishInfoTable, 是一张记录有关 Topic 的路由信息的 map, 先尝试获取是否有存在的 TopicPublishInfo
若是不存在, 或者消息队列不可用 (ok 不成立):
- public boolean ok() {
- return null != this.messageQueueList && !this.messageQueueList.isEmpty();
- }
ok 用来验证该路由上的消息队列是否可用
需要创建一个新的 TopicPublishInfo 放在 map 中, 然后调用 updateTopicRouteInfoFromNameServer 来更新路由信息, updateTopicRouteInfoFromNameServer 在上一篇说过, 在定时任务中会使用, 这里就是为了及时更新
若是存在, 且有路由信息消息队列可用, 则直接返回 topicPublishInfo
否则还需要调用 updateTopicRouteInfoFromNameServer 来进行一次更新
回到 sendDefaultImpl, 在取得到路由信息后, 现设置 callTimeout 超时响应为 false, 用于处理发送超时
接着根据发送方式 CommunicationMode, 计算如果发送失败, 允许重发的次数, 这里是针对同步发送, 默认 1+2 共三次, 其他两种模式只允许发送一次
根据发送次数, 创建一个记录 BrokerName 的数组, 再由发送次数进行 for 循环
首先根据 topicPublishInfo 和 lastBrokerName 调用 selectOneMessageQueue 选取指定的消息队列, 是由 TopicPublishInfo 的 selectOneMessageQueue 方法实现的:
- public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
- if (lastBrokerName == null) {
- return selectOneMessageQueue();
- } else {
- int index = this.sendWhichQueue.getAndIncrement();
- for (int i = 0; i <this.messageQueueList.size(); i++) {
- int pos = Math.abs(index++) % this.messageQueueList.size();
- if (pos < 0)
- pos = 0;
- MessageQueue mq = this.messageQueueList.get(pos);
- if (!mq.getBrokerName().equals(lastBrokerName)) {
- return mq;
- }
- }
- return selectOneMessageQueue();
- }
- }
- public MessageQueue selectOneMessageQueue() {
- int index = this.sendWhichQueue.getAndIncrement();
- int pos = Math.abs(index) % this.messageQueueList.size();
- if (pos < 0)
- pos = 0;
- return this.messageQueueList.get(pos);
- }
当 lastBrokerName 等于 null, 使用 selectOneMessageQueue 的无参方法, 其中 sendWhichQueue 我在上一篇介绍过, 不同线程通过 getAndIncrement 获得到的 index 是一个随机值
根据这个 index 对 messageQueueList 取余, 来获取在 list 中的下标, 根据这个下标在 messageQueueList 中选取一个 MessageQueue
由于不同的 MessageQueue 有不同的路由信息, 所里在这里其实是为了负载均衡, 保证每次发送能发送给不同的 broker
若是 lastBrokerName 不等于 null, 还是和上面相似, 只不过当选取到了 MessageQueue 时, 要和 lastBrokerName 比较, 当不想同时, 才返回, 同样也是为了保证不向同一 broker 重复发送来保证负载均衡
回到 sendDefaultImpl, 在选取完 MessageQueue 后, 记录 BrokerName, 在计算是否达到超时事件, 当这些成功后需要调用 sendKernelImpl 来完成真正的发送:
sendKernelImpl 方法:
- private SendResult sendKernelImpl(final Message msg,
- final MessageQueue mq,
- final CommunicationMode communicationMode,
- final SendCallback sendCallback,
- final TopicPublishInfo topicPublishInfo,
- final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
- long beginStartTime = System.currentTimeMillis();
- String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- if (null == brokerAddr) {
- tryToFindTopicPublishInfo(mq.getTopic());
- brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
- }
- SendMessageContext context = null;
- if (brokerAddr != null) {
- brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
- byte[] prevBody = msg.getBody();
- try {
- //for MessageBatch,ID has been set in the generating process
- if (!(msg instanceof MessageBatch)) {
- MessageClientIDSetter.setUniqID(msg);
- }
- int sysFlag = 0;
- boolean msgBodyCompressed = false;
- if (this.tryToCompressMessage(msg)) {
- sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
- msgBodyCompressed = true;
- }
- final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
- sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
- }
- if (hasCheckForbiddenHook()) {
- CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
- checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
- checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
- checkForbiddenContext.setCommunicationMode(communicationMode);
- checkForbiddenContext.setBrokerAddr(brokerAddr);
- checkForbiddenContext.setMessage(msg);
- checkForbiddenContext.setMq(mq);
- checkForbiddenContext.setUnitMode(this.isUnitMode());
- this.executeCheckForbiddenHook(checkForbiddenContext);
- }
- if (this.hasSendMessageHook()) {
- context = new SendMessageContext();
- context.setProducer(this);
- context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- context.setCommunicationMode(communicationMode);
- context.setBornHost(this.defaultMQProducer.getClientIP());
- context.setBrokerAddr(brokerAddr);
- context.setMessage(msg);
- context.setMq(mq);
- String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
- if (isTrans != null && isTrans.equals("true")) {
- context.setMsgType(MessageType.Trans_Msg_Half);
- }
- if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
- context.setMsgType(MessageType.Delay_Msg);
- }
- this.executeSendMessageHookBefore(context);
- }
- SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
- requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
- requestHeader.setTopic(msg.getTopic());
- requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
- requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
- requestHeader.setQueueId(mq.getQueueId());
- requestHeader.setSysFlag(sysFlag);
- requestHeader.setBornTimestamp(System.currentTimeMillis());
- requestHeader.setFlag(msg.getFlag());
- requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
- requestHeader.setReconsumeTimes(0);
- requestHeader.setUnitMode(this.isUnitMode());
- requestHeader.setBatch(msg instanceof MessageBatch);
- if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
- String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
- if (reconsumeTimes != null) {
- requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
- MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
- }
- String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
- if (maxReconsumeTimes != null) {
- requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
- MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
- }
- }
- SendResult sendResult = null;
- switch (communicationMode) {
- case ASYNC:
- Message tmpMessage = msg;
- if (msgBodyCompressed) {
- //If msg body was compressed, msgbody should be reset using prevBody.
- //Clone new message using commpressed message body and recover origin massage.
- //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
- tmpMessage = MessageAccessor.cloneMessage(msg);
- msg.setBody(prevBody);
- }
- long costTimeAsync = System.currentTimeMillis() - beginStartTime;
- if (timeout < costTimeAsync) {
- throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
- }
- sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
- brokerAddr,
- mq.getBrokerName(),
- tmpMessage,
- requestHeader,
- timeout - costTimeAsync,
- communicationMode,
- sendCallback,
- topicPublishInfo,
- this.mQClientFactory,
- this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
- context,
- this);
- break;
- case ONEWAY:
- case SYNC:
- long costTimeSync = System.currentTimeMillis() - beginStartTime;
- if (timeout < costTimeSync) {
- throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
- }
- sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
- brokerAddr,
- mq.getBrokerName(),
- msg,
- requestHeader,
- timeout - costTimeSync,
- communicationMode,
- context,
- this);
- break;
- default:
- assert false;
- break;
- }
- if (this.hasSendMessageHook()) {
- context.setSendResult(sendResult);
- this.executeSendMessageHookAfter(context);
- }
- return sendResult;
- } catch (RemotingException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- } catch (MQBrokerException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- } catch (InterruptedException e) {
- if (this.hasSendMessageHook()) {
- context.setException(e);
- this.executeSendMessageHookAfter(context);
- }
- throw e;
- } finally {
- msg.setBody(prevBody);
- }
- }
- throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
- }
先记录开始时间 beginStartTime, 为可能的超时做准备
然后根据 BrokerName 来获取对应的 Broker 地址
findBrokerAddressInPublish 方法:
- public String findBrokerAddressInPublish(final String brokerName) {
- HashMap<Long/* brokerId */, String/* address */> map = this.brokerAddrTable.get(brokerName);
- if (map != null && !map.isEmpty()) {
- return map.get(MixAll.MASTER_ID);
- }
- return null;
- }
根据 brokerName 在 brokerAddrTable 表中进行查找
若是没有找到还是通过 tryToFindTopicPublishInfo 来进行更新, 然后再通过 findBrokerAddressInPublish 重新查找
再往后, 如果设置了 VIP(高优先级队列) 通道, 那么这里将根据 brokerAddr 获取 VIP 通道的的地址:
MixAll 的 brokerVIPChannel 方法:
- public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {
- if (isChange) {
- String[] ipAndPort = brokerAddr.split(":");
- String brokerAddrNew = ipAndPort[0] + ":" + (Integer.parseInt(ipAndPort[1]) - 2);
- return brokerAddrNew;
- } else {
- return brokerAddr;
- }
- }
VIP 通道的地址计算很简单, 只是将端口号减去 2
在设置完后, 就是一大堆的配置了
这里定义了一个 sysFlag 的整型值, 表示消息的类型, 有如下取值:
- public class MessageSysFlag {
- public final static int COMPRESSED_FLAG = 0x1;
- public final static int MULTI_TAGS_FLAG = 0x1 <<1;
- public final static int TRANSACTION_NOT_TYPE = 0;
- public final static int TRANSACTION_PREPARED_TYPE = 0x1 << 2;
- public final static int TRANSACTION_COMMIT_TYPE = 0x2 << 2;
- public final static int TRANSACTION_ROLLBACK_TYPE = 0x3 << 2;
- }
还定义了一个 msgBodyCompressed, 表示消息是否经过压缩, tryToCompressMessage 判断并对消息进行压缩:
tryToCompressMessage 方法:
- private boolean tryToCompressMessage(final Message msg) {
- if (msg instanceof MessageBatch) {
- //batch dose not support compressing right now
- return false;
- }
- byte[] body = msg.getBody();
- if (body != null) {
- if (body.length>= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
- try {
- byte[] data = UtilAll.compress(body, zipCompressLevel);
- if (data != null) {
- msg.setBody(data);
- return true;
- }
- } catch (IOException e) {
- log.error("tryToCompressMessage exception", e);
- log.warn(msg.toString());
- }
- }
- }
- return false;
- }
当消息大小大于等于 compressMsgBodyOverHowmuch(默认 4M) 时, 使用 UtilAll 的 compress 消息进行压缩处理:
- public static byte[] compress(final byte[] src, final int level) throws IOException {
- byte[] result = src;
- ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(src.length);
- java.util.zip.Deflater defeater = new java.util.zip.Deflater(level);
- DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(byteArrayOutputStream, defeater);
- try {
- deflaterOutputStream.write(src);
- deflaterOutputStream.finish();
- deflaterOutputStream.close();
- result = byteArrayOutputStream.toByteArray();
- } catch (IOException e) {
- defeater.end();
- throw e;
- } finally {
- try {
- byteArrayOutputStream.close();
- } catch (IOException ignored) {
- }
- defeater.end();
- }
- return result;
- }
这里采用 zip 的方式进行消息压缩
接下来, 根据消息是否是事务消息来选择设置 sysFlag, 关于事务消息在后面博客再说
接下来检查是否设置了 CheckForbiddenHook, 若是设置了需要遍历所有的 CheckForbiddenHook, 执行其 checkForbidden 方法, 来完成禁发
同理检查是否设置了 SendMessageHook, 遍历所有的 SendMessageHook, 执行其 sendMessageBefore 方法, 在消息发送完毕后, 会执行其 sendMessageAfter 方法
接着会对请求头 requestHeader 进行一大堆设置, 做完这些后, 进入 switch 块, 根据不同的发送方式做了相应检查
最后无论是哪种发送方式, 都会调用 MQClientAPIImpl 的 sendMessage 方法:
- public SendResult sendMessage(
- final String addr,
- final String brokerName,
- final Message msg,
- final SendMessageRequestHeader requestHeader,
- final long timeoutMillis,
- final CommunicationMode communicationMode,
- final SendCallback sendCallback,
- final TopicPublishInfo topicPublishInfo,
- final MQClientInstance instance,
- final int retryTimesWhenSendFailed,
- final SendMessageContext context,
- final DefaultMQProducerImpl producer
- ) throws RemotingException, MQBrokerException, InterruptedException {
- long beginStartTime = System.currentTimeMillis();
- RemotingCommand request = null;
- if (sendSmartMsg || msg instanceof MessageBatch) {
- SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
- request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
- } else {
- request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
- }
- request.setBody(msg.getBody());
- switch (communicationMode) {
- case ONEWAY:
- this.remotingClient.invokeOneway(addr, request, timeoutMillis);
- return null;
- case ASYNC:
- final AtomicInteger times = new AtomicInteger();
- long costTimeAsync = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis <costTimeAsync) {
- throw new RemotingTooMuchRequestException("sendMessage call timeout");
- }
- this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, context, producer);
- return null;
- case SYNC:
- long costTimeSync = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTimeSync) {
- throw new RemotingTooMuchRequestException("sendMessage call timeout");
- }
- return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
- default:
- assert false;
- break;
- }
- return null;
- }
首先会根据消息的类型, 设置不同类型的请求 RemotingCommand
在完成请求的封装后, 还是根据发送方式来执行
ONEWAY 方式:
会直接调用 remotingClient 即 Netty 客户端的 invokeOneway 方法:
- public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
- RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
- final Channel channel = this.getAndCreateChannel(addr);
- if (channel != null && channel.isActive()) {
- try {
- doBeforeRpcHooks(addr, request);
- this.invokeOnewayImpl(channel, request, timeoutMillis);
- } catch (RemotingSendRequestException e) {
- log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
- this.closeChannel(addr, channel);
- throw e;
- }
- } else {
- this.closeChannel(addr, channel);
- throw new RemotingConnectException(addr);
- }
- }
首先根据 broker 的地址在 channelTables 中选取一个 Channel(上一篇博客介绍过在 Netty 客户端会缓存一张建立好连接的 Channel 的 map 即 channelTables)
然后和前面相似, 执行所有配置了的 RPCHook 的 doBeforeRequest 方法
之后执行 invokeOnewayImpl 方法:
- public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
- throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
- request.markOnewayRPC();
- boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
- if (acquired) {
- final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway);
- try {
- channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws Exception {
- once.release();
- if (!f.isSuccess()) {
- log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed.");
- }
- }
- });
- } catch (Exception e) {
- once.release();
- log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed.");
- throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
- }
- } else {
- if (timeoutMillis <= 0) {
- throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast");
- } else {
- String info = String.format(
- "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
- timeoutMillis,
- this.semaphoreOneway.getQueueLength(),
- this.semaphoreOneway.availablePermits()
- );
- log.warn(info);
- throw new RemotingTimeoutException(info);
- }
- }
- }
首先对 request 的标志位进行设置:
- public void markOnewayRPC() {
- int bits = 1 <<RPC_ONEWAY;
- this.flag |= bits;
- }
接着会使用一个信号量 SemaphoreReleaseOnlyOnce, 会保证该信号量被释放一次
最后调用 Netty 的 writeAndFlush 方法, 进行 request 的发送, 同时设置了异步监听, 用于成功后信号量的释放
由于是单向发送, 发送完成后并没有过多的处理
ASYNC 方式:
调用 sendMessageAsync 方法:
- private void sendMessageAsync(
- final String addr,
- final String brokerName,
- final Message msg,
- final long timeoutMillis,
- final RemotingCommand request,
- final SendCallback sendCallback,
- final TopicPublishInfo topicPublishInfo,
- final MQClientInstance instance,
- final int retryTimesWhenSendFailed,
- final AtomicInteger times,
- final SendMessageContext context,
- final DefaultMQProducerImpl producer
- ) throws InterruptedException, RemotingException {
- this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
- @Override
- public void operationComplete(ResponseFuture responseFuture) {
- RemotingCommand response = responseFuture.getResponseCommand();
- if (null == sendCallback && response != null) {
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
- if (context != null && sendResult != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
- } catch (Throwable e) {
- }
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- return;
- }
- if (response != null) {
- try {
- SendResult sendResult = MQClientAPIImpl.this.processSendResponse(brokerName, msg, response);
- assert sendResult != null;
- if (context != null) {
- context.setSendResult(sendResult);
- context.getProducer().executeSendMessageHookAfter(context);
- }
- try {
- sendCallback.onSuccess(sendResult);
- } catch (Throwable e) {
- }
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), false);
- } catch (Exception e) {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, e, context, false, producer);
- }
- } else {
- producer.updateFaultItem(brokerName, System.currentTimeMillis() - responseFuture.getBeginTimestamp(), true);
- if (!responseFuture.isSendRequestOK()) {
- MQClientException ex = new MQClientException("send request failed", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- } else if (responseFuture.isTimeout()) {
- MQClientException ex = new MQClientException("wait response timeout" + responseFuture.getTimeoutMillis() + "ms",
- responseFuture.getCause());
- onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- } else {
- MQClientException ex = new MQClientException("unknow reseaon", responseFuture.getCause());
- onExceptionImpl(brokerName, msg, 0L, request, sendCallback, topicPublishInfo, instance,
- retryTimesWhenSendFailed, times, ex, context, true, producer);
- }
- }
- }
- });
- }
在这里设置了一个 InvokeCallback, 用于处理发送之后的回调
先看到 invokeAsync 方法:
- public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
- throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
- RemotingSendRequestException {
- long beginStartTime = System.currentTimeMillis();
- final Channel channel = this.getAndCreateChannel(addr);
- if (channel != null && channel.isActive()) {
- try {
- doBeforeRpcHooks(addr, request);
- long costTime = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTime) {
- throw new RemotingTooMuchRequestException("invokeAsync call timeout");
- }
- this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, invokeCallback);
- } catch (RemotingSendRequestException e) {
- log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
- this.closeChannel(addr, channel);
- throw e;
- }
- } else {
- this.closeChannel(addr, channel);
- throw new RemotingConnectException(addr);
- }
- }
和前面 ONEWAY 类似, 其具体实现是 invokeAsyncImpl
invokeAsyncImpl 方法:
- public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
- final InvokeCallback invokeCallback)
- throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
- long beginStartTime = System.currentTimeMillis();
- final int opaque = request.getOpaque();
- boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
- if (acquired) {
- final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
- long costTime = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis < costTime) {
- once.release();
- throw new RemotingTimeoutException("invokeAsyncImpl call timeout");
- }
- final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
- this.responseTable.put(opaque, responseFuture);
- try {
- channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws Exception {
- if (f.isSuccess()) {
- responseFuture.setSendRequestOK(true);
- return;
- }
- requestFail(opaque);
- log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
- }
- });
- } catch (Exception e) {
- responseFuture.release();
- log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
- throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
- }
- } else {
- if (timeoutMillis <= 0) {
- throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
- } else {
- String info =
- String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
- timeoutMillis,
- this.semaphoreAsync.getQueueLength(),
- this.semaphoreAsync.availablePermits()
- );
- log.warn(info);
- throw new RemotingTimeoutException(info);
- }
- }
- }
这里会通过 request 的 getOpaque 方法获取一个 opaque 值, 这个值在 request 创建时就会被赋值, 是一个自增的 AtomicInteger, 也就是每个 request 的唯一 ID
之后会创建一个 ResponseFuture 封装 invokeCallback 及 channel, 并将其放入 responseTable 中
responseTable 是一个 map:
- protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =
- new ConcurrentHashMap<Integer, ResponseFuture>(256);
其记录了 requestID 对应的 ResponseFuture, 用于管理异步发送后, 对接收到响应的异步事件处理
也就是说当发送完毕, 接收到响应消息, 会通过 requestID 查找到对应的 ResponseFuture, 进而执行刚才设置的 InvokeCallback 中的方法, 在 InvokeCallback 中, 会执行 processSendResponse 方法, 完成 Broker 回送的响应消息的处理, 最终根据情况会执行用户传入的 SendCallback 的 onSuccess 或者 onException 方法, 以此完成消息的异步发送
之后的步骤和 ONEWAY 一样, 由 Netty 的 writeAndFlush 完成发送
SYNC 方式:
调用 sendMessageSync 方法:
- private SendResult sendMessageSync(
- final String addr,
- final String brokerName,
- final Message msg,
- final long timeoutMillis,
- final RemotingCommand request
- ) throws RemotingException, MQBrokerException, InterruptedException {
- RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
- assert response != null;
- return this.processSendResponse(brokerName, msg, response);
- }
首先调用 Netty 客户端的 invokeSync 方法:
invokeSync 方法:
- public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
- throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
- long beginStartTime = System.currentTimeMillis();
- final Channel channel = this.getAndCreateChannel(addr);
- if (channel != null && channel.isActive()) {
- try {
- doBeforeRpcHooks(addr, request);
- long costTime = System.currentTimeMillis() - beginStartTime;
- if (timeoutMillis <costTime) {
- throw new RemotingTimeoutException("invokeSync call timeout");
- }
- RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);
- doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
- return response;
- } catch (RemotingSendRequestException e) {
- log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
- this.closeChannel(addr, channel);
- throw e;
- } catch (RemotingTimeoutException e) {
- if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
- this.closeChannel(addr, channel);
- log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
- }
- log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
- throw e;
- }
- } else {
- this.closeChannel(addr, channel);
- throw new RemotingConnectException(addr);
- }
- }
还是和前面类似的步骤
直接看到 invokeSyncImpl 方法:
- public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
- final long timeoutMillis)
- throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
- final int opaque = request.getOpaque();
- try {
- final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);
- this.responseTable.put(opaque, responseFuture);
- final SocketAddress addr = channel.remoteAddress();
- channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture f) throws Exception {
- if (f.isSuccess()) {
- responseFuture.setSendRequestOK(true);
- return;
- } else {
- responseFuture.setSendRequestOK(false);
- }
- responseTable.remove(opaque);
- responseFuture.setCause(f.cause());
- responseFuture.putResponse(null);
- log.warn("send a request command to channel <" + addr + "> failed.");
- }
- });
- RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
- if (null == responseCommand) {
- if (responseFuture.isSendRequestOK()) {
- throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
- responseFuture.getCause());
- } else {
- throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());
- }
- }
- return responseCommand;
- } finally {
- this.responseTable.remove(opaque);
- }
- }
和 ASYNC 基本一致, 只不过在完成 writeAndFlush 后, 使用 responseFuture 的 waitResponse 方法, 在超时时间内进行等待 response 的回送
若是发送失败, 则会在 DefaultMQProducerImpl 的 sendDefaultImpl 中的 for 循环继续, 直至发送完成或者发送此时用完
若是在超时时间内, 接收到 Broker 的回送 response, 在 invokeSync 中会执行配置了的 RPCHook 的 doAfterResponse 方法, 然后在 sendMessageSync 中由 processSendResponse 处理接收到的响应
到此 Producer 的消息发送结束
来源: https://www.cnblogs.com/a526583280/p/11290538.html