考虑一个分布式场景中一个常见的场景: 服务 A 执行某个数据库操作成功后, 会发送一条消息到消息队列, 现在希望只有数据库操作执行成功才发送这条消息. 下面是一些常见的作法:
1. 先执行数据库操作, 再发送消息
- public void purchaseOrder() {
- orderDao.save(order);
- messageQueue.send(message);
- }
有可能 order 新增成功, 发送消息失败. 最终形成不一致状态.
2. 先发送消息, 再执行数据库操作
- public void purchaseOrder() {
- messageQueue.send(message);
- orderDao.save(order);
- }
有可能消息发送成功, 而 order 新增失败, 从而形成不一致状态.
3. 在数据库事务中, 先发送消息, 再执行数据库操作
- @Transactional public void purchaseOrder() {
- messageQueue.send(message);
- orderDao.save(order);
- }
这里同样无法保证一致性. 如果数据库操作成功, 然而消息已经发送了, 无法进行回滚.
4. 在数据库事务中, 先执行数据库操作, 再发送消息
- @Transactional public void purchaseOrder() {
- orderDao.save(order);
- messageQueue.send(message);
- }
这种方案成功与否, 取决于消息队列是否拥有应答机制和事务机制.
应答机制表示 producer 发送消息后, 消息队列能够返回 response 从而证明消息是否插入成功.
如果消息队列拥有应答机制, 将上面的代码改写为:
- @Transactional public void purchaseOrder() {
- orderDao.save(order); try{
- kafkaProducer.send(message).get();
- } catch(Exception e) throw new RuntimeException("Fail to send message");
- }
这段代码表示如果发送发收到消息队列错误的 response, 就抛出一个 RuntimeException. 那么消息发送失败, 能够造成数据库操作的回滚. 这个方案看似可行, 然而存在这样一种情况, 如果消息发送成功, 而消息队列由于网络原因没有即时返回 response, 此时消息发送方由于没有及时收到应答从而认为消息发送失败了, 因此消息发送方的数据库事务回滚了, 然而消息的确已经插入成功, 从而造成了最终不一致性.
上面的不一致性可以通过消息的事务机制解决.
事务机制表示消息队列中的消息是否拥有状态, 从而决定消费者是否消费该条消息.
Alibaba 旗下的开源消息队列 RocketMQ 以高可用性闻名, 它是最早支持事务消息的消息队列. Kafka 从版本 0.11 开始也支持了事务机制.
RoketMQ 的事务机制是将消息标记为 Prepared 状态或者 Confirmed 状态. 处于 Prepared 状态的消息对 consumer 不可见.
而 Kafka 通过 Transaction Marker 将消息标记为 Uncommited 或 Commited 状态. Consumer 通过配置 isolation-level 为 read_committed 或 read_uncommitted 来决定对哪种类型的消息可见.
5. 消息队列不支持事务消息
如果消息队列不支持事务消息, 那么我们的解决方案是, 新增一张 message 表, 并开启一个定时任务扫描这张 message 表, 将所有状态为 prepared 的 message 发送给消息队列, 发送成功后, 将 message 状态置为 confirmed.
代码如下:
- @Transactional public void purchaseOrder() {
- orderDao.save(order);
- messageService.save(message);
- }
此时插入 order 和插入 message 的逻辑处于同一个数据库事务, 通过后台的定时程序不断扫描 message 表, 因此一定能够保证消息被成功投递到消息消费方.
这个方案存在的一个问题是, 有可能后台任务发送消息成功后宕机了, 从而没有来得及将已发送的 message 状态置为 confirmed. 因此下一次扫描 message 表时, 会重复发送该条消息. 这就是 at least once delivery.
由于 at least once delivery 的特性, consumer 有可能收到重复的数据. 此时可以在 consumer 端建立一张 message_consume 表, 来判断消息是否已经消费过, 如果已经消费过, 那么就直接丢弃该消息.
来源: http://www.bubuko.com/infodetail-3091774.html