Spark小象学院笔记

---小象学院陈超视频教程笔记------陈超讲

第一节

Scala基础与实践

基于JVM的FP+OO

静态类型

和Java互操作

函数式编程和面向对象的结合,纯静态的语言。

解释器(interpreter)

值与变量(val & var)

函数(Function)

1、常量 val

2、变量 var

3、main函数要定义在object里面

实例:

object Basic{

def hello(name : String): String ={

"Hello :" + name //block的最后一行就是输出值

}

def helloScala(){

println("hello Scala!!")

}

val add = (x: Int,y :Int) => x + y

def add2(x:Int)(y:Int) = x + y

def printEveryChar(c : String*)={

c.foreach(x => println(x))

}

def main(args : Array[String]){

println("hello Scala")

//println(hello("Scala"))

helloScala()

add(1,2)

println(add2(4)(6))

printEveryChar("a","b","c","d")

//if表达式

val x=1

val a=if(x>0) 1 else 0

println(a)

//循环

var(n,r)=(10,0)

while(n>0){

r = r + n

n = n - 1

}

println(r)

for(i<- 1 to(10)){

println(i)

}

}

}

------------------------------------------------------------

条件表达式(if)

循环表达式(no continue,no break)

语句终止(;no need)

实例2

class Basic2{

}

class Person{

var name : String = _ //会生成getter和setter方法

val age = 10 //只会生成getter方法

private[this] val  gender = "male"

}

object Basic2 {

def main(args: Array[String]): Unit ={

val p = new Person //括号可省略

p.name = "Jack"

println(p.name + ":" + p.age)

}

}

----------------------------------------------------------

类(class)

声明类(一个源文件中可以包含很多类,并且都是public级别)

getter和setter

构造函数(primary constructor & auxiliary constructor)

继承(extends)

重写父类方法(override def)

重写字段(override val,override var)

实例3

class Basic2{

}

class Person{

var name : String = _ //会生成getter和setter方法

val age = 10 //只会生成getter方法

private[this] val  gender = "male"

}

object Basic2 {

def main(args: Array[String]): Unit ={

val p = new Person //括号可省略

p.name = "Jack"

println(p.name + ":" + p.age)

}

}

实例4

class Basic2{

}

//1、主构造器直接跟在类名后面,主构造器中的参数,最后会被编译成字段

//2、主构造器执行的时候,会执行类中的所有语句

//3、假设参数声明时不带val和var,那么相当于private(this)!!!

class Person(var name:String,val age:Int){

/* var name : String = _ //会生成getter和setter方法

val age = 10 //只会生成getter方法

private[this] val  gender = "male"*/

println("this is the primary constructor!")

}

object Basic2 {

def main(args: Array[String]): Unit ={

/*    val p = new Person

p.name = "Jack"

println(p.name + ":" + p.age)*/

val p = new Person("Jack",20)

println(p.name + ":" + p.age)

}

}

--------------------------------------------------------------------

抽象类(abstract class)

类的一个或者多个方法没有没完整的定义

声明抽象方法不需要加abstract关键字,只需要不写方法体

子类重写父类的抽象方法时不需要加override

父类可以声明抽象字段(没有初始值的字段)

子类重写父类的抽象字段时不需要加override

class Basic3{

}

abstract class Person1{

def speak

val name : String

var age : Int

}

abstract Student1 extends Persion1{

def speak{

print("speak!!!")

}

val name = "AAA"

var age = 100

}

trait Logger{

def log(msg : String){

println("log" + msg)

}

}

class Test extends Logger{

def test{

log("xxx")

}

}

trait logger{

def log(msg:String)

}

trait ConsoleLogger extends Logger{

def log(msg:String){

println(msg)

}

}

class Test extends ConsoleLogger{

def test{

log("PPP")

}

}

trait ConsoleLogger{

def log(ms : String){

println("save money:" + msg)

}

}

trait MessageLogger extends ConsoleLogger{

def log(msg : String){

println("save money to bank :" + msg)

}

}

abstract class Account{

def save

}

class MyAccount extends Account with ConsoleLogger{

def save{

log(100)

}

}

object Basic3 extends App{

val t = new Test

t.test

val acc = new MyAccount with MessageLogger

acc.save

var s = new Student1

s.speak

println(s.name + ":" + s.age)

}

特质(trait)-对比下JAVA8的接口

字段和行为的集合

混入类中

通过with关键字,一个类可以扩展多个特质

trait续

当做接口

带有具体实现的接口

带有特质的对象

特质从左到右被构造

