Spark 以及 spark streaming 核心原理及实践

导语

spark 已经成为广告、报表以及推荐系统等大数据计算场景中首选系统,因效率高,易用以及通用性越来越得到大家的青睐,我自己最近半年在接触spark以及spark streaming之后,对spark技术的使用有一些自己的经验积累以及心得体会,在此分享给大家。

本文依次从spark生态,原理,基本概念,spark streaming原理及实践,还有spark调优以及环境搭建等方面进行介绍,希望对大家有所帮助。

spark 生态及运行原理

Spark 特点

  1. 运行速度快 => Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算。官方提供的数据表明,如果数据由磁盘读取,速度是Hadoop MapReduce的10倍以上,如果数据从内存中读取,速度可以高达100多倍。
  2. 适用场景广泛 => 大数据分析统计,实时数据处理,图计算及机器学习
  3. 易用性 => 编写简单,支持80种以上的高级算子,支持多种语言,数据源丰富,可部署在多种集群中
  4. 容错性高。Spark引进了弹性分布式数据集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”(即充许基于数据衍生过程)对它们进行重建。另外在RDD计算时可以通过CheckPoint来实现容错,而CheckPoint有两种方式:CheckPoint Data,和Logging The Updates,用户可以控制采用哪种方式来实现容错。

Spark的适用场景

目前大数据处理场景有以下几个类型:

  1. 复杂的批量处理(Batch Data Processing),偏重点在于处理海量数据的能力,至于处理速度可忍受,通常的时间可能是在数十分钟到数小时;
  2. 基于历史数据的交互式查询(Interactive Query),通常的时间在数十秒到数十分钟之间
  3. 基于实时数据流的数据处理(Streaming Data Processing),通常在数百毫秒到数秒之间

Spark成功案例

目前大数据在互联网公司主要应用在广告、报表、推荐系统等业务上。在广告业务方面需要大数据做应用分析、效果分析、定向优化等,在推荐系统方面则需要大数据优化相关排名、个性化推荐以及热点点击分析等。这些应用场景的普遍特点是计算量大、效率要求高。

腾讯 / yahoo / 淘宝 / 优酷土豆

spark运行架构

spark基础运行架构如下所示:

spark结合yarn集群背后的运行流程如下所示:

spark 运行流程:

Spark架构采用了分布式计算中的Master-Slave模型。Master是对应集群中的含有Master进程的节点,Slave是集群中含有Worker进程的节点。

  • Master作为整个集群的控制器,负责整个集群的正常运行;
  • Worker相当于计算节点,接收主节点命令与进行状态汇报;
  • Executor负责任务的执行;
  • Client作为用户的客户端负责提交应用;
  • Driver负责控制一个应用的执行。

Spark集群部署后,需要在主节点和从节点分别启动Master进程和Worker进程,对整个集群进行控制。在一个Spark应用的执行过程中,Driver和Worker是两个重要角色。Driver 程序是应用逻辑执行的起点,负责作业的调度,即Task任务的分发,而多个Worker用来管理计算节点和创建Executor并行处理任务。在执行阶段,Driver会将Task和Task所依赖的file和jar序列化后传递给对应的Worker机器,同时Executor对相应数据分区的任务进行处理。

  1. Excecutor /Task 每个程序自有,不同程序互相隔离,task多线程并行
  2. 集群对Spark透明,Spark只要能获取相关节点和进程
  3. Driver 与Executor保持通信,协作处理

三种集群模式:

1.Standalone 独立集群

2.Mesos, apache mesos

3.Yarn, hadoop yarn

