在学习 Spark 过程中, 资料中介绍的提交 Spark Job 的方式主要有三种:
第一种:
通过命令行的方式提交 Job, 使用 spark 自带的 spark-submit 工具提交, 官网和大多数参考资料都是已这种方式提交的, 提交命令示例如下:
./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 ../spark-demo.jar
参数含义就不解释了, 请参考官网资料.
第二种:
提交方式是已 JAVA API 编程的方式提交, 这种方式不需要使用命令行, 直接可以在 IDEA 中点击 Run 运行包含 Job 的 Main 类就行, Spark 提供了以 SparkLanuncher 作为唯一入口的 API 来实现. 这种方式很方便(试想如果某个任务需要重复执行, 但是又不会写 Linux 脚本怎么搞? 我想到的是以 JAV API 的方式提交 Job, 还可以和 Spring 整合, 让应用在 tomcat 中运行), 官网的示例: http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
根据官网的示例, 通过 JAVA API 编程的方式提交有两种方式:
第一种是调用 SparkLanuncher 实例的 startApplication 方法, 但是这种方式在所有配置都正确的情况下使用运行都会失败的, 原因是 startApplication 方法会调用 LauncherServer 启动一个进程与集群交互, 这个操作貌似是异步的, 所以可能结果是 main 主线程结束了这个进程都没有起起来, 导致运行失败. 解决办法是调用 new SparkLanuncher().startApplication 后需要让主线程休眠一定的时间后者是使用下面的例子:
- package com.learn.spark;
- import org.apache.spark.launcher.SparkAppHandle;
- import org.apache.spark.launcher.SparkLauncher;
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.concurrent.CountDownLatch;
- public class LanuncherAppV {
- public static void main(String[] args) throws IOException, InterruptedException {
- HashMap env = new HashMap();
- // 这两个属性必须设置
- env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf");
- env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151");
- // 可以不设置
- //env.put("YARN_CONF_DIR","");
- CountDownLatch countDownLatch = new CountDownLatch(1);
- // 这里调用 setJavaHome()方法后, JAVA_HOME is not set 错误依然存在
- SparkAppHandle handle = new SparkLauncher(env)
- .setSparkHome("/usr/local/spark")
- .setAppResource("/usr/local/spark/spark-demo.jar")
- .setMainClass("com.learn.spark.SimpleApp")
- .setMaster("yarn")
- .setDeployMode("cluster")
- .setConf("spark.app.id", "11222")
- .setConf("spark.driver.memory", "2g")
- .setConf("spark.akka.frameSize", "200")
- .setConf("spark.executor.memory", "1g")
- .setConf("spark.executor.instances", "32")
- .setConf("spark.executor.cores", "3")
- .setConf("spark.default.parallelism", "10")
- .setConf("spark.driver.allowMultipleContexts", "true")
- .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
- // 这里监听任务状态, 当任务结束时 (不管是什么原因结束),isFinal() 方法会返回 true, 否则返回 false
- @Override
- public void stateChanged(SparkAppHandle sparkAppHandle) {
- if (sparkAppHandle.getState().isFinal()) {
- countDownLatch.countDown();
- }
- System.out.println("state:" + sparkAppHandle.getState().toString());
- }
- @Override
- public void infoChanged(SparkAppHandle sparkAppHandle) {
- System.out.println("Info:" + sparkAppHandle.getState().toString());
- }
- });
- System.out.println("The task is executing, please wait ....");
- // 线程等待任务结束
- countDownLatch.await();
- System.out.println("The task is finished!");
- }
- }
注意: 如果部署模式是 cluster, 但是代码中有标准输出的话将看不到, 需要把结果写到 HDFS 中, 如果是 client 模式则可以看到输出.
第二种方式是: 通过 SparkLanuncher.lanunch()方法获取一个进程, 然后调用进程的 process.waitFor()方法等待线程返回结果, 但是使用这种方式需要自己管理运行过程中的输出信息, 比较麻烦, 好处是一切都在掌握之中, 即获取的输出信息和通过命令提交的方式一样, 很详细, 实现如下:
- package com.learn.spark;
- import org.apache.spark.launcher.SparkAppHandle;
- import org.apache.spark.launcher.SparkLauncher;
- import java.io.IOException;
- import java.util.HashMap;
- public class LauncherApp {
- public static void main(String[] args) throws IOException, InterruptedException {
- HashMap env = new HashMap();
- // 这两个属性必须设置
- env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf");
- env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151");
- //env.put("YARN_CONF_DIR","");
- SparkLauncher handle = new SparkLauncher(env)
- .setSparkHome("/usr/local/spark")
- .setAppResource("/usr/local/spark/spark-demo.jar")
- .setMainClass("com.learn.spark.SimpleApp")
- .setMaster("yarn")
- .setDeployMode("cluster")
- .setConf("spark.app.id", "11222")
- .setConf("spark.driver.memory", "2g")
- .setConf("spark.akka.frameSize", "200")
- .setConf("spark.executor.memory", "1g")
- .setConf("spark.executor.instances", "32")
- .setConf("spark.executor.cores", "3")
- .setConf("spark.default.parallelism", "10")
- .setConf("spark.driver.allowMultipleContexts","true")
- .setVerbose(true);
- Process process =handle.launch();
- InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
- Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
- inputThread.start();
- InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
- Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
- errorThread.start();
- System.out.println("Waiting for finish...");
- int exitCode = process.waitFor();
- System.out.println("Finished! Exit code:" + exitCode);
- }
- }
使用的自定义 InputStreamReaderRunnable 类实现如下:
- package com.learn.spark;
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.InputStreamReader;
- public class InputStreamReaderRunnable implements Runnable {
- private BufferedReader reader;
- private String name;
- public InputStreamReaderRunnable(InputStream is, String name) {
- this.reader = new BufferedReader(new InputStreamReader(is));
- this.name = name;
- }
- public void run() {
- System.out.println("InputStream" + name + ":");
- try {
- String line = reader.readLine();
- while (line != null) {
- System.out.println(line);
- line = reader.readLine();
- }
- reader.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
第三种方式是通过 yarn 的 REST API 的方式提交(不太常用但在这里也介绍一下):
Post 请求示例: * http://<rm http address:port>/ws/v1/cluster/apps
请求所带的参数列表:
Item | Data Type | Description |
---|---|---|
application-id | string | The application id |
application-name | string | The application name |
queue | string | The name of the queue to which the application should be submitted |
priority | int | The priority of the application |
am-container-spec | object | The application master container launch context, described below |
unmanaged-AM | boolean | Is the application using an unmanaged application master |
max-app-attempts | int | The max number of attempts for this application |
resource | object | The resources the application master requires, described below |
application-type | string | The application type(MapReduce, Pig, Hive, etc) |
keep-containers-across-application-attempts | boolean | Should YARN keep the containers used by this application instead of destroying them |
application-tags | object | List of application tags, please see the request examples on how to speciy the tags |
log-aggregation-context | object | Represents all of the information needed by the NodeManager to handle the logs for this application |
attempt-failures-validity-interval | long | The failure number will no take attempt failures which happen out of the validityInterval into failure count |
reservation-id | string | Represent the unique id of the corresponding reserved resource allocation in the scheduler |
am-black-listing-requests | object | Contains blacklisting information such as “enable/disable AM blacklisting” and “disable failure threshold” |
来源: http://www.bubuko.com/infodetail-2856355.html