Spark入门(四)--Spark的map、flatMap、mapToPair

spark的RDD操作

在上一节Spark经典的单词统计中,了解了几个RDD操作,包括flatMap,map,reduceByKey,以及后面简化的方案,countByValue。那么这一节将介绍更多常用的RDD操作,并且为每一种RDD我们分解来看其运作的情况。

spark的flatMap

flatMap,有着一对多的表现,输入一输出多。并且会将每一个输入对应的多个输出整合成一个大的集合,当然不用担心这个集合会超出内存的范围,因为spark会自觉地将过多的内容溢写到磁盘。当然如果对运行的机器的内存有着足够的信心,也可以将内容存储到内存中。

为了更好地理解flatMap,我们将举一个例子来说明。当然和往常一样,会准备好例子对应的数据文本,文本名称为uv.txt,该文本和示例程序可以从github上下载。以下会用三种语言:scala、java、python去描述,同时在java中会对比采用java和java8来实现各个例子。其中java和scala程序在github能直接下载,而python则暂时不提供,后续会补上。

scala实现


import org.apache.spark.{SparkConf, SparkContext}

object SparkFlatMap {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("SparkFlatMap")

    val sc = new SparkContext(conf)

    //设置数据路径
    val textData = sc.textFile("./uv.txt")

    //输出处理前总行数
    println("before:"+textData.count()+"行")

    //输出处理前第一行数据
    println("first line:"+textData.first())

    //进行flatMap处理
    val flatData = textData.flatMap(line => line.split(" "))

    //输出处理后总行数
    println("after:"+flatData.count())

    //输出处理后第一行数据
    println("first line:"+flatData.first())

    //将结果保存在flatResultScala文件夹中
    flatData.saveAsTextFile("./flatResultScala")

  }

}

复制代码

java实现

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

import java.util.Arrays;
import java.util.Iterator;

public class SparkFlatMapJava {

    public static void main(String[] args){
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //java实现
        flatMapJava(sc);

        //java8实现
        flatMapJava8(sc);

    }

    public static void flatMapJava(JavaSparkContext sc){
        //设置数据路径
        JavaRDD<String> textData = sc.textFile("./uv.txt");

        //输出处理前总行数
        System.out.println("before:"+textData.count()+"行");

        //输出处理前第一行数据
        System.out.println("first line:"+textData.first()+"行");

        //进行flatMap处理
        JavaRDD<String> flatData = textData.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });

        //输出处理后总行数
        System.out.println("after:"+flatData.count()+"行");

        //输出处理后第一行数据
        System.out.println("first line:"+flatData.first()+"行");

        //将结果保存在flatResultScala文件夹中
        flatData.saveAsTextFile("./flatResultJava");
    }

    public static void flatMapJava8(JavaSparkContext sc){
        sc.textFile("./uv.txt")
          .flatMap(line -> Arrays.asList(line.split(" ")).iterator())
          .saveAsTextFile("./flatResultJava8");
    }

}
复制代码

python实现

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("FlatMapPython")

sc = SparkContext(conf=conf)

textData = sc.textFile("./uv.txt")

print("before:"+str(textData.count())+"行")

print("first line"+textData.first())

flatData = textData.flatMap(lambda line:line.split(" "))

print("after:"+str(flatData.count())+"行")

print("first line"+flatData.first())

flatData.saveAsTextFile("./resultFlatMap")
复制代码

运行任意程序,得到相同结果

before:86400行
first line:2015-08-24_00:00:00 55311 buy
after:259200
first line:2015-08-24_00:00:00
复制代码

查看文件

很显然每一行都按照空格拆分成了三行,因此总行数是拆分前的三倍,第一行的内容只剩下原第一行的第一个数据,时间。这样flatMap的作用就很明显了。

spark的map

用同样的方法来展示map操作,与flatMap不同的是,map通常是一对一,即输入一个,对应输出一个。但是输出的结果可以是一个元组,一个元组则可能包含多个数据,但是一个元组是一个整体,因此算是一个元素。这里注意到在输出的结果是元组时,scala和python能够很正常处理,而在java中则有一点不同。

scala实现


import org.apache.spark.{SparkConf, SparkContext}

object SparkMap {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("SparkFlatMap")

    val sc = new SparkContext(conf)

    val textData = sc.textFile("./uv.txt")

    //得到一个最后一个操作值,前面的时间和次数舍弃
    val mapData1 = textData.map(line => line.split(" ")(2))

    println(mapData1.count())

    println(mapData1.first())

    mapData1.saveAsTextFile("./resultMapScala")

