在面向流处理的分布式计算中, 经常会有这种需求, 希望需要处理的某个数据集能够不随着流式数据的流逝而消失.
以 spark streaming 为例, 就是希望有个数据集能够在当前批次中更新, 再下个批次后又可以继续访问. 一个最简单的实现是在 driver 的内存中, 我们可以自行保存一个大的内存结构. 这种 hack 的方式就是我们无法利用 spark 提供的分布式计算的能力.
对此, spark streaming 提供了 stateful streaming, 可以创建一个有状态的 DStream, 我们可以操作一个跨越不同批次的 RDD.
1 updateStateByKey
该方法提供了这样的一种机制: 维护了一个可以跨越不同批次的 RDD, 姑且成为 StateRDD, 在每个批次遍历 StateRDD 的所有数据, 对每条数据执行 update 方法. 当 update 方法返回 None 时, 淘汰 StateRDD 中的该条数据.
具体接口如下:
- /**
- * Return a new "state" DStream where the state for each key is updated by applying
- * the given function on the previous state of the key and the new values of each key.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
- * @param updateFunc State update function. If `this` function returns None, then
- * corresponding state key-value pair will be eliminated.
- * @param numPartitions Number of partitions of each RDD in the new DStream.
- * @tparam S State type
- */
- def updateStateByKey[S: ClassTag](
- updateFunc: (Seq[V], Option[S]) => Option[S],
- numPartitions: Int
- ): DStream[(K, S)] = ssc.withScope {
- updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
- }
即用户需要实现一个 updateFunc 的函数, 该函数的参数:
Seq[V] 该批次中相同 key 的数据, 以 Seq 数组形式传递
Option[S] 历史状态中的数据
返回值: 返回需要保持的历史状态数据, 为 None 时表示删除该数据
def updateStateFunc(lines: Seq[Array[String]], state: Option[Array[String]]): Option[Array[String]] = {...}
这种做法简单清晰明了, 但是其中有一些可以优化的地方:
a) 如果 DRDD 增长到比较大的时候, 而每个进入的批次数据量相比并不大, 此时每次都需要遍历 DRDD, 无论该批次中是否有数据需要更新 DRDD. 这种情况有的时候可能会引发性能问题.
b) 需要用户自定义数据的淘汰机制. 有的时候显得不是那么方便.
c) 返回的类型需要和缓存中的类型相同. 类型不能发生改变.
2 mapWithState
该接口是对 updateSateByKey 的改良, 解决了 updateStateFunc 中可以优化的地方:
- * :: Experimental ::
- * Return a [[MapWithStateDStream]] by applying a function to every key-value element of
- * `this` stream, while maintaining some state data for each unique key. The mapping function
* and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this
* transformation can be specified using [[StateSpec]] class. The state data is accessible in
* as a parameter of type [[State]] in the mapping function.
- *
- * Example of using `mapWithState`:
- * {{{
- * // A mapping function that maintains an integer state and return a String
- * def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
- * // Use state.exists(), state.get(), state.update() and state.remove()
- * // to manage state, and return the necessary string
- * }
- *
- * val spec = StateSpec.function(mappingFunction).numPartitions(10)
- *
- * val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec)
- * }}}
- *
* @param spec Specification of this transformation
* @tparam StateType Class type of the state data
* @tparam MappedType Class type of the mapped data
- */
- @Experimental
- def mapWithState[StateType: ClassTag, MappedType: ClassTag](
- spec: StateSpec[K, V, StateType, MappedType]
- ): MapWithStateDStream[K, V, StateType, MappedType] = {
- new MapWithStateDStreamImpl[K, V, StateType, MappedType](
- self,
- spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
- )
其中 spec 封装了用户自定义的函数, 用以更新缓存数据:
mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType
实现样例如下:
val mappingFunc = (k: String, line: Option[Array[String]], state: State[Array[String]]) => {...}
参数分别代表:
数据的 key: k
RDD 中的每行数据: line
state: 缓存数据
当对 state 调用 remove 方法时, 该数据会被删除.
注意, 如果数据超时, 不要调用 remove 方法, 因为 spark 会在 mappingFunc 后自动调用 remove.
a) 与 updateStateByKey 每次都要遍历缓存数据不同, mapWithState 每次遍历每个批次中的数据, 更新缓存中的数据. 对于缓存数据较大的情况来说, 性能会有较大提升.
b) 提供了内置的超时机制, 当数据一定时间内没有更新时, 淘汰相应数据.
注意, 当有数据到来或者有超时发生时, mappingFunc 都会被调用.
3 checkpointing
通常情况下, 在一个 DStream 钟, 对 RDD 的各种转换而依赖的数据都是来自于当前批次中. 但是当在进行有状态的 transformations 时, 包括 updateStateByKey/reduceByKeyAndWindow ,mapWithSate, 还会依赖于以前批次的数据, RDD 的容错机制, 在异常情况需要重新计算 RDD 时, 需要以前批次的 RDD 信息. 如果这个依赖的链路过长, 会需要大量的内存, 即使有些 RDD 的数据在内存中, 不需要计算. 此时 spark 通过 checkpoint 来打破依赖链路. checkpoint 会生成一个新的 RDD 到 hdfs 中, 该 RDD 是计算后的结果集, 而没有对之前的 RDD 依赖.
此时一定要启用 checkpointing, 以进行周期性的 RDD Checkpointing
在 StateDstream 在实现 RDD 的 compute 方法时, 就是将之前的 PreStateRDD 与当前批次中依赖的 ParentRDD 进行合并.
而 checkpoint 的实现是将上述合并的 RDD 写入 HDFS 中.
现在 checkpoint 的实现中, 数据写入 hdfs 的过程是由一个固定的线程池异步完成的. 一种存在的风险是上次 checkpoint 的数据尚未完成, 此次又来了新的要写的 checkpoint 数据, 会加大集群的负载, 可能会引发一系列的问题.
4 checkpoint 周期设置:
对 mapWithStateByKey/updateStateByKey 返回的 DStream 可以调用 checkpoint 方法设置 checkpoint 的周期. 注意传递的时间只能是批次时间的整数倍.
另外, 对于 mapWithState 而言, checkpoint 执行时, 才会进行数据的删除. State.remove 方法只是设置状态, 标记为删除, 数据并不会真的删除. SnapShot 方法还是可以获取得到.
来源: http://stor.51cto.com/art/201804/570839.htm