一, 问题答案
是不可以的
而且后注册的会替换前注册的, MqConsumer2 会替换 MqConsumer, 并且只结束 tag-2 的消息
- /**
- * @date 2019/05/28
- */
- @Component
- @Slf4j
- public class MqConsumer implements MessageConsumer {
- @Override
- @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED)
- public void onMessage(String msg) {
- log.info("接收到的库存 MQ 消息:{}", msg);
- log.info("接收到的库存 MQ 消息:{}", msg);
- log.info("接收到的库存 MQ 消息:{}", msg);
- }
- @Override
- public String getTopic() {
- return "topic-1";
- }
- @Override
- public String getTag() {
- return "tag-1";
- }
- }
- @Component
- @Slf4j
- public class MqConsumer2 implements MessageConsumer {
- @Override
- @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRED)
- public void onMessage(String msg) {
- log.info("接收到的库存 MQ 消息:{}", msg);
- log.info("接收到的库存 MQ 消息:{}", msg);
- log.info("接收到的库存 MQ 消息:{}", msg);
- }
- @Override
- public String getTopic() {
- return "topic-1";
- }
- @Override
- public String getTag() {
- return "tag-2";
- }
- }
二, 为什么呢?
我们从源码的角度来分析下
1. 订阅消息的方法 public void subscribe(String topic, String subExpression, MessageListener listener) , 其中 subExpression 即为 tag
- package com.aliyun.openservices.ons.API.impl.rocketmq;
- ....
- @Generated("ons-client")
- public class ConsumerImpl extends ONSConsumerAbstract implements Consumer {
- private final ConcurrentHashMap<String, MessageListener> subscribeTable = new ConcurrentHashMap<String, MessageListener>();
- public ConsumerImpl(final Properties properties) {
- super(properties);
- boolean postSubscriptionWhenPull = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.PostSubscriptionWhenPull, "false"));
- this.defaultMQPushConsumer.setPostSubscriptionWhenPull(postSubscriptionWhenPull);
- String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
- this.defaultMQPushConsumer.setMessageModel(MessageModel.valueOf(messageModel));
- }
- @Override
- public void start() {
- this.defaultMQPushConsumer.registerMessageListener(new MessageListenerImpl());
- super.start();
- }
- @Override
- public void subscribe(String topic, String subExpression, MessageListener listener) {
- if (null == topic) {
- throw new ONSClientException("topic is null");
- }
- if (null == listener) {
- throw new ONSClientException("listener is null");
- }
- this.subscribeTable.put(topic, listener);
- super.subscribe(topic, subExpression);
- }
- .....
- }
从上面的类中我们可以从 this.subscribeTable.put(topic, listener); 看到 subscribeTable 这样的一个 Map, 该 Map 与 tag 无关
2. 我们再看 super.subscribe(topic, subExpression) 方法, 属于 ONSConsumerAbstract 类中
- protected void subscribe(String topic, String subExpression) {
- try {
- this.defaultMQPushConsumer.subscribe(topic, subExpression);
- } catch (MQClientException e) {
- throw new ONSClientException("defaultMQPushConsumer subscribe exception", e);
- }
- }
DefaultMQPushConsumer 中:
- @Override
- public void subscribe(String topic, String subExpression) throws MQClientException {
- this.defaultMQPushConsumerImpl.subscribe(withNamespace(topic), subExpression);
- }
DefaultMQPushConsumerImpl 中:
- public void subscribe(String topic, String subExpression) throws MQClientException {
- try {
- // 此处用来构建订阅数据, 并且指定了 tag
- SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
- topic, subExpression);
- // 此处将 topic 和该 topic 的订阅数据存放到 subscriptionInner 这个 Map 中
- // protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner = new ConcurrentHashMap<String, SubscriptionData>();
- this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
- if (this.mQClientFactory != null) {
- this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
- }
- } catch (Exception e) {
- throw new MQClientException("subscription exception", e);
- }
- }
三, 总结
从上面简单的源码可以看到, 有用到两个 Map,
subscribeTable 和 subscriptionInner , 并且 Map 的 key 都为 topic. 所以我们可以笃定, RocketMQ 在同一个项目中, 只支持注册一个 topic 消费者, 那么也就只能指定一个 tag
来源: http://www.bubuko.com/infodetail-3085771.html