spark使用总结

背景

使用spark开发已有几个月。相比于python/hive,scala/spark学习门槛较高。尤其记得刚开时,举步维艰,进展十分缓慢。不过谢天谢地,这段苦涩(bi)的日子过去了。忆苦思甜,为了避免项目组的其他同学走弯路,决定总结和梳理spark的使用经验。

Spark基础

基石RDD

spark的核心是RDD(弹性分布式数据集),一种通用的数据抽象,封装了基础的数据操作,如map,filter,reduce等。RDD提供数据共享的抽象,相比其他大数据处理框架,如MapReduce,Pegel,DryadLINQ和HIVE等均缺乏此特性,所以RDD更为通用。

简要地概括RDD:RDD是一个不可修改的,分布的对象集合。每个RDD由多个分区组成,每个分区可以同时在集群中的不同节点上计算。RDD可以包含Python,Java和Scala中的任意对象。

Spark生态圈中应用都是基于RDD构建(下图),这一点充分说明RDD的抽象足够通用,可以描述大多数应用场景。

RDD操作类型—转换和动作

RDD的操作主要分两类:转换(transformation)和动作(action)。两类函数的主要区别是,转换接受RDD并返回RDD,而动作接受RDD但是返回非RDD。转换采用惰性调用机制,每个RDD记录父RDD转换的方法,这种调用链表称之为血缘(lineage);而动作调用会直接计算。

采用惰性调用,通过血缘连接的RDD操作可以管道化(pipeline),管道化的操作可以直接在单节点完成,避免多次转换操作之间数据同步的等待

使用血缘串联的操作可以保持每次计算相对简单,而不用担心有过多的中间数据,因为这些血缘操作都管道化了,这样也保证了逻辑的单一性,而不用像MapReduce那样,为了竟可能的减少map reduce过程,在单个map reduce中写入过多复杂的逻辑。

RDD使用模式

RDD使用具有一般的模式,可以抽象为下面的几步

  1. 加载外部数据,创建RDD对象
  2. 使用转换(如filter),创建新的RDD对象
  3. 缓存需要重用的RDD
  4. 使用动作(如count),启动并行计算

RDD高效的策略

Spark官方提供的数据是RDD在某些场景下,计算效率是Hadoop的20X。这个数据是否有水分,我们先不追究,但是RDD效率高的由一定机制保证的:

  1. RDD数据只读,不可修改。如果需要修改数据,必须从父RDD转换(transformation)到子RDD。所以,在容错策略中,RDD没有数据冗余,而是通过RDD父子依赖(血缘)关系进行重算实现容错。
  2. RDD数据在内存中,多个RDD操作之间,数据不用落地到磁盘上,避免不必要的I/O操作。
  3. RDD存放的数据可以是java对象,所以避免的不必要的对象序列化和反序列化。

总而言之,RDD高效的主要因素是尽量避免不必要的操作和牺牲数据的操作精度,用来提高计算效率。

Spark使用技巧

RDD操作闭包外部变量原则

RDD相关操作都需要传入自定义闭包函数(closure),如果这个函数需要访问外部变量,那么需要遵循一定的规则,否则会抛出运行时异常。闭包函数传入到节点时,需要经过下面的步骤:

  1. 驱动程序,通过反射,运行时找到闭包访问的所有变量,并封成一个对象,然后序列化该对象
  2. 将序列化后的对象通过网络传输到worker节点
  3. worker节点反序列化闭包对象
  4. worker节点执行闭包函数,

注意:外部变量在闭包内的修改不会被反馈到驱动程序。

简而言之,就是通过网络,传递函数,然后执行。所以,被传递的变量必须可以序列化,否则传递失败。本地执行时,仍然会执行上面四步。

广播机制也可以做到这一点,但是频繁的使用广播会使代码不够简洁,而且广播设计的初衷是将较大数据缓存到节点上,避免多次数据传输,提高计算效率,而不是用于进行外部变量访问。

RDD数据同步

RDD目前提供两个数据同步的方法:广播和累计器。

广播 broadcast

前面提到过,广播可以将变量发送到闭包中,被闭包使用。但是,广播还有一个作用是同步较大数据。比如你有一个IP库,可能有几G,在map操作中,依赖这个ip库。那么,可以通过广播将这个ip库传到闭包中,被并行的任务应用。广播通过两个方面提高数据共享效率:1,集群中每个节点(物理机器)只有一个副本,默认的闭包是每个任务一个副本;2,广播传输是通过BT下载模式实现的,也就是P2P下载,在集群多的情况下,可以极大的提高数据传输速率。广播变量修改后,不会反馈到其他节点。

