spark 教程三 spark Map filter flatMap union distinct intersection操作

RDD的创建 spark 所有的操作都围绕着弹性分布式数据集(RDD)进行,这是一个有容错机制的并可以被并行操作的元素集合,具有只读、分区、容错、高效、无需物化、可以缓存、RDD依赖等特征

RDD的创建基础RDD

1.并行集合(Parallelized Collections):接收一个已经存在的Scala集合,然后进行各种并行运算

 var sc=new SparkContext(conf)

 var rdd=sc.parallelize(Array(2,4,9,3,5,7,8,1,6));
 rdd.foreach(println)

 

2.hadoop 数据集(Hadoop Datasets):在一个文件的每条记录上运行函数只要文件系统是HDFS,或者hadoop支持的任意存档存储系统即可

val file=sc.textFile("hdfs://hostname:9090/path/somefile.txt")

 Map的操作

map(func) 返回一个新的RDD ,由每一个元素经过func后转换组成

    //设置conf master =local 为本地模式, appNamm 是应用程序的名称
    var conf=new SparkConf().setMaster("local[1]").setAppName("RDD")

    var sc=new SparkContext(conf)

    // 加载一个数组,然后给没一个元素乘以2, 并且输出
    var rdd=sc.parallelize(Array(2,4,9,3,5,7,8,1,6)).map(_*2).collect().foreach(println)

 filter(func) 返回一个新的RDD,由func函数计算后返回为true 的元素组成

    //返回字母为b的元素
    var strrdd=sc.parallelize(Array("a","b","c","d")).filter(_=="b").foreach(println)

 flatMap(func) 类似于Map 但是每一个输入元素被映射成0个或多个输出元素,因此func 应该返回一个序列,而不是单一元素

    //把每一个元素乘以2然后在压平,输出
    var list=sc.parallelize(List(List(1,6),List(2,7),List(3,8),List(4,9),List(5,10)))
    list.flatMap(_.map(_*2)).foreach(println)

下面用flatMap做一个wordcount的例子,新建一个文件建立了一下几个字段

/读取本地文件然后  。用split函数 把元素拆分成单一的元素,然后再用个map 给没个元素设置一个1, 在用reduceByKey把相同key的数量相加
    var txtrdd=sc.textFile("d:/517/wc.txt").flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).foreach(println)

result:
(tome,1)
(tom,1)
(xiaoli,1)
(lusi,1)
(hello,7)
(java,1)
(jame,2)

 distinct(numTasks)  返回一个数据集中不包含重复元素的新数据集 (去重)

 var la=sc.parallelize(List(("A",1),("A",6),("B",2),("B",7),("A",1)))

 var distinct=la.distinct()

