Spark学习小记1

Spark is what:

Spache Spark is an open source clustercomputing system that aims to make dataanalytics fast — both fast to run and fast towrite

BDAS:

mesos:类似于yarn

hdfs:分布式文件系统

tochyon:同时也支持mapreduce,在hadoop2.3.0中,datanode也开始支持cache,这是一个非常重要的改进

shark:相当于Hive on spark

blinkdb:用于在海量数据上运行交互式查询的sql查询的大规模并行查询引擎,允许用户权衡数据精度来提升查询响应的时间

Spark的快是因为内存计算和DAG,很多优化措施其实是想通的,譬如说delay scheduling.

通过哪些模式运?行Spark呢?有4种模式可以运行

• local(多用于测试)
• Standalone
• Mesos
• YARN

其实Spark一切都以RDD(Resilient Distributed Dataset)为基础

特点:
1.A list of partitions(分片)

2.A function for computing each split(每个分片上都有函数去迭代)

3.A list of dependencies on other RDDs(RDD之间会有一系列依赖)

4.Optionally, a Partitioner for key-value RDDs (e.g. to say that theRDD is hash-partitioned)(可以指定partitioner)

5.Optionally, a list of preferred locations to compute each split on(e.g. block locations for an HDFS file)(可以指定优先节点)

Spark runtime

Spark流程示意图:RDD可以从集合直接转换?而来,也可以由从现存的任何Hadoop

InputFormat?而来,亦或者HBase等等。

first demo!

lines = sc.textFile(“hdfs://...”)
errors = lines.filter(_.startsWith(“ERROR”))    //transformation,并不立即执行
errors.persist()  //缓存rdd 不立即执行
Mysql_errors = errors.filter(_.contains(“MySQL”)).count  //action开始执行
http_errors = errors.filter(_.contains(“Http”)).count    //action开始执行

缓存策略

class StorageLevel private(
private var useDisk_ : Boolean,
private var useMemory_ : Boolean,
private var deserialized_ : Boolean,
private var replication_ : Int = 1)

val NONE = new StorageLevel(false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, true)//默认缓存策略

//默认是反序列化存在的,序列化存在的时候,由于需要序列化,CPU消耗会多一些,在内存比较紧张的时候,最好使用反序列化的形式,这样可以减少内存消耗,spark支持除jdk序列化方式以外的其他序列化
val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)

 transformation & action

map:对每个元素经过函数转换后的所有值得到一个新的分布式数据集

filter:经过函数计算后,返回值为true的那些值,...

flatMap:先压扁再map

sample:生成样本子集

首先调用sc的paralize方法将一个集合转换成rdd,并未执行;

其次对rdd中每个元素执行*2操作,并未执行;

最后collect action触发上述操作执行

实际上上述步骤可以一步执行:

sc.parallelize(List(1,2,3,4,5)).map(2*_).filter(_>5).collect

rdd.count

rdd.cache//rdd放在内存,对比前后两个count的时间会发现不同

rdd.count

-----------------------------

wordcount on spark(结果未排序):

