MQ SDK 新增接口:
IMQSession 新增方法:
- /// <summary>
- /// 创建消息消费者
- /// </summary>
- /// <param name="topicName"> 主题名称 </param>
- /// <param name="customTopicQueueName"> 自定义 Topic 关联队列名称 </param>
- /// <param name="isPersistence"> 是否持久化 </param>
- /// <returns> 消息消费者 </returns>
- IMessageConsumer CreateTopicConsumer(string topicName, string customTopicQueueName, bool isPersistence = false);
调用方式: 消费端需要明确指定需要消费的发布订阅关联队列例如配置中心热部署, 每个配置中心实例都需要指定唯一的关联队列名
这样就可以和正常的 MAC 队列消费一样, 消费指定队列消息
实现方式, 四个步骤:
1. 创建持久化 Topic(即持久化 Exchange):
- var service = MQServiceProvider.GetDefaultMQService();
- var messageText = "abc";
- /// 创建 Topic
- using (var connection = service.CreateConnection())
- {
- var session = connection.CreateSession(MessageAckMode.IndividualAcknowledge);
- var messageCreator = service.GetMessageCreator();
- var message = messageCreator.CreateMessage(messageText);
- message.IsPersistent = true;
- var producer = session.CreateProducer();
- var topic = session.DeclareTopic(topicName, true);
- }
2. 定义消费者 Consumer:
- List<string> queueList = new List<string>() {
- "guozhiqi1",
- "guozhiqi2",
- "guozhiqi3",
- "guozhiqi4",
- "guozhiqi5",
- "guozhiqi6",
- "guozhiqi7",
- "guozhiqi8",
- "guozhiqi9",
- };
- //var service = MQServiceProvider.GetDefaultMQService();
- //var messageText = "abc" + DateTime.Now.ToShortTimeString();
- // 定义消费者
- using (var connection1 = service.CreateConnection())
- {
- var session1 = connection1.CreateSession(MessageAckMode.IndividualAcknowledge);
- foreach (var item in queueList)
- {
- session1.DeclareQueue(item, true);
- var consumer = session1.CreateTopicConsumer(topicName, item, true);
- }
- }
3. 发送消息到 Topic
- // 发送消息
- for (int i = 0; i <= 100; i++)
- {
- using (var connection = service.CreateConnection())
- {
- var session = connection.CreateSession(MessageAckMode.IndividualAcknowledge);
- var messageCreator = service.GetMessageCreator();
- var message = messageCreator.CreateMessage(messageText);
- message.IsPersistent = true;// 设置持久化
- message.TimeToLive = TimeSpan.FromSeconds(30);// 设置过期时间
- var producer = session.CreateProducer();
- var topic = session.DeclareTopic(topicName, true);
- producer.Send(message, topic);
- }
- }
4. 从队列接收消息
- Parallel.ForEach(queueList, (item) =>
- {
- while (true)
- {
- // 接收消息
- using (var connection1 = service.CreateConnection())
- {
- var session1 = connection1.CreateSession(MessageAckMode.IndividualAcknowledge);
- session1.DeclareQueue(item, true);
- var consumer = session1.CreateTopicConsumer(topicName, item, true);
- var topic = session1.DeclareTopic(topicName, true);
- var receivedmessage = consumer.Receive(topic);
- var textMessage = receivedmessage as ITextMessage;
- Assert.AreEqual(messageText, textMessage.Body);
- consumer.Acknowledge(receivedmessage);
- }
- }
- });
来源: https://www.cnblogs.com/jiagoushi/p/8678871.html