在流处理系统中, 通常使用基于 ProcessTime ,EventTime,Ingestion Time 的消息处理模式. 相关含义可参考 Flink 对于流消息的时间介绍
时间模式 | 时间来源 | 备注 |
---|---|---|
ProcessingTime | 处理消息时的系统时间 | 无法处理乱序 |
Ingestion Time | 接收消息时的系统时间 | 无法处理乱序 |
EventTime | 消息本身带有的时间 | 通常该时间戳表明消息产生时间 |
现实中的流消息延迟是必然的:
理想状态下, 消息从产生要接收没有时间的延迟, 基于消息的 ProcessingTime 和 EventTime 的统计处理没有区别, 我们得到的结果是一样的. 然而现实情况却不完全如此. 如我们再处理电信信令消息时, 在消息到达流系统之前要经过各种流程的解析分发, 显然从信令产生到被流处理系统处理肯定有一定的消息延, 如 00:00:00 时刻的消息到了流系统时, 已经是 00:05:00
基于 EventTime 的消息处理是对 Processing/Ingestion Time 机制的完善:
通常对于消息时间本身没有特殊要求的场景, 如消息的简单 ETL 处理, 如消息分发过滤等场景, 我们无需考虑消息的乱序, 基于接收 / 处理时间来完成消息的处理可以满足生产需要.
但是还有一些场景如统计某一段时间内 (如 12:00:00 ---13:00:00) 某一个路口的车流量时, 在该时间段内的消息进入流系统时, 可能已经是(如 14:00:00 ---15:00:00), 如果是基于 PorcessingTime 的处理统计出来的车流量是 14:00:00 ---15:00:00 期间的车流量, 然而我们真正需要的或者说现实的情况是, 这个车流量是 12:00:00 ---13:00:00, 这个就会引发决策者 / 或者管理部门无法拿到真实的数据统计, 进而引发错误的决策.
ProcessingTime VS EventTime
从上图可知, 对于带有窗口的流消息统计中, 基于 EventTime 的消息处理反应了消息本身的时间属性, 具有特定的含义. 当前主流的大数据流处理框架均支持了 EventTime,ProcessTime 的消息处理, 以下对 Spark 和 Flink 的基于 ProcessTime 和 EventTime 的消息处理机制进行分析对比.
基于 IngestionTime 的消息处理大致与 ProcessTime 的处理类似, 亦无法处理乱序 / 延迟消息, 此处不再区分 ProcessTime 和 IngestionTime
Processing Time 的处理逻辑
Spark 对于 processingTime 的处理
Spark 中的 Sparkstreaming 和 StructuredStreaming 均支持基于 ProcessingTime 的消息处理模式, 二者均是基于 micro-batch 的处理, 核心逻辑是定期触发批次处理, 由 Driver 计算出本批次需要处理的消息, 然后发送至 executor 启动 task 线程处理. 如下图:
Spark ProcessingTime
Spark 中的基于 ProcessingTime 的处理, 其中的 ProcessingTime 真正使用到的为 Dirver 所在节点的时间, 与 Executor 计算节点的时间无关. 如在 Windows 中统计时, 其 Windows 的时间就是 Driver 节点的 Windows 系统时间.
严格来说, Spark 中的 ProcessingTime 既不是 ProcessingTime(并非基于真正的收到消息的时间, 而是读取消息前就已经设定好了时间), 又不是 IngestionTime(没有消息时间戳设定这一步)
StructuredStreaming 同时包含了 Continuous 模式的消息处理模式, 由于该模式在最新的 Spark2.4 中依旧是 Experiment 状态, 当前只能支持简单的 Map 类的操作, 无法支持聚合, 此处暂时忽略该模式
Flink 中基于 ProcessingTime 的处理
Flink 中基于 ProcessingTime 的消息处理, 使用的是读取和处理消息时的进程所在节点的系统时间. 如果在多个节点执行, 由于节点的系统误差, 可能存在时间窗口不一致的情况.
以带有 Windows 的消息统计为例, 其主要逻辑如下:
: 在接收到消息时, 根据当前系统时间及窗口大小的设置, 获取窗口并以窗口 END 值设置定时任务.
: 定时任务在时间到了之后触发窗口计算, 输出结果
Event Time 的处理逻辑
EventTime 与 ProcessingTime 相比, 更能反映消息自身的时间属性, 对于关注消息发生时间的统计来说, 具有重要意义.
流处理中的 Windows
由于流数据的无限 (没有尽头) 的特征, 类似传统批处理的全量统计无法进行, 基于流消息的统计通常只能分段进行, 在流处理中,"一段" 即成为一个 "window".
流消息的延迟
消息的绝对延迟
延迟不是指流消息的产生时间到处理时间的不可避免的绝对延迟
消息的相对延迟(乱序)
在生成时有序的消息, 由于各种现实情况如网络, 设备故障 / 重启等原因引发的在进入流系统时变成无序消息的现象.
现实生活中, 由于种种原因, 消息的乱序 (延迟) 几乎是一种必然, 在流消息的统计中, 由于采用的是分段 (Windows) 统计, 但消息又是延迟的, 如何界定一个 Windows 的统计已经完成, 可以输出了呢?
流技术中采用了 watermark(水印 / 水平线)的概念, 其核心思想是设定一个乱序的容忍阈值 (lateness), 基于当前所接收到的消息的最大的时间戳(maxTimeStamp) 和设定的 lateness 算出一个 watermark, 后续所有时间戳小于该 watermark 的消息, 均被认为是迟到的消息, 不会被计算在内.
关于 Event Time,watermark 概念介绍可以参考 StructuredStreaming:Windows-on-EventTime
EventTime 和 Watermark 的生成
StructuredStreaming 中 EventTime 和 watermark 的逻辑
Spark 中只有 StructuredStreaming 支持 EventTime 模式的消息处理.
EventTime&watermark
其核心思想 / 步骤如下:
是在 driver 端通过物理计划 EventTimeWatermarkExec 注册一个累加器 EventTimeStatsAccum
在 executor 中每个 task 处理消息时, 根据消息的时间状态操作累加器记录更新时间戳, 从 EventTimeWatermarkExec 的 doExecute 方法中可以看到
- iter.map { row =>
- eventTimeStats.add(getEventTime(row).getLong(0) / 1000)
- row
- }
在批次运行结束后调用 updateWatermark 统计该累加器的值, 并根据设定的 lateness, 计算并记录 watermark
- watermarkOperators.zipWithIndex.foreach {
- case (e, index) if e.eventTimeStats.value.count> 0 =>
- logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
- val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
- val prevWatermarkMs = operatorToWatermarkMap.get(index)
- if (prevWatermarkMs.isEmpty || newWatermarkMs> prevWatermarkMs.get) {
- operatorToWatermarkMap.put(index, newWatermarkMs)
- } ...
在下个批次时候, 通过 constructNextBatch 更新 watermark
- offsetSeqMetadata = offsetSeqMetadata.copy(
- batchWatermarkMs = watermarkTracker.currentWatermark,
- batchTimestampMs = triggerClock.getTimeMillis())
下个 Executor 的 task 在计算时, 执行第二步的逻辑; 如果有用到需要缓存状态时, 会判读消息时候延迟, 通过 predict 算子直接过滤掉. 如 StateStoreSaveExec,FlatMapGroupsWithStateExec,SymmetricHashJoinStateManager 算子. 以 StateStoreSaveExec 为例, 查看其 doExecute 方法
- case Some(Append) =>
- allUpdatesTimeMs += timeTakenMs {
- val filteredIter = iter.filter(row => !watermarkPredicateForData.get.eval(row))}
由第五步可以看出 task 中接收到的消息只要小于该 watermark, 直接丢弃该消息.
Flink 中 EventTime 和 watermark 的逻辑
与 StructuredStreaming 中每个批次一个 watermark 的处理逻辑不同, Flink 流处理中没有批次的概念, 其 eventTime 的提取方式与 Structured 类似, 由 taskManager 中的 source 相关线程完成 eventTime 的读取. 关于 watermark 的生成则默认提供两种方式, 同时支持自定义 eventTime 和 watermark 的生成方式.
AssignerWithPeriodicWatermarks : 启动独立线程定期根据 maxEventTime 生成 watermark
AssignerWithPunctuatedWatermarks : 每条消息都会进入接口的 checkAndGetNextWatermark 函数, 可以 ` 实现严格意义的乱序消息处理 ` 为每一条记录都生成一个 watermark, 但对性能有较大影响, 在特定的场景 (如: 消息中包含某些标志来触发生成 watermark) 下使用.
Flink 的 watermark 生成可参考
通常应用中使用 AssignerWithPeriodicWatermarks 进行周期性的 watermark 生成方式, 其大致实现如下:
- class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
- val maxOutOfOrderness = 3500L // 3.5 seconds
- var currentMaxTimestamp: Long = _
- override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
- val timestamp = element.getCreationTime()
- currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
- timestamp
- }
- override def getCurrentWatermark(): Watermark = {
- // return the watermark as current highest timestamp minus the out-of-orderness bound
- new Watermark(currentMaxTimestamp - maxOutOfOrderness)
- }
- }
在执行中主要由如下两步
Source 线程读取消息, 调用 extractTimestamp 提取消息时间, 计算得出 currentMaxTimestamp
Time trigger 线程周期性调用 getCurrentWatermark 方法获取 watermark 值, 并发送至下游 operator
在 Flink 中, 对于迟到的消息, 默认的策略是丢弃消息但记录丢弃消息总数, 但支持 siedeoutput 机制将消息单独输出.
- if (isSkippedElement && isElementLate(element)) {
- if (lateDataOutputTag != null){
- sideOutput(element);
- } else {
- this.numLateRecordsDropped.inc();
- }
- }
总结:
相同之处:
在概念上, Flink 和 Spark(StructuredStreaming)对 eventTime 和 watermark 的定义是一致的, 都是用于处理乱序数据
在实现中, Flink 的周期性 watermark 提取的实现原理和 Structured 比较一致, 周期性获取 watermark
从实现效果来看, StructuredStreaming 和 Flink 基于周期性的 watermark 获取方式并不能完全彻底的处理消息的乱序. 只能处理批次间 (Flink 的不同 watermark 的调用之间) 的消息乱序, 对于同一批次或者同一同期间隔时间内的数据的乱序并不能完美
StructuredStreaming 的 watermark 使用的是 yyyy-MM-dd HH:mm:ss 的方式表示, 而 flink 则是 Long 型数字表示,
Flink 通过 AssignerWithPunctuatedWatermarks 实现了严格意义的乱序消息处理, 且支持可定制的实现方式, 与 Structured 相比更加灵活, 更加完善
对于延迟的消息, StructuredStreaming 的处理逻辑是直接丢弃, Flink 支持将延迟消息单独输出, 便于数据稽查
Windows 的触发
流消息的统计基于 Windows, 那么何时触发 Windows?
StructuredStreaming 中 Windows 的触发
在 StructuredStreaming 中, Windows 用户统计如执行 groupby 后进行 count,max 等操作, 其 Windows 的触发基于 trigger 机制, 其核心逻辑是设定窗口大小, 基于窗口将消息划分至不同的窗口内, 使用 update 或 complete 模式完成窗口消息统计和输出. 关于 Trigger 可以参考
StructuredStreaming 中的统计输出分为三种模式:
模式 | 解释 |
---|---|
Complete | 全量输出,每次统计的全量结果全部输出 |
Update | 每次统计输出统计更新结果 |
Append | 追加模式,后续的结果输出是在之前的结果上追加, 已输出的不会修改 |
生产中使用 Append 模式居多
如下图展示了消息的触发和计算:
StructuredStreaming 的计算和 watermark 示意图
有如下几个特征 :
watermark 由 trigger 触发
迟到消息只能丢弃处理
append 模式当 watermark 大于 Windows 的 end 时, 才会触发 Windows 的计算
StructuredStreaming 的输出模式可以参考
Flink 中的 Windows 的触发
Flink 中的 Windows 较 Spark 中有更加丰富的语义支持. Flink Windows 请参考
Flink 中的 Windows 函数触发有两种途径 :
watermark 触发 : 接收到 watermark 后, 当 watermark> Windows.end && Windows 内有消息
消息触发 : 消息到来, 且对应 Windows 没有延迟, 即该 Windows.end + allowlateness> watermark 且 watermark> Windows.end
消息延迟的判定 :
消息对应的 Windows 已经延迟, Windows.end + allowedLateness <wateramrk, 且
消息本身延迟, 消息的 timestamp + allowedLateness <wateramrk
如下图 :
Flink Windows 触发
来源: http://www.jianshu.com/p/19f8925b6477