- private void checkpointStreamOperator(StreamOperator<?> op) throws Exception {
- if (null != op) {
- // 这个构造方法是核心
- OperatorSnapshotFutures snapshotInProgress = op.snapshotState(
- checkpointMetaData.getCheckpointId(),
- checkpointMetaData.getTimestamp(),
- checkpointOptions,
- storageLocation);
- operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
- }
- }
op.snapshotState() 是核心, 调用 org.apache.flink.streaming.API.operators.AbstractStreamOperator#snapshotState(long, long, org.apache.flink.runtime.checkpoint.CheckpointOptions, org.apache.flink.runtime.state.CheckpointStreamFactory)
注意因为 op 是子类, 有些累实现 AbstractStreamOperator 有些子类实现 AbstractUdfStreamOperator, 所以在下面调用 snapshotState(snapshotContext) 方法时, 会根据子类的实现不同, 调用 org.apache.flink.streaming.API.operators.AbstractStreamOperator#snapshotState(org.apache.flink.runtime.state.StateSnapshotContext)
或 org.apache.flink.streaming.API.operators.AbstractUdfStreamOperator#snapshotState
AbstractStreamOperator 实现类有 94 个
AbstractUdfStreamOperator 实现类有 42 个
AbstractUdfStreamOperator 继承 AbstractStreamOperator
- @Override
- public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions,
- CheckpointStreamFactory factory) throws Exception {
- try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
- checkpointId,
- timestamp,
- factory,
- keyGroupRange,
- getContainingTask().getCancelables())) {
- // 继承 AbstractUdfStreamOperator 的操作类会调用用户的快照方法, 继承 AbstractStreamOperator 的操作类会调用这个方法, 但是这个方法没有做什么东西.
- snapshotState(snapshotContext);
- // 上面调用好用户的快照方法了, 就是确定了状态类里面目前的数据了.
- // 下面就是如何访问到状态类, 讲状态内的数据写入磁盘了.
- snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
- snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
- // 这里是生产状态数据文件
- if (null != operatorStateBackend) {
- System.out.println(Thread.currentThread().getName()+":: 这里将状态数据写入文件中");
- snapshotInProgress.setOperatorStateManagedFuture(
- operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
- }
- // 这里是生产状态数据文件
- if (null != keyedStateBackend) {
- snapshotInProgress.setKeyedStateManagedFuture(
- keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
- }
- }
- return snapshotInProgress;
- }
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)) 调用路径 org.apache.flink.runtime.state.DefaultOperatorStateBackend#snapshot
谜底就在下面
- public RunnableFuture<SnapshotResult> snapshot(
- long checkpointId,
- long timestamp,
- @Nonnull CheckpointStreamFactory streamFactory,
- @Nonnull CheckpointOptions checkpointOptions) throws Exception {
- long syncStartTime = System.currentTimeMillis();
- // 这个是超级关键的地方, 你想知道如何访问到用户函数中的状态类, 就在这里.
- RunnableFuture<SnapshotResult> snapshotRunner =
- snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
- snapshotStrategy.logSyncCompleted(streamFactory, syncStartTime);
- return snapshotRunner;
- }
snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions) 调用路径, 取决于用户指定的后端状态, 默认调用路径如下 org.apache.flink.runtime.state.DefaultOperatorStateBackend.DefaultOperatorStateBackendSnapshotStrategy#snapshot
DefaultOperatorStateBackendSnapshotStrategy 是 DefaultOperatorStateBackend 的内部类
- public RunnableFuture<SnapshotResult> snapshot(......) throws IOException {
- // 貌似数据就存在 registeredOperatorStates 对象里面 其实下面的步骤不用研究, 就是将状态数据写入文件, 主要看看这个 registeredOperatorStates 是怎么弄到的
- //************ 重点 registeredOperatorStates 对象
- final Map<String, PartitionableListState<?>> registeredOperatorStatesDeepCopies =
- new HashMap<>(registeredOperatorStates.size());
- final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStatesDeepCopies =
- new HashMap<>(registeredBroadcastStates.size());
- ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
- try {
- // eagerly create deep copies of the list and the broadcast states (if any)
- // in the synchronous phase, so that we can use them in the async writing.
- //entry.getValue() 里面就是状态类 将状态类存储在新建的 map 对象中
- if (!registeredOperatorStates.isEmpty()) {
- for (Map.Entry<String, PartitionableListState<?>> entry : registeredOperatorStates.entrySet()) {
- PartitionableListState<?> listState = entry.getValue();
- if (null != listState) {
- listState = listState.deepCopy();
- }
- registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);
- }
- }
- // 广播状态
- if (!registeredBroadcastStates.isEmpty()) {
- for (Map.Entry<String, BackendWritableBroadcastState<?, ?>> entry : registeredBroadcastStates.entrySet()) {
- BackendWritableBroadcastState<?, ?> broadcastState = entry.getValue();
- if (null != broadcastState) {
- broadcastState = broadcastState.deepCopy();
- }
- registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);
- }
- }
- }
来源: https://www.cnblogs.com/intsmaze/p/10773095.html