val rdd=sc.textFile(hdfs://..)

val wordcount=rdd.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

wordcount.collect

wordcount.saveAsTextFile("路径")

-----------------------------------

val rdd1=sc.parallelize(List((‘a‘,1),(‘a‘,2)))

val rdd2=sc.parallelize(List((‘b‘,1),(‘b‘,2)))

val res=rdd1 union rdd2

res.collect

---------------------------------------

val rdd1=sc.parallelize(List((‘a‘,1),(‘a‘,2),(‘b‘,3),(‘b‘,4)))

val rdd2=sc.parallelize(List((‘a‘,5),(‘a‘,6),(‘b‘,7),(‘b‘,8)))

val res=rdd1 join rdd2//笛卡尔积

res.collect

---------------------------------------

val rdd=sc.parallelize(List(1,2,3,4,5))

rdd.reduce(_+_)//直接对rdd中元素进行制定操作,而且立即执行

--------------------------------------

val rdd1=sc.parallelize(List((‘a‘,1),(‘a‘,2),(‘b‘,3),(‘b‘,4)))

rdd1.lookup(‘a‘)//将Key为a的value放在数组中呈现出来,立即执行

--------------------------------------------------------

wordcount on spark(结果排序):

val wordcount=sc.textFile(hdfs://..).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).saveAsTextFile("路径")

---------------------------------------

LineAge容错

每一个都看做RDD,每个RDD都会记录自己依赖于哪个(哪些)RDD,万一某个RDD的某些partition挂了,可以通过其它RDD并行计算迅速恢复出来。但是假如每次都快到进化完的时候就挂了,那岂不是每次都要从头进化?何不在中间制作个拷贝呢?!

-----------------------------

依赖

上图中的空心举行表示的是rdd,实心小矩形指的是partition,若干个partition组成了rdd.

在窄依赖中,父rdd的每个partition至多被子rdd的一个partition使用;(map,filter,copartition导致窄依赖)

在宽依赖中,父rdd的每个partition可以被子rdd的多个partition使用;(join导致宽依赖)

---------------------------

集群配置:

spark-env.sh

export JAVA_HOME=
export SPARK_MASTER_IP=
export SPARK_WORKER_CORES=//几个cpu
export SPARK_WORKER_INSTANCES=1//几个实例,常用默认值1
export SPARK_WORKER_MEMORY=//分配内存
export SPARK_MASTER_PORT=8080//端口
export SPARK_JAVA_OPTS="-verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps”

slaves

xx.xx.xx.2
xx.xx.xx.3

--------------------------------------

分析搜狗实验室提供的用户访问日志

1.加载 val rdd=sc.

2.统计时间小于某个时间点的访问数:rdd.map(_.split(‘\t‘)(0)).filter(_<"20111010000000").count//map将函数处理后的信息返回,这里返回第一列

3.统计有多少搜索链接是排名第一位的

rdd.map(_split(‘\t‘)(3)).filter(_.toInt==1).count

4.统计有多少排名第一的搜索链接是第一次就被搜索到的

rdd.map(_.split(‘\t‘)).filter(_(0).toInt==1).filter(_(1).toInt==1).count

时间: 2024-10-05 12:08:43

Spark学习小记1的相关文章

git 学习小记之记住https方式推送密码

昨天刚刚学了点git基础操作,但是不幸的是[email protected]给出公告说尽量使用 https 进行操作.可是在用 https 进行 push 时,都需要输入帐号和密码. 各种百度谷歌之后在[email protected]官网找到了解决方法<https方式使用[email protected]设置密码的方式>文中给出了几个方法,并且都非常简单. 关于 cache 缓存方式,我不太喜欢,因为要设置时间,而且会过期.而 store 相应的非常方便,设置全局后,方便多个库使用.当然如果

Spark 学习: spark 原理简述与 shuffle 过程介绍

Spark学习: 简述总结 Spark 是使用 scala 实现的基于内存计算的大数据开源集群计算环境.提供了 java,scala, python,R 等语言的调用接口. Spark学习 简述总结 引言 1 Hadoop 和 Spark 的关系 Spark 系统架构 1 spark 运行原理 RDD 初识 shuffle 和 stage 性能优化 1 缓存机制和 cache 的意义 2 shuffle 的优化 3 资源参数调优 4 小结 本地搭建 Spark 开发环境 1 Spark-Scal

linux学习小记 (一 )

shell 学习小记: 注意:多看系统脚本  多模仿    su切换用户时需要输入目标用户密码,root(superuser)切换到任何用户都不需要输入密码,- 参数必须要是最后一个(su huhu -) sudo需要输入当前用户密码,拥有sudo特权的用户可以执行 "sudo su -"命令,使用自己的密码切换到root用户 , 所以应该在/etc/sudoers 文件中禁止 sudo 执行su命令 linux文件与颜色: /etc/DIR_COLORS   (命令dircolors

Spark学习四:网站日志分析案例

Spark学习四:网站日志分析案例 标签(空格分隔): Spark Spark学习四网站日志分析案例 一创建maven工程 二创建模板 三日志分析案例 一,创建maven工程 1,执行maven命令创建工程 mvn archetype:generate -DarchetypeGroupId=org.scala-tools.archetypes -DarchetypeArtifactId=scala-archetype-simple -DremoteRepositories=http://scal

Spark学习三:Spark Schedule以及idea的安装和导入源码

Spark学习三:Spark Schedule以及idea的安装和导入源码 标签(空格分隔): Spark Spark学习三Spark Schedule以及idea的安装和导入源码 一RDD操作过程中的数据位置 二Spark Schedule 三Idea导入spark源码 一,RDD操作过程中的数据位置 [hadoop001@xingyunfei001 spark-1.3.0-bin-2.5.0]$ bin/spark-shell --master local[2] val rdd = sc.t

Spark学习七:spark streaming与flume集成

Spark学习七:spark streaming与flume集成 标签(空格分隔): Spark 一,启动flume flume-conf.properties文件 agent002.sources = sources002 agent002.channels = channels002 agent002.sinks = sinks002 ## define sources agent002.sources.sources002.type = exec agent002.sources.sour

Spark学习六:spark streaming

Spark学习六:spark streaming 标签(空格分隔): Spark Spark学习六spark streaming 一概述 二企业案例分析 三Spark streaming的工作原理 四textFileStreaming的应用 四企业中的开发方式 五总结 一,概述 一个简单的实例 1,安装nc nc -lk 9999 2,启动应用 ./bin/run-example streaming.NeworkWordCount localhost 9999 二,企业案例分析 需求: 实时统计

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre

Spark学习笔记之SparkRDD

Spark学习笔记之SparkRDD 一.   基本概念 RDD(resilient distributed datasets)弹性分布式数据集. 来自于两方面 ①   内存集合和外部存储系统 ②   通过转换来自于其他RDD,如map,filter等 2.创建操作(creation operation):RDD的创建由SparkContext来负责. 3.转换操作(transformation operation):将一个RDD通过一定操作转换为另一个RDD. 4.控制操作(control o