摘要: 阅读本文之前,请先阅读Flink原理与实现系列前面的几篇文章 : Flink 原理与实现:架构和拓扑概览Flink 原理与实现:如何生成 StreamGraphFlink 原理与实现:如何生成 JobGraph ExecutionGraph生成过程 StreamGraph和JobGraph都是在client生成的,这篇文章将描述如何生成ExecutionGraph以及物理执行图。
阅读本文之前,请先阅读Flink原理与实现系列前面的几篇文章 :
Flink 原理与实现:架构和拓扑概览
Flink 原理与实现:如何生成 StreamGraph
Flink 原理与实现:如何生成 JobGraph
StreamGraph和JobGraph都是在client生成的,这篇文章将描述如何生成ExecutionGraph以及物理执行图。同时会讲解一个作业提交后如何被调度和执行。
client生成JobGraph之后,就通过submitJob提交至JobMaster。
在其构造函数中,会生成ExecutionGraph:
- this.executionGraph = ExecutionGraphBuilder.buildGraph(...)
看下这个方法,比较长,略过了一些次要的代码片断:
- // 流式作业中,schedule mode固定是EAGER的
- executionGraph.setScheduleMode(jobGraph.getScheduleMode());
- executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling());
- // 设置json plan
- // ...
- // 检查executableClass(即operator类),设置最大并发
- // ...
- // 按拓扑顺序,获取所有的JobVertex列表
- List < JobVertex > sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
- // 根据JobVertex列表,生成execution graph
- executionGraph.attachJobGraph(sortedTopology);
- // checkpoint检查
可以看到,生成execution graph的代码,主要是在最后一行,即ExecutionGraph.attachJobGraph方法:
- public void attachJobGraph(List < JobVertex > topologiallySorted) throws JobException,
- IOException {
- // 遍历job vertex
- for (JobVertex jobVertex: topologiallySorted) {
- // 根据每一个job vertex,创建对应的ExecutionVertex
- ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, createTimestamp);
- // 将创建的ExecutionJobVertex与前置的IntermediateResult连接起来
- ejv.connectToPredecessors(this.intermediateResults);
- ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
- // sanity check
- // ...
- this.verticesInCreationOrder.add(ejv);
- }
- }
可以看到,创建ExecutionJobVertex的重点就在它的构造函数中:
- // 上面是并行度相关的设置
- // 序列化后的TaskInformation,这个信息很重要
- // 后面deploy的时候会将TaskInformation分发到具体的Task中。
- this.serializedTaskInformation = new SerializedValue<>(new TaskInformation(
- jobVertex.getID(),
- jobVertex.getName(),
- parallelism,
- maxParallelism,
- // 这个就是Task将要执行的Operator的类名
- jobVertex.getInvokableClassName(),
- jobVertex.getConfiguration()));
- // ExecutionVertex列表,按照JobVertex并行度设置
- this.taskVertices = new ExecutionVertex[numTaskVertices];
- this.inputs = new ArrayList<>(jobVertex.getInputs().size());
- // slot sharing和coLocation相关代码
- // ...
- // 创建intermediate results,这是由当前operator的出度确定的,如果当前operator只向下游一个operator输出,则为1
- // 注意一个IntermediateResult包含多个IntermediateResultPartition
- this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
- for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
- final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
- this.producedDataSets[i] = new IntermediateResult(
- result.getId(),
- this,
- numTaskVertices,
- result.getResultType());
- }
- // 根据job vertex的并行度,创建对应的ExecutionVertex列表。
- // 即,一个JobVertex/ExecutionJobVertex代表的是一个operator,而
- // 具体的ExecutionVertex则代表了每一个Task
- for (int i = 0; i < numTaskVertices; i++) {
- ExecutionVertex vertex = new ExecutionVertex(
- this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);
- this.taskVertices[i] = vertex;
- }
- // sanity check
- // ...
- // set up the input splits, if the vertex has any
- // 这是batch相关的代码
- // ...
- finishedSubtasks = new boolean[parallelism];
ExecutionJobVertex和ExecutionVertex是创建完了,但是ExecutionEdge还没有创建呢,接下来看一下
方法中这一行代码:
- attachJobGraph
- ejv.connectToPredecessors(this.intermediateResults);
这个方法代码如下:
- // 获取输入的JobEdge列表
- List < JobEdge > inputs = jobVertex.getInputs();
- // 遍历每条JobEdge
- for (int num = 0; num < inputs.size(); num++) {
- JobEdge edge = inputs.get(num);
- // 获取当前JobEdge的输入所对应的IntermediateResult
- IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
- if (ires == null) {
- throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID " + edge.getSourceId());
- }
- // 将IntermediateResult加入到当前ExecutionJobVertex的输入中。
- this.inputs.add(ires);
- // 为IntermediateResult注册consumer
- // consumerIndex跟IntermediateResult的出度相关
- int consumerIndex = ires.registerConsumer();
- for (int i = 0; i < parallelism; i++) {
- ExecutionVertex ev = taskVertices[i];
- // 将ExecutionVertex与IntermediateResult关联起来
- ev.connectSource(num, ires, edge, consumerIndex);
- }
- }
看下
方法代码:
- ExecutionVertex.connectSource
- public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
- // 只有forward的方式的情况下,pattern才是POINTWISE的,否则均为ALL_TO_ALL
- final DistributionPattern pattern = edge.getDistributionPattern();
- final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
- ExecutionEdge[] edges;
- switch (pattern) {
- case POINTWISE:
- edges = connectPointwise(sourcePartitions, inputNumber);
- break;
- case ALL_TO_ALL:
- edges = connectAllToAll(sourcePartitions, inputNumber);
- break;
- default:
- throw new RuntimeException("Unrecognized distribution pattern.");
- }
- this.inputEdges[inputNumber] = edges;
- // 之前已经为IntermediateResult添加了consumer,这里为IntermediateResultPartition添加consumer,即关联到ExecutionEdge上
- for (ExecutionEdge ee: edges) {
- ee.getSource().addConsumer(ee, consumerNumber);
- }
- }
方法:
- connectAllToAll
- ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];
- for (int i = 0; i < sourcePartitions.length; i++) {
- IntermediateResultPartition irp = sourcePartitions[i];
- edges[i] = new ExecutionEdge(irp, this, inputNumber);
- }
- return edges;
看这个方法之前,需要知道,ExecutionVertex的inputEdges变量,是一个二维数据。它表示了这个ExecutionVertex上每一个input所包含的ExecutionEdge列表。
即,如果ExecutionVertex有两个不同的输入:输入A和B。其中输入A的partition=1, 输入B的partition=8,那么这个二维数组inputEdges如下(为简短,以irp代替IntermediateResultPartition)
[ ExecutionEdge[ A.irp[0]] ]
[ ExecutionEdge[ B.irp[0], B.irp[1], ..., B.irp[7] ]
所以上面的代码就很容易理解了。
到这里为止,ExecutionJobGraph就创建完成了。接下来看下这个ExecutionGraph是如何转化成Task并开始执行的。
接下来我们以最简单的mini cluster为例讲解一下Task如何被调度和执行。
简单略过client端job的提交和StreamGraph到JobGraph的翻译,以及上面ExecutionGraph的翻译。
提交后的job的流通过程大致如下:
- env.execute('<job name>')
- --> MiniCluster.runJobBlocking(jobGraph)
- --> MiniClusterDispatcher.runJobBlocking(jobGraph)
- --> MiniClusterDispatcher.startJobRunners
- --> JobManagerRunner.start
- --> JobMaster.<init> (build ExecutionGraph)
创建完JobMaster之后,JobMaster就会进行leader election,得到leader之后,会回调
方法,从而调用
- grantLeadership
开始运行job。
- jobManager.start(leaderSessionID);
- JobMaster.start
- --> JobMaster.startJobExecution(这里还没开始执行呢..)
- --> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
重点是在下面这行,获取到resource manage之后,就会回调
,整个调用流如下:
- ResourceManagerLeaderListener.notifyLeaderAddress
- ResourceManagerLeaderListener.notifyLeaderAddress
- --> JobMaster.notifyOfNewResourceManagerLeader
- --> ResourceManagerConnection.start
- --> ResourceManagerConnection.onRegistrationSuccess(callback,由flink rpc框架发送并回调)
- --> JobMaster.onResourceManagerRegistrationSuccess
然后终于来到了最核心的调度代码,在
方法中:
- JobMaster.onResourceManagerRegistrationSuccess
- executionContext.execute(new Runnable() {
- @Override
- public void run() {
- try {
- executionGraph.restoreExternalCheckpointedStore();
- executionGraph.setQueuedSchedulingAllowed(true);
- executionGraph.scheduleForExecution(slotPool.getSlotProvider());
- }
- catch (Throwable t) {
- executionGraph.fail(t);
- }
- }
- });
ExecutionGraph.scheduleForExecution --> ExecutionGraph.scheduleEager
这个方法会计算所有的ExecutionVertex总数,并为每个ExecutionVertex分配一个SimpleSlot(暂时不考虑slot sharing的情况),然后封装成ExecutionAndSlot,顾名思义,即ExecutionVertex + Slot(更为贴切地说,应该是ExecutionAttempt + Slot)。
然后调用
进行deploy,即
- execAndSlot.executionAttempt.deployToSlot(slot);
。
- Execution.deployToSlot
这个方法先会进行一系列状态迁移和检查,然后进行deploy,比较核心的代码如下:
- final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
- attemptId,
- slot,
- taskState,
- attemptNumber);
- // register this execution at the execution graph, to receive call backs
- vertex.getExecutionGraph().registerExecution(this);
- final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
ExecutionVertex.createDeploymentDescriptor方法中,包含了从Execution Graph到真正物理执行图的转换。如将IntermediateResultPartition转化成ResultPartition,ExecutionEdge转成InputChannelDeploymentDescriptor(最终会在执行时转化成InputGate)。
最后通过RPC方法提交task,实际会调用到
方法中。
- TaskExecutor.submitTask
开始task的执行。
- task.startTaskThread();
在Task构造函数中,会根据输入的参数,创建InputGate, ResultPartition, ResultPartitionWriter等。
而
方法,则会执行
- startTaskThread
,从而调用
- executingThread.start
方法。 它的最核心的代码如下:
- Task.run
- // ...
- // now load the task's invokable code
- invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
- // ...
- invokable.setEnvironment(env);
- // ...
- this.invokable = invokable;
- invokable.invoke();
- // task finishes or fails, do cleanup
- // ...
这里的invokable即为operator对象实例,通过反射创建。具体地,即为OneInputStreamTask,或者SourceStreamTask等。这个nameOfInvokableClass是哪里生成的呢?其实早在生成StreamGraph的时候,这就已经确定了,见
方法:
- StreamGraph.addOperator
- if (operatorObject instanceof StoppableStreamSource) {
- addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
- } else if (operatorObject instanceof StreamSource) {
- addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
- } else {
- addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
- }
这里的
即为生成的StreamNode的vertexClass。这个值会一直传递,当StreamGraph被转化成JobGraph的时候,这个值会被传递到JobVertex的invokableClass。然后当JobGraph被转成ExecutionGraph的时候,这个值被传入到ExecutionJobVertex.TaskInformation.invokableClassName中,一直传到Task中。
- OneInputStreamTask.class
那么用户真正写的逻辑代码在哪里呢?比如word count中的Tokenizer,去了哪里呢?
OneInputStreamTask的基类StreamTask,包含了headOperator和operatorChain。当我们调用
的时候,会生成一个StreamFlatMap的operator,这个operator是一个AbstractUdfStreamOperator,而用户的代码
- dataStream.flatMap(new Tokenizer())
,即为它的userFunction。
- new Tokenizer
所以再串回来,以OneInputStreamTask为例,Task的核心执行代码即为
方法,它会调用
- OneInputStreamTask.invoke
方法,这是个抽象方法,最终会调用其派生类的run方法,即OneInputStreamTask, SourceStreamTask等。
- StreamTask.run
OneInputStreamTask的run方法代码如下:
- final OneInputStreamOperator<IN, OUT> operator = this.headOperator;
- final StreamInputProcessor<IN> inputProcessor = this.inputProcessor;
- final Object lock = getCheckpointLock();
- while (running && inputProcessor.processInput(operator, lock)) {
- // all the work happens in the "processInput" method
- }
就是一直不停地循环调用
方法,即
- inputProcessor.processInput(operator, lock)
方法:
- StreamInputProcessor.processInput
- public boolean processInput(OneInputStreamOperator < IN, ?>streamOperator, final Object lock) throws Exception {
- // ...
- while (true) {
- if (currentRecordDeserializer != null) {
- // ...
- if (result.isFullRecord()) {
- StreamElement recordOrMark = deserializationDelegate.getInstance();
- // 处理watermark,则框架处理
- if (recordOrMark.isWatermark()) {
- // watermark处理逻辑
- // ...
- continue;
- } else if (recordOrMark.isLatencyMarker()) {
- // 处理latency mark,也是由框架处理
- synchronized(lock) {
- streamOperator.processLatencyMarker(recordOrMark.asLatencyMarker());
- }
- continue;
- } else {
- // ***** 这里是真正的用户逻辑代码 *****
- StreamRecord < IN > record = recordOrMark.asRecord();
- synchronized(lock) {
- numRecordsIn.inc();
- streamOperator.setKeyContextElement1(record);
- streamOperator.processElement(record);
- }
- return true;
- }
- }
- }
- // 其他处理逻辑
- // ...
- }
- }
上面的代码中,
才是真正处理用户逻辑的代码,以StreamFlatMap为例,即为它的processElement方法:
- streamOperator.processElement(record);
- public void processElement(StreamRecord < IN > element) throws Exception {
- collector.setTimestamp(element);
- userFunction.flatMap(element.getValue(), collector);
- }
这样,整个调度和执行逻辑就全部串起来啦。
来源: https://yq.aliyun.com/articles/225618