apply方法

单例对象

class ApplyTest{

def test{

println("test")

}

}

class Basic4{

}

object Basic4 extends App{

val a = ApplyTest

}

--------包(package com.xx.data)

支持嵌套,下层可以访问上层作用域中的名称

可串联

顶部标记

包对象

包可见性

包在任何地方都可以引入,作用域至该语句所在块的末尾

重命名引入成员(xx => yy)

隐藏方法(xx => _)

自动引入(java.lang._ scala._ Predef._)

--------模式匹配-----------------

标准用法(match)

使用守卫

匹配类型

class Basic5{

}

case class Book(name : String,author : String)

object Basic5 extends App{

val value = 1

value match{

case 1 => "one"

case 2 => "two"

case _ => "some other number"

}

val result = value match{

case i if i == 1 => "one"

case i if i == 2 => "two"

case _ => "some other number"

}

println("result of match is :" + result)

println("result2 of match is :" + result2)

def t(obj:Any) = obj match{

case x : Int => println("Int")

case s : String => println("String")

case _ => println("unknow type")

}

t("1")

t(1)

t(1L)

val macTalk = Book("MacTalk","CJQ")

macTalk match{

case Book(name,author) => println("this is book")

case _ =>println("unknown")

}

}

---case class(多用在模式匹配中)

构造器中的每一个类型都为val,不建议用var

不用new就可以直接生产对象(为什么?apply方法)

-----------------------------

高阶函数

匿名函数 val double = (x: Int)=> 2 * x

函数作为参数

参数类型推断

常用高阶函数

map、filter、reduce等等

----------------------------

集合

List

Set

Tuple

Map

----------------

集合操作

foreach(类似于map,但是没有返回值)

map(迭代)

filter(过滤)

zip(聚合)

partition(列表分割)

flatten(扁平化)

flatMap(map + flatten)

------------------------

泛型

泛型类

class Pair[T,S](val first:T,val second:S)

泛型方法

def computer[T](list : List[T])= ...

------------------------

class Basic6{

}

class A{

}

class Rich(a : A){

def rich{

println("rich...")

}

}

object Basic6 extends App{

implicit def a2Rich(a : A) = new Rich(a)

val a = new A

a.rich

def testParam(implicit name : String){

println(name)

}

implicit val name = "implicit!!!"

testParam

implicit class Calculator(x : Int){

def add(a : Int) : Int = a + 1

}

println(1.add(1))

}

---------------Spark概述与编程模型---------------------

Spark的快只是因为内存?

内存计算

DAG

支持三种语言的API:Scala、Python、Java

有四种运行模式:local()

--------------集群配置-----------------------------------------

spark-env.sh

export JAVA_HOME=

export SPARK_MASTER_IP=

export SPARK_WORKER_CORES=

export SPARK_WORKER_INSTANCES=

export SPARK_WORKER_MEMORY=

export SPARK_MASTER_PORT=

export SPAEK_JAVA_OPTS="-verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps"

slaves

xxx.xxx.xxx.2

xxx.xxx.xxx.3

xxx.xxx.xxx.4

xxx.xxx.xxx.5

----------------------------------------------------------------------------------------

shell 运行

MASTER=local[4] ADD_JARS=code.jar ./spark-shell

MASTER=spark://host:port

指定executor内存:export SPARK_MEM=25g

-------------------------------------------------------------------

class Analysis{

}

object Analysis{

def main(arg : Array[String]){

if(args.length !=3){

println("Usage : java -jar code.jar dependency_jars file_location save_location")

System.exit(0)

}

val jars = ListBuffer[String]()

args(0).split(‘.‘).map(jar += _)

val conf = new SparkConf()

conf.setMaster("spark://master:8080")

.setSparkHome("/usr/hadoop/spark-1.6.0-cdh5.7.1")

.setAppName("analysis")

.setJar(jars)

.set("spark.executor.memory","2g")

val sc = new SparkContext(conf)

val data = sc.textFile(args(1))

data.cache

println(data.count)

data.file(_.split(‘ ‘).length == 3).map(_.split(‘ ‘)(1)).map((_,1)).reduceByKey(_+_)

.map(x=>(x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveAsTextFile(args(2))

}

}

-----------------------------------------------------------------------------

java -jar /home/cloudera/IdeaProjects/scala/out/artifacts/scala_jar/scala.jar /home/cloudera/IdeaProjects/scala/out/artifacts/scala_jar/scala.jar hdfs://quickstart.cloudera:8020/spark/test.txt hdfs://quickstart.cloudera:8020/spark/

val rdd1 = sc.textFile("hdfs://192.168.1.198:8020/spark/SogouQ1.txt")

rdd1.toDebugString

val words=rdd1.flatMap(_.split(" "))

val wordscount=words.map(x=>(x,1)).reduceByKey(_+_)

wordscount.collect

wordscount.toDebugString

val rdd1 = sc.textFile("hdfs://192.168.1.198:8020/spark/test.txt",9)

val rdd2=rdd1.map(_.split("\t")).filter(_.length==6)

val rdd3=rdd2.filter(_(3).toInt==1).filter(_(4).toInt==2)

rdd3.count()

spark-shell --executor-memory 2g --driver-memory 1g --master spark://192.168.1.198:7077

val rdd = sc.textFile("hdfs://192.168.1.198:8020/spark/test1.txt")

rdd.cache()

val wordcount = rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)

wordcount.take(10)

val wordsort = wordcount.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))

