这篇文章主要介绍从命令行到任务在 Driver 端运行的过程
通过 flink run 命令提交 jar 包运行程序
以 yarn 模式提交任务命令类似于: flink run -m yarn-cluster XXX.jar
先来看一下脚本中的调用类
在 flink.sh 脚本中可以看到提交的命令走到了这样一个外观类上, 用于提交 job 解析用户命令行参数
在其 main 方法中
先会解析对应需要的 flink 参数包括 flink-conf-dir 等, 接着
1 处会根据是否有 hadoop 权限安全控制走对应的 doas(), 具体的执行逻辑为 2 处解析对应的用户参数
拿到参数后会先将参数中的第一个先取出来作为 action
这里我们只看 job 提交的, 解析出来也就是 run, 然后将剩余的参数用于 job 运行
在 job 运行前会先解析剩余的参数, 比如运行的 jar 文件地址, 运行的主类名 (没有后面回去 Manifest 里面找) 作为 entryPoint 入口, 并行度等参数
接着
就用得到的这些参数构建 program 了, 这里其实就是拿到了入口运行类的全额限定名, 然后通过类加载器加载运行主类
接着, 会根据运行时用户的主类是否为 Program 的实现类 (用户可以直接返回 plan) 来设置对应的 packageProgram 的属性 program 是否为空
那我们常规的提交 main 方法主类的这里就是空的, 如果是主类实现 progarm 的就反射实例化了一个以后赋给它
接着, 就是运行并且提交任务了
这里比较重要, yarn 模式提交的话这里会调度整个集群, 提交常见的异常
Couldn't deploy Yarn session cluster
就是从这个方法里面抛出的, 与 yarn 有关
这里只看 yarn 的调度集群, 因为 standalone 模式的话 Jobmanager 和 TaskManager 是已经启动好的了不需要这里
其中走到了这个方法 deployInternal()
可以看到这里就是申请 AppMaster 并且传入了 yarn 模式启动集群的类的全额限定名, 其实就是这个类
用于启动 jobmanager, 和 standalone 的入口类
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
功能差不多, 但是还有有区别, 当这个 yarnsourceManager 类申请到 contain 的时候就会
就会去起对应的 taskManager 了
回到最开始, 当集群调度完以后
运行用户程序
其实就是调用了用户的 main 方法, 结束
后面就是 job 往 jobmanager 提交了, 前面的文章有
总结:
通过一个外观类解析用户参数, 拿到类名
调度集群启动申请 AppMaster,Contaion 起 JM,TM
然后类名通过类加载器加载类, 然后反射实例调用用户的 main 方法启动 Job
来源: https://www.cnblogs.com/ljygz/p/12381826.html