基本概念:

  • Application =>Spark的应用程序,包含一个Driver program和若干Executor
  • SparkContext => Spark应用程序的入口,负责调度各个运算资源,协调各个Worker Node上的Executor
  • Driver Program => 运行Application的main()函数并且创建SparkContext
  • Executor => 是为Application运行在Worker node上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上。每个Application都会申请各自的Executor来处理任务
  • Cluster Manager =>在集群上获取资源的外部服务 (例如:Standalone、Mesos、Yarn)
  • Worker Node => 集群中任何可以运行Application代码的节点,运行一个或多个Executor进程
  • Task => 运行在Executor上的工作单元
  • Job => SparkContext提交的具体Action操作,常和Action对应
  • Stage => 每个Job会被拆分很多组task,每组任务被称为Stage,也称TaskSet
  • RDD => 是Resilient distributed datasets的简称,中文为弹性分布式数据集;是Spark最核心的模块和类
  • DAGScheduler => 根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler
  • TaskScheduler => 将Taskset提交给Worker node集群运行并返回结果
  • Transformations => 是Spark API的一种类型,Transformation返回值还是一个RDD,所有的Transformation采用的都是懒策略,如果只是将Transformation提交是不会执行计算的
  • Action => 是Spark API的一种类型,Action返回值不是一个RDD,而是一个scala集合;计算只有在Action被提交的时候计算才被触发。

Spark核心概念之RDD

Spark核心概念之Transformations / Actions

Transformation返回值还是一个RDD。它使用了链式调用的设计模式,对一个RDD进行计算后,变换成另外一个RDD,然后这个RDD又可以进行另外一次转换。这个过程是分布式的。 Action返回值不是一个RDD。它要么是一个Scala的普通集合,要么是一个值,要么是空,最终或返回到Driver程序,或把RDD写入到文件系统中。

Action是返回值返回给driver或者存储到文件,是RDD到result的变换,Transformation是RDD到RDD的变换。

只有action执行时,rdd才会被计算生成,这是rdd懒惰执行的根本所在。

Spark核心概念之Jobs / Stage

Job => 包含多个task的并行计算,一个action触发一个job

stage => 一个job会被拆为多组task,每组任务称为一个stage,以shuffle进行划分

Spark核心概念之Shuffle

以reduceByKey为例解释shuffle过程。

