Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务

hadoop api提供了一些遍历文件的api,通过该api可以实现遍历文件目录:

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class BatchSubmitMain {
    public static void main(String[] args) throws Exception {
        String mrTableName = args[0];
        String fglibTableName = args[1];

        Configuration conf = new Configuration();
        /*
         * <property> <name>fs.defaultFS</name> <value>hdfs://hcluster</value>
         * </property>
         */
        conf.set("fs.defaultFS", "hdfs://hcluster");
        FileSystem fileSystem = FileSystem.get(conf);

        String mrFilePath = "/myuser/hivedb/" + mrTableName;
        String fglibFilePath = "/myuser/hivedb/" + fglibTableName;

        System.out.println(mrFilePath);
        List<String> mrObjectIdItems = getObjectIdItems(fileSystem, mrFilePath);

        System.out.println(fglibFilePath);
        List<String> fglibObjectIdItems = getObjectIdItems(fileSystem, fglibFilePath);

        List<String> objectIdItems = new ArrayList<>();

        for (String mrObjectId : mrObjectIdItems) {
            for (String fglibObjectId : fglibObjectIdItems) {
                if (mrObjectId == fglibObjectId) {
                    objectIdItems.add(mrObjectId);
                }
            }
        }

        String submitShPath = "/app/myaccount/service/submitsparkjob.sh";

        CountDownLatch threadSignal = new CountDownLatch(objectIdItems.size());

        for (int ii = 0; ii < objectIdItems.size(); ii++) {
            String objectId = objectIdItems.get(ii);
            Thread thread = new ImportThread(objectId, submitShPath, threadSignal);
            thread.start();
        }

        threadSignal.await();

        System.out.println(Thread.currentThread().getName() + "complete");
    }

    private static List<String> getObjectIdItems(FileSystem fileSystem, String filePath) throws FileNotFoundException, IOException {
        List<String> objectItems = new ArrayList<>();

        Path path = new Path(filePath);
        // 获取文件列表
        FileStatus[] files = fileSystem.listStatus(path);
        // 展示文件信息
        for (int i = 0; i < files.length; i++) {
            try {
                if (files[i].isDirectory()) {
                    String[] fileItems = files[i].getPath().getName().split("/");
                    String objectId = fileItems[fileItems.length - 1].replace("objectid=", "");
                    objectItems.add(objectId);
                    System.out.println(objectId);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        return objectItems;
    }

    /**
     * @param hdfs
     *            FileSystem 对象
     * @param path
     *            文件路径
     */
    public static void iteratorShowFiles(FileSystem hdfs, Path path) {
        try {
            if (hdfs == null || path == null) {
                return;
            }

            // 获取文件列表
            FileStatus[] files = hdfs.listStatus(path);

            // 展示文件信息
            for (int i = 0; i < files.length; i++) {
                try {
                    if (files[i].isDirectory()) {
                        System.out.print(">>>" + files[i].getPath() + ", dir owner:" + files[i].getOwner());
                        // 递归调用
                        iteratorShowFiles(hdfs, files[i].getPath());
                    } else if (files[i].isFile()) {
                        System.out.print(" " + files[i].getPath() + ",length:" + files[i].getLen() + ", owner:" + files[i].getOwner());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

并行执行sh的线程:

import java.util.concurrent.CountDownLatch;

public class ImportThread extends Thread {
    private final JavaShellInvoker javaShellInvoker = new JavaShellInvoker();

    private CountDownLatch countDownLatch;
    private String objectId;
    private String submitShPath;

    public ImportThread(String objectId, String submitShPath, CountDownLatch countDownLatch) {
        this.objectId = objectId;
        this.submitShPath = submitShPath;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + "start... " + this.submitShPath + " " + this.objectId.toString());// 打印开始标记

        try {
            int result = this.javaShellInvoker.executeShell("mrraster", this.submitShPath, this.objectId);
            if (result != 0) {
                System.out.println(Thread.currentThread().getName() + " result type is error");
            }
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(Thread.currentThread().getName() + "-error:" + e.getMessage());
        }

        this.countDownLatch.countDown();// 计时器减1
        System.out.println(Thread.currentThread().getName() + " complete,last " + this.countDownLatch.getCount() + " threads");// 打印结束标记
    }
}

执行sh的java代码:

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;

public class JavaShellInvoker {
    private static final String executeShellLogFile = "./executeShell_%s_%s.log";

    public int executeShell(String shellCommandType, String shellCommand, String args) throws Exception {
        int success = 0;

        args = (args == null) ? "" : args;

        String now = new SimpleDateFormat("yyyy-MM-dd").format(new Date());
        File logFile = new File(String.format(executeShellLogFile, shellCommandType, now));

        ProcessBuilder pb = new ProcessBuilder("sh", shellCommand, args);
        pb.redirectOutput(ProcessBuilder.Redirect.appendTo(logFile));
        pb.redirectError(ProcessBuilder.Redirect.appendTo(logFile));

        Process pid = null;

        try {
            pid = pb.start();
            success = pid.waitFor();
        } catch (Exception ex) {
            success = 2;
            System.out.println("executeShell-error:" + ex.getMessage());
            throw ex;
        } finally {
            if (pid.isAlive()) {
                success = pid.exitValue();
                pid.destroy();
            }
        }

        return success;
    }
}

submitsparkjob.sh

#!/bin/sh
source ../login.sh
spark-submit --master yarn-cluster --class MySparkJobMainClass --driver-class-path /app/myaccount/service/jars/ojdbc7.jar --jars /app/myaccount/service/jars/ojdbc7.jar --num-executors
 20 --driver-memory 6g --executor-cores 1 --executor-memory 8g MySparkJobJar.jar $1

执行BatchSubmit.jar的命令:

hadoop jar BatchSubmit.jar
时间: 2024-10-19 15:33:33

Hadoop API:遍历文件分区目录,并根据目录下的数据进行并行提交spark任务的相关文章

C#遍历文件夹(包括子目录)下的所有文件

前提现在一个分区下建立bb.txt文件. 1 using System; 2 using System.Collections.Generic; 3 using System.ComponentModel; 4 using System.Data; 5 using System.Drawing; 6 using System.Linq; 7 using System.Text; 8 using System.Windows.Forms; 9 using System.IO; 10 11 name

遍历文件夹及其子文件夹下的.pdf文件,并解压文件夹下所有的压缩包

List<PDFPATH> pdfpath = new List<PDFPATH>(); List<string> ziplist = new List<string>(); public Form1() { InitializeComponent(); } private void button1_Click(object sender, EventArgs e) { FolderBrowserDialog dialog = new FolderBrows

BAT 遍历文件夹和子文件夹下所有文件

echo off & color 0A ::指定起始文件夹 set DIR="%cd%" echo DIR=%DIR% :: 参数 /R 表示需要遍历子文件夹,去掉表示不遍历子文件夹 :: %%f 是一个变量,类似于迭代器,但是这个变量只能由一个字母组成,前面带上%% :: 括号中是通配符,可以指定后缀名,*.*表示所有文件 for /R %DIR% %%f in (*.txt) do ( echo %%f ) pause

php高效遍历文件夹、高效读取文件

/** * PHP高效遍历文件夹 * @param string $path 目录路径 * @param integer $level 目录深度 */ function fn_scandir($path = './', $level = 0) { $file = new FilesystemIterator($path); $filename = ''; $prefix = ''; $url = ''; foreach ($file as $fileinfo) { $filename = $fi

C++下遍历文件夹

编写程序遍历文件夹及其子文件夹下所有文件,并输出到标准输出流或者文件流. 1. 先考虑在单层目录下,遍历所有文件.以C:\WINDOWS为例: 用到数据结构_finddata_t,文件信息结构体的指针. struct _finddata_t{ unsigned attrib; //文件属性 time_t time_create; //文件创建时间 time_t time_access; //文件上一次访问时间 time_t time_write; //文件上一次修改时间 _fsize_t siz

C语言遍历文件和目录——————【Badboy】

[cpp] #include #include #include #include #include #include #include #define MAX_PATH_LENGTH 512 #define MAX_FILE_EXTENSION 9 unsigned long visit_dirs = 0; unsigned long visit_files = 0; void listdir(char *path){ DIR *ptr_dir; struct dirent *dir_entr

Hadoop之HDFS文件操作

摘要:Hadoop之HDFS文件操作常有两种方式,命令行方式和JavaAPI方式.本文介绍如何利用这两种方式对HDFS文件进行操作. 关键词:HDFS文件    命令行     Java API HDFS是一种分布式文件系统,为MapReduce这种框架下的海量数据分布式处理而设计. Hadoop之HDFS文件操作常有两种方式,一种是命令行方式,即Hadoop提供了一套与Linux文件命令类似的命令行工具:另一种是JavaAPI,即利用Hadoop的Java库,采用编程的方式操作HDFS的文件.

java File基本操作,以及递归遍历文件夹

java 的文件操作,相对来说是比较重要的,无论是编写CS还是BS程序,都避免不了要与文件打交道,例如读写配置文件等.虽然现在很多框架都直接帮你做好了这一步! java.io.File 底层是调用与c语言接的接口,所以我们暂时不需要知道底层是怎么实现的,再说了,也看不见,最多就是看见一个接口而已.我们只需要知道java.io.File提供给我们对文件的一些操作就行了. 1.文件的创建 :java的文件创建可以直接通过new一个对象来实现. File file = new File(String

Hadoop HDFS分布式文件系统设计要点与架构

Hadoop简介:一个分布式系统基础架构,由Apache基金会开发.用户可以在不了解分布式底层细节的情况下,开发分布式程序.充分利用集群的威力高速运算和存储.Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS.HDFS有着高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上.而且它提供高传输率(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序.HDF