Spark-MapReduce编程-自连接(Scala)

关于SQL和Hadoop的实现参考这里 MapReduce编程-自连接

这里用相同的原理,使用spark实现。本人也是刚学Scala,可能写的不好,还请指正。

object SelfUion {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SelfUnion")
    val sc = new SparkContext(conf)
    val cpFile = sc.textFile("cp.txt")
    //val strs =  Array[String]("a", "b")

    // 1) 生成两张表
    val res = cpFile.flatMap(line => {
      val strs: Array[String] = line.split(" ");
      if (strs.length == 2) Array(strs(0) + " parent_" + strs(1), strs(1) + " child_" + strs(0)) else Array[String]()
    })
      // 2) 转化为key-value形式
      .map(line => {
      val kv = line.split(" ")
      (kv(0), kv(1))
    })
      // 3) 列的相等匹配
      .groupByKey()
      // 4) 解析value,得到结果
      .flatMapValues(values => {
      val childList = new ArrayBuffer[String]();
      val parentList = new ArrayBuffer[String]();
      values.foreach(
        name => {
          if (name.startsWith("child_")) childList += name
          else if (name.startsWith("parent_")) parentList += name;
        })
      for (c <- childList; p <- parentList) yield
        c.substring(6) + " " + p.substring(7)
      }).values
      .saveAsTextFile("selfunion_out")

  }
}

cp.txt的内容为:

Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Tom

输出为:

Terry Lucy
Terry Jack
Tom Mary
Tom Ben
Jone Mary
Jone Ben
Tom Alice
Tom Jesse
Jone Alice
Jone Jesse

纯Scala实现

另外,附带上使用Scala的函数式编程实现。注意,下面是用的Scala原有的map,flatMap() 等方法,而不是RDD。

使用一个ArrayBuffer[String]带入输入文件,每一行为一个item.

  def main(args: Array[String]) {
    val list = new ArrayBuffer[String]();
    list+="Tom Lucy"
    list += "Tom Jack"
    list += "Jone Lucy"
    list += "Jone Jack"
    list += "Lucy Mary"
    list += "Lucy Ben"
    list += "Jack Alice"
    list += "Jack Jesse"
    list += "Terry Tom"

    val list2 = list.flatMap(line => {
      val strs: Array[String] = line.split(" ");
      if (strs.length == 2) Array(strs(0) + " parent_" + strs(1), strs(1) + " child_" + strs(0)) else Array[String]()
    } )
    //println("list2 : " + list2)
    val mapout = list2.map(line => line.split(" "))
    //println("mapout : " + mapout)
    //for(item <- mapout) print(item(0) + " " + item(1) + " ; " )
    // println()
    val groupbyout = mapout.groupBy(_(0)) //same as groupByKey, The first element of the Array
    val res = groupbyout.values.flatMap(values => {
      val childList = new ArrayBuffer[String]();
      val parentList = new ArrayBuffer[String]();
      values.foreach(
        names => {
          if (names(1).startsWith("child_")) childList += names(1)
          else if (names(1).startsWith("parent_")) parentList += names(1);
        })
      for (c <- childList; p <- parentList) yield
         Array[String](c,p)
    })

    for(arr <- res){
      println(arr(0).substring(6) + " " + arr(1).substring(7))
    }

  }

可以看出,MR和函数式编程有很多相似之处

时间: 2024-10-11 22:18:34

Spark-MapReduce编程-自连接(Scala)的相关文章

MapReduce编程-自连接

SQL自连接 SQL自身连接,可以解决很多问题.下面举的一个例子,就是使用了SQL自身连接,它解决了列与列之间的逻辑关系问题,准确的讲是列与列之间的层次关系. 对于下面的表cp(存储的孩子和父母的关系),用一个SQL,找出所有的 grandchild 和 grandparent,就是找出所有的 孙子 -> 祖父母 +-------+--------+ | child | parent | +-------+--------+ | tom | jack | | hello | jack | | t

Spark Streaming编程示例