    //得到一个最后两个值,前面的时间舍弃
    val mapData2 = textData.map(line => (line.split(" ")(1),line.split(" ")(2)))

    println(mapData2.count())

    println(mapData2.first())

    //将所有值存到元组中去
    val mapData3 = textData.map(line => (line.split(" ")(1),line.split(" ")(1),line.split(" ")(2)))

    println(mapData3.count())

    println(mapData3.first())

  }

}
复制代码

java实现

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.codehaus.janino.Java;
import scala.Tuple2;
import scala.Tuple3;

public class SparkMapJava {

    public static void main(String[] args){
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava");
        JavaSparkContext sc = new JavaSparkContext(conf);

        //java实现
        mapJava(sc);

        //java8实现
        mapJava8(sc);

    }

    public static void mapJava(JavaSparkContext sc){
        JavaRDD<String> txtData = sc.textFile("./uv.txt");

        //保留最后一个值
        JavaRDD<String> mapData1 = txtData.map(new Function<String, String>() {
            @Override
            public String call(String s) throws Exception {
                return s.split(" ")[2];
            }
        });

        System.out.println(mapData1.count());
        System.out.println(mapData1.first());

        //保留最后两个值
        JavaRDD<Tuple2<String,String>> mapData2 = txtData.map(new Function<String, Tuple2<String,String>>() {
            @Override
            public Tuple2<String,String> call(String s) throws Exception {
                return new Tuple2<>(s.split(" ")[1],s.split(" ")[2]);
            }
        });

        System.out.println(mapData2.count());
        System.out.println(mapData2.first());

        //保留最后三个值
        JavaRDD<Tuple3<String,String,String>> mapData3 = txtData.map(new Function<String, Tuple3<String,String,String>>() {
            @Override
            public Tuple3<String,String,String> call(String s) throws Exception {
                return new Tuple3<>(s.split(" ")[0],s.split(" ")[1],s.split(" ")[2]);
            }
        });

        System.out.println(mapData2.count());
        System.out.println(mapData2.first());

    }

    public static void mapJava8(JavaSparkContext sc){
        JavaRDD<String> mapData1 = sc.textFile("./uv.txt").map(line -> line.split(" ")[2]);
        System.out.println(mapData1.count());
        System.out.println(mapData1.first());

        JavaRDD<Tuple2<String,String>> mapData2 = sc.textFile("./uv.txt").map(line -> new Tuple2<String, String>(line.split(" ")[1],line.split(" ")[2]));
        System.out.println(mapData2.count());
        System.out.println(mapData2.first());

        JavaRDD<Tuple3<String,String,String>> mapData3 = sc.textFile("./uv.txt").map(line -> new Tuple3<String, String, String>(line.split(" ")[0],line.split(" ")[1],line.split(" ")[2]));
        System.out.println(mapData3.count());
        System.out.println(mapData3.first());

    }

}

复制代码

python实现

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("FlatMapPython")

sc = SparkContext(conf=conf)

textData = sc.textFile("./uv.txt")

mapData1 = textData.map(lambda line : line.split(" ")[2])

print(mapData1.count())
print(mapData1.first())

mapData2 = textData.map(lambda line : (line.split(" ")[1],line.split(" ")[2]))

print(mapData2.count())
print(mapData2.first())

mapData3 = textData.map(lambda line : (line.split(" ")[0],line.split(" ")[1],line.split(" ")[2]))

print(mapData3.count())
print(mapData3.first())
复制代码

运行任意程序,得到相同结果

86400
buy
86400
(55311,buy)
86400
(55311,55311,buy)
复制代码

Java中独有的mapToPair

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class SparkMapToPair {

    public static void main(String[] args){
        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkFlatMapJava");
        JavaSparkContext sc = new JavaSparkContext(conf);

        mapToPairJava(sc);

        mapToPairJava8(sc);

    }

    public static void mapToPairJava(JavaSparkContext sc){

        JavaPairRDD<String,String> pairRDD = sc.textFile("./uv.txt").mapToPair(new PairFunction<String, String, String>() {
            @Override
            public Tuple2<String, String> call(String s) throws Exception {
                return new Tuple2<>(s.split(" ")[1],s.split(" ")[2]);
            }
        });

        System.out.println(pairRDD.count());

        System.out.println(pairRDD.first());

    }

    public static void mapToPairJava8(JavaSparkContext sc){
        JavaPairRDD<String,String> pairRDD = sc.textFile("./uv.txt").mapToPair(line -> new Tuple2<>(line.split(" ")[1],line.split(" ")[2]));

        System.out.println(pairRDD.count());

        System.out.println(pairRDD.first());
    }
}

