【总结】Spark优化-多Job并发执行

Spark程序中一个Job的触发是通过一个Action算子,比如count(), saveAsTextFile()等

在这次Spark优化测试中,从Hive中读取数据,将其另外保存四份,其中两个Job采用串行方式,另外两个Job采用并行方式。将任务提交到Yarn中执行。能够明显看出串行与兵线处理的性能。

每个Job执行时间:

JobID 开始时间 结束时间 耗时
Job 0 16:59:45 17:00:34 49s
Job 1 17:00:34 17:01:13 39s
Job 2 17:01:15 17:01:55 40s
Job 3 17:01:16 17:02:12 56s

四个Job都是自执行相同操作,Job0,Job1一组采用串行方式,Job2,Job3采用并行方式。

Job0,Job1串行方式耗时等于两个Job耗时之和 49s+39s=88s

Job2,Job3并行方式耗时等于最先开始和最后结束时间只差17:02:12-17:01:15=57s

代码:

package com.cn.ctripotb;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.hive.HiveContext;

import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

/**
 * Created by Administrator on 2016/9/12.
 */
public class HotelTest {
    static ResourceBundle rb = ResourceBundle.getBundle("filepath");
    public static void main(String[] args) {
        SparkConf conf = new SparkConf()
                .setAppName("MultiJobWithThread")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");

        JavaSparkContext sc = new JavaSparkContext(conf);
        HiveContext hiveContext = new HiveContext(sc.sc());  //测试真实数据时要把这里放开

        final DataFrame df = getHotelInfo(hiveContext);
        //没有多线程处理的情况,连续执行两个Action操作,生成两个Job
        df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file1",com.hadoop.compression.lzo.LzopCodec.class);
        df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file2",com.hadoop.compression.lzo.LzopCodec.class);

        //用Executor实现多线程方式处理Job
        java.util.concurrent.ExecutorService executorService = Executors.newFixedThreadPool(2);
        executorService.submit(new Callable<Void>() {
            @Override
            public Void call(){
                df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file3",com.hadoop.compression.lzo.LzopCodec.class);
                return null;
            }
        });
        executorService.submit(new Callable<Void>() {
            @Override
            public Void call(){
                df.rdd().saveAsTextFile(rb.getString("hdfspath") + "/file4",com.hadoop.compression.lzo.LzopCodec.class);
                return null;
            }
        });

        executorService.shutdown();
    }
    public static DataFrame getHotelInfo(HiveContext hiveContext){
        String sql = "select * from common.dict_hotel_ol";
        return  hiveContext.sql(sql);
    }
}
时间: 2024-10-08 04:04:34

【总结】Spark优化-多Job并发执行的相关文章

nginx优化 突破十万并发

nginx优化 突破十万并发 一.一般来说nginx 配置文件中对优化比较有作用的为以下几项: 1. worker_processes 8; nginx 进程数,建议按照cpu 数目来指定,一般为它的倍数 (如,2个四核的cpu计为8). 2. worker_cpu_affinity 00000001 00000010 00000100 00001000 00010000 00100000 01000000 10000000; 为每个进程分配cpu,上例中将8 个进程分配到8 个cpu,当然可以

hive 实现job并发执行

hive里,同一sql里,会涉及到n个job,默认情况下,每个job是顺序执行的. 如果每个job没有前后依赖关系,可以并发执行的话,可以通过设置该参数 set hive.exec.parallel=true,实现job并发执行,该参数默认可以并发执行的job数为8. set hive.exec.parallel=true; 例如: sql=""" set hive.exec.parallel=true; use database; select a,b,c from ( s

Spark优化一则 - 减少Shuffle

Spark优化一则 - 减少Shuffle 看了Spark Summit 2014的A Deeper Understanding of Spark Internals,视频(要科学上网)详细讲解了Spark的工作原理,Slides的45页给原始算法和优化算法. 破砂锅用自己3节点的Spark集群试验了这个优化算法,并进一步找到更快的算法.测试数据是Sogou实验室的日志文件前10000000条数据.目标是对日志第2列数据,按照第一个字母合并,得到每个首字母有几条记录. 所有的方案都重新启动Spa

多线程的并发执行应用(生产者消费者模式)

在实际的开发中我们为了提高CPU的利用率,也提高程序的执行效率,我们经常使用多线程进行对数据进行并发处理,下面我举一个多线程并发执行的实例,大致意思就是 一个简单的生产者消费者模式,二个线程进行存数据,一个线程进行取数据. import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueTest { /** * @param a

试解释下列名词:程序的顺序执行,程序的并发执行。

一个计算由若干个操作组成,若这些操作必须按照某种先后次序来执行,以保证操作的结果是正确的,则这类计算过程称为程序的顺序执行过程. 所谓的程序的并发执行是指若干个程序同时在系统中运行,这些程序的执行在时间上是重叠的,一个程序的执行尚未结束,另一个程序的执行已经开始. 补充: 顺序程序的特点: 顺序性.封闭性.可再现性. 并发程序的特点: 失去程序的封闭性.程序与计算不再一一对应.程序并发执行时的相互制约关系.

python 并发执行之多线程

正常情况下,我们在启动一个程序的时候.这个程序会先启动一个进程,启动之后这个进程会拉起来一个线程.这个线程再去处理事务.也就是说真正干活的是线程,进程这玩意只负责向系统要内存,要资源但是进程自己是不干活的.默认情况下只有一个进程只会拉起来一个线程. 多线程顾名思义,就是同样在一个进程的情况同时拉起来多个线程.上面说了,真正干活的是线程.进程与线程的关系就像是工厂和工人的关系.那么现在工厂还是一个,但是干活的工人多了.那么效率自然就提高了.因为只有一个进程,所以多线程在提高效率的同时,并没有向系统

shell模拟并发执行

参考: http://www.51testing.com/html/28/116228-238978.html http://blog.chinaunix.net/uid-27571599-id-3473078.html         在bash中,使用后台任务来实现任务的多进程化.在不加控制的模式下,不管有多少任务,全部都后台执行.也就是说,在这种情况下,有多少任务就有多少"进程"在同时执行.   实例一:正常脚本(脚本功能:查看一个文件中的IP列表循环测试主机连通性) [[ema

【多线程】并发执行指定数量的线程

有时候为了控制并发规模,我们需要对每次启动的线程做个数量上的限制,可以使用Executors.newFixedThreadPool(int)这个方法. 例子 一个线程类,运行中休息几秒为了观察现象更为明显 package com.nicchagil.study.thread.cnblogs.No01启动固定数量的线程; import java.util.concurrent.TimeUnit; public class MyThread extends Thread { @Override pu

两个线程并发执行以下代码,假设a是全局变量,那么以下输出______是不可能的?

3.两个线程并发执行以下代码,假设a是全局变量,那么以下输出______是不可能的? void foo(){    ++a;    printf("%d ",a);}A.3 2    B.2 3    C.3 3    D.2 2  1.读a            5.读a2.a+1            6.a+13.写a            7.写a4.打印a          8.打印a B:12345678C:12356784(或48)D:15234678