累加器 Accumulator

累加器是一个write-only的变量,用于累加各个任务中的状态,只有在驱动程序中,才能访问累加器。而且,截止到1.2版本,累加器有一个已知的缺陷,在action操作中,n个元素的RDD可以确保累加器只累加n次,但是在transformation时,spark不确保,也就是累加器可能出现n+1次累加。

目前RDD提供的同步机制粒度太粗,尤其是转换操作中变量状态不能同步,所以RDD无法做复杂的具有状态的事务操作。不过,RDD的使命是提供一个通用的并行计算框架,估计永远也不会提供细粒度的数据同步机制,因为这与其设计的初衷是违背的。

RDD优化技巧

RDD缓存

需要使用多次的数据需要cache,否则会进行不必要的重复操作。举个例子


val data = … // read from tdw

println(data.filter(_.contains("error")).count)

println(data.filter(_.contains("warning")).count)

上面三段代码中,data变量会加载两次,高效的做法是在data加载完后,立刻持久化到内存中,如下


val data = … // read from tdw

data.cache

println(data.filter(_.contains("error")).count)

println(data.filter(_.contains("warning")).count)

这样,data在第一加载后,就被缓存到内存中,后面两次操作均直接使用内存中的数据。

转换并行化

RDD的转换操作时并行化计算的,但是多个RDD的转换同样是可以并行的,参考如下


val dataList:Array[RDD[Int]] = …

val sumList = data.list.map(_.map(_.sum))

上面的例子中,第一个map是便利Array变量,串行的计算每个RDD中的每行的sum。由于每个RDD之间计算是没有逻辑联系的,所以理论上是可以将RDD的计算并行化的,在scala中可以轻松试下,如下


val dataList:Array[RDD[Int]] = …

val sumList = data.list.par.map(_.map(_.sum))

注意红色代码。

减少shuffle网络传输

一般而言,网络I/O开销是很大的,减少网络开销,可以显著加快计算效率。任意两个RDD的shuffle操作(join等)的大致过程如下,

用户数据userData和事件events数据通过用户id连接,那么会在网络中传到另外一个节点,这个过程中,有两个网络传输过程。Spark的默认是完成这两个过程。但是,如果你多告诉spark一些信息,spark可以优化,只执行一个网络传输。可以通过使用、HashPartition,在userData"本地"先分区,然后要求events直接shuffle到userData的节点上,那么就减少了一部分网络传输,减少后的效果如下,

虚线部分都是在本地完成的,没有网络传输。在数据加载时,就按照key进行partition,这样可以经一部的减少本地的HashPartition的过程,示例代码如下


val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://…")

.partitionBy(new HashPartitioner(100)) // Create 100 partitions

.persist()

注意,上面一定要persist,否则会重复计算多次。100用来指定并行数量。

Spark其他

Spark开发模式

由于spark应用程序是需要在部署到集群上运行的,导致本地调试比较麻烦,所以经过这段时间的经验累积,总结了一套开发流程,目的是为了尽可能的提高开发调试效率,同时保证开发质量。当然,这套流程可能也不是最优的,后面需要持续改进。

整个流程比较清楚,这里主要谈谈为什么需要单元测试。公司内的大多数项目,一般不提倡单元测试,而且由于项目进度压力,开发人员会非常抵触单元测试,因为会花费"额外"的精力。Bug这东西不会因为项目赶进度而消失,而且恰好相反,可能因为赶进度,而高于平均水平。所以,如果不花时间进行单元测试,那么会花同样多,甚至更多的时间调试。很多时候,往往一些很小的bug,却导致你花了很长时间去调试,而这些bug,恰好是很容易在单元测试中发现的。而且,单元测试还可以带来两个额外的好处:1)API使用范例;2)回归测试。所以,还是单元测试吧,这是一笔投资,而且ROI还挺高!不过凡事需要掌握分寸,单元测试应该根据项目紧迫程度调整粒度,做到有所为,有所不为。

Spark其他功能

前面提到了spark生态圈,spark除了核心的RDD,还提供了之上的几个很使用的应用:

  1. Spark SQL: 类似hive,使用rdd实现sql查询
  2. Spark Streaming: 流式计算,提供实时计算功能,类似storm
  3. MLLib:机器学习库,提供常用分类,聚类,回归,交叉检验等机器学习算法并行实现。
  4. GraphX:图计算框架,实现了基本的图计算功能,常用图算法和pregel图编程框架。

