本讲主要内容:环境安装、配置、本地模式、集群模式、自动化脚本、web状态监控
==========单机============
开发工具开发
下载最新版Scala For Eclipse
1、建立工程,修改scala编译版本
2、加入Spark1.6.0的jar文件依赖
下载 http://apache.opencas.org/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz
spark-assembly-1.6.0-hadoop2.6.0.jar
3、找到依赖的spark jar文件并导入到eclipse中的jar依赖中
4、src下建立spark工程包com.dt.spark
5、创建scala入口类
6、把class编程object,并编写main入口方法
找不到类crtl+shift+o
例子代码:
package com.dt.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* 使用Scala开发本地测试的spark wordcount程序
* @author DT_大数据梦工厂
* 新浪微博:http://weibo.com/ilovepains
* */
object WordCount {
def main(args:Array[String]){
/**
* 1、创建Spark配置对象SparkConf,设置Spark程序的运行时的程序配置信息
* 例如:通过setMaster来设置程序要连接的Spark集群的url,
* 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如只有1G的内存)的初学者
*/
val conf = new SparkConf()//创建SparkConf对象
conf.setAppName("My First Spark App!")//设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setMaster("local")//此时程序在本地运行,不需要安装Spark集群
/**
* 2、创建SparkContext对象
* SparkContext是Spark程序所有功能的唯一入口,无论采用Scala、Java、Python、R等都必须要
* SparkContext核心作用:初始化Spark应用程序所运行所需要的核心组件,包括DAGScheduler、TaskScheduler、SchedulerBackend
* 同时还会负责Spark程序往Master注册程序等
* SparkContext是整个Spark应用程序中最为至关重要的一个对象
*/
val sc = new SparkContext(conf)//通过创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
/**
* 3、根据具体的数据来源(HDFS、HBase、Local FS、DB、S3等)通过SparkContext来创建RDD
* RDD创建基本有三种方式:根据外部的数据来源(例如HDFS),根据Scala集合、由其它的RDD操作
* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
*/
//sc.textFile(文件路径,最小并行度)
//val lines: RDD[String],通过类型推断得到lines是String类型的RDD
//val lines: RDD[String] = sc.textFile("F:/安装文件/操作系统/spark-1.6.0-bin-hadoop2.6/README.md", 1)
val lines = sc.textFile("F:/安装文件/操作系统/spark-1.6.0-bin-hadoop2.6/README.md", 1)
/**
* 4、对初始的RDD进行transformation级别的处理,例如map、filter等高阶函数的编程,来进行具体的数据计算
* 4.1、将每一行的字符串拆分成单个的单词
*/
val words = lines.flatMap { line => line.split(" ") }//对每一行的字符串进行单次拆分并把所有行的拆分结果通过flat合并成一个大的单次集合
/**
* 4.2、在单词拆分的基础上对每个单词实例计数为1,也就是word=>(word,1)
*/
val pairs = words.map { word => (word,1) }//其实编程了Tuple (word,1)
/**
* 4.3、在每个单词实例计数为1的基础上统计每个单词在文件中出现的总次数
*/
val wordCounts = pairs.reduceByKey(_+_)//对相同的key,进行value的累加(包括local和Reducer级别同时reduce)
wordCounts.foreach(wordNumberPair=>println(wordNumberPair._1+":"+wordNumberPair._2))
/**
* 5、释放相关资源
*/
sc.stop()
}
}
我的结果不是老师的结果,暂时不管
另外里面开始报错了,一定会报错,这个错误是正确的,因为运行的时候要找Hadoop,没有找到,但是这不是程序错误,也不影响我们的任何功能
==========集群============
package com.dt.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* 使用Scala开发集群测试的spark wordcount程序
* @author DT_大数据梦工厂
* 新浪微博:http://weibo.com/ilovepains
* */
object WordCount_Cluster {
def main(args:Array[String]){
/**
* 1、创建Spark配置对象SparkConf,设置Spark程序的运行时的程序配置信息
* 例如:通过setMaster来设置程序要连接的Spark集群的url,
* 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如只有1G的内存)的初学者
*/
val conf = new SparkConf()//创建SparkConf对象
conf.setAppName("My First Spark App!")//设置应用程序的名称,在程序运行的监控界面可以看到名称
//conf.setMaster("spark://Master:7077")//此时必须要运行在Master:7077上,之后运行可手工配置
/**
* 2、创建SparkContext对象
* SparkContext是Spark程序所有功能的唯一入口,无论采用Scala、Java、Python、R等都必须要
* SparkContext核心作用:初始化Spark应用程序所运行所需要的核心组件,包括DAGScheduler、TaskScheduler、SchedulerBackend
* 同时还会负责Spark程序往Master注册程序等
* SparkContext是整个Spark应用程序中最为至关重要的一个对象
*/
val sc = new SparkContext(conf)//通过创建SparkContext对象,通过传入SparkConf实例来定制Spark运行的具体参数和配置信息
/**
* 3、根据具体的数据来源(HDFS、HBase、Local FS、DB、S3等)通过SparkContext来创建RDD
* RDD创建基本有三种方式:根据外部的数据来源(例如HDFS),根据Scala集合、由其它的RDD操作
* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴
*/
//sc.textFile(文件路径,最小并行度)
//val lines: RDD[String],通过类型推断得到lines是String类型的RDD
//val lines = sc.textFile("/library/wordcount/input/Data", 1)//读取HDFS文件,并切分成不同的Partition
val lines = sc.textFile("/historyserverforSpark/README.md", 1)
/**
* 4、对初始的RDD进行transformation级别的处理,例如map、filter等高阶函数的编程,来进行具体的数据计算
* 4.1、将每一行的字符串拆分成单个的单词
*/
val words = lines.flatMap { line => line.split(" ") }//对每一行的字符串进行单次拆分并把所有行的拆分结果通过flat合并成一个大的单次集合
/**
* 4.2、在单词拆分的基础上对每个单词实例计数为1,也就是word=>(word,1)
*/
val pairs = words.map { word => (word,1) }//其实编程了Tuple (word,1)
/**
* 4.3、在每个单词实例计数为1的基础上统计每个单词在文件中出现的总次数
*/
val wordCounts = pairs.reduceByKey(_+_)//对相同的key,进行value的累加(包括local和Reducer级别同时reduce)
//集群中wordCounts.collect
wordCounts.collect.foreach(wordNumberPair=>println(wordNumberPair._1+":"+wordNumberPair._2))
/**
* 5、释放相关资源
*/
sc.stop()
}
}
打包:jar
WordCount.jar
http://spark.apache.org/docs/latest/submitting-applications.html
./bin/spark-submit \
--class <main-class>
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
./spark-submit --class com.dt.spark.WordCount_Cluster --master spark://Master:7077 /root/Documents/SparkApps/WordCount.jar
hadoop fs -put /usr/local/spark-1.6.0-bin-hadoop2.6/README.md hdfs://192.168.145.131:9000/historyserverforSpark/ 可以上传文件 但是不能用??
作业:在eclipse中写好广告点击排名的程序并测试好
王家林老师名片:
中国Spark第一人
新浪微博:http://weibo.com/ilovepains
微信公众号:DT_Spark
博客:http://blog.sina.com.cn/ilovepains
手机:18610086859
QQ:1740415547
邮箱:[email protected]