今天我们来学习spark,spark是一种快速,通用,可扩展的大数据分析引擎,现已成为Apache顶级项目,Spark是MapReduce的替代方案,而且兼容HDFS,Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足,下面我们来介绍这样的一门语言:
一.spark安装
1.上传spark环境所需要的压缩包,
这里面请记住我们要的是预编译包(prebuild版本),而不是源码包,即解压就可以使用的压缩包
我们这个里面使用的是spark-1.6.1-bin-hadoop2.6.tgz,首先我们把它上传到Linux之中,并解压到系统中
2.配置Spark
进入Spark安装目录,进入conf目录并重命名spark-evn.sh.template文件
cd conf/
mv spark-env.sh.template spark-env.sh
vi spark-env.sh
在配置文件中添加如下配置
export JAVA_HOME=/usr/java/jdk1.7.0_45(JAVA的安装路径)
export SPARK_MASTER_IP=node1.itcast.cn(master的那台机器的IP,这个里面我们可以写ip:192.168.109.136)
export SPARK_MASTER_PORT=7077
保存退出
重命名并修改slaves.template文件
mv slave.template slaves
vi slaves
在该文件中添加子节点所在的位置(Worker节点)
192.168.109.137
192.168.109.138
192.168.109.139
保存退出
将配置好的Spark拷贝到其他节点上
scp -r spark-1.6.1 weekday02:/home/xxx/apps
scp -r spark-1.6.1 weekday03:/home/xxx/apps
scp -r spark-1.6.1 weekday04:/home/xxx/apps
3.Spark集群配置完毕,目前是1个Master,3个Worker,在Master的那台主机上启动Spark集群
/home/xxx/apps/spark-1.6.1/sbin/start-all.sh
启动后执行jps命令,主节点上有Master进程,其他节点上有Work进行,登录Spark管理界面查看集群状态(主节点):http://mast1的ip:8080即可
注意,启动集群的时候,注意要关闭防火墙
二,我的第一个spark程序
这个里面我们只使用spark程序,这个算法是利用蒙特-卡罗算法求PI
/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://192.168.109.136:7077 --executor-memory 1G --total-executor-cores 2 /usr/local/spark-1.5.2-bin-hadoop2.6/lib/spark-examples-1.5.2-hadoop2.6.0.jar 100
这个也不需要hadoop的启动,只需要我们有了spark的环境即可
参数说明
-master spark://192.168.109.136:7077 指定Master地址
-executor-memory 2g 指定每一个worker可以使用的内存是2G
-total-executor-cores 2 指定整个集群使用的cpu核数是2
注意:我们一般使用spark-shell,这个是spark自带交互式shell程序,方便用户进行交互式编程,用户可以在该命令下面使用scala编写spark程序
例如:/usr/local/spark-1.5.2-bin-hadoop2.6/bin/spark-shell,如果不写后面的,这个spark只能说是单机的,应为它没有与master的机器
相连,跟别提和别人的连接
三.在spark shell中编写WordCount程序
1.首先启动HDFS
2.向hdfs上传一个文件到hdfs://192.168.109.136:9000/words.txt
3.在spark shell 中用scala语言写spark程序
则此时这个结果在页面直接显示出来
sc.textFile("hdfs://192.168.109.136:9000/words.txt").flatMap(_.split(" ")) .map(_,1).reduceByKey(_+_).collect
则此时这个结果在hdfs的文件中显示出来
sc.textFile("hdfs://192.168.109.136:9000/words.txt").flatMap(_.split(" ")) .map(_,1).reduceByKey(_+_).saveAsTextFile("hdfs://192.168.109.136:9000/wc/out")
则此时因为我们有三个子节点,所以就想hadoop的mapreduce,中三个reduce,则此时我们将其让
最终的结果都读取到一个文件中去,此时我们就修改了一个地方,在reduceByKey(_+_)改为了reduceByKey(_+_,1)
sc.textFile("hdfs://192.168.109.136:9000/words.txt").flatMap(_.split(" ")) .map(_,1).reduceByKey(_+_,1).saveAsTextFile("hdfs://192.168.109.136:9000/wc/out")
四.spark算子
spark算子一共分为两类
一类叫做Transformation(转换),一类叫做Action(动作)
Transformation延迟执行,Transformation会记录元数据信息,当任务触犯到Action才开始真正的执行