spark学习四

5.sc.textFiles() 与 sc.wholeTextFiles() 的区别

sc.textFile()是将path 里的所有文件内容读出,以文件中的每一行作为一条记录的方式,文件的每一行 相当于 列表 的一个元素,因此可以在每个partition中用for i in data的形式遍历处理数据。

sc.wholeTextFiles()返回的是[(key, val), (key, val)...]的形式,其中key是文件路径,val是文件内容,每个文件作为一个记录!这说明这里的 val 将不再是 list 的方式为你将文件每行拆成一个 list的元素,
而是将整个文本的内容以字符串的形式读进来,也就是说val = ‘...line1...\n...line2...\n‘
这时需要你自己去拆分每行!而如果你还是用for i in val的形式来便利 val那么i得到的将是每个字符。

6.filter方法过滤集合中的元素

首先你需要给filter方法一个判断条件或者返回true/false的函数,这个判断条件(函数)的输入类型要与集合元素类型一致,
返回值是布尔型的。filter方法会对集合的每一个元素调用判断条件,当条件为true的时候则元素进入新的集合否则会被过滤掉。
你还需要使用一个变量来指向新的集合
过滤偶数:
val x=List.range(1,10)
x:List[Int]=List(1,2,3,4,5,6,7,8,9)
val evens =x.filter(_%2==0)
evens:List[Int]=List(2,4,6,8)

val evens=x.filterNot(_%2==0)
evens:List[Int]=List(1,3,5,7,9)

