执行 sign ice put stat exce == pla ...
hadoop api提供了一些遍历文件的api,通过该api可以实现遍历文件目录:
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.net.URI;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.List;
- import java.util.concurrent.CountDownLatch;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- public class BatchSubmitMain {
- public static void main(String[] args) throws Exception {
- String mrTableName = args[0];
- String fglibTableName = args[1];
- Configuration conf = new Configuration();
- /*
- * <property> <name>fs.defaultFS</name> <value>hdfs://hcluster</value>
- * </property>
- */
- conf.set("fs.defaultFS", "hdfs://hcluster");
- FileSystem fileSystem = FileSystem.get(conf);
- String mrFilePath = "/myuser/hivedb/" + mrTableName;
- String fglibFilePath = "/myuser/hivedb/" + fglibTableName;
- System.out.println(mrFilePath);
- List < String > mrObjectIdItems = getObjectIdItems(fileSystem, mrFilePath);
- System.out.println(fglibFilePath);
- List < String > fglibObjectIdItems = getObjectIdItems(fileSystem, fglibFilePath);
- List < String > objectIdItems = new ArrayList < >();
- for (String mrObjectId: mrObjectIdItems) {
- for (String fglibObjectId: fglibObjectIdItems) {
- if (mrObjectId == fglibObjectId) {
- objectIdItems.add(mrObjectId);
- }
- }
- }
- String submitShPath = "/app/myaccount/service/submitsparkjob.sh";
- CountDownLatch threadSignal = new CountDownLatch(objectIdItems.size());
- for (int ii = 0; ii < objectIdItems.size(); ii++) {
- String objectId = objectIdItems.get(ii);
- Thread thread = new ImportThread(objectId, submitShPath, threadSignal);
- thread.start();
- }
- threadSignal.await();
- System.out.println(Thread.currentThread().getName() + "complete");
- }
- private static List < String > getObjectIdItems(FileSystem fileSystem, String filePath) throws FileNotFoundException,
- IOException {
- List < String > objectItems = new ArrayList < >();
- Path path = new Path(filePath);
- // 获取文件列表
- FileStatus[] files = fileSystem.listStatus(path);
- // 展示文件信息
- for (int i = 0; i < files.length; i++) {
- try {
- if (files[i].isDirectory()) {
- String[] fileItems = files[i].getPath().getName().split("/");
- String objectId = fileItems[fileItems.length - 1].replace("objectid=", "");
- objectItems.add(objectId);
- System.out.println(objectId);
- }
- } catch(Exception e) {
- e.printStackTrace();
- }
- }
- return objectItems;
- }
- /**
- * @param hdfs
- * FileSystem 对象
- * @param path
- * 文件路径
- */
- public static void iteratorShowFiles(FileSystem hdfs, Path path) {
- try {
- if (hdfs == null || path == null) {
- return;
- }
- // 获取文件列表
- FileStatus[] files = hdfs.listStatus(path);
- // 展示文件信息
- for (int i = 0; i < files.length; i++) {
- try {
- if (files[i].isDirectory()) {
- System.out.print(">>>" + files[i].getPath() + ", dir owner:" + files[i].getOwner());
- // 递归调用
- iteratorShowFiles(hdfs, files[i].getPath());
- } else if (files[i].isFile()) {
- System.out.print(" " + files[i].getPath() + ",length:" + files[i].getLen() + ", owner:" + files[i].getOwner());
- }
- } catch(Exception e) {
- e.printStackTrace();
- }
- }
- } catch(Exception e) {
- e.printStackTrace();
- }
- }
- }
并行执行sh的线程:
- import java.util.concurrent.CountDownLatch;
- public class ImportThread extends Thread {
- private final JavaShellInvoker javaShellInvoker = new JavaShellInvoker();
- private CountDownLatch countDownLatch;
- private String objectId;
- private String submitShPath;
- public ImportThread(String objectId, String submitShPath, CountDownLatch countDownLatch) {
- this.objectId = objectId;
- this.submitShPath = submitShPath;
- this.countDownLatch = countDownLatch;
- }
- @Override public void run() {
- System.out.println(Thread.currentThread().getName() + "start... " + this.submitShPath + " " + this.objectId.toString()); // 打印开始标记
- try {
- int result = this.javaShellInvoker.executeShell("mrraster", this.submitShPath, this.objectId);
- if (result != 0) {
- System.out.println(Thread.currentThread().getName() + " result type is error");
- }
- } catch(Exception e) {
- e.printStackTrace();
- System.out.println(Thread.currentThread().getName() + "-error:" + e.getMessage());
- }
- this.countDownLatch.countDown(); // 计时器减1
- System.out.println(Thread.currentThread().getName() + " complete,last " + this.countDownLatch.getCount() + " threads"); // 打印结束标记
- }
- }
执行sh的java代码:
- import java.io.File;
- import java.text.SimpleDateFormat;
- import java.util.Date;
- public class JavaShellInvoker {
- private static final String executeShellLogFile = "./executeShell_%s_%s.log";
- public int executeShell(String shellCommandType, String shellCommand, String args) throws Exception {
- int success = 0;
- args = (args == null) ? "": args;
- String now = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
- File logFile = new File(String.format(executeShellLogFile, shellCommandType, now));
- ProcessBuilder pb = new ProcessBuilder("sh", shellCommand, args);
- pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logFile));
- pb.redirectError(ProcessBuilder.Redirect.appendTo(logFile));
- Process pid = null;
- try {
- pid = pb.start();
- success = pid.waitFor();
- } catch(Exception ex) {
- success = 2;
- System.out.println("executeShell-error:" + ex.getMessage());
- throw ex;
- } finally {
- if (pid.isAlive()) {
- success = pid.exitValue();
- pid.destroy();
- }
- }
- return success;
- }
- }
submitsparkjob.sh
- #!/bin/sh
- source ../login.sh
- spark-submit --master yarn-cluster --class MySparkJobMainClass --driver-class-path /app/myaccount/service/jars/ojdbc7.jar --jars /app/myaccount/service/jars/ojdbc7.jar --num-executors
- 20 --driver-memory 6g --executor-cores 1 --executor-memory 8g MySparkJobJar.jar $1
执行BatchSubmit.jar的命令:
- hadoop jar BatchSubmit.jar
Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务
执行 sign ice put stat exce == pla ...
原文:http://www.cnblogs.com/yy3b2007com/p/7816917.html
来源: http://www.bubuko.com/infodetail-2390025.html