wordsort.take(10)

val data=sc.textFile("hdfs://192.168.1.198:8020/spark/SogouQ1.txt")

data.count

data.map(_.split(‘\t‘)(0)).filter(_ < "20111230010101").count

data.map(_.split(‘\t‘)(3)).filter(_.toInt == 1).count

data.map(_.split(‘\t‘)).filter(_(3).toInt == 1).filter(_(4).toInt == 1).count

data.map(_.split(‘\t‘)).filter(_(2).contains("baidu")).count

--------maven--依赖-----包----------

<project>

<dependencies>

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-core_2.10</artifactId>

<version>1.5.0-incubating</version>

</dependency>

<dependency>

<groupId>org.apache.hadoop</groupId>

<artifactId>hadoop-client</artifactId>

<version>2.6.0</version>

</dependency>

<dependencies>

</project>

-----------------------------------------------------

??学习下maven

------深入Spark内核-----------

术语解释

Application 基于Spark的用户,包含了driver程序

Driver Program 运行main函数并且新建SparkContext的程序

Cluster Manager 在集群上获取资源的外部服务(例如:standalone.Mesos,Yarn)

Worker Node 集群中任何可以运行应用代码的节点

Executor 是在一个worker node上为某应用启动的一个进展,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用都有各自独立的executors

Task 被送到某个executor上的工作单元

Job 包含很多任务的并行计算,可以看做和Spark的action对应

Stage 一个Job会被拆分很多组任务,每组任务被称为Stage(就像Marpreduce分map任务和reduce任务一样)

|--------------------------------------------------------------------------|

|Cluster Overview                                                          |

|--------------------------------------------------------------------------|

|Driver Program                                                            |

|SparkContext <---------->   Cluster Manager  <-----------> Worker Node    |

|                                                           Executor Cache |

|                    Task Task      |

|                                                                          |

|                    Worker Node    |

|                    Executor Cache |

|                    Task Task      |

|--------------------------------------------------------------------------|

---------数据本地性---------------

第一次运行时数据不在内存中,所以从HDFS上取,任务最好运行在数据所在的节点上         文件系统本地性

第二次运行,数据已经在内存中,所以任务最好运行在该数据所在内存的节点上             内存本地性

万一有数据被置换出内存,则任然从HDFS上取                                           LRU置换

-------------------------------------------------------------------------------------------

再看RDD

分区 protected def getPartitions:Array[Partition]

依赖 protected def getDependencies:Seq[Dependency[]] = deps

函数 def computes(split:Partition,context:TaskContext):Iterator[T]

最佳位置(可选) protected def getPreferredLocations(split:Partition):Seq[String]=Nil

分区策略(可选)@transient val partitioner: Option[Partitioner] = None

------------------------------------------------------------------------------------------

最常见的HadoopRDD

分区:每个HDFS block

依赖:无

函数:读取每一个block

最佳位置:HDFS block所在位置

分区策略:无

----------------------------------------------------------------------

FilteredRDD

分区:与父RDD一致

依赖:与父RDD一对一

函数:计算父RDD的每个分区并过滤

最佳位置:无(与父RDD一致)

分区策略:无

------------------------------------------------------

JoinedRdd

分区:每个reduce任务一个分区

依赖:依赖所有父RDD

函数:读取suffle数据并计算

最佳位置:无

分区策略:HashPartitioner(partitions:Int)

---------------------------------------------

细看DAG Scheduler

目标RDD    计算每个分区的函数     结果监听器

DAG Scheduler

基于Stage构建DAG,决定每个任务的最佳位置