近期也有开始研究使用spark streaming来实现流式处理.本文以流式计算word count为例,简单描述如何进行spark streaming编程. 1. 依赖的jar包 参考<分别用Eclipse和IDEA搭建Scala+Spark开发环境>一文,pom.xml中指定依赖库spark-streaming_2.10.jar. <dependency> <groupId>org.scala-lang</groupId> <artifactId&

MapReduce编程(六) 从HDFS导入数据到Elasticsearch

一.Elasticsearch for Hadoop安装 Elasticsearch for Hadoop并不像logstash.kibana一样是一个独立的软件,而是Hadoop和Elasticsearch交互所需要的jar包.所以,有直接下载和maven导入2种方式.安装之前确保JDK版本不要低于1.8,Elasticsearch版本不能低于1.0. 官网对声明是对Hadoop 1.1.x.1.2.x.2.2.x.2.4.x.2.6.x.2.7.x测试通过,支持较好,其它版本的也并不是不能用

Spark Graphx编程指南

问题导读 1.GraphX提供了几种方式从RDD或者磁盘上的顶点和边集合构造图?2.PageRank算法在图中发挥什么作用?3.三角形计数算法的作用是什么? Spark中文手册-编程指南Spark之一个快速的例子Spark之基本概念Spark之基本概念Spark之基本概念(2)Spark之基本概念(3)Spark-sql由入门到精通Spark-sql由入门到精通续spark GraphX编程指南(1) Pregel API 图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依

Apache Spark 2.2.0 中文文档 - Spark Streaming 编程指南 | ApacheCN

Spark Streaming 编程指南 概述 一个入门示例 基础概念 依赖 初始化 StreamingContext Discretized Streams (DStreams)(离散化流) Input DStreams 和 Receivers(接收器) DStreams 上的 Transformations(转换) DStreams 上的输出操作 DataFrame 和 SQL 操作 MLlib 操作 缓存 / 持久性 Checkpointing Accumulators, Broadcas

Spark RDD编程(二)

转载请注明出处:http://blog.csdn.net/gamer_gyt @高阳团 博主微博:http://weibo.com/234654758 Github:https://github.com/thinkgamer ============================================================ SparkRDD编程(一) Spark 的键值对(pair RDD)操作,Scala实现 RDD的分区函数 目前Spark中实现的分区函数包括两种 Ha

Spark API编程动手实战-01-以本地模式进行Spark API实战map、filter和co

首先以spark的本地模式测试spark API,以local的方式运行spark-shell: 先从parallelize入手吧: map操作后结果: 下面看下 filter操作: filter执行结果: 我们用最正宗的scala函数式编程的风格: 执行结果: 从结果 可以看出来,与之前那种分步奏方式结果是一样的 但采用这种方式,即是复合scala风格的写法,也是符合spark的应用程序风格的写法,在spark的编程中,大多数功能的实现都是只要一行代码即可完成.

Spark API编程动手实战-01-以本地模式进行Spark API实战map、filter和collect

首先以spark的本地模式测试spark API,以local的方式运行spark-shell: 先从parallelize入手吧: map操作后结果: 下面看下 filter操作: filter执行结果: 我们用最正宗的scala函数式编程的风格: 执行结果: 从结果 可以看出来,与之前那种分步奏方式结果是一样的 但采用这种方式,即是复合scala风格的写法,也是符合spark的应用程序风格的写法,在spark的编程中,大多数功能的实现都是只要一行代码即可完成.

MapReduce编程实例5

前提准备: 1.hadoop安装运行正常.Hadoop安装配置请参考:Ubuntu下 Hadoop 1.2.1 配置安装 2.集成开发环境正常.集成开发环境配置请参考 :Ubuntu 搭建Hadoop源码阅读环境 MapReduce编程实例: MapReduce编程实例(一),详细介绍在集成环境中运行第一个MapReduce程序 WordCount及代码分析 MapReduce编程实例(二),计算学生平均成绩 MapReduce编程实例(三),数据去重 MapReduce编程实例(四),排序 M