spark学习总结

Spark总结

Spark Engine

RDD

弹性分布式数据集

partitons组成的,partition一定是一个具体的概念,就是一段连续的数据在某个物理节点

1,由一组partitions组成

2,应用在RDD上面的算子,会被应用到每一个partitions上面去

3,每一个RDD需要有依赖

4,如果RDD是k,v键值对的,就可以有一些重新partition的功能,比如说有些算子,groupByKey,reduceByKey,countByKey

5,有些RDD有最佳计算位置,比如HadoopRDD,反例比如是本地集合演化过来的RDD,那没有最佳计算位置(数据本地性)

算子操作

Transformations

map、mapPartition、flatMap、reduceByKey、groupByKey、filter、sortByKey、mapValues、sample ...

本质就是生成新的RDD,new MapPartitionsRDD()

Actions

collect慎用、reduce、count、take、foreach、foreachPartition...

本质会提交一个JOB去集群里面去运算,sc.runJob()

RDD容错

1,血统,Lineage,重算!! 重算会找依赖的RDD,如果一直都没有过持久化,重新从数据源来读取数据

2,cache() persist() 默认的持久化策略 MEMORY_ONLY,存不下就不存了,下次重新算 _2 _SER

着重要区别开的就是MEMORY_AND_DISK,这个东西是存不下就存在本地磁盘

OFF_HEAP,默认会去找Tachyon

3,checkpoint

做checkpoint需要首先在sc.setCheckpointDir("hdfs://") 存在分布式文件系统里面!

Spark Cluster --> Worker Nodes --> Executors --> Threads

Yarn Cluster --> Node Managers --> Containers --> Threads

ApplicationMaster是这个Driver驱动程序和ResourceManager沟通的中间人或者桥梁

Application(Driver DAGScheduler TaskScheduler) --> Jobs(Action操作) --> Stages(宽窄依赖/Shuffle) -->

Tasks(Pipeline/看一个Stage里面最后一个finalRDD上面有几个Partitions,其实就有Tasks被划分出来)

DAGScheduler会划分JOB到Tasks,DAGScheduler会计算每一个Task的最佳计算位置,它是倒着往前来推的,也就是推到

pipeline一条线最前面一个RDD,如果这条线上没有做过持久化,最前面一个RDD如果譬如是HadoopRDD,

那么最近位置就是由Block所在的位置决定,如果做过持久化,那么最近计算位置,就是做persist的位置!

最后如果没有持久化,如果没有Block位置,那么就没有最佳位置,那么Task就会扔到资源列表里面的一个空闲的Executor里面,

数据就是走网络传输!

TaskScheduler在初始化的时候会申请到一堆Executors/Containers,TaskScheduler就接收DAGScheduler发送过来的TaskSet(对应一个Stage)

说白了,就是发送Stage的顺序是DAGScheduler来决定的,TaskScheduler会把TaskSet里面的task抽出来一个个的发送到从节点里面去执行,

真正到从节点里面,才会开始读取数据!!!!

Tasks在从节点运行完了之后会把Results返回给Driver,所以这个地方也就是Driver在哪里,去哪里看结果

Standalone --deploy client cluster的区别

Yarn --master yarn-client yarn-cluster的区别

Spark Core

new SparkContext(conf)

算子操作

TopN

GroupTopN (Collection.sort(list) 插入排序)

二次排序 (构建自定义的key)

PageRank (需要注意的是,迭代次数多了,DAG很复杂,可以对每次迭代的RDD进行checkpoint)

SparkPi

Spark SQL

new SQLContext/HiveContext()

DataFrame 里面除了有数据还有schema

RDD 转成 DataFrame

1,反射 JavaBean

2,动态的方式, 需要去构建StructFiled StructType

几种数据源

JSON

MySQL spark-default.conf

Hive (注意的就是运行代码的时候,如果是yarn-cluster模式,需要--jars 3个jar包)

开窗函数里面的row_number()打行号

可以通过 row_number() OVER (PARTITION BY ... ORDER BY ... DESC) rank where rank <=3

来做到分组去Top3

自定义UDF 和 UDAF

udf定义 sqlconext.udf.register("", => )

UDF是多个元素进来,多个元素出去

UDAF是多个元素进来,一个元素出去

Spark Streaming

val ssc = new StreamingContext(conf)

ssc.start()

ssc.awaitTermination()

本质是微批处理,其实就是每隔一个间隔切割一个RDD,然后一个RDD提交一个JOB,本质还是用咱们的Spark Engine来处理

读取端口数据,socketTextStream(),用到Reciver的机制,会占用额外的线程

读取HDFS数据,textFileStream(),没有用Reciver,本质就是每隔一个间隔去读取Block数据

读取Kafka数据,

1,基于Receiver,KafkaUtils.createStream()

2,Direct方式,没有Receiver,KafkaUtils.createDirectStream()

Direct方式好处

1,one to one mapping,说白了就是 kafka里面partitons有几个,这边到Spark里面的RDD就对应几个

2,高效,这样的话就不许WAL,预习日志,就不需要额外的Disk IO

3,Exactly-once,计算一次尽计算一次,不多不少

SparkStreaming里面和SparkCore里面相比,比较有特点的三个transform算子操作

1,updateStateByKey()

注意的是,需要ssc.checkpoint("hdfs://"),会不断的往之前的状态上更新

2,transform()

特点是交给你一个个的RDD,咱们可以直接用Spark Core里面的所有算子操作!最后给它返回一个个RDD再往下游传递就可以

3,基于window的操作

