利用SparkLauncher 类以JAVA API 编程的方式提交Spark job

一.环境说明和使用软件的版本说明:

hadoop-version:hadoop-2.9.0.tar.gz

spark-version:spark-2.2.0-bin-hadoop2.7.tgz

java-version:jdk1.8.0_151

集群环境:单机伪分布式环境。

二.适用背景

在学习Spark过程中,资料中介绍的提交Spark Job的方式主要有两种(我所知道的):第一种是通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这种方式提交的,提交命令示例如下:

./spark-submit --class com.learn.spark.SimpleApp --master yarn --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3  ../spark-demo.jar

参数含义就不解释了,请参考官网资料。

第二种提交方式是已JAVA API编程的方式提交,这种方式不需要使用命令行,直接可以在IDEA中点击Run 运行包含Job的Main类就行,Spark 提供了以SparkLanuncher 作为唯一入口的API来实现。这种方式很方便(试想如果某个任务需要重复执行,但是又不会写linux 脚本怎么搞?我想到的是以JAV API的方式提交Job, 还可以和Spring整合,让应用在tomcat中运行),官网的示例:http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html

三.文章的目地

官网已有demo和API的情况下写这篇文章的目地:官网给出的demo 放在本机跑不了。出现的现象是程序结束了,什么输出都没有或者输出JAVA_HOME is not set,虽然我调用方法设置了,然而没啥用,因此把我搜索和加上在自己思考后能够运行的demo记录下来。

四.相关demo

根据官网的示例这里有两种方式:

第一种是调用SparkLanuncher实例的startApplication方法,但是这种方式在所有配置都正确的情况下使用运行都会失败的,原因是startApplication方法会调用LauncherServer启动一个进程与集群交互,这个操作貌似是异步的,所以可能结果是main主线程结束了这个进程都没有起起来,导致运行失败。解决办法是调用new SparkLanuncher().startApplication后需要让主线程休眠一定的时间后者是使用下面的例子:

package com.learn.spark;

import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;

public class LanuncherAppV {
    public static void main(String[] args) throws IOException, InterruptedException {

        HashMap env = new HashMap();
        //这两个属性必须设置
        env.put("HADOOP_CONF_DIR", "/usr/local/hadoop/etc/overriterHaoopConf");
        env.put("JAVA_HOME", "/usr/local/java/jdk1.8.0_151");
        //可以不设置
        //env.put("YARN_CONF_DIR","");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        //这里调用setJavaHome()方法后,JAVA_HOME is not set 错误依然存在
        SparkAppHandle handle = new SparkLauncher(env)
                .setSparkHome("/usr/local/spark")
                .setAppResource("/usr/local/spark/spark-demo.jar")
                .setMainClass("com.learn.spark.SimpleApp")
                .setMaster("yarn")
                .setDeployMode("cluster")
                .setConf("spark.app.id", "11222")
                .setConf("spark.driver.memory", "2g")
                .setConf("spark.executor.memory", "1g")
                .setConf("spark.executor.instances", "32")
                .setConf("spark.executor.cores", "3")
                .setConf("spark.default.parallelism", "10")
                .setConf("spark.driver.allowMultipleContexts", "true")
                .setVerbose(true).startApplication(new SparkAppHandle.Listener() {
                    //这里监听任务状态,当任务结束时(不管是什么原因结束),isFinal()方法会返回true,否则返回false
                    @Override
                    public void stateChanged(SparkAppHandle sparkAppHandle) {
                        if (sparkAppHandle.getState().isFinal()) {
                            countDownLatch.countDown();
                        }
                        System.out.println("state:" + sparkAppHandle.getState().toString());
                    }

                    @Override
                    public void infoChanged(SparkAppHandle sparkAppHandle) {
                        System.out.println("Info:" + sparkAppHandle.getState().toString());
                    }
                });
        System.out.println("The task is executing, please wait ....");
        //线程等待任务结束
        countDownLatch.await();
        System.out.println("The task is finished!");

    }
}

