Spark异步job

What if we want to execute 2 actions concurrently on different RDD’s, Spark actions are always synchronous. Like if we perform two actions one after other they always execute in sequentially like one after other.

Let see example


1

2

3

4

val rdd = sc.parallelize(List(32, 34, 2, 3, 4, 54, 3), 4)

rdd.collect().map{ x => println("Items in the lists:" + x)}

val rddCount = sc.parallelize(List(434, 3, 2, 43, 45, 3, 2), 4)

println("Number of items in the list" + rddCount.count())

In the above exmaple 2 actions are perform one after other collect and count, both are execute synchronous. So count will always execute after collect will finish. The out of the above code is as follows

Now question is if we want to run spark jobs concurrently in async fashion.

So for above question answer is simple apache spark also provide a asyn action for concurrent execution of jobs, Few Asynchronous actions spark provide as follows

collectAsync() -> Returns a future for retrieving all elements of this RDD.
countAsync() -> Returns a future for counting the number of elements in the RDD.
foreachAsync(scala.Function1<T,scala.runtime.BoxedUnit> f) -> Applies a function f to all elements of this RDD.
foreachPartitionAsync(scala.Function1<scala.collection.Iterator,scala.runtime.BoxedUnit> f) ->
Applies a function f to each partition of this RDD.
takeAsync(int num) -> Returns a future for retrieving the first num elements of the RDD.

Now let us see what happen when we use async actions.


1

2

3

4

val rdd = sc.parallelize(List(32, 34, 2, 3, 4, 54, 3), 4)

rdd.collectAsync().map{ x => x.map{x=> println("Items in the list:"+x)} }

val rddCount = sc.parallelize(List(434, 3, 2, 43, 45, 3, 2), 4)

rddCount.countAsync().map { x =>println("Number of items in the list: "+x) }

So output of the above code is as follows

You can see in above output the result of the second job is come first because first job return future and execute second one but still have you noticed that jobs are execute one after other that’s means a job use all resources of cluster so another job will delayed.

So for take full advantage of Asynchronous jobs we need to configure job scheduler.

Job Scheduling

By default spark scheduler run spark jobs in FIFO (First In First Out) fashion. In FIFO scheduler the priority is given to the first job and then second and so on. If the jobs is not using whole cluster then second job is also run parallel but if first job is too big then second job will wait soo long even it take too less to execute. So for solution spark provide fair scheduler, fair scheduler jobs will execute in “round robin” fashion.

To configure job scheduler we need to set configuration for it as follows

val conf = new SparkConf().setAppName("spark_auth")
.setMaster("local[*]").set("spark.scheduler.mode", "FAIR")

After configure FAIR scheduling you can see both the jobs are running concurrently and share resources of the spark cluster.

So after this the out of the above code is as follows

You can see in above result both jobs are running concurrently. The result of both the actions are not wait for each other.

For above code you can checkout: https://github.com/knoldus/spark-scala-async

时间: 2024-08-05 23:41:30

Spark异步job的相关文章

Hadoop、Spark、HBase与Redis的适用性讨论(全文)

最近在网上又看到有关于Hadoop适用性的讨论[1].想想今年大数据技术开始由互联网巨头走向中小互联网和传统行业,估计不少人都在考虑各种"纷繁复杂"的大数据技术的适用性的问题.这儿我就结合我这几年在Hadoop等大数据方向的工作经验,与大家讨论一下Hadoop.Spark.HBase及Redis等几个主流大数据技术的使用场景(首先声明一点,本文中所指的Hadoop,是很"狭义"的Hadoop,即在HDFS上直接跑MapReduce的技术,下同). 我这几年实际研究和

Spark的分布式计算

Spark,Spark是什么,如何使用Spark 1.Spark基于什么算法的分布式计算(很简单) 2.Spark与MapReduce不同在什么地方 3.Spark为什么比Hadoop灵活 4.Spark局限是什么 5.什么情况下适合使用Spark 什么是Spark Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点:但不同于Ma