记录哪个RDD或者Stage输出被物化

将taskset传给底层调度器TaskScheduler

重新提交shuffle输出丢失的stage

--------------------------------------------

调度器优化

一个Stage内的窄依赖进行pipeline操作

1+1+1+1=4  1+1=2 2+1=3 3+1=4

基于partition选择最优的join算法使shuffle的数据最小化

重用已经缓存过的数据

--------------------------------------------

Task细节

外部存储           Task

shuffle数据      f1->f2->f3  ---------> 输出文件

Stage边界只出现在外部输入及取shuffle数据的时候

为了容错,会把suffle输出写在磁盘或者内存

任何一个任务可以运行在任何一个节点

允许任务使用那些被缓存但是已经被置换出去的数据

--------------------------------------------

TaskScheduler

提交taskset(一组task)到集群运行并汇报结果

出现shuffle输出lost要报告fetch failed错误

碰到straggle任务需要放到别的节点上重试

为每一个TaskSet维护一个TaskSetManager(追踪本地性及错误信息)

------------------------------------------

广播变量 Broadcast variables

BT形式的广播变量

使用场景:lookup表,mapside join

注意点:只读、存于每台worker的cache,不随task发送

使用方式:val broadcastVar = sc.broadcast(Array(1,2,3))

broadcastVar.value

--------------------------------------------------

累加器Accumulators

只增

类似与MapReduce中的counter

用法:val accum = sc.accumulator(0)

sc.parallelize(Array(1,2,3,4)).foreach(x=>accum +=x)

accum.value

----------------------------------------------

性能调优

优化点1

问题:Task序列化后太大

解决:使用广播变量

优化点2

问题:val rdd=data.filter(f1).filter(f2).reduceBy...

经过以上语句会有很多空任务或者小任务

解决:使用coalesce或者repartition去减少RDD中partition数量

--------------------------------------------------------

优化点3

问题:每个记录的开销太大

rdd.map{x=conn=getDBConn;conn.write(x.toString);conn.close}

解决:rdd.mapPartitions(records=>conn.getDBConn;for(item <- records))

write(item:toString);

conn.close)

-------------------------------------------------------

优化点4

问题:任务执行速度倾斜

解决:1、数据倾斜(一般是partition key取的不好) 考虑其他的并行处理方式 中间可以加入一步aggregation

2、Worker倾斜(在某些worker上的executor不给力)

设置spark.speculation=true把哪些持续不给力的node去掉

------------------------------------------------------------

优化点5

问题:不设置spark.local.dir 这是spark写shuffle输出的地方

解决:设置一组磁盘

spark.local.dir=/mn1/spark,/mnt2/spark,/mnt3/spark

--------------------------------------------------------

优化点6

问题:reducer数量不合适

解决:需要按照实际情况调整

太多的reducer,造成很多的小任务,以此产生很多启动任务的开销

太少的reducer,任务执行慢

--------------------------------------------------------

优化7

问题:collect输出大量结果慢

解决:直接输出到分布式文件系统

-------------------------------------------

优化8

问题:序列化 Spark默认使用JDK自带的ObjectOutputStream

解决:使用Kryo serialization

---开源框架---------------------------------------

-----DB---

Cassandra

HBase

MongoDB

Terrastore

Redis

SSDB

MySQL

-----SQL on Hadoop(Spark)----------------------

Hive

Shark(Catalyst)

Impala

------------日志(数据)收集------------------

Sqoop

Flume

Chukwa

Kafka

DataX

Dbsync

TimeTunnel

------------ML---------------------

Mahout

MLib

--------other---------------

Zookeeper

Ooize

Hue

-----------------------

时间: 2024-10-17 13:33:09

Spark小象学院笔记的相关文章

小象学院Python机器学习和算法高级版视频教程

下载地址:百度网盘下载 ├─00.课程介绍│      <机器学习·升级版II>常见问题FAQ - 小象问答-hadoop,spark,storm,R,hi.jpg│      <机器学习>升级版II,11月4日开课 - 小象学院 - 中国最专业的Hadoop,Spark大数据.jpg│      ├─01.机器学习的数学基础1 - 数学分析│  │  01 数学分析与概率论.mp4│  │  1.数学分析与概率论.pdf│  │  笔记.jpg│  │  │  └─参考文献资料│

百度前端技术学院—-小薇学院(HTML+CSS课程任务)