注意:如果部署模式是cluster,但是代码中有标准输出的话将看不到,需要把结果写到HDFS中,如果是client模式则可以看到输出。
第二种方式是:通过SparkLanuncher.lanunch()方法获取一个进程,然后调用进程的process.waitFor()方法等待线程返回结果,但是使用这种方式需要自己管理运行过程中的输出信息,比较麻烦,好处是一切都在掌握之中,即获取的输出信息和通过命令提交的方式一样,很详细,实现如下:

package com.learn.spark;

import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;

import java.io.IOException;
import java.util.HashMap;

public class LauncherApp {

    public static void main(String[] args) throws IOException, InterruptedException {

        HashMap env = new HashMap();
        //这两个属性必须设置
        env.put("HADOOP_CONF_DIR","/usr/local/hadoop/etc/overriterHaoopConf");
        env.put("JAVA_HOME","/usr/local/java/jdk1.8.0_151");
        //env.put("YARN_CONF_DIR","");

        SparkLauncher handle = new SparkLauncher(env)
                .setSparkHome("/usr/local/spark")
                .setAppResource("/usr/local/spark/spark-demo.jar")
                .setMainClass("com.learn.spark.SimpleApp")
                .setMaster("yarn")
                .setDeployMode("cluster")
                .setConf("spark.app.id", "11222")
                .setConf("spark.driver.memory", "2g")
                .setConf("spark.akka.frameSize", "200")
                .setConf("spark.executor.memory", "1g")
                .setConf("spark.executor.instances", "32")
                .setConf("spark.executor.cores", "3")
                .setConf("spark.default.parallelism", "10")
                .setConf("spark.driver.allowMultipleContexts","true")
                .setVerbose(true);

         Process process =handle.launch();
        InputStreamReaderRunnable inputStreamReaderRunnable = new InputStreamReaderRunnable(process.getInputStream(), "input");
        Thread inputThread = new Thread(inputStreamReaderRunnable, "LogStreamReader input");
        inputThread.start();

        InputStreamReaderRunnable errorStreamReaderRunnable = new InputStreamReaderRunnable(process.getErrorStream(), "error");
        Thread errorThread = new Thread(errorStreamReaderRunnable, "LogStreamReader error");
        errorThread.start();

        System.out.println("Waiting for finish...");
        int exitCode = process.waitFor();
        System.out.println("Finished! Exit code:" + exitCode);

    }
}

使用的自定义InputStreamReaderRunnable类实现如下:

package com.learn.spark;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

public class InputStreamReaderRunnable implements Runnable {

    private BufferedReader reader;

    private String name;

    public InputStreamReaderRunnable(InputStream is, String name) {
        this.reader = new BufferedReader(new InputStreamReader(is));
        this.name = name;
    }

