CLIENT_ACKNOWLEDGE : 客户端手动确认, 这就意味着 AcitveMQ 将不会 "自作主张" 的为你 ACK 任何消息, 开发者需要自己择机确认. 在此模式下, 开发者需要需要关注几个方法: 1) message.acknowledge(),2) ActiveMQMessageConsumer.acknowledege(),3) ActiveMQSession.acknowledge(); 其 1)和 3)是等效的, 将当前 session 中所有 consumer 中尚未 ACK 的消息都一起确认, 2)只会对当前 consumer 中那些尚未确认的消息进行确认. 开发者可以在合适的时机必须调用一次上述方法. 为了避免混乱, 对于这种 ACK 模式下, 建议一个 session 下只有一个 consumer.
我们通常会在基于 Group(消息分组)情况下会使用 CLIENT_ACKNOWLEDGE, 我们将在一个 group 的消息序列接受完毕之后确认消息(组); 不过当你认为消息很重要, 只有当消息被正确处理之后才能确认时, 也可以使用此模式 .
如果开发者忘记调用 acknowledge 方法, 将会导致当 consumer 重启后, 会接受到重复消息, 因为对于 broker 而言, 那些尚未真正 ACK 的消息被视为 "未消费".
开发者可以在当前消息处理成功之后, 立即调用 message.acknowledge()方法来 "逐个" 确认消息, 这样可以尽可能的减少因网络故障而导致消息重发的个数; 当然也可以处理多条消息之后, 间歇性的调用 acknowledge 方法来一次确认多条消息, 减少 ack 的次数来提升 consumer 的效率, 不过这仍然是一个利弊权衡的问题.
除了 message.acknowledge()方法之外, ActiveMQMessageConumser.acknowledge()和 ActiveMQSession.acknowledge()也可以确认消息, 只不过前者只会确认当前 consumer 中的消息. 其中 sesson.acknowledge()和 message.acknowledge()是等效的.
无论是 "同步"/"异步",ActiveMQ 都不会发送 STANDARD_ACK_TYPE, 直到 message.acknowledge()调用. 如果在 client 端未确认的消息个数达到 prefetchSize * 0.5 时, 会补充发送一个 ACK_TYPE 为 DELIVERED_ACK_TYPE 的确认指令, 这会触发 broker 端可以继续 push 消息到 client 端.(参看 PrefetchSubscription.acknwoledge 方法)
在 broker 端, 针对每个 Consumer, 都会保存一个因为 "DELIVERED_ACK_TYPE" 而 "拖延" 的消息个数, 这个参数为 prefetchExtension, 事实上这个值不会大于 prefetchSize * 0.5, 因为 Consumer 端会严格控制 DELIVERED_ACK_TYPE 指令发送的时机 (参见 ActiveMQMessageConsumer.ackLater 方法),broker 端通过 "prefetchExtension" 与 prefetchSize 互相配合, 来决定即将 push 给 client 端的消息个数, count = prefetchExtension + prefetchSize - dispatched.size(), 其中 dispatched 表示已经发送给 client 端但是还没有 "STANDARD_ACK_TYPE" 的消息总量; 由此可见, 在 CLIENT_ACK 模式下, 足够快速的调用 acknowledge() 方法是决定 consumer 端消费消息的速率; 如果 client 端因为某种原因导致 acknowledge 方法未被执行, 将导致大量消息不能被确认, broker 端将不会 push 消息, 事实上 client 端将处于 "假死" 状态, 而无法继续消费消息. 我们要求 client 端在消费 1.5*prefetchSize 个消息之前, 必须 acknowledge()一次; 通常我们总是每消费一个消息调用一次, 这是一种良好的设计.
此外需要额外的补充一下: 所有 ACK 指令都是依次发送给 broker 端, 在 CLIET_ACK 模式下, 消息在交付给 listener 之前, 都会首先创建一个 DELIVERED_ACK_TYPE 的 ACK 指令, 直到 client 端未确认的消息达到 "prefetchSize * 0.5" 时才会发送此 ACK 指令, 如果在此之前, 开发者调用了 acknowledge()方法, 会导致消息直接被确认(STANDARD_ACK_TYPE).broker 端通常会认为 "DELIVERED_ACK_TYPE" 确认指令是一种 "slow consumer" 信号, 如果 consumer 不能及时的对消息进行 acknowledge 而导致 broker 端阻塞, 那么此 consumer 将会被标记为 "slow", 此后 queue 中的消息将会转发给其他 Consumer.
DUPS_OK_ACKNOWLEDGE : "消息可重复" 确认, 意思是此模式下, 可能会出现重复消息, 并不是一条消息需要发送多次 ACK 才行. 它是一种潜在的 "AUTO_ACK" 确认机制, 为批量确认而生, 而且具有 "延迟" 确认的特点. 对于开发者而言, 这种模式下的代码结构和 AUTO_ACKNOWLEDGE 一样, 不需要像 CLIENT_ACKNOWLEDGE 那样调用 acknowledge()方法来确认消息.
1) 在 ActiveMQ 中, 如果在 Destination 是 Queue 通道, 我们真的可以认为 DUPS_OK_ACK 就是 "AUTO_ACK + optimizeACK + (prefetch> 0)" 这种情况, 在确认时机上几乎完全一致; 此外在此模式下, 如果 prefetchSize =1 或者没有开启 optimizeACK, 也会导致消息逐条确认, 从而失去批量确认的特性.
2) 如果 Destination 为 Topic,DUPS_OK_ACKNOWLEDGE 才会产生 JMS 规范中诠释的意义, 即无论 optimizeACK 是否开启, 都会在消费的消息个数>=prefetch * 0.5 时, 批量确认 (STANDARD_ACK_TYPE), 在此过程中, 不会发送 DELIVERED_ACK_TYPE 的确认指令, 这是 1) 和 AUTO_ACK 的最大的区别.
这也意味着, 当 consumer 故障重启后, 那些尚未 ACK 的消息会重新发送过来.
SESSION_TRANSACTED : 当 session 使用事务时, 就是使用此模式. 在事务开启之后, 和 session.commit()之前, 所有消费的消息, 要么全部正常确认, 要么全部 redelivery. 这种严谨性, 通常在基于 GROUP(消息分组)或者其他场景下特别适合. 在 SESSION_TRANSACTED 模式下, optimizeACK 并不能发挥任何效果, 因为在此模式下, optimizeACK 会被强制设定为 false, 不过 prefetch 仍然可以决定 DELIVERED_ACK_TYPE 的发送时机.
因为 Session 非线程安全, 那么当前 session 下所有的 consumer 都会共享同一个 transactionContext; 同时建议, 一个事务类型的 Session 中只有一个 Consumer, 以避免 rollback()或者 commit()方法被多个 consumer 调用而造成的消息混乱.
当 consumer 接受到消息之后, 首先检测 TransactionContext 是否已经开启, 如果没有, 就会开启并生成新的 transactionId, 并把信息发送给 broker; 此后将检测事务中已经消费的消息个数是否>= prefetch * 0.5, 如果大于则补充发送一个 "DELIVERED_ACK_TYPE" 的确认指令; 这时就开始调用 onMessage()方法, 如果是同步(receive), 那么即返回 message. 上述过程, 和其他确认模式没有任何特殊的地方.
当开发者决定事务可以提交时, 必须调用 session.commit()方法, commit 方法将会导致当前 session 的事务中所有消息立即被确认; 事务的确认过程中, 首先把本地的 deliveredMessage 队列中尚未确认的消息全部确认(STANDARD_ACK_TYPE); 此后向 broker 发送 transaction 提交指令并等待 broker 反馈, 如果 broker 端事务操作成功, 那么将会把本地 deliveredMessage 队列清空, 新的事务开始; 如果 broker 端事务操作失败(此时 broker 已经 rollback), 那么对于 session 而言, 将执行 inner-rollback, 这个 rollback 所做的事情, 就是将当前事务中的消息清空并要求 broker 重发(REDELIVERED_ACK_TYPE), 同时 commit 方法将抛出异常.
当 session.commit 方法异常时, 对于开发者而言通常是调用 session.rollback()回滚事务 (事实上开发者不调用也没有问题), 当然你可以在事务开始之后的任何时机调用 rollback(),rollback 意味着当前事务的结束, 事务中所有的消息都将被重发. 需要注意, 无论是 inner-rollback 还是调用 session.rollback() 而导致消息重发, 都会导致 message.redeliveryCounter 计数器增加, 最终都会受限于 brokerUrl 中配置的 "jms.redeliveryPolicy.maximumRedeliveries", 如果 rollback 的次数过多, 而达到重发次数的上限时, 消息将会被 DLQ(dead letter).
INDIVIDUAL_ACKNOWLEDGE : 单条消息确认, 这种确认模式, 我们很少使用, 它的确认时机和 CLIENT_ACKNOWLEDGE 几乎一样, 当消息消费成功之后, 需要调用 message.acknowledege 来确认此消息 (单条), 而 CLIENT_ACKNOWLEDGE 模式先 message.acknowledge() 方法将导致整个 session 中所有消息被确认(批量确认).
结语: 到目前为止, 我们已经已经简单的了解了 ActiveMQ 中消息传送机制, 还有 JMS 中 ACK 策略, 重点分析了 optimizeACK 的策略, 希望开发者能够在使用 activeMQ 中避免一些不必要的错误. 本文如有疏漏和错误之处, 请各位不吝赐教, 特此感谢.
参考:
- http://shift-alt-ctrl.iteye.com/blog/2020182
- http://shift-alt-ctrl.iteye.com/blog/2020182
来源: https://www.cnblogs.com/xxj0316/p/9474036.html