[Spark传奇行动] 第34课:Stage划分和Task最佳位置算法源码彻底解密

本課主題 Job Stage 划分算法解密 Task 最佳位置算法實現解密 引言 作业调度的划分算法以及 Task 的最佳位置的算法,因为 Stage 的划分是DAGScheduler 工作的核心,这也是关系到整个作业有集群中该怎么运行:其次就是数据本地性,Spark 一舨的代码都是链式表达的,这就让一个任务什么时候划分成 Stage,在大数据世界要追求最大化的数据本地性,所有最大化的数据本地性就是在数据计算的时候,数据就在内存中.最后就是 Spark 的实现算法时候的略的怎么样.希望这篇文章能

大数据框架对比:Hadoop、Storm、Samza、Spark和Flink--容错机制(ACK,RDD,基于log和状态快照),消息处理at least once,exactly once两个是关键

分布式流处理是对无边界数据集进行连续不断的处理.聚合和分析.它跟MapReduce一样是一种通用计算,但我们期望延迟在毫秒或者秒级别.这类系统一般采用有向无环图(DAG). DAG是任务链的图形化表示,我们用它来描述流处理作业的拓扑.如下图,数据从sources流经处理任务链到sinks.单机可以运行DAG,但本篇文章主要聚焦在多台机器上运行DAG的情况. 关注点 当选择不同的流处理系统时,有以下几点需要注意的: 运行时和编程模型:平台框架提供的编程模型决定了许多特色功能,编程模型要足够处理各种

Spark的Rpct模块的学习

Spark的Rpct模块的学习 Spark的Rpc模块是1.x重构出来可,以前的代码中大量使用了akka的类,为了把akka从项目的依赖中移除,所有添加了该模块.先看下该模块的几个主要的类 使用EA把该模块所有的类都添加进来了 要看懂该模块还是要先了解akka,  akka有Actor和ActorRef两个类,一个用于接收消息,一个用于发送消息.正好对应该模块的RpcEndpoint和RpcEndpointRef两个类. 下面大致介绍下这几个类,附带一些scala的特性 1:RpcAddress

Spark Streaming实践和优化

发表于:<程序员>杂志2016年2月刊.链接:http://geek.csdn.net/news/detail/54500 作者:徐鑫,董西成 在流式计算领域,Spark Streaming和Storm时下应用最广泛的两个计算引擎.其中,Spark Streaming是Spark生态系统中的重要组成部分,在实现上复用Spark计算引擎.如图1所示,Spark Streaming支持的数据源有很多,如Kafka.Flume.TCP等.Spark Streaming的内部数据表示形式为DStrea

Spark Streaming编程指南

本文基于Spark Streaming Programming Guide原文翻译, 加上一些自己的理解和小实验的结果. 一.概述 Spark Streaming是基于Core Spark API的可扩展,高吞吐量,并具有容错能力的用于处理实时数据流的一个组件.Spark Streaming可以接收各种数据源传递来的数据,比如Kafka, Flume, Kinesis或者TCP等,对接收到的数据还可以使用一些用高阶函数(比如map, reduce, join及window)进行封装的复杂算法做进

Apache Spark RDD初谈2

RDD是Spark最基本也是最根本的数据抽象,它具备像MapReduce等数据流模型的容错性,并且允许开发人员在大型集群上执行基于内存的计算. 为了有效地实现容错,(详细见http://www.cnblogs.com/zlslch/p/5718799.html )RDD提供了一种高度受限的共享内存,即RDD是只读的,并且,只能通过其他RDD上的批量操作来创建. RDD只支持粗粒度转换,限制了编程模型. 但RDD仍然可以很好地适用于很多应用,特别是支持数据并行的批量分析应用,包括数据挖掘.机器学习

(版本定制)第6课:Spark Streaming源码解读之Job动态生成和深度思考

本期内容: 1.Spark Streaming Job生成深度思考 2.Spark Streaming Job生成源码解析 本节课主要是针对Job如何产生进行阐述 在Spark Streaming里,总体负责动态作业调度的具体类是JobScheduler: /** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a