在没有task的文件分片合并下的shuffle过程如下:(spark.shuffle.consolidateFiles=false

fetch 来的数据存放到哪里?

刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。这里我们主要讨论处理后的数据,可以灵活设置这些数据是“只用内存”还是“内存+磁盘”。如果spark.shuffle.spill = false就只用内存。由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。

shuffle之所以需要把中间结果放到磁盘文件中,是因为虽然上一批task结束了,下一批task还需要使用内存。如果全部放在内存中,内存会不够。另外一方面为了容错,防止任务挂掉。

存在问题如下:

  1. 产生的 FileSegment 过多。每个 ShuffleMapTask 产生 R(reducer 个数)个 FileSegment,M 个 ShuffleMapTask 就会产生 M * R 个文件。一般 Spark job 的 M 和 R 都很大,因此磁盘上会存在大量的数据文件。
  2. 缓冲区占用内存空间大。每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 MR 个 bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区可以被回收,但一个 worker node 上同时存在的 bucket 个数可以达到 cores R 个(一般 worker 同时可以运行 cores 个 ShuffleMapTask),占用的内存空间也就达到了cores× R × 32 KB。对于 8 核 1000 个 reducer 来说,占用内存就是 256MB。

为了解决上述问题,我们可以使用文件合并的功能。

在进行task的文件分片合并下的shuffle过程如下:(spark.shuffle.consolidateFiles=true

可以明显看出,在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i‘,每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。这样,每个 worker 持有的文件数降为 cores× R。FileConsolidation 功能可以通过spark.shuffle.consolidateFiles=true来开启。

Spark核心概念之Cache

val rdd1 = ... // 读取hdfs数据,加载成RDD
rdd1.cache

val rdd2 = rdd1.map(...)
val rdd3 = rdd1.filter(...)

rdd2.take(10).foreach(println)
rdd3.take(10).foreach(println)

rdd1.unpersist

cache和unpersisit两个操作比较特殊,他们既不是action也不是transformation。cache会将标记需要缓存的rdd,真正缓存是在第一次被相关action调用后才缓存;unpersisit是抹掉该标记,并且立刻释放内存。只有action执行时,rdd1才会开始创建并进行后续的rdd变换计算。

cache其实也是调用的persist持久化函数,只是选择的持久化级别为MEMORY_ONLY

persist支持的RDD持久化级别如下:

需要注意的问题:

Cache或shuffle场景序列化时, spark序列化不支持protobuf message,需要java 可以serializable的对象。一旦在序列化用到不支持java serializable的对象就会出现上述错误。

Spark只要写磁盘,就会用到序列化。除了shuffle阶段和persist会序列化,其他时候RDD处理都在内存中,不会用到序列化。

Spark Streaming运行原理

spark程序是使用一个spark应用实例一次性对一批历史数据进行处理,spark streaming是将持续不断输入的数据流转换成多个batch分片,使用一批spark应用实例进行处理。

从原理上看,把传统的spark批处理程序变成streaming程序,spark需要构建什么?

需要构建4个东西:

  1. 一个静态的 RDD DAG 的模板,来表示处理逻辑;
  2. 一个动态的工作控制器,将连续的 streaming data 切分数据片段,并按照模板复制出新的 RDD ;
  3. DAG 的实例,对数据片段进行处理;
  4. Receiver进行原始数据的产生和导入;Receiver将接收到的数据合并为数据块并存到内存或硬盘中,供后续batch RDD进行消费;
  5. 对长时运行任务的保障,包括输入数据的失效后的重构,处理任务的失败后的重调。

具体streaming的详细原理可以参考广点通出品的源码解析文章:

https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/0.1%20Spark%20Streaming%20%E5%AE%9E%E7%8E%B0%E6%80%9D%E8%B7%AF%E4%B8%8E%E6%A8%A1%E5%9D%97%E6%A6%82%E8%BF%B0.md#24

对于spark streaming需要注意以下三点:

  1. 尽量保证每个work节点中的数据不要落盘,以提升执行效率。

  1. 保证每个batch的数据能够在batch interval时间内处理完毕,以免造成数据堆积。

  1. 使用steven提供的框架进行数据接收时的预处理,减少不必要数据的存储和传输。从tdbank中接收后转储前进行过滤,而不是在task具体处理时才进行过滤。

Spark 资源调优

内存管理:

Executor的内存主要分为三块:

第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;

第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;

第三块是让RDD持久化时使用,默认占Executor总内存的60%。

每个task以及每个executor占用的内存需要分析一下。每个task处理一个partiiton的数据,分片太少,会造成内存不够。

其他资源配置:

具体调优可以参考美团点评出品的调优文章:

http://tech.meituan.com/spark-tuning-basic.html

http://tech.meituan.com/spark-tuning-pro.html

Spark 环境搭建

spark tdw以及tdbank api文档:

http://git.code.oa.com/tdw/tdw-spark-common/wikis/api

其他学习资料:

http://km.oa.com/group/2430/articles/show/257492



原文链接:https://www.qcloud.com/community/article/770164

原文地址:https://www.cnblogs.com/kdy11/p/10943547.html

时间: 2024-10-12 12:14:08

Spark 以及 spark streaming 核心原理及实践的相关文章

黑客大追踪:网络取证核心原理与实践

第一部分 基础篇 第1章 实用调查策略1.1 真实的案例1.1.1 医院里被盗的笔记本电脑1.1.2 发现公司的网络被用于传播盗版1.1.3 被黑的政府服务器 1.2 足迹1.3 电子证据的概念1.3.1 实物证据1.3.2 最佳证据1.3.3 直接证据1.3.4 情况证据1.3.5 传闻证据1.3.6 经营记录1.3.7 电子证据1.3.8 基于网络的电子证据1.4 关于网络证据相关的挑战 14 1.5 网络取证调查方法(OSCAR) 15 1.5.1 获取信息 15 1.5.2 制订方案 1

【原创 Hadoop&Spark 动手实践 11】Spark Streaming 应用与动手实践

[原创 Hadoop&Spark 动手实践 11]Spark Streaming 应用与动手实践 目标: 1. 掌握Spark Streaming的基本原理 2. 完成Spark Streaming最简单的演练和动手实验 3. 完成一个完整的Spark Streaming的实际案例(用户手机信息实时分析系统)

《Spark大数据分析:核心概念、技术及实践》大数据技术一览

本节书摘来自华章出版社<Spark大数据分析:核心概念.技术及实践>一书中的第1章,第1节,作者穆罕默德·古勒(Mohammed Guller)更多章节内容可以访问云栖社区"华章计算机"公众号查看. 大数据技术一览 我们正处在大数据时代.数据不仅是任何组织的命脉,而且在指数级增长.今天所产生的数据比过去几年所产生的数据大好几个数量级.挑战在于如何从数据中获取商业价值.这就是大数据相关技术想要解决的问题.因此,大数据已成为过去几年最热门的技术趋势之一.一些非常活跃的开源项目都

Spark 学习: spark 原理简述与 shuffle 过程介绍

Spark学习: 简述总结 Spark 是使用 scala 实现的基于内存计算的大数据开源集群计算环境.提供了 java,scala, python,R 等语言的调用接口. Spark学习 简述总结 引言 1 Hadoop 和 Spark 的关系 Spark 系统架构 1 spark 运行原理 RDD 初识 shuffle 和 stage 性能优化 1 缓存机制和 cache 的意义 2 shuffle 的优化 3 资源参数调优 4 小结 本地搭建 Spark 开发环境 1 Spark-Scal

Spark(一): 基本架构及原理

Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一,与Hadoop和Storm等其他大数据和MapReduce技术相比,Spark有如下优势: Spark提供了一个全面.统一的框架用于管理各种有着不同性质(文本数据.图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求 官方资料介绍Spark可以将Hadoop集群中的应用在内存中的运行速度提升100倍

Spark SQL下的Parquet使用最佳实践和代码实战

一:Spark SQL下的Parquet使用最佳实践 1,过去整个业界对大数据的分析的技术栈的Pipeline一般分为一下两种方式: A)Data Source -> HDFS -> MR/Hive/Spark(相当于ETL) -> HDFS Parquet -> SparkSQL/impala -> Result Service(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用): B)Data Source -> Real time update

大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池

第0章 预备知识0.1 Scala0.1.1 Scala 操作符0.1.2 拉链操作0.2 Spark Core0.2.1 Spark RDD 持久化0.2.2 Spark 共享变量0.3 Spark SQL0.3.1 RDD.DataFrame 与 DataSet0.3.2 DataSet 与 RDD 互操作0.3.3 RDD.DataFrame 与 DataSet 之间的转换0.3.4 用户自定义聚合函数(UDAF)0.3.5 开窗函数0.4 Spark Streaming0.4.1 Dst

Real Time Credit Card Fraud Detection with Apache Spark and Event Streaming

https://mapr.com/blog/real-time-credit-card-fraud-detection-apache-spark-and-event-streaming/ Editor's Note: Have questions about the topics discussed in this post? Search for answers and post questions in the Converge Community. In this post we are

【转】科普Spark,Spark是什么,如何使用Spark

本博文是转自如下链接,为了方便自己查阅学习和他人交流.感谢原博主的提供! http://www.aboutyun.com/thread-6849-1-1.html http://www.aboutyun.com/thread-6850-1-1.html 科普Spark,Spark核心是什么,如何使用Spark(1) 阅读本文章可以带着下面问题: 1.Spark基于什么算法的分布式计算(很简单) 2.Spark与MapReduce不同在什么地方 3.Spark为什么比Hadoop灵活 4.Spar