后面需要继续学习和使用上面的功能,尤其是与数据挖掘强相关的MLLib。

来源: <http://www.cnblogs.com/bourneli/p/4394271.html>

来自为知笔记(Wiz)

时间: 2024-10-11 12:48:57

spark使用总结的相关文章

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

Spark SQL 之 Join 实现

原文地址:Spark SQL 之 Join 实现 Spark SQL 之 Join 实现 涂小刚 2017-07-19 217标签: spark , 数据库 Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者,我们有必要了解Join在Spark中是如何组织运行的. SparkSQL总体流程介绍 在阐述Join实现之前,我们首先简单介绍SparkSQL

spark性能调优之资源调优

转https://tech.meituan.com/spark-tuning-basic.html spark作业原理 使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程.根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动.Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core.而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Stand

Spark 整合hive 实现数据的读取输出

实验环境: linux centOS 6.7 vmware虚拟机 spark-1.5.1-bin-hadoop-2.1.0 apache-hive-1.2.1 eclipse 或IntelJIDea 本次使用eclipse. 代码: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import o

spark 教程三 spark Map filter flatMap union distinct intersection操作

RDD的创建 spark 所有的操作都围绕着弹性分布式数据集(RDD)进行,这是一个有容错机制的并可以被并行操作的元素集合,具有只读.分区.容错.高效.无需物化.可以缓存.RDD依赖等特征 RDD的创建基础RDD 1.并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行运算 var sc=new SparkContext(conf) var rdd=sc.parallelize(Array(2,4,9,3,5,7,8,1,6)); rd

Spark运行命令示例

local单机模式:结果xshell可见:./bin/spark-submit --class org.apache.spark.examples.SparkPi --master local[1] ./lib/spark-examples-1.3.1-hadoop2.4.0.jar 100 standalone集群模式:需要的配置项1, slaves文件2, spark-env.shexport JAVA_HOME=/usr/soft/jdk1.7.0_71export SPARK_MASTE

Spark Job具体的物理执行

即使采用pipeline的方式,函数f对依赖的RDD中的数据集合的操作也会有两种方式: 1.f(record),f作用于集合的每一条记录,每次只作用于一条记录 2.f(records),f一次性作用于集合的全部数据: Spark采用的是第一种方式,因为: 1.无需等待,可以最大化的使用集群的计算资源 2.减少OOM的产生 3.最大化的有利于并发 4.可以精准的控制每一个Partition本身(Dependency)及其内部的计算(compute) 5.基于lineage的算子流动式函数式计算,可

Dataflow编程模型和spark streaming结合

Dataflow编程模型和spark streaming结合 主要介绍一下Dataflow编程模型的基本思想,后面再简单比较一下Spark  streaming的编程模型 == 是什么 == 为用户提供以流式或批量模式处理海量数据的能力,该服务的编程接口模型(或者说计算框架)也就是下面要讨论的dataflow model 流式计算框架处理框架很多,也有大量的模型/框架号称能较好的处理流式和批量计算场景,比如Lambda模型,比如Spark等等,那么dataflow模型有什么特别的呢? 这就要要从

Spark性能优化指南——高级篇

Spark性能优化指南--高级篇 [TOC] 前言 继基础篇讲解了每个Spark开发人员都必须熟知的开发调优与资源调优之后,本文作为<Spark性能优化指南>的高级篇,将深入分析数据倾斜调优与shuffle调优,以解决更加棘手的性能问题. 数据倾斜调优 调优概述 有的时候,我们可能会遇到大数据计算中一个最棘手的问题--数据倾斜,此时Spark作业的性能会比期望差很多.数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能. 数据倾斜发生时的现象 绝大多数tas

【Spark深入学习 -14】Spark应用经验与程序调优

----本节内容------- 1.遗留问题解答 2.Spark调优初体验 2.1 利用WebUI分析程序瓶颈 2.2 设置合适的资源 2.3 调整任务的并发度 2.4 修改存储格式 3.Spark调优经验 3.1 Spark原理及调优工具 3.2 运行环境优化 3.2.1 防止不必要的分发 3.2.2 提高数据本地性 3.2.3 存储格式选择 3.2.4 选择高配机器 3.3 优化操作符 3.3.1 过滤操作导致多小任务 3.3.2 降低单条记录开销 3.3.3 处理数据倾斜或者任务倾斜 3.