---小象学院陈超视频教程笔记------陈超讲
第一节
Scala基础与实践
基于JVM的FP+OO
静态类型
和Java互操作
函数式编程和面向对象的结合,纯静态的语言。
解释器(interpreter)
值与变量(val & var)
函数(Function)
1、常量 val
2、变量 var
3、main函数要定义在object里面
实例:
object Basic{
def hello(name : String): String ={
"Hello :" + name //block的最后一行就是输出值
}
def helloScala(){
println("hello Scala!!")
}
val add = (x: Int,y :Int) => x + y
def add2(x:Int)(y:Int) = x + y
def printEveryChar(c : String*)={
c.foreach(x => println(x))
}
def main(args : Array[String]){
println("hello Scala")
//println(hello("Scala"))
helloScala()
add(1,2)
println(add2(4)(6))
printEveryChar("a","b","c","d")
//if表达式
val x=1
val a=if(x>0) 1 else 0
println(a)
//循环
var(n,r)=(10,0)
while(n>0){
r = r + n
n = n - 1
}
println(r)
for(i<- 1 to(10)){
println(i)
}
}
}
------------------------------------------------------------
条件表达式(if)
循环表达式(no continue,no break)
语句终止(;no need)
实例2
class Basic2{
}
class Person{
var name : String = _ //会生成getter和setter方法
val age = 10 //只会生成getter方法
private[this] val gender = "male"
}
object Basic2 {
def main(args: Array[String]): Unit ={
val p = new Person //括号可省略
p.name = "Jack"
println(p.name + ":" + p.age)
}
}
----------------------------------------------------------
类(class)
声明类(一个源文件中可以包含很多类,并且都是public级别)
getter和setter
构造函数(primary constructor & auxiliary constructor)
继承(extends)
重写父类方法(override def)
重写字段(override val,override var)
实例3
class Basic2{
}
class Person{
var name : String = _ //会生成getter和setter方法
val age = 10 //只会生成getter方法
private[this] val gender = "male"
}
object Basic2 {
def main(args: Array[String]): Unit ={
val p = new Person //括号可省略
p.name = "Jack"
println(p.name + ":" + p.age)
}
}
实例4
class Basic2{
}
//1、主构造器直接跟在类名后面,主构造器中的参数,最后会被编译成字段
//2、主构造器执行的时候,会执行类中的所有语句
//3、假设参数声明时不带val和var,那么相当于private(this)!!!
class Person(var name:String,val age:Int){
/* var name : String = _ //会生成getter和setter方法
val age = 10 //只会生成getter方法
private[this] val gender = "male"*/
println("this is the primary constructor!")
}
object Basic2 {
def main(args: Array[String]): Unit ={
/* val p = new Person
p.name = "Jack"
println(p.name + ":" + p.age)*/
val p = new Person("Jack",20)
println(p.name + ":" + p.age)
}
}
--------------------------------------------------------------------
抽象类(abstract class)
类的一个或者多个方法没有没完整的定义
声明抽象方法不需要加abstract关键字,只需要不写方法体
子类重写父类的抽象方法时不需要加override
父类可以声明抽象字段(没有初始值的字段)
子类重写父类的抽象字段时不需要加override
class Basic3{
}
abstract class Person1{
def speak
val name : String
var age : Int
}
abstract Student1 extends Persion1{
def speak{
print("speak!!!")
}
val name = "AAA"
var age = 100
}
trait Logger{
def log(msg : String){
println("log" + msg)
}
}
class Test extends Logger{
def test{
log("xxx")
}
}
trait logger{
def log(msg:String)
}
trait ConsoleLogger extends Logger{
def log(msg:String){
println(msg)
}
}
class Test extends ConsoleLogger{
def test{
log("PPP")
}
}
trait ConsoleLogger{
def log(ms : String){
println("save money:" + msg)
}
}
trait MessageLogger extends ConsoleLogger{
def log(msg : String){
println("save money to bank :" + msg)
}
}
abstract class Account{
def save
}
class MyAccount extends Account with ConsoleLogger{
def save{
log(100)
}
}
object Basic3 extends App{
val t = new Test
t.test
val acc = new MyAccount with MessageLogger
acc.save
var s = new Student1
s.speak
println(s.name + ":" + s.age)
}
特质(trait)-对比下JAVA8的接口
字段和行为的集合
混入类中
通过with关键字,一个类可以扩展多个特质
trait续
当做接口
带有具体实现的接口
带有特质的对象
特质从左到右被构造
apply方法
单例对象
class ApplyTest{
def test{
println("test")
}
}
class Basic4{
}
object Basic4 extends App{
val a = ApplyTest
}
--------包(package com.xx.data)
支持嵌套,下层可以访问上层作用域中的名称
可串联
顶部标记
包对象
包可见性
包在任何地方都可以引入,作用域至该语句所在块的末尾
重命名引入成员(xx => yy)
隐藏方法(xx => _)
自动引入(java.lang._ scala._ Predef._)
--------模式匹配-----------------
标准用法(match)
使用守卫
匹配类型
class Basic5{
}
case class Book(name : String,author : String)
object Basic5 extends App{
val value = 1
value match{
case 1 => "one"
case 2 => "two"
case _ => "some other number"
}
val result = value match{
case i if i == 1 => "one"
case i if i == 2 => "two"
case _ => "some other number"
}
println("result of match is :" + result)
println("result2 of match is :" + result2)
def t(obj:Any) = obj match{
case x : Int => println("Int")
case s : String => println("String")
case _ => println("unknow type")
}
t("1")
t(1)
t(1L)
val macTalk = Book("MacTalk","CJQ")
macTalk match{
case Book(name,author) => println("this is book")
case _ =>println("unknown")
}
}
---case class(多用在模式匹配中)
构造器中的每一个类型都为val,不建议用var
不用new就可以直接生产对象(为什么?apply方法)
-----------------------------
高阶函数
匿名函数 val double = (x: Int)=> 2 * x
函数作为参数
参数类型推断
常用高阶函数
map、filter、reduce等等
----------------------------
集合
List
Set
Tuple
Map
----------------
集合操作
foreach(类似于map,但是没有返回值)
map(迭代)
filter(过滤)
zip(聚合)
partition(列表分割)
flatten(扁平化)
flatMap(map + flatten)
------------------------
泛型
泛型类
class Pair[T,S](val first:T,val second:S)
泛型方法
def computer[T](list : List[T])= ...
------------------------
class Basic6{
}
class A{
}
class Rich(a : A){
def rich{
println("rich...")
}
}
object Basic6 extends App{
implicit def a2Rich(a : A) = new Rich(a)
val a = new A
a.rich
def testParam(implicit name : String){
println(name)
}
implicit val name = "implicit!!!"
testParam
implicit class Calculator(x : Int){
def add(a : Int) : Int = a + 1
}
println(1.add(1))
}
---------------Spark概述与编程模型---------------------
Spark的快只是因为内存?
内存计算
DAG
支持三种语言的API:Scala、Python、Java
有四种运行模式:local()
--------------集群配置-----------------------------------------
spark-env.sh
export JAVA_HOME=
export SPARK_MASTER_IP=
export SPARK_WORKER_CORES=
export SPARK_WORKER_INSTANCES=
export SPARK_WORKER_MEMORY=
export SPARK_MASTER_PORT=
export SPAEK_JAVA_OPTS="-verbose:gc -XX:-PrintGCDetails -XX:+PrintGCTimeStamps"
slaves
xxx.xxx.xxx.2
xxx.xxx.xxx.3
xxx.xxx.xxx.4
xxx.xxx.xxx.5
----------------------------------------------------------------------------------------
shell 运行
MASTER=local[4] ADD_JARS=code.jar ./spark-shell
MASTER=spark://host:port
指定executor内存:export SPARK_MEM=25g
-------------------------------------------------------------------
class Analysis{
}
object Analysis{
def main(arg : Array[String]){
if(args.length !=3){
println("Usage : java -jar code.jar dependency_jars file_location save_location")
System.exit(0)
}
val jars = ListBuffer[String]()
args(0).split(‘.‘).map(jar += _)
val conf = new SparkConf()
conf.setMaster("spark://master:8080")
.setSparkHome("/usr/hadoop/spark-1.6.0-cdh5.7.1")
.setAppName("analysis")
.setJar(jars)
.set("spark.executor.memory","2g")
val sc = new SparkContext(conf)
val data = sc.textFile(args(1))
data.cache
println(data.count)
data.file(_.split(‘ ‘).length == 3).map(_.split(‘ ‘)(1)).map((_,1)).reduceByKey(_+_)
.map(x=>(x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveAsTextFile(args(2))
}
}
-----------------------------------------------------------------------------
java -jar /home/cloudera/IdeaProjects/scala/out/artifacts/scala_jar/scala.jar /home/cloudera/IdeaProjects/scala/out/artifacts/scala_jar/scala.jar hdfs://quickstart.cloudera:8020/spark/test.txt hdfs://quickstart.cloudera:8020/spark/
val rdd1 = sc.textFile("hdfs://192.168.1.198:8020/spark/SogouQ1.txt")
rdd1.toDebugString
val words=rdd1.flatMap(_.split(" "))
val wordscount=words.map(x=>(x,1)).reduceByKey(_+_)
wordscount.collect
wordscount.toDebugString
val rdd1 = sc.textFile("hdfs://192.168.1.198:8020/spark/test.txt",9)
val rdd2=rdd1.map(_.split("\t")).filter(_.length==6)
val rdd3=rdd2.filter(_(3).toInt==1).filter(_(4).toInt==2)
rdd3.count()
spark-shell --executor-memory 2g --driver-memory 1g --master spark://192.168.1.198:7077
val rdd = sc.textFile("hdfs://192.168.1.198:8020/spark/test1.txt")
rdd.cache()
val wordcount = rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
wordcount.take(10)
val wordsort = wordcount.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
wordsort.take(10)
val data=sc.textFile("hdfs://192.168.1.198:8020/spark/SogouQ1.txt")
data.count
data.map(_.split(‘\t‘)(0)).filter(_ < "20111230010101").count
data.map(_.split(‘\t‘)(3)).filter(_.toInt == 1).count
data.map(_.split(‘\t‘)).filter(_(3).toInt == 1).filter(_(4).toInt == 1).count
data.map(_.split(‘\t‘)).filter(_(2).contains("baidu")).count
--------maven--依赖-----包----------
<project>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.0-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependencies>
</project>
-----------------------------------------------------
??学习下maven
------深入Spark内核-----------
术语解释
Application 基于Spark的用户,包含了driver程序
Driver Program 运行main函数并且新建SparkContext的程序
Cluster Manager 在集群上获取资源的外部服务(例如:standalone.Mesos,Yarn)
Worker Node 集群中任何可以运行应用代码的节点
Executor 是在一个worker node上为某应用启动的一个进展,该进程负责运行任务,并且负责将数据存在内存或者磁盘上。每个应用都有各自独立的executors
Task 被送到某个executor上的工作单元
Job 包含很多任务的并行计算,可以看做和Spark的action对应
Stage 一个Job会被拆分很多组任务,每组任务被称为Stage(就像Marpreduce分map任务和reduce任务一样)
|--------------------------------------------------------------------------|
|Cluster Overview |
|--------------------------------------------------------------------------|
|Driver Program |
|SparkContext <----------> Cluster Manager <-----------> Worker Node |
| Executor Cache |
| Task Task |
| |
| Worker Node |
| Executor Cache |
| Task Task |
|--------------------------------------------------------------------------|
---------数据本地性---------------
第一次运行时数据不在内存中,所以从HDFS上取,任务最好运行在数据所在的节点上 文件系统本地性
第二次运行,数据已经在内存中,所以任务最好运行在该数据所在内存的节点上 内存本地性
万一有数据被置换出内存,则任然从HDFS上取 LRU置换
-------------------------------------------------------------------------------------------
再看RDD
分区 protected def getPartitions:Array[Partition]
依赖 protected def getDependencies:Seq[Dependency[]] = deps
函数 def computes(split:Partition,context:TaskContext):Iterator[T]
最佳位置(可选) protected def getPreferredLocations(split:Partition):Seq[String]=Nil
分区策略(可选)@transient val partitioner: Option[Partitioner] = None
------------------------------------------------------------------------------------------
最常见的HadoopRDD
分区:每个HDFS block
依赖:无
函数:读取每一个block
最佳位置:HDFS block所在位置
分区策略:无
----------------------------------------------------------------------
FilteredRDD
分区:与父RDD一致
依赖:与父RDD一对一
函数:计算父RDD的每个分区并过滤
最佳位置:无(与父RDD一致)
分区策略:无
------------------------------------------------------
JoinedRdd
分区:每个reduce任务一个分区
依赖:依赖所有父RDD
函数:读取suffle数据并计算
最佳位置:无
分区策略:HashPartitioner(partitions:Int)
---------------------------------------------
细看DAG Scheduler
目标RDD 计算每个分区的函数 结果监听器
DAG Scheduler
基于Stage构建DAG,决定每个任务的最佳位置
记录哪个RDD或者Stage输出被物化
将taskset传给底层调度器TaskScheduler
重新提交shuffle输出丢失的stage
--------------------------------------------
调度器优化
一个Stage内的窄依赖进行pipeline操作
1+1+1+1=4 1+1=2 2+1=3 3+1=4
基于partition选择最优的join算法使shuffle的数据最小化
重用已经缓存过的数据
--------------------------------------------
Task细节
外部存储 Task
shuffle数据 f1->f2->f3 ---------> 输出文件
Stage边界只出现在外部输入及取shuffle数据的时候
为了容错,会把suffle输出写在磁盘或者内存
任何一个任务可以运行在任何一个节点
允许任务使用那些被缓存但是已经被置换出去的数据
--------------------------------------------
TaskScheduler
提交taskset(一组task)到集群运行并汇报结果
出现shuffle输出lost要报告fetch failed错误
碰到straggle任务需要放到别的节点上重试
为每一个TaskSet维护一个TaskSetManager(追踪本地性及错误信息)
------------------------------------------
广播变量 Broadcast variables
BT形式的广播变量
使用场景:lookup表,mapside join
注意点:只读、存于每台worker的cache,不随task发送
使用方式:val broadcastVar = sc.broadcast(Array(1,2,3))
broadcastVar.value
--------------------------------------------------
累加器Accumulators
只增
类似与MapReduce中的counter
用法:val accum = sc.accumulator(0)
sc.parallelize(Array(1,2,3,4)).foreach(x=>accum +=x)
accum.value
----------------------------------------------
性能调优
优化点1
问题:Task序列化后太大
解决:使用广播变量
优化点2
问题:val rdd=data.filter(f1).filter(f2).reduceBy...
经过以上语句会有很多空任务或者小任务
解决:使用coalesce或者repartition去减少RDD中partition数量
--------------------------------------------------------
优化点3
问题:每个记录的开销太大
rdd.map{x=conn=getDBConn;conn.write(x.toString);conn.close}
解决:rdd.mapPartitions(records=>conn.getDBConn;for(item <- records))
write(item:toString);
conn.close)
-------------------------------------------------------
优化点4
问题:任务执行速度倾斜
解决:1、数据倾斜(一般是partition key取的不好) 考虑其他的并行处理方式 中间可以加入一步aggregation
2、Worker倾斜(在某些worker上的executor不给力)
设置spark.speculation=true把哪些持续不给力的node去掉
------------------------------------------------------------
优化点5
问题:不设置spark.local.dir 这是spark写shuffle输出的地方
解决:设置一组磁盘
spark.local.dir=/mn1/spark,/mnt2/spark,/mnt3/spark
--------------------------------------------------------
优化点6
问题:reducer数量不合适
解决:需要按照实际情况调整
太多的reducer,造成很多的小任务,以此产生很多启动任务的开销
太少的reducer,任务执行慢
--------------------------------------------------------
优化7
问题:collect输出大量结果慢
解决:直接输出到分布式文件系统
-------------------------------------------
优化8
问题:序列化 Spark默认使用JDK自带的ObjectOutputStream
解决:使用Kryo serialization
---开源框架---------------------------------------
-----DB---
Cassandra
HBase
MongoDB
Terrastore
Redis
SSDB
MySQL
-----SQL on Hadoop(Spark)----------------------
Hive
Shark(Catalyst)
Impala
------------日志(数据)收集------------------
Sqoop
Flume
Chukwa
Kafka
DataX
Dbsync
TimeTunnel
------------ML---------------------
Mahout
MLib
--------other---------------
Zookeeper
Ooize
Hue
-----------------------