消息无序产生的原因
消息队列, 既然是队列就能保证消息在进入队列, 以及出队列的时候保证消息的有序性, 显然这是在消息的生产端(Producer), 但是往往在生产环境中有多个消息的消费端(Consumer), 尽管消费端在拉取消息时是有序的, 但各个消息由于网络等方面原因无法保证在各个消费端中处理时有序.
场景分析
先后两次修改了商品信息, 消息 A 和消息 B 先后同步写入 MySQL, 接着异步写入消息队列中发送消息, 此时消息队列生产端 (Producer) 按时序先后发出了 A 和 B 两条消息(消息 A 先发出, 消息 B 后发出). 按业务逻辑, 商品信息的最终状态需要以消息 A 和消息 B 综合为准.
看似一个比较常见的同步写数据库, 异步发送消息的场景, 但实际上需要保证消息的有序消费.
假设 1: 消息 A 只包含修改的商品名称, 消息 B 只包含修改的商品重量, 此时消息队列的消费端实际上不需要关注消息时序, 消息队列消费端 (Consumer) 只管消费即可.
假设 2: 消息 A 包含修改的商品名称, 重量, 消息 B 包含修改的商品名称, 此时消费端首先接收到消息 B, 后接收到消息 A, 那么消息 B 的修改就会被覆盖. 此时消息队列的消费端实际上又需要关注消息时序.
可见, 你无法保证消息中包含什么信息, 此时必须保证消息的有序消费.
业务角度如何保证消息有序消费
生产端在发送消息时, 始终保证消息是全量信息.
消费端在接收消息时, 通过缓存时间戳的方式, 消费消息时判断消息产生的时间是否最新, 如果不是则丢弃, 如果是则执行下一步.
下面通过伪代码的方式描述:
生产端伪代码
- insertWare(ware); #插入数据到数据库, 通常在插入数据库时我们只会 update 修改的字段, 而不会全量插入
- ware = selectWareById(ware.getId); #获取商品的全量信息(此时是最新的), 用于将它放入到消息队列中
- syncMq(ware); #异步发送 mq 消息 A
消费端伪代码
- ware = fetchWare(); #获取消息
- if (isLasted(ware)) #通过商品的修改时间戳判断是否是最新的修改
- TODO #执行下一步业务逻辑
- else
- return #丢弃该消息
重点在于消费端如何判断该消息是否是最新的修改也就是 isLasted 方法.
isLasted 方法
- Long modified = getCacheById(ware.getId); #获取缓存中该条商品的最新修改时间
- If (ware.getModified> modified) {
- #如果消息中商品修改时间大于缓存中的时间, 说明是最新操作
- setCacheById(ware); #将该条消息的商品修改时间戳写入到缓存中
- return true;
- } else #如果消息中的商品修改时间小于缓存中的时间, 说明该条消息属于 "历史操作", 不对其更新
- return false;
以上就是通过伪代码的方式, 描述如何通过业务手段保证消息有序消费, 重点在于全量发送信息和缓存时间戳. 在其中还有一些技术实现细节.
例如: 消费端消费消息 B, 执行到获取时间戳缓存之后, 并在重新设置新的缓存之前, 此时另一个消费端恰好也正在消费 B 它也正执行到获取时间戳缓存, 由于消息 A 此时并没有更新缓存, 消息 A 拿到的缓存仍然是旧的缓存, 这时就会存在两个消费端都认为自己所消费的消息时最新的, 造成该丢弃的消息没丢.
显然, 这是分布式线程安全问题, 分布式锁通常使用 Redis 或者 ZooKeeper, 加锁后的执行时序如下图所示.
这是从业务角度保证消息在消费端有序消费. 通过在消息发送端全量发送消息以及在消息消费端缓存时间戳就可以保证消息的有序消费.
在上述场景中是先同步写入 MySQL, 再获取商品全量数据, 接着再异步发送消息. 这一系列的步骤可以通过接 MySQL 的 binlog 实现, 在同步写入 MySQL 后, MySQL 发送 binlog 变更, 通过阿里巴巴 Canal 中间件接收 MySQL 的 binlog 变更再发送消息到消息队列.
这是一个能给程序员加 buff 的公众号 (CoderBuff)
来源: https://www.cnblogs.com/yulinfeng/p/11254925.html