本文是整理自几个月前的内部 flink state 分享, flink 状态所包含的东西很多, 在下面列举了一些, 还有一些在本文没有体现, 后续会单独的挑出来再进行讲解
state 的层次结构
- keyedState => windowState
- OperatorState => kafkaOffset
- stateBackend
- snapshot/restore
- internalTimerService
RocksDB 操作的初探
- state ttL
- state local recovery
- QueryableState
- increamental checkpoint
- state redistribution
- broadcasting state
- CheckpointStreamFactory
内部和外部状态
flink 状态分为了内部和外部使用接口, 但是两个层级都是一一对应, 内部接口都实现了外部接口, 主要是有两个目的
内部接口提供了更多的方法, 包括获取 state 中的 serialize 之后的 byte, 以及 Namespace 的操作方法. 内部状态主要用于内部 runtime 实现时所需要用到的一些状态比如 Windows 中的 windowState,CEP 中的 sharedBuffer,kafkaConsumer 中 offset 管理的 ListState, 而外部 State 接口主要是用户自定义使用的一些状态
考虑到各个版本的兼容性, 外部接口要保障跨版本之间的兼容问题, 而内部接口就很少受到这个限制, 因此也就比较灵活
状态的使用
了解了 flink 状态的层次结构, 那么编程中和 flink 内部是如何使用这些状态呢?
flink 中使用状态主要是两部分, 一部分是函数中使用状态, 另一部分是在 operator 中使用状态
方式:
- CheckpointedFunction
- ListCheckpointed
- RuntimeContext (DefaultKeyedStateStore)
- StateContext
- StateContext
- StateInitializationContext
- Iterable<StatePartitionStreamProvider> getRawOperatorStateInputs();
- Iterable<KeyGroupStatePartitionStreamProvider> getRawKeyedStateInputs();
- ManagedInitializationContext
- OperatorStateStore getOperatorStateStore();
- KeyedStateStore getKeyedStateStore();
举例:
AbstractStreamOperator 封装了这个方法 initializeState(StateInitializationContext context)用以在 operator 中进行 raw 和 managed 的状态管理
CheckpointedFunction 的用法其实也是借助于 StateContext 进行相关实现
- CheckpointedFunction#initializeState 方法在 transformation function 的各个并发实例初始化的时候被调用这个方法提供了 FunctionInitializationContext 的对象, 可以通过这个 context 来获取 OperatorStateStore 或者 KeyedStateStore, 也就是说通过这个接口可以注册这两种类型的 State, 这也是和 ListCheckpointed 接口不一样的地方, 只是说 KeyedStateStore 只能在 keyedstream 上才能注册, 否则就会报错而已, 以下是一个使用这两种类型状态的样例. 可以参见 FlinkKafkaConsumerBase 通过这个接口来实现 offset 的管理.
- public class MyFunction<T> implements MapFunction<T, T>, CheckpointedFunction {
- private ReducingState<Long> countPerKey;
- private ListState<Long> countPerPartition;
- private long localCount;
- public void initializeState(FunctionInitializationContext context) throws Exception {
- // get the state data structure for the per-key state
- countPerKey = context.getKeyedStateStore().getReducingState(
- new ReducingStateDescriptor<>("perKeyCount", new AddFunction<>(), Long.class));
- // get the state data structure for the per-partition state
- countPerPartition = context.getOperatorStateStore().getOperatorState(
- new ListStateDescriptor<>("perPartitionCount", Long.class));
- // initialize the "local count variable" based on the operator state
- for (Long l : countPerPartition.get()) {
- localCount += l;
- }
- }
- public void snapshotState(FunctionSnapshotContext context) throws Exception {
- // the keyed state is always up to date anyways
- // just bring the per-partition state in shape
- countPerPartition.clear();
- countPerPartition.add(localCount);
- }
- public T map(T value) throws Exception {
- // update the states
- countPerKey.add(1L);
- localCount++;
- return value;
- }
- }
- }
这个 Context 的继承接口 StateSnapshotContext 的方法则提供了 raw state 的存储方法, 但是其实没有对用户函数提供相应的接口, 只是在引擎中有相关的使用, 相比较而言这个接口提供的方法, context 比较多, 也有一些简单的方法去注册使用 operatorstate 和 keyedState. 如通过 RuntimeContext 注册 keyedState:
因此使用简易化程度为:
- RuntimeContext> FunctionInitializationContext> StateSnapshotContext
- keyedStream.map(new RichFlatMapFunction<MyType, List<MyType>>() {
- private ListState<MyType> state;
- public void open(Configuration cfg) {
- state = getRuntimeContext().getListState(
- new ListStateDescriptor<>("myState", MyType.class));
- }
- public void flatMap(MyType value, Collector<MyType> out) {
- if (value.isDivider()) {
- for (MyType t : state.get()) {
- out.collect(t);
- }
- } else {
- state.add(value);
- }
- }
- });
通过实现 ListCheckpointed 来注册 OperatorState, 但是这个有限制: 一个 function 只能注册一个 state, 因为并不能像其他接口一样指定 state 的名字.
- example:
- public class CountingFunction<T> implements MapFunction<T, Tuple2<T, Long>>, ListCheckpointed<Long> {
- // this count is the number of elements in the parallel subtask
- private long count;
- {@literal @}Override
- public List<Long> snapshotState(long checkpointId, long timestamp) {
- // return a single element - our count
- return Collections.singletonList(count);
- }
- {@literal @}Override
- public void restoreState(List<Long> state) throws Exception {
- // in case of scale in, this adds up counters from different original subtasks
- // in case of scale out, list this may be empty
- for (Long l : state) {
- count += l;
- }
- }
- {@literal @}Override
- public Tuple2<T, Long> map(T value) {
- count++;
- return new Tuple2<>(value, count);
- }
- }
- }
下面比较一下里面的两种 stateStore
- KeyedStateStore
- OperatorStateStore
查看 OperatorStateStore 接口可以看到 OperatorState 只提供了 ListState 一种形式的状态接口, OperatorState 和 KeyedState 主要有以下几个区别:
keyedState 只能应用于 KeyedStream, 而 operatorState 都可以
keyedState 可以理解成一个算子为每个 subtask 的每个 key 维护了一个状态 namespace, 而 OperatorState 是每个 subtask 共享一个状态
operatorState 只提供了 ListState, 而 keyedState 提供了 ValueState,ListState,ReducingState,MapState
operatorStateStore 的默认实现只有
DefaultOperatorStateBackend
可以看到他的状态都是存储在堆内存之中, 而 keyedState 根据 backend 配置的不同, 线上都是存储在 rocksdb 之中
snapshot
这个让我们着眼于两个 Operator 的 snapshot,AbstractStreamOperator 和 AbstractUdfStreamOperator, 这两个基类几乎涵盖了所有相关 operator 和 function 在做 snapshot 的时候会做的处理.
- if (null != operatorStateBackend) {
- snapshotInProgress.setOperatorStateManagedFuture(
- operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
- }
- if (null != keyedStateBackend) {
- snapshotInProgress.setKeyedStateManagedFuture(
- keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
- }
按 keyGroup 去 snapshot 各个 timerService 的状态, 包括 processingTimer 和 eventTimer(RawKeyedOperatorState)
将 operatorStateBackend 和 keyedStateBackend 中的状态做 snapshot
如果 Operator 还包含了 userFunction, 即是一个 UdfStreamOperator, 那么可以注意到 udfStreamOperator 覆写了父类的
snapshotState(StateSnapshotContext context)
方法, 其主要目的就是为了将 Function 中的状态及时的 register 到相应的 backend 中, 在第二步的时候统一由
CheckpointStreamFactory
去做快照
- StreamingFunctionUtils#snapshotFunctionState
- if (userFunction instanceof CheckpointedFunction) {
- ((CheckpointedFunction) userFunction).snapshotState(context);
- return true;
- }
- if (userFunction instanceof ListCheckpointed) {
- @SuppressWarnings("unchecked")
- List<Serializable> partitionableState = ((ListCheckpointed<Serializable>) userFunction).
- snapshotState(context.getCheckpointId(), context.getCheckpointTimestamp());
- ListState<Serializable> listState = backend.
- getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
- listState.clear();
- if (null != partitionableState) {
- try {
- for (Serializable statePartition : partitionableState) {
- listState.add(statePartition);
- }
- } catch (Exception e) {
- listState.clear();
- throw new Exception("Could not write partitionable state to operator" +
- "state backend.", e);
- }
- }
可以看到这里就只有以上分析的两种类型的 checkpoined 接口, CheckpointedFunction, 只需要执行相应的 snapshot 方法, 相应的函数就已经将要做 snapshot 的数据打入了相应的 state 中, 而 ListCheckpointed 接口由于返回的是个 List, 所以需要手动的通过 getSerializableListState 注册一个 ListState(这也是 ListCheckpointed 只能注册一个 state 的原因), 然后将 List 数据挨个存入 ListState 中.
operatorStateBackend#snapshot
针对所有注册的 state 作 deepCopy, 为了防止在 checkpoint 的时候数据结构又被修改, deepcopy 其实是通过序列化和反序列化的过程(参见 http://aitozi.com/java-serialization.html )
异步将 state 以及 metainfo 的数据写入到 hdfs 中, 使用的是 flink 的 asyncIO(这个也可以后续深入了解下), 并返回相应的 statehandle 用作 restore 的过程
在 StreamTask 触发 checkpoint 的时候会将一个 Task 中所有的 operator 触发一次 snapshot, 触发部分就是上面 1,2 两个步骤, 其中第二步是会返回一个 RunnableFuture, 在触发之后会提交一个
AsyncCheckpointRunnable
异步任务, 会阻塞一直等到 checkpoint 的 Future, 其实就是去调用这个方法
AbstractAsyncIOCallable
, 直到完成之后 OperatorState 会返回一个
OperatorStateHandle
, 这个地方和后文的 keyedState 返回的 handle 不一样.
- @Override
- public V call() throws Exception {
- synchronized (this) {
- if (isStopped()) {
- throw new IOException("Task was already stopped. No I/O handle opened.");
- }
- ioHandle = openIOHandle();
- }
- try {
- return performOperation();
- } finally {
- closeIOHandle();
- }
在 managed keyedState,managed operatorState,raw keyedState, 和 raw operatorState 都完成返回相应的 Handle 之后, 会生成一个 SubTaskState 来 ack jobmanager, 这个主要是用在 restore 的过程中
- SubtaskState subtaskState = createSubtaskStateFromSnapshotStateHandles(
- chainedNonPartitionedOperatorsState,
- chainedOperatorStateBackend,
- chainedOperatorStateStream,
- keyedStateHandleBackend,
- keyedStateHandleStream);
- owner.getEnvironment().acknowledgeCheckpoint(
- checkpointMetaData.getCheckpointId(),
- checkpointMetrics,
- subtaskState);
在 jm 端, ack 的时候又将各个 handle 封装在 pendingCheckpoint => operatorStates => operatorState => operatorSubtaskState 中, 最后无论是 savepoint 或者是 externalCheckpoint 都会将相应的 handle 序列化存储到 hdfs, 这也就是所谓的 checkpoint 元数据. 这个可以起个任务观察下 zk 和 hdfs 上的文件, 补充一下相关的验证.
至此完成 operator state 的 snapshot/checkpoint 阶段
KeyedStateBackend#snapshot
和 operatorStateBackend 一样, snapshot 也分为了同步和异步两个部分.
rocksDB 的 keyedStateBackend 的 snapshot 提供了增量和全量两种方式
利用 rocksdb 自身的 snapshot 进行
this.snapshot = stateBackend.db.getSnapshot();
这个过程是同步的, rocksdb 这块是怎么 snapshot 还不是很了解, 待后续学习
之后也是一样异步将数据写入 hdfs, 返回相应的 keyGroupsStateHandle
snapshotOperation.closeCheckpointStream();
不同的地方在于增量返回的是 IncrementalKeyedStateHandle, 而全量返回的是 KeyGroupsStateHandle,
restore / redistribution
OperatorState 的 rescale
void setInitialState(TaskStateHandles taskStateHandles) throws Exception;
一个 task 在真正的执行任务之前所需要做的事情是把状态 inject 到 task 中, 如果一个任务是失败之后从上次的 checkpoint 点恢复的话, 他的状态就是非空的. streamTask 也就靠是否有这样的一个恢复状态来确认算子是不是在 restore 来 branch 他的启动逻辑
- if (null != taskStateHandles) {
- if (invokable instanceof StatefulTask) {
- StatefulTask op = (StatefulTask) invokable;
- op.setInitialState(taskStateHandles);
- } else {
- throw new IllegalStateException("Found operator state for a non-stateful task invokable");
- }
- // be memory and GC friendly - since the code stays in invoke() for a potentially long time,
- // we clear the reference to the state handle
- //noinspection UnusedAssignment
- taskStateHandles = null;
- }
那么追根究底一下这个 Handle 是怎么带入的呢?
FixedDelayRestartStrategy => triggerFullRecovery => Execution#restart => Execution#scheduleForExecution => Execution#deployToSlot => ExecutionVertex => TaskDeploymentDescriptor => taskmanger => task
当然还有另一个途径就是通过向 jobmanager submitJob 的时候带入 restore 的 checkpoint path, 这两种方式最终都会通过 checkpointCoordinator#restoreLatestCheckpointedState 来恢复 hdfs 中的状态来获取到 snapshot 时候存入的 StateHandle.
恢复的过程如何进行 redistribution 呢? 也就是大家关心的并发度变了我的状态的行为是怎么样的.
- // re-assign the task states
- final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
- StateAssignmentOperation stateAssignmentOperation =
- new StateAssignmentOperation(tasks, operatorStates, allowNonRestoredState);
- stateAssignmentOperation.assignStates();
如果并发度没变那么不做重新的 assign, 除非 state 的模式是 broadcast, 会将一个 task 的 state 广播给所有的 task
对于 operator state 会针对每一个 name 的 state 计算出每个 subtask 中的 element 个数之和 (这就要求每个 element 之间相互独立) 进行 roundrobin 分配
keyedState 的重新分配相对简单, 就是根据新的并发度和最大并发度计算新的 keygroupRange, 然后根据 subtaskIndex 获取 keyGroupRange, 然后获取到相应的 keyStateHandle 完成状态的切分.
这里补充关于 raw state 和 managed state 在 rescale 上的差别, 由于 operator state 在 reassign 的时候是根据 metaInfo 来计算出所有的 List 来重新分配, operatorbackend 中注册的状态是会保存相应的 metainfo, 最终也会在 snapshot 的时候存入 OperatorHandle, 那 raw state 的 metainfo 是在哪里呢?
其实会在写入 hdfs 返回相应的 handle 的时候构建一个默认的, OperatorStateCheckpointOutputStream#closeAndGetHandle, 其中状态各个 partition 的构建来自 startNewPartition 方法, 引擎中我所看到的 rawstate 仅有 timerservice 的 raw keyedState
- OperatorStateHandle closeAndGetHandle() throws IOException {
- StreamStateHandle streamStateHandle = delegate.closeAndGetHandle();
- if (null == streamStateHandle) {
- return null;
- }
- if (partitionOffsets.isEmpty() && delegate.getPos()> initialPosition) {
- startNewPartition();
- }
- Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>(1);
- OperatorStateHandle.StateMetaInfo metaInfo =
- new OperatorStateHandle.StateMetaInfo(
- partitionOffsets.toArray(),
- OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
- offsetsMap.put(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, metaInfo);
- return new OperatorStateHandle(offsetsMap, streamStateHandle);
- }
KeyedState 的 keyGroup
keyedState 重新分配里引入了一个 keyGroup 的概念, 那么这里为什么要引入 keygroup 这个概念呢?
hash(key) = key(identity)
key_group(key) = hash(key) % number_of_key_groups (等于最大并发), 默认 flink 任务会设置一个 max parallel
subtask(key) = key_greoup(key) * parallel / number_of_key_groups
避免在恢复的时候带来随机 IO
避免每个 subtask 需要将所有的状态数据读取出来 pick 和自己 subtask 相关的浪费了很多 io 资源
减少元数据的量, 不再需要保存每次的 key, 每一个 keygroup 组只需保留一个 range
- int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
- int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
- return new KeyGroupRange(start, end);
每一个 backend(subtask)上只有一个 keygroup range
每一个 subtask 在 restore 的时候就接收到了已经分配好的和重启后当前这个并发相绑定的 keyStateHandle
- subManagedKeyedState = getManagedKeyedStateHandles(operatorState, keyGroupPartitions.get(subTaskIndex));
- subRawKeyedState = getRawKeyedStateHandles(operatorState, keyGroupPartitions.get(subTaskIndex));
这里面关键的一步在于, 根据新的 subtask 上的 keyGroupRange, 从原来的 operator 的 keyGroupsStateHandle 中求取本 subtask 所关心的一部分 Handle, 可以看到每个 KeyGroupsStateHandle 都维护了 KeyGroupRangeOffsets 这样一个变量, 来标记这个 handle 所覆盖的 keygrouprange, 以及 keygrouprange 在 stream 中 offset 的位置, 可以看下再 snapshot 的时候会记录 offset 到这个对象中来
keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
- public KeyGroupRangeOffsets getIntersection(KeyGroupRange keyGroupRange) {
- Preconditions.checkNotNull(keyGroupRange);
- KeyGroupRange intersection = this.keyGroupRange.getIntersection(keyGroupRange);
- long[] subOffsets = new long[intersection.getNumberOfKeyGroups()];
- if(subOffsets.length> 0) {
- System.arraycopy(
- offsets,
- computeKeyGroupIndex(intersection.getStartKeyGroup()),
- subOffsets,
- 0,
- subOffsets.length);
- }
- return new KeyGroupRangeOffsets(intersection, subOffsets);
- }
KeyGroupsStateHandle 是一个 subtask 的所有 state 的一个 handle KeyGroupsStateHandle 维护一个 KeyGroupRangeOffsets, KeyGroupRangeOffsets 维护一个 KeyGroupRange 和 offsets KeyGroupRange 维护多个 KeyGroup KeyGroup 维护多个 key
KeyGroupsStateHandle 和 operatorStateHandle 还有一个不同点, operatorStateHandle 维护了 metainfo 中的 offset 信息用在 restore 时的 reassign, 原因在于 KeyGroupsStateHandle 的 reassign 不依赖这些信息, 当然在 restore 的时候也需要 keygroupOffset 中的 offset 信息来重新构建 keyGroupsStateHandle 来进行各个 task 的状态分配.
参考:
来源: https://juejin.im/post/5c375c9851882525da264246