RocketMQ 顺序消费
如果要保证顺序消费, 那么他的核心点就是: 生产者有序存储, 消费者有序消费.
一, 概念
1, 什么是无序消息
无序消息 无序消息也指普通的消息, Producer 只管发送消息, Consumer 只管接收消息, 至于消息和消息之间的顺序并没有保证.
举例 Producer 依次发送 orderId 为 1,2,3 的消息, Consumer 接到的消息顺序有可能是 1,2,3, 也有可能是 2,1,3 等情况, 这就是普通消息.
2, 什么是全局顺序
对于指定的一个 Topic, 所有消息按照严格的先入先出 (FIFO) 的顺序进行发布和消费.
举例 比如 Producer 发送 orderId 1,3,2 的消息, 那么 Consumer 也必须要按照 1,3,2 的顺序进行消费.
3, 局部顺序
在实际开发有些场景中, 我并不需要消息完全按照完全按的先进先出, 而是某些消息保证先进先出就可以了.
就好比一个订单涉及 订单生成, 订单支付, 订单完成. 我不用管其它的订单, 只保证同样订单 ID 能保证这个顺序就可以了.
二, 实现原理
我们知道 生产的 message 最终会存放在 Queue 中, 如果一个 Topic 关联了 16 个 Queue, 如果我们不指定消息往哪个队列里放, 那么默认是平均分配消息到 16 个 queue,
好比有 100 条消息, 那么这 100 条消息会平均分配在这 16 个 Queue 上, 那么每个 Queue 大概放 5~6 个左右. 这里有一点很重的是:
同一个 queue, 存储在里面的 message 是按照先进先出的原则
这个时候思路就来了, 好比有 orderId=1 的 3 条消息, 分别是 订单生产, 订单付款, 订单完成. 只要保证它们放到同一个 Queue 那就保证消费者先进先出了.
这就保证局部顺序了, 即同一订单按照先后顺序放到同一 Queue, 那么取消息的时候就可以保证先进先取出.
那么全局消息呢?
这个就简单啦, 你把所有消息都放在一个 Queue 里, 这样不就保证全局消息了.
就这么简单
当然不是, 这里还有很关键的一点, 好比在一个消费者集群的情况下, 消费者 1 先去 Queue 拿消息, 它拿到了 订单生成, 它拿完后, 消费者 2 去 queue 拿到的是 订单支付.
拿的顺序是没毛病了, 但关键是先拿到不代表先消费完它. 会存在虽然你消费者 1 先拿到订单生成, 但由于网络等原因, 消费者 2 比你真正的先消费消息. 这是不是很尴尬了.
订单付款还是可能会比订单生成更早消费的情况. 那怎么办.
分布式锁来了
Rocker 采用的是分段锁, 它不是锁整个 Broker 而是锁里面的单个 Queue, 因为只要锁单个 Queue 就可以保证局部顺序消费了.
所以最终的消费者这边的逻辑就是
消费者 1 去 Queue 拿 订单生成, 它就锁住了整个 Queue, 只有它消费完成并返回成功后, 这个锁才会释放.
然后下一个消费者去拿到 订单支付 同样锁住当前 Queue, 这样的一个过程来真正保证对同一个 Queue 能够真正意义上的顺序消费, 而不仅仅是顺序取出.
全局顺序与分区顺序对比
消息类型对比
发送方式对比
其它的注意事项
, 顺序消息暂不支持广播模式.
, 顺序消息不支持异步发送方式, 否则将无法严格保证顺序.
, 建议同一个 Group ID 只对应一种类型的 Topic, 即不同时用于顺序消息和无序消息的收发.
, 对于全局顺序消息, 建议创建实例个数>=2.
三, 代码示例
这里保证两点
, 生产端 同一 orderID 的订单放到同一个 queue.
, 消费端 同一个 queue 取出消息的时候锁住整个 queue, 直到消费后再解锁.
1,ProductOrder 实体
- @AllArgsConstructor
- @Data
- @ToString
- public class ProductOrder {
- /**
- * 订单编号
- */
- private String orderId;
- /**
- * 订单类型(订单创建, 订单付款, 订单完成)
- */
- private String type;
- }
- 2,Product(生产者)
生产者和之前发送普通消息最大的区别, 就是针对每一个 message 都手动通过 MessageQueueSelector 选择好 queue.
@RestController public class Product { private static List<ProductOrder> orderList = null; private static String producerGroup = "test_producer"; /** * 模拟数据 */ static { orderList = new ArrayList<>(); orderList.add(new ProductOrder("XXX001", "订单创建")); orderList.add(new ProductOrder("XXX001", "订单付款")); orderList.add(new ProductOrder("XXX001", "订单完成")); orderList.add(new ProductOrder("XXX002", "订单创建")); orderList.add(new ProductOrder("XXX002", "订单付款")); orderList.add(new ProductOrder("XXX002", "订单完成")); orderList.add(new ProductOrder("XXX003", "订单创建")); orderList.add(new ProductOrder("XXX003", "订单付款")); orderList.add(new ProductOrder("XXX003", "订单完成")); } @GetMapping("message") public void sendMessage() throws Exception { // 示例生产者 DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 不开启 vip 通道 开通口端口会减 2 producer.setVipChannelEnabled(false); // 绑定 name server producer.setNamesrvAddr("IP:9876"); producer.start(); for (ProductOrder order : orderList) { //1, 生成消息 Message message = new Message(JmsConfig.TOPIC, "", order.getOrderId(), order.toString().getBytes()); //2, 发送消息是 针对每条消息选择对应的队列 SendResult sendResult = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //3,arg 的值其实就是下面传入 orderId String orderid = (String) arg; //4, 因为订单是 String 类型, 所以通过 hashCode 转成 int 类型 int hashCode = orderid.hashCode(); //5, 因为 hashCode 可能为负数 所以取绝对值 hashCode = Math.abs(hashCode); //6, 保证同一个订单号 一定分配在同一个 queue 上 long index = hashCode % mqs.size(); return mqs.get((int) index); } }, order.getOrderId(),50000); System.out.printf("Product: 发送状态 =%s, 存储 queue=%s ,orderid=%s, type=%s\n", sendResult.getSendStatus(), sendResult.getMessageQueue().getQueueId(), order.getOrderId(), order.getType()); } producer.shutdown(); } }
看看生产者有没有把相同订单指定到同一个 queue
通过测试结果可以看出: 相同订单已经存到同一 queue 中了.
3,Consumer(生产者)
上面说过, 消费者真正要达到消费顺序, 需要分布式锁, 所以这里需要将 MessageListenerOrderly 替换之前的 MessageListenerConcurrently, 因为它里面实现了分布式锁.
@Slf4j @Component public class Consumer { /** * 消费者实体对象 */ private DefaultMQPushConsumer consumer; /** * 消费者组 */ public static final String CONSUMER_GROUP = "consumer_group"; /** * 通过构造函数 实例化对象 */ public Consumer() throws MQClientException { consumer = new DefaultMQPushConsumer(CONSUMER_GROUP); consumer.setNamesrvAddr("IP:9876"); //TODO 这里真的是个坑, 我 product 设置 VipChannelEnabled(false), 但消费者并没有设置这个参数, 之前发送普通消息的时候也没有问题. 能正常消费. //TODO 但在顺序消息时, consumer 一直不消费消息了, 找了好久都没有找到原因, 直到我这里也设置为 VipChannelEnabled(false), 竟然才可以消费消息. consumer.setVipChannelEnabled(false); // 订阅主题和 标签 ( * 代表所有标签) 下信息 consumer.subscribe(JmsConfig.TOPIC, "*"); // 注册消费的监听 这里注意顺序消费为 MessageListenerOrderly 之前并发为 ConsumeConcurrentlyContext consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { // 获取消息 MessageExt msg = msgs.get(0); // 消费者获取消息 这里只输出 不做后面逻辑处理 log.info("Consumer - 线程名称 ={}, 消息 ={}", Thread.currentThread().getName(), new String(msg.getBody())); return ConsumeOrderlyStatus.SUCCESS; }); consumer.start(); } }
看看消费结果是不是我们需要的结果
通过测试结果我们看出
, 消费消息的顺序并没有完全按照之前的先进先出, 即没有满足全局顺序.
, 同一订单来讲, 订单的 订单生成, 订单支付, 订单完成 消费顺序是保证的.
这是局部保证顺序消费就已经满足我们当前实际开发中的需求了.
有关消费端选择 MessageListenerOrderly 后, consumer.start()启动相关的源码可以参考博客: RocketMQ 顺序消息消费端源码 https://www.jianshu.com/p/931e9bc25c21
只要自己变优秀了, 其他的事情才会跟着好起来(上将 4)
posted on 2019-07-05 12:29 雨点的名字 阅读(...) 评论(...) 编辑 收藏
来源: https://www.cnblogs.com/qdhxhz/p/11134903.html