记录spark的Wordcount小程序:
前提:hdfs已经打开
创建一个name为wc.input的文件,上传到hdfs中的/user/hadoop/spark/中,内容如上图
[[email protected] hadoop-2.6.0-cdh5.4.0]# bin/hdfs dfs -put wc.input /user/hadoop/spark/ 上传
[[email protected] hadoop-2.6.0-cdh5.4.0]# bin/hdfs dfs -ls /user/hadoop/spark/ 查看文件
[[email protected] hadoop-2.6.0-cdh5.4.0]# bin/hdfs dfs -text /uesr/hadoop/spark/wc.input 查看文件内容
[[email protected] spark-1.3.1]# bin/spark-shell 打开spark的客户端
scala> val rdd=sc.textFile("hdfs://spark00:8020/user/hadoop/spark/wc.input") 读取dfs中的文件wc.input
val wordcount = rdd.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((a,b)=>a+b) 进行mapreduce
wordcount.collect 查看所有
rdd.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((a,b)=>a+b).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).collect
sc.textFile("hdfs:spark00:8020/user/hadoop/spark/wc.input").flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey((a,b)=>a+b).collect
sc.textFile(.....).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
排序:
val wordsort = wordcount.sortByKey(true)
val wordsort = wordcount.sortByKey(false)
wordsort.collect
RDD的认识:
在spark中,一个应用程序中包含多个job任务
在mapreduce中,一个job任务就是一个应用
RDD 的特点:
1》 分区 partitioned,split
2》 计算 compute
3》 依赖
rdd特点官网上的翻译及理解:
1,A list of partitions
一系列的分片:比如64M一片,类似于Hadoop中的split
2,A function for computing each split
在每个分片上都有一个函数去迭代/执行/计算 它
3,A list of dependencies on other RDDs
一系列的依赖:RDDa转换成RDDb,RDDb转换成RDDc,那么RDDc就依赖于RDDb,RDDb就依赖于RDDa
4,Optionally,a Partitioner for key-value RDDs(e.g. to say that the RDD is hash-partitioned)
对于key-value的RDD可指定一个partitioner,告诉它如何分片;常用的有hash,range
5,Optionally,a list of preferred location(s) to compute each split on (e.g. block locations for an HDFS file)
要运行的计算/执行最好在哪几个机器上运行,数据本地性
为什么会有哪几个呢?
比如:hadoop默认有三个位置,或者spark cache到内存是可能通过StorageLevel设置多个副本,所以一个partition可能返回多个最佳位置