原创文章,首发自作者个人博客,转载请务必将下面这段话置于文章开头处。
本文转发自技术世界,原文链接http://www.jasongj.com/kafka/transaction/
本文所有Kafka原理性的描述除特殊说明外均基于Kafka 1.0.0版本。
Kafka事务机制的实现主要是为了支持
即正好一次语义
- Exactly Once
- Exactly Once
《Kafka背景及架构介绍》一文中有说明Kafka在0.11.0.0之前的版本中只支持
和
- At Least Once
语义,尚不支持
- At Most Once
语义。
- Exactly Once
但是在很多要求严格的场景下,如使用Kafka处理交易数据,
语义是必须的。我们可以通过让下游系统具有幂等性来配合Kafka的
- Exactly Once
语义来间接实现
- At Least Once
。但是:
- Exactly Once
因此,Kafka本身对
语义的支持就非常必要。
- Exactly Once
操作的原子性是指,多个操作要么全部成功要么全部失败,不存在部分成功部分失败的可能。
实现原子性操作的意义在于:
上文提到,实现
的一种方法是让下游系统具有幂等处理特性,而在Kafka Stream中,Kafka Producer本身就是“下游”系统,因此如果能让Producer具有幂等处理特性,那就可以让Kafka Stream在一定程度上支持
- Exactly Once
语义。
- Exactly once
为了实现Producer的幂等语义,Kafka引入了
(即
- Producer ID
)和
- PID
。每个新的Producer在初始化的时候会被分配一个唯一的PID,该PID对用户完全透明而不会暴露给用户。
- Sequence Number
对于每个PID,该Producer发送数据的每个
都对应一个从0开始单调递增的
- <Topic, Partition>
。
- Sequence Number
类似地,Broker端也会为每个
维护一个序号,并且每次Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其序号比Broker维护的序号(即最后一次Commit的消息的序号)大一,则Broker会接受它,否则将其丢弃:
- <PID, Topic, Partition>
- InvalidSequenceNumber
- DuplicateSequenceNumber
上述设计解决了0.11.0.0之前版本中的两个问题:
上述幂等设计只能保证单个Producer对于同一个
的
- <Topic, Partition>
语义。
- Exactly Once
另外,它并不能保证写操作的原子性——即多个写操作,要么全部被Commit要么全部不被Commit。
更不能保证多个读写操作的的原子性。尤其对于Kafka Stream应用而言,典型的操作即是从某个Topic消费数据,经过一系列转换后写回另一个Topic,保证从源Topic的读取与向目标Topic的写入的原子性有助于从故障中恢复。
事务保证可使得应用程序将生产数据和消费数据当作一个原子单元来处理,要么全部成功,要么全部失败,即使该生产或消费跨多个
。
- <Topic, Partition>
另外,有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。
为了实现这种效果,应用程序必须提供一个稳定的(重启后不变)唯一的ID,也即
。
- Transaction ID
与
- Transactin ID
可能一一对应。区别在于
- PID
由用户提供,而
- Transaction ID
是内部的实现对用户透明。
- PID
另外,为了保证新的Producer启动后,旧的具有相同
的Producer即失效,每次Producer通过
- Transaction ID
拿到PID的同时,还会获取一个单调递增的epoch。由于旧的Producer的epoch比新Producer的epoch小,Kafka可以很容易识别出该Producer是老的Producer并拒绝其请求。
- Transaction ID
有了
后,Kafka可保证:
- Transaction ID
的新的Producer实例被创建且工作时,旧的且拥有相同
- Transaction ID
的Producer将不再工作。
- Transaction ID
需要注意的是,上述的事务保证是从Producer的角度去考虑的。从Consumer的角度来看,该保证会相对弱一些。尤其是不能保证所有被某事务Commit过的所有消息都被一起消费,因为:
这一节所说的事务主要指原子性,也即Producer将多条消息作为一个事务批量发送,要么全部成功要么全部失败。
为了实现这一点,Kafka 0.11.0.0引入了一个服务器端的模块,名为
,用于管理Producer发送的消息的事务性。
- Transaction Coordinator
该
维护
- Transaction Coordinator
,该log存于一个内部的Topic内。由于Topic数据具有持久性,因此事务的状态也具有持久性。
- Transaction Log
Producer并不直接读写
,它与
- Transaction Log
通信,然后由
- Transaction Coordinator
将该事务的状态插入相应的
- Transaction Coordinator
。
- Transaction Log
的设计与
- Transaction Log
用于保存Consumer的Offset类似。
- Offset Log
许多基于Kafka的应用,尤其是Kafka Stream应用中同时包含Consumer和Producer,前者负责从Kafka中获取消息,后者负责将处理完的数据写回Kafka的其它Topic中。
为了实现该场景下的事务的原子性,Kafka需要保证对Consumer Offset的Commit与Producer对发送消息的Commit包含在同一个事务中。否则,如果在二者Commit中间发生异常,根据二者Commit的顺序可能会造成数据丢失和数据重复:
语义,可能造成数据重复。
- At Least Once
语义,可能造成数据丢失。
- At Most Once
为了区分写入Partition的消息被Commit还是Abort,Kafka引入了一种特殊类型的消息,即
。该类消息的Value内不包含任何应用相关的数据,并且不会暴露给应用程序。它只用于Broker与Client间的内部通信。
- Control Message
对于Producer端事务,Kafka以Control Message的形式引入一系列的
。Consumer即可通过该标记判定对应的消息被Commit了还是Abort了,然后结合该Consumer配置的隔离级别决定是否应该将该消息返回给应用程序。
- Transaction Marker
- Producer < String,
- String > producer = new KafkaProducer < String,
- String > (props);
- // 初始化事务,包括结束该Transaction ID对应的未完成的事务(如果有)
- // 保证新的事务在一个正确的状态下启动
- producer.initTransactions();
- // 开始事务
- producer.beginTransaction();
- // 消费数据
- ConsumerRecords < String,
- String > records = consumer.poll(100);
- try {
- // 发送数据
- producer.send(new ProducerRecord < String, String > ("Topic", "Key", "Value"));
- // 发送消费数据的Offset,将上述数据消费与数据发送纳入同一个Transaction内
- producer.sendOffsetsToTransaction(offsets, "group1");
- // 数据发送及Offset发送均成功的情况下,提交事务
- producer.commitTransaction();
- } catch(ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
- // 数据发送或者Offset发送出现异常时,终止事务
- producer.abortTransaction();
- } finally {
- // 关闭Producer和Consumer
- producer.close();
- consumer.close();
- }
- Transaction Coordinator
由于
是分配PID和管理事务的核心,因此Producer要做的第一件事情就是通过向任意一个Broker发送
- Transaction Coordinator
请求找到
- FindCoordinator
的位置。
- Transaction Coordinator
注意:只有应用程序为Producer配置了
时才可使用事务特性,也才需要这一步。另外,由于事务性要求Producer开启幂等特性,因此通过将
- Transaction ID
设置为非空从而开启事务特性的同时也需要通过将
- transactional.id
设置为true来开启幂等特性。
- enable.idempotence
找到
后,具有幂等特性的Producer必须发起
- Transaction Coordinator
请求以获取PID。
- InitPidRequest
注意:只要开启了幂等特性即必须执行该操作,而无须考虑该Producer是否开启了事务特性。
*** 如果事务特性被开启 ***
会发送给
- InitPidRequest
。如果
- Transaction Coordinator
是第一次收到包含有该
- Transaction Coordinator
的InitPidRequest请求,它将会把该
- Transaction ID
存入
- <TransactionID, PID>
,如上图中步骤2.1所示。这样可保证该对应关系被持久化,从而保证即使
- Transaction Log
宕机该对应关系也不会丢失。
- Transaction Coordinator
除了返回PID外,
还会执行如下任务:
- InitPidRequest
注意:
的处理过程是同步阻塞的。一旦该调用正确返回,Producer即可开始新的事务。
- InitPidRequest
另外,如果事务特性未开启,
可发送至任意Broker,并且会得到一个全新的唯一的PID。该Producer将只能使用幂等特性以及单一Session内的事务特性,而不能使用跨Session的事务特性。
- InitPidRequest
Kafka从0.11.0.0版本开始,提供
方法用于开启一个事务。调用该方法后,Producer本地会记录已经开启了事务,但
- beginTransaction()
只有在Producer发送第一条消息后才认为事务已经开启。
- Transaction Coordinator
这一阶段,包含了整个事务的数据处理过程,并且包含了多种请求。
AddPartitionsToTxnRequest
一个Producer可能会给多个
发送数据,给一个新的
- <Topic, Partition>
发送数据前,它需要先向
- <Topic, Partition>
发送
- Transaction Coordinator
。
- AddPartitionsToTxnRequest
会将该
- Transaction Coordinator
存于
- <Transaction, Topic, Partition>
内,并将其状态置为
- Transaction Log
,如上图中步骤4.1所示。有了该信息后,我们才可以在后续步骤中为每个
- BEGIN
设置COMMIT或者ABORT标记(如上图中步骤5.2所示)。
- Topic, Partition>
另外,如果该
为该事务中第一个
- <Topic, Partition>
,
- <Topic, Partition>
还会启动对该事务的计时(每个事务都有自己的超时时间)。
- Transaction Coordinator
ProduceRequest
Producer通过一个或多个
发送一系列消息。除了应用数据外,该请求还包含了PID,epoch,和
- ProduceRequest
。该过程如上图中步骤4.2所示。
- Sequence Number
AddOffsetsToTxnRequest
为了提供事务性,Producer新增了
方法,该方法将多组消息的发送和消费放入同一批处理内。
- sendOffsetsToTransaction
该方法先判断在当前事务中该方法是否已经被调用并传入了相同的Group ID。若是,直接跳到下一步;若不是,则向
发送
- Transaction Coordinator
请求,
- AddOffsetsToTxnRequests
将对应的所有
- Transaction Coordinator
存于
- <Topic, Partition>
中,并将其状态记为
- Transaction Log
,如上图中步骤4.3所示。该方法会阻塞直到收到响应。
- BEGIN
TxnOffsetCommitRequest
作为
方法的一部分,在处理完
- sendOffsetsToTransaction
后,Producer也会发送
- AddOffsetsToTxnRequest
请求给
- TxnOffsetCommit
从而将本事务包含的与读操作相关的各
- Consumer Coordinator
的Offset持久化到内部的
- <Topic, Partition>
中,如上图步骤4.4所示。
- __consumer_offsets
在此过程中,
会通过PID和对应的epoch来验证是否应该允许该Producer的该请求。
- Consumer Coordinator
这里需要注意:
的Offset信息在当前事务Commit前对外是不可见的。也即在当前事务被Commit前,可认为该Offset尚未Commit,也即对应的消息尚未被完成处理。
- __consumer_offsets
并不会立即更新缓存中相应
- Consumer Coordinator
的Offset,因为此时这些更新操作尚未被COMMIT或ABORT。
- <Topic, Partition>
一旦上述数据写入操作完成,应用程序必须调用
的
- KafkaProducer
方法或者
- commitTransaction
方法以结束当前事务。
- abortTransaction
EndTxnRequest
方法使得Producer写入的数据对下游Consumer可见。
- commitTransaction
方法通过
- abortTransaction
将Producer写入的数据标记为
- Transaction Marker
状态。下游的Consumer如果将
- Aborted
设置为
- isolation.level
,则它读到被Abort的消息后直接将其丢弃而不会返回给客户程序,也即被Abort的消息对应用程序不可见。
- READ_COMMITTED
无论是Commit还是Abort,Producer都会发送
请求给
- EndTxnRequest
,并通过标志位标识是应该Commit还是Abort。
- Transaction Coordinator
收到该请求后,
会进行如下操作
- Transaction Coordinator
或
- PREPARE_COMMIT
消息写入
- PREPARE_ABORT
,如上图中步骤5.1所示
- Transaction Log
请求以
- WriteTxnMarker
的形式将
- Transaction Marker
或
- COMMIT
信息写入用户数据日志以及
- ABORT
中,如上图中步骤5.2所示
- Offset Log
或
- COMPLETE_COMMIT
信息写入
- COMPLETE_ABORT
中,如上图中步骤5.3所示
- Transaction Log
补充说明:对于
方法,它会在发送
- commitTransaction
之前先调用flush方法以确保所有发送出去的数据都得到相应的ACK。对于
- EndTxnRequest
方法,在发送
- abortTransaction
之前直接将当前Buffer中的事务性消息(如果有)全部丢弃,但必须等待所有被发送但尚未收到ACK的消息发送完成。
- EndTxnRequest
上述第二步是实现将一组读操作与写操作作为一个事务处理的关键。因为Producer写入的数据Topic以及记录Comsumer Offset的Topic会被写入相同的
,所以这一组读操作与写操作要么全部COMMIT要么全部ABORT。
- Transactin Marker
WriteTxnMarkerRequest
上面提到的
由
- WriteTxnMarkerRequest
发送给当前事务涉及到的每个
- Transaction Coordinator
的Leader。收到该请求后,对应的Leader会将对应的
- <Topic, Partition>
或者
- COMMIT(PID)
控制信息写入日志,如上图中步骤5.2所示。
- ABORT(PID)
该控制消息向Broker以及Consumer表明对应PID的消息被Commit了还是被Abort了。
这里要注意,如果事务也涉及到
,即该事务中有消费数据的操作且将该消费的Offset存于
- __consumer_offsets
中,
- __consumer_offsets
也需要向该内部Topic的各Partition的Leader发送
- Transaction Coordinator
从而写入
- WriteTxnMarkerRequest
或
- COMMIT(PID)
控制信息。
- COMMIT(PID)
写入最终的
或
消息
写完所有的
后,
- Transaction Marker
会将最终的
- Transaction Coordinator
或
- COMPLETE_COMMIT
消息写入
- COMPLETE_ABORT
中以标明该事务结束,如上图中步骤5.3所示。
- Transaction Log
此时,
中所有关于该事务的消息全部可以移除。当然,由于Kafka内数据是Append Only的,不可直接更新和删除,这里说的移除只是将其标记为null从而在Log Compact时不再保留。
- Transaction Log
另外,
或
- COMPLETE_COMMIT
的写入并不需要得到所有Rreplica的ACK,因为如果该消息丢失,可以根据事务协议重发。
- COMPLETE_ABORT
补充说明,如果参与该事务的某些
在被写入
- <Topic, Partition>
前不可用,它对
- Transaction Marker
的Consumer不可见,但不影响其它可用
- READ_COMMITTED
的COMMIT或ABORT。在该
- <Topic, Partition>
恢复可用后,
- <Topic, Partition>
会重新根据
- Transaction Coordinator
或
- PREPARE_COMMIT
向该
- PREPARE_ABORT
发送
- <Topic, Partition>
。
- Transaction Marker
与
- PID
的引入实现了写操作的幂等性
- Sequence Number
语义实现了单一Session内的
- At Least Once
语义
- Exactly Once
与
- Transaction Marker
提供了识别消息是否应该被读取的能力,从而实现了事务的隔离性
- PID
)来实现事务中涉及的所有读写操作同时对外可见或同时对外不可见
- Transaction Marker
InvalidProducerEpoch
这是一种Fatal Error,它说明当前Producer是一个过期的实例,有
相同但epoch更新的Producer实例被创建并使用。此时Producer会停止并抛出Exception。
- Transaction ID
InvalidPidMapping
没有与该
- Transaction Coordinator
对应的PID。此时Producer会通过包含有
- Transaction ID
的
- Transaction ID
请求创建一个新的PID。
- InitPidRequest
NotCorrdinatorForGTransactionalId
该
不负责该当前事务。Producer会通过
- Transaction Coordinator
请求重新寻找对应的
- FindCoordinatorRequest
。
- Transaction Coordinator
InvalidTxnRequest
违反了事务协议。正确的Client实现不应该出现这种Exception。如果该异常发生了,用户需要检查自己的客户端实现是否有问题。
CoordinatorNotAvailable
仍在初始化中。Producer只需要重试即可。
- Transaction Coordinator
DuplicateSequenceNumber
发送的消息的序号低于Broker预期。该异常说明该消息已经被成功处理过,Producer可以直接忽略该异常并处理下一条消息
InvalidSequenceNumber
这是一个Fatal Error,它说明发送的消息中的序号大于Broker预期。此时有两种可能
被强制设置为1,而
- max.inflight.requests.per.connection
被强制设置为all。故前面消息重试期间,后续消息不会被发送,也即不会发生乱序。并且只有ISR中所有Replica都ACK,Producer才会认为消息已经被发送,也即不存在Broker端数据丢失问题。
- acks
InvalidTransactionTimeout
调用出现的Fatal Error。它表明Producer传入的timeout时间不在可接受范围内,应该停止Producer并报告给用户。
- InitPidRequest
失败
- Transaction Coordinator
前失败
- PREPARE_COMMIT/PREPARE_ABORT
Producer通过
找到新的
- FindCoordinatorRequest
,并通过
- Transaction Coordinator
请求发起
- EndTxnRequest
或
- COMMIT
流程,新的
- ABORT
继续处理
- Transaction Coordinator
请求——写
- EndTxnRequest
或
- PREPARE_COMMIT
,写
- PREPARE_ABORT
,写
- Transaction Marker
或
- COMPLETE_COMMIT
。
- COMPLETE_ABORT
后失败
- PREPARE_COMMIT/PREPARE_ABORT
此时旧的
可能已经成功写入部分
- Transaction Coordinator
。新的
- Transaction Marker
会重复这些操作,所以部分Partition中可能会存在重复的
- Transaction Coordinator
或
- COMMIT
,但只要该Producer在此期间没有发起新的事务,这些重复的
- ABORT
就不是问题。
- Transaction Marker
后失败
- COMPLETE_COMMIT/ABORT
旧的
可能已经写完了
- Transaction Coordinator
或
- COMPLETE_COMMIT
但在返回
- COMPLETE_ABORT
之前失败。该场景下,新的
- EndTxnRequest
会直接给Producer返回成功。
- Transaction Coordinator
- transaction.timeout.ms
当Producer失败时,
必须能够主动的让某些进行中的事务过期。否则没有Producer的参与,
- Transaction Coordinator
无法判断这些事务应该如何处理,这会造成:
- Transaction Coordinator
需要维护大量的事务状态,大量占用内存
- Transaction Coordinator
内也会存在大量数据,造成新的
- Transaction Log
启动缓慢
- Transaction Coordinator
的Consumer需要缓存大量的消息,造成不必要的内存浪费甚至是OOM
- READ_COMMITTED
不同的Producer交叉写同一个Partition,当一个Producer的事务状态不更新时,
- Transaction ID
的Consumer为了保证顺序消费而被阻塞
- READ_COMMITTED
为了避免上述问题,
会周期性遍历内存中的事务状态Map,并执行如下操作
- Transaction Coordinator
并且其最后更新时间与当前时间差大于
- BEGIN
(默认值为1小时),则主动将其终止:1)未避免原Producer临时恢复与当前终止流程冲突,增加该Producer对应的PID的epoch,并确保将该更新的信息写入
- transaction.remove.expired.transaction.cleanup.interval.ms
;2)以更新后的epoch回滚事务,从而使得该事务相关的所有Broker都更新其缓存的该PID的epoch从而拒绝旧Producer的写操作
- Transaction Log
,完成后续的COMMIT流程————向各
- PREPARE_COMMIT
写入
- <Topic, Partition>
,在
- Transaction Marker
内写入
- Transaction Log
- COMPLETE_COMMIT
,完成后续ABORT流程
- PREPARE_ABORT
- Transaction ID
某
的Producer可能很长时间不再发送数据,
- Transaction ID
没必要再保存该
- Transaction Coordinator
与
- Transaction ID
等的映射,否则可能会造成大量的资源浪费。因此需要有一个机制探测不再活跃的
- PID
并将其信息删除。
- Transaction ID
会周期性遍历内存中的
- Transaction Coordinator
与
- Transaction ID
映射,如果某
- PID
没有对应的正在进行中的事务并且它对应的最后一个事务的结束时间与当前时间差大于
- Transaction ID
(默认值是7天),则将其从内存中删除并在
- transactional.id.expiration.ms
中将其对应的日志的值设置为null从而使得Log Compact可将其记录删除。
- Transaction Log
Kafka的事务机制与《MVCC PostgreSQL实现事务和多版本并发控制的精华》一文中介绍的PostgreSQL通过MVCC实现事务的机制非常类似,对于事务的回滚,并不需要删除已写入的数据,都是将写入数据的事务标记为Rollback/Abort从而在读数据时过滤该数据。
Kafka的事务机制与《分布式事务(一)两阶段提交及JTA》一文中所介绍的两阶段提交机制看似相似,都分PREPARE阶段和最终COMMIT阶段,但又有很大不同。
还是
- PREPARE_COMMIT
,并且只须在
- PREPARE_ABORT
中标记即可,无须其它组件参与。而两阶段提交的PREPARE需要发送给所有的分布式事务参与方,并且事务参与方需要尽可能准备好,并根据准备情况返回
- Transaction Log
或
- Prepared
状态给事务管理器。
- Non-Prepared
或
- PREPARE_COMMIT
,则确定该事务最终的结果应该是被
- PREPARE_ABORT
或
- COMMIT
。而分布式事务中,PREPARE后由各事务参与方返回状态,只有所有参与方均返回
- ABORT
状态才会真正执行COMMIT,否则执行ROLLBACK
- Prepared
实例,而分布式事务中只有一个事务管理器
- Transaction Coordinator
Zookeeper的原子广播协议与两阶段提交以及Kafka事务机制有相似之处,但又有各自的特点
实例,扩展性较好。而Zookeeper写操作只能在Leader节点进行,所以其写性能远低于读性能。
- Transaction Coordinator
来源: http://www.cnblogs.com/jasongj/p/7912348.html