result:
(A,1)
(A,6)
(B,7)
(B,2)

 union(otherDataset) 返回一个新的数据集,新数据集由源数据集和参数数据集共同构成

    var lb=sc.parallelize(List(("A",1),("A",6),("B",2),("B",7),("C",3)))

    var la=sc.parallelize(List(("A",1),("A",6),("D",2),("E",7),("A",1)))

    var distinct=la.union(lb)

    distinct.foreach(println

    result:

    (A,1)
    (A,6)
    (D,2)
    (E,7)
    (A,1)

    (A,1)
    (A,6)
    (B,2)
    (B,7)
    (C,3)

intersection(otherDataset) 返回一个新的数据集,新数据集由源数据集和参数数据集的交集构成

    var lb=sc.parallelize(List(("A",1),("A",6),("B",2),("B",7),("C",3)))

    var la=sc.parallelize(List(("A",1),("A",6),("D",2),("E",7),("A",1)))

    var distinct=la.intersection(lb)

    distinct.foreach(println)

    result:
    (A,1)
    (A,6)
时间: 2024-12-13 14:40:25

spark 教程三 spark Map filter flatMap union distinct intersection操作的相关文章

Spark算子:RDD基本转换操作(1)–map、flatMap、distinct

Spark算子:RDD基本转换操作(1)–map.flatMap.distinct 关键字:Spark算子.Spark RDD基本转换.map.flatMap.distinct map 将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素. 输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区. hadoop fs -cat /tmp/lxw1234/1.txt hello world hello spark hello hive //读取HDFS文件到RDD sca

Spark教程-构建Spark集群(1)

对于90%以上想学习Spark的人而言,如何构建Spark集群是其最大的难点之一,为了解决大家构建Spark集群的一切困难,家林把Spark集群的构建分为了四个步骤,从零起步,不需要任何前置知识,涵盖操作的每一个细节,构建完整的Spark集群. 从零起步,构建Spark集群经典四部曲: 第一步:搭建Hadoop单机和伪分布式环境: 第二步:构造分布式Hadoop集群: 第三步:构造分布式的Spark集群: 第四步:测试Spark集群: 本文内容为构建Spark集群经典四部曲的第一步,从零起步构建

Spark教程-构建Spark集群-运行Ubuntu系统(1)

为了简化权限等问题,下面我们以root用户的身份登录和使用Ubuntu系统,而Ubuntu在默认情况下并没有开启root用户,这需要我们做如下设置: sudo  -s进入 root用户权限模式 vim /etc/lightdm/lightdm.conf [SeatDefaults] greeter-session=unity-greeter user-session=ubuntu greeter-show-manual-login=true #手工输入登陆系统的用户名和密码 allow-gues

Spark教程(2)-Spark概述及相关组件

1.概述 Spark起源于加州大学伯克利分校RAD实验室,起初旨在解决MapReduce在迭代计算和交互计算中的效率低下问题.目前Spark已经发展成集离线计算,交互式计算,流计算,图计算,机器学习等模块于一体的通用大数据解决方案. 2.Spark组件 Spark Core Spark Core 实现了 Spark 的基本功能,包含任务调度.内存管理.错误恢复.与存储系统 交互等模块. Spark Core 中还包含了对弹性分布式数据集(resilient distributed dataset

Spark教程-构建Spark集群-安装Ubuntu系统(2)

3.点击“Finish”完成虚拟系统的创建,如下图所示: 点击我们创建的Ubuntu,如下所示 点击“CD/DVD(IDE)”进入如下界面 选择我们的Ubuntu的ISO的具体存放路径,选择好后如下图所示: 点击“OK”完成设置: 点击“Memory”,进入如下界面: 此时把我们虚拟的内存设置为2G,如下所示: 点击“OK”完成设置. Spark教程-构建Spark集群-安装Ubuntu系统(2)

Spark教程-构建Spark集群-安装Ubuntu系统(1)

Unbuntu系统是我们开发Hadoop时最常用的操作系统,下面带领大家一步步完成Vmware虚拟机下Unbuntu系统的安装 创建Vmware中的虚拟系统: 我们选择的是“Typical”的方式,点击“Next”进入下一步: 选择稍后安装操作系统,点击“Next”进入下一步: 选择安装Linux系统的Ubuntu版本,点击“Next”进入下一步: 家林这里选择了自定义系统的存放路径为“E:\VMware\Virtual Machines\Master” 如下图所示: 点击“Next”进入下一步

Spark教程-构建Spark集群-运行Ubuntu系统(2)

安装Java 1.打开终端,建立新目录“/usr/lib/java”,如下图所示: 2.把下载的JDK文件移到刚刚创建的“/usr/lib/java”中,如下图所示 3.解压JDK文件,如下图所示: 解压完成的文件目录如下图所示: 4.修改环境变量: 进入如下图所示的配置文件中: 按下“i”进入INSERT模式,把Java的环境编写信息加入其中,如下图所示: 按下“esc“键回到正常模式,保存并退出配置文件: 执行以下命令是配置文件的修改生效: 5.在终端中显示刚刚安装的Java版本,如下图所示

Spark教程-构建Spark集群-配置Hadoop单机模式并运行Wordcount(1)

安装ssh Hadoop是采用ssh进行通信的,此时我们要设置密码为空,即不需要密码登陆,这样免去每次通信时都输入秘密,安装如下: 输入“Y”进行安装并等待自动安装完成. 安装ssh完成后启动服务 以下命令验证服务是否正常启动: 可以看到ssh正常启动: 设置免密码登录,生成私钥和公钥: 在/root/.ssh中生成两个文件:id_rsa和id_rsa.pub,id_rsa为私钥,id_rsa.pub为公钥,我们将公钥id_rsa.pub追加到 authorized_keys中,因为author

Spark 教程:Spark的体系架构

最近看到一篇关于Spark架构的博文,作者是 Alexey Grishchenko.看过Alexey博文的同学应该都知道,他对Spark理解地非常深入,读完他的 “spark-architecture” 这篇博文,有种醍醐灌顶的感觉,从JVM内存分配到Spark集群的资源管理,步步深入,感触颇多(腾云科技ty300.com).因此,在周末的业余时间里,将此文的核心内容译成中文,并在这里与大家分享.如在翻译过程中有文字上的表达纰漏,还请大家指出. 首先来看一张Spark 1.3.0 官方给出的图片