filter方法可以便利整个集合,但是其他方法只是遍历一部分元素
filter方法允许提供一个判断条件(函数),过滤集合元素
当判断逻辑复杂时,没办法一行写完,可以在filter内部使用多行的判断逻辑,也可以定义一个判断函数,filter(panduan)
也可以连续使用filter方法
io.Source.formFile(canFilename)
.toList
.filter(_.trim !="")
.filter(_.charAt(0) !=‘#‘)

7.mapValues(func)

功能:对键值对每个value都应用一个函数,但是,key不会发生变化。

val list = List("hadoop","spark","hive","spark")
val rdd = sc.parallelize(list)
val pairRdd = rdd.map(x => (x,1))
pairRdd.mapValues(_+1).collect.foreach(println)//对每个value进行+1
结果
(hadoop,2)
(spark,2)
(hive,2)
(spark,2)

完成了实验四RDD编程初级实践

2.编写独立应用程序实现数据去重
对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并,并剔除其 中重复的内容,得到一个新文件 C。下面是输入文件和输出文件的一个样例,供参考。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object exercise{

         def main(args: Array[String]) {
              val conf = new SparkConf().setAppName("RemDup")
         val sc = new SparkContext(conf)
         val dataFile1 = "file:///usr/local/spark/mycode/exercise42/text1.txt,file:///usr/local/spark/mycode/exercise42/text2.txt"

         val data = sc.textFile(dataFile1,2)
         val da = data.distinct()
         da.foreach(println)

}
}

  

3.编写独立应用程序实现求平均值问题
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生 名字,第二个是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到 一个新文件中。下面是输入文件和输出文件的一个样例,供参考

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object wordcount{
    def main(args:Array[String]){
         val inputfile="file:///usr/local/spark/mycode/exercise43/data.txt"
         val conf=new SparkConf().setAppName("WordCount").setMaster("local[2]")
         val sc=new SparkContext(conf)
         val textFile=sc.textFile(inputfile)
         val wordCount=textFile.map(line=>(line.split(" ")(0),line.split(" ")(1).toInt)).mapValues(x=>(x,1)).reduceByKey((x,y)=>(x._1+y._1,x._2+y._2)).mapValues(x=>(x._1/x._2)).collect().foreach(println)
}

}

  

原文地址:https://www.cnblogs.com/zhang12345/p/12264520.html

时间: 2024-08-04 12:17:03

spark学习四的相关文章

Spark学习四:网站日志分析案例

Spark学习四:网站日志分析案例 标签(空格分隔): Spark Spark学习四网站日志分析案例 一创建maven工程 二创建模板 三日志分析案例 一,创建maven工程 1,执行maven命令创建工程 mvn archetype:generate -DarchetypeGroupId=org.scala-tools.archetypes -DarchetypeArtifactId=scala-archetype-simple -DremoteRepositories=http://scal

Spark 学习: spark 原理简述与 shuffle 过程介绍

Spark学习: 简述总结 Spark 是使用 scala 实现的基于内存计算的大数据开源集群计算环境.提供了 java,scala, python,R 等语言的调用接口. Spark学习 简述总结 引言 1 Hadoop 和 Spark 的关系 Spark 系统架构 1 spark 运行原理 RDD 初识 shuffle 和 stage 性能优化 1 缓存机制和 cache 的意义 2 shuffle 的优化 3 资源参数调优 4 小结 本地搭建 Spark 开发环境 1 Spark-Scal

Spark学习六:spark streaming

Spark学习六:spark streaming 标签(空格分隔): Spark Spark学习六spark streaming 一概述 二企业案例分析 三Spark streaming的工作原理 四textFileStreaming的应用 四企业中的开发方式 五总结 一,概述 一个简单的实例 1,安装nc nc -lk 9999 2,启动应用 ./bin/run-example streaming.NeworkWordCount localhost 9999 二,企业案例分析 需求: 实时统计

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre

Spark学习笔记之SparkRDD

Spark学习笔记之SparkRDD 一.   基本概念 RDD(resilient distributed datasets)弹性分布式数据集. 来自于两方面 ①   内存集合和外部存储系统 ②   通过转换来自于其他RDD,如map,filter等 2.创建操作(creation operation):RDD的创建由SparkContext来负责. 3.转换操作(transformation operation):将一个RDD通过一定操作转换为另一个RDD. 4.控制操作(control o

用Spark学习FP Tree算法和PrefixSpan算法

在FP Tree算法原理总结和PrefixSpan算法原理总结中,我们对FP Tree和PrefixSpan这两种关联算法的原理做了总结,这里就从实践的角度介绍如何使用这两个算法.由于scikit-learn中没有关联算法的类库,而Spark MLlib有,本文的使用以Spark MLlib作为使用环境. 1. Spark MLlib关联算法概述 在Spark MLlib中,也只实现了两种关联算法,即我们的FP Tree和PrefixSpan,而像Apriori,GSP之类的关联算法是没有的.而

nodejs学习四 Node.js NPM

什么是NPM? 不知道大家注意没有,windows平台下的Node.js安装包大小才区区4M多,真可以用短小精悍来形容它,作为一种编程语言,像java一个SDK 就几十M,为什么node.js的运行环境这么小呢?这其中的微妙之处在于,它拥有一个庞大的第三方软件库. 在Node本身提供的包(原生)中没有我们要实现的功能模块的时候,我们可以去寻找下是否已经有人实现了这种功能.毕竟重复造轮子这种事情,很多人都不想干. 去哪里寻找我们想要的包呢?如果你还不知道报的名字,你可以去https://www.n

ZigBee学习四 无线+UART通信

ZigBee学习四 无线+UART通信 1) 协调器编程 修改coordinator.c文件 byte GenericApp_TransID; // This is the unique message ID (counter) afAddrType_t GenericApp_DstAddr; //unsigned char uartbuf[128];/********************************************************************** L

Spark学习三:Spark Schedule以及idea的安装和导入源码

Spark学习三:Spark Schedule以及idea的安装和导入源码 标签(空格分隔): Spark Spark学习三Spark Schedule以及idea的安装和导入源码 一RDD操作过程中的数据位置 二Spark Schedule 三Idea导入spark源码 一,RDD操作过程中的数据位置 [hadoop001@xingyunfei001 spark-1.3.0-bin-2.5.0]$ bin/spark-shell --master local[2] val rdd = sc.t