在用户代码中, 我们设置生成水印和事件时间的方法 assignTimestampsAndWatermarks() 中这里有个方法的重载
我们传入的对象分为两种
- AssignerWithPunctuatedWatermarks(可以理解为每条数据都会产生水印, 如果不想产生水印, 返回一个 null 的水印)
- AssignerWithPeriodicWatermarks(周期性的生成水印)
来看一下源码中是如何实现这两种水印的
二话不说打开 org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.java
这个类的 processElement 方法
看到源码这里这段逻辑就 非常的清晰了
先通过用户的代码获取到事件时间, 注入到 element 里面就直接往下个 opeartor 发送了
然后通过用户代码获取水印, 这里会判断水印是否为 null
不为 null 的就直接往下游 emit 了
现在看一下 AssignerWithPeriodicWatermarks 如何周期的发送生成的水印
直接打开 TimestampsAndPeriodicWatermarksOperator.java 这个类
这里先不看 processElement() 方法, 先看 open 方法
可以看到它将 当前时间其实就是 System.currentTimeMillis()+ watermarkInterval 水印间隔 注册作为了一个 timer 定时器
这样就知道了, 当他过了这个水印间隔时间以后肯定会触发操作
来看一下这个间隔时间以后触发了什么操作
可以看到, 他先是获取了当前的水印时间, 然后直接 emit 出去了????
Periodic 模式明明是在接收数据的 processElement() 发送水印的
然后又再次注册了一个 当前时间 + 间隔的 timer, 这样就无限的触发下去了
既然他在这里发送了水印, 来看下他的 processElement 方法
果然他周期性的发送水印以后, 接收数据的 processElement() 方法里面就没有发送水印了
只有获取事件时间的逻辑了
来源: https://www.cnblogs.com/ljygz/p/11435243.html