复制代码

运行得到结果

86400
(55311,buy)
复制代码

显然我们发现这个结果,和用map处理保留后两个的结果是一致的。灵活使用map、flatMap、mapToPair将非常重要,后面还将有运用多种操作去处理复杂的数据。以上所有程序的代码都能够在GitHub上下载

转自:https://juejin.im/post/5c77e383f265da2d8f474e29

原文地址:https://www.cnblogs.com/tjp40922/p/12181626.html

时间: 2024-10-08 13:52:45

Spark入门(四)--Spark的map、flatMap、mapToPair的相关文章

一、spark入门之spark shell:wordcount

1.安装完spark,进入spark中bin目录: bin/spark-shell scala> val textFile = sc.textFile("/Users/admin/spark/spark-1.6.1-bin-hadoop2.6/README.md") scala> textFile.flatMap(_.split(" ")).filter(!_.isEmpty).map((_,1)).reduceByKey(_+_).collect().

二、spark入门之spark shell:文本中发现5个最常用的word

scala> val textFile = sc.textFile("/Users/admin/spark-1.5.1-bin-hadoop2.4/README.md") scala> val topWord = textFile.flatMap(_.split(" ")).filter(!_.isEmpty).map((_,1)).reduceByKey(_+_).map{case (word,count) =>(count,word)}.sor

Spark入门系列视频教程

 视频目录: Spark入门| 01 Spark概念架构 Spark入门| 02 Spark集群搭建 Spark入门| 03 Spark Shell算子操作 Spark入门| 04 Spark单词计数Shell操作 Spark入门| 05 IDEA中编写Spark单词计数程序 Spark入门| 06 SparkSQL单词计数程序编写 视频截图:  关注下面公众号进行观看: 原文地址:https://www.cnblogs.com/dreamboy/p/11609979.html

Spark入门实战系列--2.Spark编译与部署(下)--Spark编译安装

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.编译Spark Spark可以通过SBT和Maven两种方式进行编译,再通过make-distribution.sh脚本生成部署包.SBT编译需要安装git工具,而Maven安装则需要maven工具,两种方式均需要在联网下进行,通过比较发现SBT编译速度较慢(原因有可能是1.时间不一样,SBT是白天编译,Maven是深夜进行的,获取依赖包速度不同 2.maven下载大文件是多线程进行,而SBT是

Spark入门实战系列--6.SparkSQL(上)--SparkSQL简介

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.SparkSQL的发展历程 1.1 Hive and Shark SparkSQL的前身是Shark,给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生,它是当时唯一运行在Hadoop上的SQL-on-Hadoop工具.但是MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低的运行效率,为了提高SQL-on-Hadoop的效率,大量的

Spark入门实战系列--3.Spark编程模型(上)--概念及SparkShell实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark编程模型 1.1 术语定义 应用程序(Application): 基于Spark的用户程序,包含了一个Driver Program 和集群中多个的Executor: 驱动程序(Driver Program):运行Application的main()函数并且创建SparkContext,通常用SparkContext代表Driver Program: 执行单元(Executor): 是为某

Spark入门实战系列--1.Spark及其生态圈简介

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架.Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL.Spark St

Apache spark入门精品文章

本文聚焦 Apache Spark 入门,了解其在大数据领域的地位,覆盖 Apache Spark 的安装及应用程序的建立,并解释一些常见的行为和操作. 一. 为什么要使用 Apache Spark 时下,我们正处在一个“大数据”的时代,每时每刻,都有各种类型的数据被生产.而在此紫外,数据增幅的速度也在显著增加.从广义上看,这些数据包含交易数据.社交媒体内容(比如文本.图像和视频)以及传感器数据.那么,为什么要在这些内容上投入如此多精力,其原因无非就是从海量数据中提取洞见可以对生活和生产实践进行

新手入门:Spark部署实战入门

Spark简介 整体认识 Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架.最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一. Spark在整个大数据系统中处于中间偏上层的地位,如下图,对hadoop起到了补充作用: 基本概念 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 第一步分割任务.首先我们需要有一个fo

Spark入门实战系列--8.Spark MLlib(上)--机器学习及SparkMLlib简介

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.机器学习概念 1.1 机器学习的定义 在维基百科上对机器学习提出以下几种定义: l“机器学习是一门人工智能的科学,该领域的主要研究对象是人工智能,特别是如何在经验学习中改善具体算法的性能”. l“机器学习是对能通过经验自动改进的计算机算法的研究”. l“机器学习是用数据或以往的经验,以此优化计算机程序的性能标准.” 一种经常引用的英文定义是:A computer program is said