注意要设置三个时间,第一个还是每隔多少时间切割一个RDD,第二个是每隔多少时间计算一次(slideDuration),第三个是每次计算多少的数据量(windowDuration)

我们举了一个例子就是reduceByKeyAndWindow()

这个地方有两个api,

reduceByKeyAndWindow(_+_, windowDuration,slideDuration)

有一个是优化的,这个需要首先设置checkpoint

reduceByKeyAndWindow(_+_,_-_, windowDuration,slideDuration)

并行度的计算

一开始读数据

sc.parallize(指定参数--多少partition)

sc.textFile(指定参数--至少多少partiton)

计算过程中,我们可以使用repartition或者colease算子来改变数据

计算过程中,我们可以通过譬如groupByKey([numTasks])、reduceByKey([numTasks])指定这次shuffle的reduce端reduce tasks的数据

还有在conf.set("spark.default.parallism", 100)

优先级

groupByKey([numTasks]) > conf.set("spark.default.parallism", 100)

如果么有设置conf.set("spark.default.parallism", 100),就根据上一次RDD里面的并行度来

*spark memory的使用

时间: 2024-10-14 18:29:14

spark学习总结的相关文章

Spark 学习: spark 原理简述与 shuffle 过程介绍

Spark学习: 简述总结 Spark 是使用 scala 实现的基于内存计算的大数据开源集群计算环境.提供了 java,scala, python,R 等语言的调用接口. Spark学习 简述总结 引言 1 Hadoop 和 Spark 的关系 Spark 系统架构 1 spark 运行原理 RDD 初识 shuffle 和 stage 性能优化 1 缓存机制和 cache 的意义 2 shuffle 的优化 3 资源参数调优 4 小结 本地搭建 Spark 开发环境 1 Spark-Scal

Spark学习四:网站日志分析案例

Spark学习四:网站日志分析案例 标签(空格分隔): Spark Spark学习四网站日志分析案例 一创建maven工程 二创建模板 三日志分析案例 一,创建maven工程 1,执行maven命令创建工程 mvn archetype:generate -DarchetypeGroupId=org.scala-tools.archetypes -DarchetypeArtifactId=scala-archetype-simple -DremoteRepositories=http://scal

Spark学习三:Spark Schedule以及idea的安装和导入源码

Spark学习三:Spark Schedule以及idea的安装和导入源码 标签(空格分隔): Spark Spark学习三Spark Schedule以及idea的安装和导入源码 一RDD操作过程中的数据位置 二Spark Schedule 三Idea导入spark源码 一,RDD操作过程中的数据位置 [hadoop001@xingyunfei001 spark-1.3.0-bin-2.5.0]$ bin/spark-shell --master local[2] val rdd = sc.t

Spark学习七:spark streaming与flume集成

Spark学习七:spark streaming与flume集成 标签(空格分隔): Spark 一,启动flume flume-conf.properties文件 agent002.sources = sources002 agent002.channels = channels002 agent002.sinks = sinks002 ## define sources agent002.sources.sources002.type = exec agent002.sources.sour

Spark学习六:spark streaming

Spark学习六:spark streaming 标签(空格分隔): Spark Spark学习六spark streaming 一概述 二企业案例分析 三Spark streaming的工作原理 四textFileStreaming的应用 四企业中的开发方式 五总结 一,概述 一个简单的实例 1,安装nc nc -lk 9999 2,启动应用 ./bin/run-example streaming.NeworkWordCount localhost 9999 二,企业案例分析 需求: 实时统计

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre

Spark学习笔记之SparkRDD

Spark学习笔记之SparkRDD 一.   基本概念 RDD(resilient distributed datasets)弹性分布式数据集. 来自于两方面 ①   内存集合和外部存储系统 ②   通过转换来自于其他RDD,如map,filter等 2.创建操作(creation operation):RDD的创建由SparkContext来负责. 3.转换操作(transformation operation):将一个RDD通过一定操作转换为另一个RDD. 4.控制操作(control o

用Spark学习矩阵分解推荐算法

在矩阵分解在协同过滤推荐算法中的应用中,我们对矩阵分解在推荐算法中的应用原理做了总结,这里我们就从实践的角度来用Spark学习矩阵分解推荐算法. 1. Spark推荐算法概述 在Spark MLlib中,推荐算法这块只实现了基于矩阵分解的协同过滤推荐算法.而基于的算法是FunkSVD算法,即将m个用户和n个物品对应的评分矩阵M分解为两个低维的矩阵:$$M_{m \times n}=P_{m \times k}^TQ_{k \times n}$$ 其中k为分解成低维的维数,一般远比m和n小.如果大

用Spark学习FP Tree算法和PrefixSpan算法

在FP Tree算法原理总结和PrefixSpan算法原理总结中,我们对FP Tree和PrefixSpan这两种关联算法的原理做了总结,这里就从实践的角度介绍如何使用这两个算法.由于scikit-learn中没有关联算法的类库,而Spark MLlib有,本文的使用以Spark MLlib作为使用环境. 1. Spark MLlib关联算法概述 在Spark MLlib中,也只实现了两种关联算法,即我们的FP Tree和PrefixSpan,而像Apriori,GSP之类的关联算法是没有的.而

【转载】Spark学习 &amp; 机器学习

继续Spark学习,开始的文章:http://www.cnblogs.com/charlesblc/p/6106603.html 参考了这个系列的文章: http://www.cnblogs.com/shishanyuan/p/4699644.html <倾情大奉送--Spark入门实战系列>实验数据下载在上面那篇开始的文章有说明. 先看了上手实验的一部分,因为之前Spark已经安装好了,见 http://www.cnblogs.com/charlesblc/p/6014158.html 上手