任务一:零基础HTML编码 课程概述 作业提交截止时间:04-24 重要说明 百度前端技术学院的课程任务是由百度前端工程师专为对前端不同掌握程度的同学设计.我们尽力保证课程内容的质量以及学习难度的合理性,但即使如此,真正决定课程效果的,还是你的每一次思考和实践. 课程多数题目的解决方案都不是唯一的,这和我们在实际工作中的情况也是一致的.因此,我们的要求不仅仅是实现设计稿的效果,更是要多去思考不同的解决方案,评估不同方案的优劣,然后使用在该场景下最优雅的方式去实现.那些最终没有被我们采纳的方案,同

(赵小明RHCE笔记)linux基础之一

默认情况下,linux有一个图形界面,五个文本虚拟终端[email protected]# tty  用来查看当前是在哪个tty下ctrl+alt+F1-F7用来切换不同终端startx命令用来启动图形化界面一.图形界面从图形界面切换到文字界面需要ctrl+alt+F1-F7从文字界面切换到其他文字界面仅需alt+F1-F7ctrl+shift+t创建多个命令终端shift+Pgup/Pgdn在有很多输出的时候翻页查看ctrl+shift+c复制选中文字ctrl+shift+v粘贴ctrl+Pg

(赵小明RHCE笔记)linux基础之二 vim的使用

一.introducing vim 1.vim是vi的新版本,是unix的标准文本编辑器  默认情况下执行vi运行的是vim 2.优点  速度:  简便化:  高可用性3.缺点  比一般的编辑器稍难二.VIM的使用1.VIM有多种模式2.三种主要模式  a.命令模式(默认):移动光标.剪切/粘帖文本,更改模式  b.插入编辑模式:修改文本内容  c.扩展模式:保存.退出等等3.按多次Esc会返回至命令模式4.进入文件 vim /tmp/passwd  q!:不保存退出  e!:重新读取该文件,之

(赵小明RHCE笔记)linux基础之三 用户、组及权限

一.user1.每个用户将指派唯一用户ID(UID)  root的ID为0  普通用户ID从500开始(0-500系统使用)2.用户名和用户ID存在 /etc/passwd中3.当用户登陆时系统自动为其分配一个用户家目录4.用户无法读.写.执行其他用的文件二.changing file ownership1.only root can change a file's owner2.only root or the owner can change a file's group3.ownershi

(赵小明RHCE笔记)linux基础之四 权限详解

一.special permissions for executables1.special permissions for executables:  -suid:command run with permissions of the owner of the command,not executor of   the command  -sgid:command runs with group affiliation of the group of the commandeg:file:us

小猪猪C++笔记基础篇(五)表达式、语句

小猪猪C++笔记基础篇(五) 关键词:表达式.语句 本章的内容比较简单,基本上没有什么理解上的困难,都是知识上的问题.先开始想要不要写呢,本来是不准备写的,但是既然读了书就要做笔记,还是写一写,毕竟还是有点点收获的东西.那么,我只就一些容易弄糊涂和忽略的地方提出来,为日后变成作参考. 一.表达式 (一)概念 表达式时由一个或者多个运算对象组成的,对一个表达式求值将得到一个结果.把一个运算发和一个或者多个运算对象组合起来可以生成较为复杂的表达式. 作用于一个对象的运算符是一元运算符例如:“&”,“

小猪猪C++笔记基础篇(六)参数传递、函数重载、函数指针、调试帮助

小猪猪C++笔记基础篇(六) ————参数传递.函数重载.函数指针.调试帮助 关键词:参数传递.函数重载.函数指针.调试帮助 因为一些事情以及自己的懒惰,大概有一个星期没有继续读书了,已经不行了,赶紧写一篇压压惊.把我文章抱走的同学留个言嘛. 函数在变成里面是一个非常重要的组成部分,那么这一部分我们先简单的介绍一下参数是如何传递进入函数,函数如何返回结果的.然后我们再来看看函数重载是个什么样的机制,最后在介绍一下所谓的函数指针到底是个什么东西.那么直接开始正题吧: 一.函数的参数传递 我们知道函

七牛云存储C#例用小例子 C#笔记

最近有需求要把网站的图片传到云存储上去.于是就找到了七牛.看上面的SDK...看了才发现里面注释很少.当时看的头大.在网上搜一下,基本上没有好的例子.给初学者带来很大的不便!不过最后还是做出来了.在这里把一些基本的例子放出来.希望给初学者一些帮助. 第一步.首选你得赋值,代码如下(应该都能看懂) Config.ACCESS_KEY = ""; //AK Config.SECRET_KEY = "";//SK Bucket = "";//空间名 D