所谓的流式处理其实就是对 Stream 的读取 - 处理 - 写入 (ETL) 操作, 应用从 Stream 中读取数据, 再对数据进行相应的处理分析, 最后将结果写入另一个 Stream 中. 其中仅一次语义保证了哪怕系统发生故障, 每一个 ETL 操作也仅会被执行一次, 不会产生数据的丢失或者重复. 这样的可靠性保证对于一些交易, 金融类的应用来说至关重要, 这就需要 Pravega 作为流存储与流计算引擎共同努力来完成.
通常来说, 对于单独的消息系统而言, 语义分为如下三种:
至多一次(At most once): 不管 Writer 在等待 ACK 时是否发生超时或者得到错误异常, Writer 都不会重新发送 Event, 因此会有数据丢失的风险. 在具体的实现过程中, 这一种语义无需做任何额外的控制, 实现起来最为简单, 因此也通常有着最优的性能. 在某些特定的场景中, 我们只希望追求极致的性能而不关心数据的丢失, 可能会选用此方案.
至少一次(At least once): 如果 Writer 在等待 ACK 时发生超时或者得到错误异常, Writer 将会重新发送消息, 这样能保证每个 Event 至少被处理一次, 保证了数据不会丢失, 从而提高了系统的可靠性, 但同时会带来数据重复的问题, 例如, 当 Writer 往 Stream 中成功写入一个 Event, 但是当系统尝试给 Writer 返回 ACK 的时候出现网络异常, Writer 因没有收到 ACK 而判断为写入 Event 失败, 因此 Writer 还是会重新发送此 Event, 导致数据重复.
仅一次(Exactly once): 在系统发生异常时, Writer 可以尝试多次重新发送 Event, 同时能保证最终每个 Event 只被写入一次. 一些对数据准确性要求非常高的系统需要保证 exactly-once 语义, 譬如支付系统, 当用户在移动端付款时, 很有可能会因为网络原因导致延时较长甚至超时, 用户可能会手动进行刷新操作, 如果没有 exactly-once 的语义支持, 很有可能会发生两次扣费, 我们绝对不希望此类错误发生.
仅一次语义是实现流处理系统正确性 (correctness) 的基石, 因此也是流存储 Pravega 自从设计之初就规划好的设计目标. 但是, exactly-once 的实现也面临着诸多挑战, 例如 Kafka 也直到 0.11 版本引入了 之后才完成了仅一次的支持. 这种更强的语义不仅使编写应用程序更容易, 而且使 Pravega 有了更为广泛的应用空间. 这一篇文章我们将介绍 Pravega 实现这一特性的设计细节, 以及和 Flink 社区合作开发的端到端 (end-to-end) 的 exactly-once 的实现.
Pravega 自身的 exactly-once 语义
从之前对三种语义的描述可以看出, 要满足 exactly-once 的语义, 需要对于可能发生的故障具有足够的恢复机制, 来保证最终结果的每一条 Event 仅被写入一次. Pravega 通过实现了读写端的以下两个性质, 增强了 Pravega 的读写语义, 完成了这一目标:
数据的可恢复性
Writer 的幂等性
其中数据的可恢复性保证了在 Pravega 读取数据过程中, 发生故障之后数据进行正确重放的可能性, 需要在读客户端优化. 而幂等性保证了单一一条数据在数据写入的过程不会出现重复, 需要在写客户端优化.
数据的可恢复性
发生故障时, 数据的恢复需要有以下三个性质的保障, 才能有数据恢复的前提.
持久性 Durability
持久性是指一旦写操作确认成功, 即使系统在发生故障的情况下数据都不会丢失, 保证了数据的可恢复性. Pravega 利用了 Bookkeeper 以及分层的存储实现了流式存储系统的持久性, 帮助用户解决了数据可能会丢失的问题. 用户只需要直接使用 Pravega, 而不需要考虑额外的数据备份工作.
Pravega 的持久化详细内容请参阅本系列之前的文章:
https://www.infoq.cn/article/VXA51t57pKphQ3d*7ZST
一致性 Consistency
一致性是指, 不管系统是否发生异常, 所有的 Reader 读到的相同键值下的数据都是一致的, 有序的. 对 Pravega 来说, 不管是 tail read 还是 catch up read 都满足一致性.
有序性 Ordering
有序性是指 Reader 读取 Event 的顺序要和 Event 被写入的顺序保持一致. 对 Pravega 来说, 数据与应用定义的路由键 (routing key) 一并写入 Stream,Pravega 根据路由键的哈希值将写入操作分配至不同的 Segment.Pravega 保证在路由键内部的有序性, 即针对在同一个路由键的数据写入是保证有序的. Pravega 的有序性同样保证了读客户端在数据读取发生异常时, 仍然能够进行有序的恢复重放.
有了以上三个特性, Pravega 得以进行有效的数据重放, 保证了 at-least-once 语义.
Pravega 是如何防止数据重复的
Pravega 实现了 at-least-once 语义, 但是为了要更进一步得满足 exactly-once 语义, 我们还需要避免数据重复. 所有的写入操作需要保证重复数据的消除, 也称为幂等性 (idempotency). 由于分布式环境下, 数据交互复杂导致故障的发生位置众多, 实现需要考虑诸多细节, 这也是 exactly-once 实现的难点. 这需要从读写两方面进行控制.
Pravega 中的 Reader 每成功消费 Segment 中的一条数据后都会将数据的位置信息以 SegmentID+offset 的形式写入 State Synchronizer(可参阅之前的文章)中进行持久化. 这样的 SegmentID+offset 的对应关系构成了 Pravega 的 StreamCut(可参考 http://pravega.io/docs/latest/streamcuts/ ), 代表了当前 Stream 读取位置的状态信息. 对于一个 ReaderGroup 中的所有读客户端而言, ReaderGroup 实现了 Checkpoint 机制, 以 Map<Stream, StreamCut> 的形式保存了 Reader 读取的所有 Stream 的一致性的状态信息. 确定了一致的恢复位置, 我们就保证了故障发生时, Segment 中的数据也仅被读取一次.
Pravega 的 Writer 内部自带一个 ID, 在 Writer 与 Pravega 服务发生重新连接时可以通过 ID 定位到最近一次成功写入 Stream 中的 Event,Writer 会以 block 的形式将 Event 批量追加写入到 Stream 中, 当 block 成功写入后, Writer 会发送一个 block end 指令, 指令中包含 number of events written 和 last event number . 当 Writer 与 Segment Store 之间发生断开重连时, Segment Store 通过 Writer ID 并通过握手将 last event number 传递给该 Writer, 这样 Writer 就能知道应该从哪里开始发送 Event. 这也是分布式系统中最常用的保证幂等性的方法之一.
然而 Pravega 中 Writer 的 ID 是无法保障永远不变的, 一旦 Writer 发生异常崩溃了, 新起的 Writer 将会生产一个新的 Writer ID, 所以在考虑 Writer 发生故障的情况下, 我们就需要将 Writer ID 与 Segment Store 解耦.
因此, Pravega 使用事务来保证数据写入时的 exactly-once.Event 将会被批量地写入事务中, 这些 Event 要么同时被提交, 要么同时被丢弃.
Pravega 的事务(Transaction)
接下来让我们一起来看一下 Pravega 中的事务. Pravega 提供了 Transaction API, 支持事务性地数据写入, 代码示例如下.
先回顾一下将普通 Event 写入 Pravega Stream 的操作:
- // 创建一个 client factory, 一个 writer 然后 写入 event
- try(ClientFactory clientFactory =
- ClientFactory.withScope(scope, controllerURI) {
- EventStreamWriter<String> writer = clientFactory
- .createEventWriter(streamName,
- newJavaSerializer<String>(),
- EventWriterConfig.builder().build());
- writer.writeNext("Key 1","Hello");
- writer.writeNext("Key 2","World!");
- }
接下来是将 Event 事务写入 Pravega Stream 的操作
- Transaction<String> txn = writer.beginTxn();
- txn.writeEvent("Key 1","Hello");
- txn.writeEvent("Key 2","World!");
- txn.commit();
API 很简单, 只需调用 beginTxn() 开启一个 Transaction, 使用 commit() 方法进行事务提交即可. 下面我们以一个例子进行说明其内部实现.
如图, Stream 有 3 个 active 的 Segments, 当在该 Stream 上创建 Transaction 时, 该 Transaction 会从 Pravega 的元数据库 Zookeeper 中读取相应 Stream 的所有的 *active* 的 Segment 集合(保证一致性), 同样会分配 3 个对应的 Transaction Segment(也称为 Shadow Segment).
当事件被写入到事务中时, 它被路由并分配给与 Stream 相同的编号的 Segment(即分配给 Stream 中的 Segment 3 的事件将被分配给 Transaction 中的 Segment 3), 并与 Stream 类似, 事件本身将附加到事务的 Segment 的尾端. 在事务处理的过程中, Transaction Segment 并不对读客户端可见, 保证了隔离性.
提交事务后, 所有 Transaction 的 Segment 将自动附加到 Stream 中对应的 Stream Segment, 由于 Pravega 的持久化特性, 数据也就被持久化了. 如果事务终止, 则其所有的 Transaction Segment 以及其中的数据都将从 Pravega 中删除, 保证了事务操作的原子性. 至此, 分布式事务的 ACID 四大特性均得到满足.
熟悉 Kafka 的读者可能会发现, Pravega 的 Transaction 和 Kafka 的 Transaction 的实现方式不同, Pravega 通过副本的方式创建 Transaction Segment, 直到 Transaction Segment 提交合并入真正的 Segment 中后, Reader 才能开始消费 Transaction 写入的数据, 然而 Kafka 直接是将 Transaction 的 Event 写入 Topic 的 Partition 中, 并且允许用户额外配置事务隔离级别满足不同需求. 因此对比 Kafka,Pravega 存在以下优势:
Kafka 中被终止 (aborted) 的 Transaction 会残留在 topic partition 中, 这就导致了磁盘空间和 IO 带宽资源的浪费, 而 Pravega 的 Transaction 使用了一个临时的 Segment, 当 Transaction abort 之后, 临时的 Segment 就将被回收.
Kafka 中 Transaction 的 Event 是直接写入 partition 中的, 当 Kafka 的隔离级别为 read-commit 时, Reader 在尝试读取一个 open Transaction 中的数据时会发生阻塞等待, 直到 Transaction 完成 (committed 或者 aborted), 而 Pravega 中, 未提交的 Transaction 以一个临时的 Segment 表示, 在提交成功前, Reader 端是无法感知到 Transaction Segment 的存在的, 因此 Reader 不会被阻塞.
Pravega Stream 的弹性伸缩机制 https://www.infoq.cn/article/MHrhw6x5qK_owazEhEw8 会对事务产生影响, 在 Writer 端负载偏高时, Segment 会相应做 split 操作, 原先的 Segment 会被 seal , 同时生产两个新的 Segment, 细心的读者会发现这里会存在问题, 倘若 Writer 正在往 Transaction 的 Transaction Segment 中写数据, 如果此时 Segment 发生了 split , 就会发生与 Transaction Segment 不一致的情况, 当我们合并 Transaction Segment 的时候就会发现找不到相对应的 Segment.Pravega 进一步实现了 Rolling Transaction 的机制来将 Stream 的伸缩和 Transaction 解耦, 让他们同时工作互不影响. 有了 Rolling Transaction 的支持, 用户既能享受 Pravega Stream 的弹性伸缩机制也同时能保证 exactly-once 的支持.
Pravega 的 Transaction 还包括以下 API:
API | 描述 |
---|---|
getTnxId() | 获取 Transaction UUID |
flush() | 等待 write 操作成功 |
ping() | 跟新 Transaction 的等待时间 |
checkStatus() | 查询 Transaction 的状态 Open , Committing , Committed , Aborting , Aborted |
commit() | 提交 |
abort() | 中断 Transaction 并且丢弃所有 Events |
Pravega 与 Flink 的端到端 exactly-once 语义
设想文章最初提到的 ETL 场景, 一个读客户端应用首先从 Pravega 中读取到数据 A , 对数据进行处理 F(A) , 此时如果在执行 F(A) 时读客户端应用发生故障重启, 因为 A 已经被成功消费过了, State Synchronizer 中的元信息已经更新, 恢复之后将导致 A 数据的丢失. 这一例子说明了 ETL 系统端到端的 exactly-once 仅靠流存储本身无法保证, 需要配合处理端进行端到端的同步才能实现.
对于一般的端到端的 exactly-once 实现, ETL 的三个组件要分别达到如下的要求.
输入端要支持固定位置的重放
流处理系统的容错处理保证任务只产生 exactly-once 的输出
输出端要有事务性的支持
为了解决这个问题, Pravega 团队与 Apache Flink 展开了深入的合作, 实现了 Pravega Flink Connector 连接器与 Flink 通信, 使得用户能够在 Flink 中调用 API, 对 Pravega 进行数据的读写, 更多详细内容可以期待 Pravega 系列之后的一篇文章. 在这篇文章中, 我们将重点介绍: 如何配合 Pravega 提供的 Checkpoint 和事务机制以及 Flink 提供的 Checkpoint, 实现 Pravega->Flink->Pravega 的 exactly-once.
熟悉 Flink 的读者应该知道, Flink 在 1.4.0 版本之前通过 Flink 的 Chandy-Lamport 算法实现的 Checkpoint 机制, 做到了 Flink 应用内部的 exactly-once 语义. 然而, 使用 Checkpoint 也意味着当时的 exactly-once 的应用是有条件的, 只有每条消息对于 Flink 应用状态的影响有且只有一次才能满足 exactly-once, 例如数据流过 Flink 直接写入到数据库中的无状态应用是无法保证 exactly-once 的. Flink 在 1.4.0 版本引入了 TwoPhaseCommitSinkFunction , 通过两阶段提交 (2PC) 协议对接事务性写客户端解决了写入的幂等性问题, 从而支持了 Kafka 0.11+ 以及 Pravega 作为输入输出端的端到端的 exactly-once 语义.
整体结构
从整体设计来看, Flink 和 Pravega 实现端到端 exactly-once 有以下四个步骤:
当 Pravega 作为 Source 时, Flink 的每个 Checkpoint Event 会触发 Pravega ReaderGroup 的 Checkpoint
当 Pravega 作为 Sink 时, 每两个 Flink Checkpoint 之间, 会创建一个 Pravega Transaction
当 Checkpoint 完成时, 提交 Transaction 到 Pravega Stream
Flink 异常恢复后, 尝试重新提交 pending 状态的 Transaction 或是恢复到最新 Checkpoint
Pravega 的 PravegaFlinkWriter 并没有直接继承 Flink 提供的 TwoPhaseCommitSinkFunction , 而是继承了其父类 RichSinkFunction , 从而用统一的入口支持了 exactly-once 和其它的语义. PravegaWriterMode 中提供了三种写入模式: BEST_EFFORT , ATLEAST_ONCE 以及 EXACTLY_ONCE . 其中的 EXACTLY_ONCE 模式即使用本文描述的事务性写入. 代码如下:
- protectedAbstractInternalWritercreateInternalWriter(){
- Preconditions.checkState(this.clientFactory !=null,"clientFactory not initialized");
- if(this.writerMode == PravegaWriterMode.EXACTLY_ONCE) {
- returnnewTransactionalWriter(this.clientFactory);
- }else{
- ExecutorService executorService = createExecutorService();
- returnnewNonTransactionalWriter(this.clientFactory, executorService);
- }
- }
EXACTLY_ONCE 模式的实现依然遵循 Flink Checkpoint 的两阶段提交协议, 在 Flink 本地算子快照时提交本地事务(进行 pre-commit 一阶段提交), 通过 Flink JobManager 协调完成投票, 当一切正常时, 通知各算子完成 Checkpoint, 并最终提交事务. 若存在失败则 Checkpoint 也将失败, 视具体情况进行相应的处理, 确保数据仅处理一次.
值得一提的是, 投票处理的过程是异步进行的, 不会影响正常的数据读写线程, 对整体处理性能的影响较小.
故障恢复
针对其中的 4, 由于异常同样可能在流处理进行的各阶段发生, 接下来将具体介绍以下三种情况出现异常时的处理方法.
Flink 写入 Event 时发生异常
上图中, Checkpoint-1 与 Checkpoint-2 均成功完成, 但在 Checkpoint-2 之后的 Event 往 Transaction-3 中写的时候发生了异常. 此时, Flink 将从 Checkpoint-2 恢复, 这时候由于没有 Checkpoint-3 因此 Transaction-3 也不会被提交到 Pravega Stream 中, 因此 Flink 程序就可以从 Checkpoint-2 恢复重新在新的 Transaction 内写 Event, 不会发生数据重复, 保证了从 Flink 到 Pravega 的 exactly-once
Flink 本地快照时发生异常
上图中, Checkpoint-1 成功, 当在 Checkpoint-2 的时候由于有些算子快照时失败. 此时, 程序从 Checkpoint-1 恢复, 没有成功提交的 Transaction-2 和 Transaction-3 被丢弃, 保证了 exactly-once.
Flink Checkpoint 成功, 但是 Transaction 提交失败
上图中, Checkpoint-2 成功, 但在 Transaction-2 往 Pravege Stream 中提交的时候由于网络原因提交失败, 这种情况下无需让 Flink 恢复到 Checkpoint-1, 而是只需重新提交 Transaction-2 即可, 如下图:
实例展示
接下来我们通过一个实际的例子, 展示并验证 Flink 写入 Pravega 的 exactly-once.
首先我们保证 Pravega 的运行. Pravega 可以选择部署 standalone 版本, 具体步骤可以参考之前的文章. Flink 可以按照官网推荐步骤进行部署, 然后提交任务 Jar 包运行. 也可以不进行部署, Flink 会自动创建本地的 ExecutionEnvironment 运行, 本例选择后者.
下载并构建 pravega-sample,
- $gitclonehttps://github.com/pravega/pravega-samples
- $cdpravega-samples
- $./gradlew installDist
- $cdflink-connector-examples/build/install/pravega-flink-examples
该目录的 bin/ 下有两个程序, exactlyOnceWriter 和 exactlyOnceChecker
exactlyOnceWriter 是一个 Flink 应用, 会生成一组整数的 Events(默认为 1~50)写入 Pravega, 并且会在 26 位置处制造一个人为的模拟异常, 同时会以 100 毫秒为周期进行 Checkpoint 操作, 如果出现异常应用就从最近 Checkpoint 恢复.
exactlyOnceChecker 应用则是一个简单的 Pravega Reader 应用, 检测写入 Pravega 中的数据否重复.
我们首先在一个命令行窗口中启动 exactlyOnceChecker
$bin/exactlyOnceChecker --scope examples --stream mystream --controller tcp://localhost:9090
然后在另一个窗口中运行 exactlyOnceWriter 开始往 Pravega 中写数据, 我们先不开启 EXACTLY_ONCE 模式, 将 --exactlyonce 参数设置为 false , 此时默认为 ATLEAST_ONCE 模式.
$bin/exactlyOnceWriter --controller tcp://localhost:9090 --scope examples --stream mystream --exactlyoncefalse
观察 exactlyOnceWriter 的输出, 会产生类似如下内容:
- ...
- Start checkpointing at position 6
- Complete checkpointing at position 6
- Artificial failure at position 26
- ...
- Restore from checkpoint at position 6
- Start checkpointing at position 50
- Complete checkpointing at position 50
- ...
我们发现第一次 Checkpoint 在 6 的位置成功, 同时数据继续往 Pravega Stream 中写入, 直到 26 的位置, 我们模拟了一个 Flink Transaction 的异常, 导致了应用从最近的 Checkpoint 点恢复, 程序又开始从 6 位置继续往 Pravega 中写数据直到结束. 由于没有开启 Pravega 的 EXACTLY_ONCE 模式, 7~26 的数据就会重复写入 Pravega.
再观察 exactlyOnceChecker 的输出, 的确监测到了数据重复
- ============== Checker starts ===============
- Duplicate event: 8
- Duplicate event: 7
- ...
- Duplicate event: 23
- Duplicate event: 24
- Found duplicates
- ============== Checker ends ===============
现在以 EXACTLY_ONCE 的模式重新运行 exactlyOnceWriter .
$bin/exactlyOnceWriter --controller tcp://localhost:9090 --scope examples --stream mystream --exactlyoncetrue
exactlyOnceChecker 的输出如下:
- ============== Checker starts ===============
- No duplicate found. EXACTLY_ONCE!
- ============== Checker ends ===============
在 EXACTLY_ONCE 模式下, Event 1~6 是一个 Transaction, 当 Flink 标记 Checkpoint 成功后 Event 1~6 才会被提交到 Pravega Stream 中, 而在 26 处 Flink Transaction 发生了异常, 这时 Transaction 中的 Event 7-26 并没有成功提交到 Pravega Stream 中, 而是被丢弃了, 因此在 exactlyOnceWriter 从 6 的 Checkpoint 处恢复后, 重新从 7 开始写入数据, 从而保证了端到端的 exactly-once.
Pravega 系列文章计划
Pravega 根据 Apache 2.0 许可证开源, 0.5 版本将于近日发布. 我们欢迎对流式存储感兴趣的大咖们加入 Pravega 社区, 与 Pravega 共同成长. 本篇文章为 Pravega 系列第七篇, 系列文章标题如下(标题根据需求可能会有更新):
实时流处理 (Streaming) 统一批处理 (Batch) 的最后一块拼图: Pravega https://www.infoq.cn/article/u8gDitPJ28mY-JL6izQT
开源 Pravega 架构解析: 如何通过分层解决流存储的三大挑战? https://www.infoq.cn/article/VXA51t57pKphQ3d*7ZST
Pravega 应用实战: 为什么云原生特性对流存储至关重要 https://www.infoq.cn/article/UuiP79pPP-IQbAr7dXo3
"ToB" 产品必备特性: Pravega 的动态弹性伸缩 https://www.infoq.cn/article/MHrhw6x5qK_owazEhEw8
取代 ZooKeeper! 高并发下的分布式一致性开源组件 StateSynchronizer https://www.infoq.cn/article/wOdBDWIGrIZ7r*4acVYf
分布式一致性解决方案 - 状态同步器 (StateSynchronizer) API 示例 https://www.infoq.cn/article/MmgfowPr6*wY9Y5844QR
Pravega 的仅一次语义及事务支持
与 Apache Flink 集成使用
- http://pravega.io/
- http://blog.pravega.io/
- https://github.com/pravega/pravega
- https://github.com/pravega/flink-connectors
- https://github.com/pravega/pravega-samples
来源: http://www.tuicool.com/articles/MBN7Nvb