    public void run() {
        System.out.println("InputStream " + name + ":");
        try {
            String line = reader.readLine();
            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

原文地址:https://www.cnblogs.com/itboys/p/10015968.html

时间: 2024-10-29 03:58:30

利用SparkLauncher 类以JAVA API 编程的方式提交Spark job的相关文章

Java API操作HA方式下的Hadoop

通过java api连接Hadoop集群时,如果集群支持HA方式,那么可以通过如下方式设置来自动切换到活动的master节点上.其中,ClusterName 是可以任意指定的,跟集群配置无关,dfs.ha.namenodes.ClusterName也可以任意指定名称,有几个master就写几个,后面根据相应的设置添加master节点地址即可. private static String ClusterName = "nsstargate"; private static final S

HBase新版本Java API编程实战及基本操作方法封装

我的HBase版本是0.98 首先说明一下,如果用eclipse操作hbase时,如果报Unknown host错误,找不到主机,是因为你没有配IP地址的映射 方法是 找到你的系统盘里面的C:\Windows\System32\drivers\etc下的hosts文件,打开,增加一个映射 加一个映射 192.168.52.140 master 话不多说,直接看代码,注释很详细 import java.io.IOException; import java.util.Arrays; import

分布式数据库HBase安装与使用(shell+Java API)

本指南介绍了HBase,详细指导大家安装配置HBase及其使用.本教程在Ubuntu14.04下测试通过. 一.HBase介绍 HBase是一个分布式的.面向列的开源数据库,源于Google的一篇论文<BigTable:一个结构化数据的分布式存储系统>.HBase以表的形式存储数据,表有行和列组成,列划分为若干个列族/列簇(column family).欲了解HBase的官方资讯,请访问HBase官方网站. HBase的运行有三种模式:单机模式.伪分布式模式.分布式模式. 单机模式:在一台计算

Java API 读取HDFS的单文件

HDFS上的单文件: -bash-3.2$ hadoop fs -ls /user/pms/ouyangyewei/data/input/combineorder/repeat_rec_category Found 1 items -rw-r--r-- 2 deploy supergroup 520 2014-08-14 17:03 /user/pms/ouyangyewei/data/input/combineorder/repeat_rec_category/repeatRecCategor

Spark Java API 之 CountVectorizer

Spark Java API 之 CountVectorizer 由于在Spark中文本处理与分析的一些机器学习算法的输入并不是文本数据,而是数值型向量.因此,需要进行转换.而将文本数据转换成数值型的向量有很多种方法,CountVectorizer是其中之一. A CountVectorizer converts a collection of text documents into a vector representing the word count of text documents.

java网络编程:InteAddress类API

apache的http协议jar地址:http://hc.apache.org/downloads.cgi 类 InetAddress java.lang.Object java.net.InetAddress 所有已实现的接口: Serializable 直接已知子类: Inet4Address, Inet6Address 此类表示互联网协议 (IP) 地址.IP 地址是 IP 使用的 32 位或 128 位无符号数字,它是一种低级协议,UDP 和 TCP 协议都是在它的基础上构建的.Inet

Java日期时间API系列5-----Jdk7及以前的日期时间类TimeUnit在并发编程中的应用

TimeUnit是一个时间单位枚举类,主要用于并发编程,时间单元表示给定粒度单元的时间持续时间,并提供实用程序方法来跨单元转换,以及在这些单元中执行计时和延迟操作. 1.时间单位换算 (1)支持的单位 TimeUnit.DAYS //天 TimeUnit.HOURS //小时 TimeUnit.MINUTES //分钟 TimeUnit.SECONDS //秒 TimeUnit.MILLISECONDS //毫秒 TimeUnit.MICROSECONDS //微秒 TimeUnit.NANOS

1.熟练的使用Java语言进行面向对象程序设计,有良好的编程习惯,熟悉常用的Java API,包括集合框架、多线程(并发编程)、I/O(NIO)、Socket、JDBC、XML、反射等。[泛型]\

. 进程和线程之间有什么不同? 一个进程是一个独立(self contained)的运行环境,它可以被看作一个程序或者一个应用.而线程是在进程中执行的一个任务.Java运行环境是一个包含了不同的类和程序的单一进程.线程可以被称为轻量级进程.线程需要较少的资源来创建和驻留在进程中,并且可以共享进程中的资源. 2. 多线程编程的好处是什么? 在多线程程序中,多个线程被并发的执行以提高程序的效率,CPU不会因为某个线程需要等待资源而进入空闲状态. 3. 用户线程和守护线程有什么区别? 当我们在Java

《java并发编程实战》读书笔记4--基础构建模块,java中的同步容器类&amp;并发容器类&amp;同步工具类,消费者模式

上一章说道委托是创建线程安全类的一个最有效策略,只需让现有的线程安全的类管理所有的状态即可.那么这章便说的是怎么利用java平台类库的并发基础构建模块呢? 5.1 同步容器类 包括Vector和Hashtable,此外还包括在JDK1.2中添加的一些功能相似的类,这些同步的封装器类由Collections.synchronizedXxx等工厂方法创建的.这些类实现线程安全的方式是:将他们的状态封装起来,并对每个共有方法都进行同步,使得每次只能有一个线程能访问容器的状态. 关于java中的Vect