Spark Transformation 算子

Java版

package com.huanfion.Spark;

import com.sun.tools.internal.ws.processor.model.java.JavaParameter;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

public class TransformationJava {
    public static void main(String[] args) {
        //map();
//        filter();
        //flatMap();
//        groupByKey();
//        reduceByKey();
//        sortByKey();
//        join();
        cogroup();
    }
    public static void cogroup()
    {
        List stulist=Arrays.asList(
                new Tuple2<>(1,"liuda"),
                new Tuple2<>(2,"lier"),
                new Tuple2<>(3,"zhangsan"),
                new Tuple2<>(4,"gousi"),
                new Tuple2<>(5,"lily"),
                new Tuple2<>(6,"lucy"));
        List scorelist=Arrays.asList(
                new Tuple2<>(1,88),
                new Tuple2<>(2,87),
                new Tuple2<>(2,88),
                new Tuple2<>(2,97),
                new Tuple2<>(3,90),
                new Tuple2<>(3,50),
                new Tuple2<>(4,100),
                new Tuple2<>(5,58),
                new Tuple2<>(6,65));
        JavaSparkContext sc=getsc();
        JavaPairRDD sturdd=sc.parallelizePairs(stulist);
        JavaPairRDD scorerdd=sc.parallelizePairs(scorelist);

        JavaPairRDD<Integer,Tuple2<Iterable,Iterable>> cogroupvalue=sturdd.cogroup(scorerdd);

        cogroupvalue.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable, Iterable>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<Iterable, Iterable>> integerTuple2Tuple2) throws Exception {
                System.out.println(integerTuple2Tuple2._1);
                System.out.println(integerTuple2Tuple2._2._1+":"+integerTuple2Tuple2._2._2);
            }
        });
    }
    public static void join()
    {
        List stulist=Arrays.asList(
                new Tuple2<>(1,"liuda"),
                new Tuple2<>(2,"lier"),
                new Tuple2<>(3,"zhangsan"),
                new Tuple2<>(4,"gousi"),
                new Tuple2<>(5,"lily"),
                new Tuple2<>(6,"lucy"));
        List scorelist=Arrays.asList(
                new Tuple2<>(1,88),
                new Tuple2<>(2,87),
                new Tuple2<>(3,90),
                new Tuple2<>(4,100),
                new Tuple2<>(5,58),
                new Tuple2<>(6,65));
        JavaSparkContext sc=getsc();
        JavaPairRDD sturdd=sc.parallelizePairs(stulist);
        JavaPairRDD scorerdd=sc.parallelizePairs(scorelist);

        JavaPairRDD<Integer,Tuple2<String,Integer>> joinvalue=sturdd.join(scorerdd);

        joinvalue.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
            @Override
            public void call(Tuple2<Integer, Tuple2<String, Integer>> integerTuple2Tuple2) throws Exception {
                System.out.println(integerTuple2Tuple2._1);
                System.out.println(integerTuple2Tuple2._2._1+":"+integerTuple2Tuple2._2._2);
            }
        });
    }
    public static void sortByKey(){
        List list=Arrays.asList(
                new Tuple2<>(91,"liuda"),
                new Tuple2<>(78,"lier"),
                new Tuple2<>(99,"zhangsan"),
                new Tuple2<>(76,"gousi"),
                new Tuple2<>(90,"lily"),
                new Tuple2<>(89,"lucy"));
        JavaPairRDD rdd=getsc().parallelizePairs(list);
        JavaPairRDD<Integer,String> sortvalue=rdd.sortByKey(false);
        sortvalue.foreach(x->System.out.println(x._1+"--"+x._2));
    }
    public static void reduceByKey(){
        List list=Arrays.asList(
                new Tuple2<>("class_1",91),
                new Tuple2<>("class_2",78),
                new Tuple2<>("class_1",99),
                new Tuple2<>("class_2",76),
                new Tuple2<>("class_2",90),
                new Tuple2<>("class_1",86));
        JavaPairRDD rdd=getsc().parallelizePairs(list);
        JavaPairRDD<String,Iterable> reducevalues=rdd.reduceByKey(new Function2<Integer, Integer,Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        reducevalues.foreach(x->System.out.println(x._1+"--"+x._2));
    }
    public static void groupByKey(){
        List list=Arrays.asList(
                new Tuple2<>("class_1",90),
                new Tuple2<>("class_2",78),
                new Tuple2<>("class_1",99),
                new Tuple2<>("class_2",76),
                new Tuple2<>("class_2",90),
                new Tuple2<>("class_1",86));
        JavaPairRDD rdd=getsc().parallelizePairs(list);

        JavaPairRDD<String,Iterable> groupvalue=rdd.groupByKey();
        groupvalue.foreach(x->System.out.println(x._1+"--"+x._2));
    }
    public static void flatMap(){
        List list=Arrays.asList("Hadoop Hive","Hadoop Hbase");
        JavaRDD rdd=getsc().parallelize(list);
        JavaRDD flatMapValue=rdd.flatMap(new FlatMapFunction<String,String>() {
            @Override
            public Iterator call(String value) throws Exception {
                return Arrays.asList(value.split(" ")).iterator();
            }
        });
        flatMapValue.foreach(x->System.out.println(x));
    }
    public static void map(){
        JavaSparkContext sc=getsc();

        List list= Arrays.asList(1,2,3,4);

        JavaRDD rdd=sc.parallelize(list);

        JavaRDD count=rdd.map(new Function<Integer,Integer>() {
            @Override
            public Integer call(Integer value) throws Exception {
                return value * 10;
            }
        });
        count.foreach(x->System.out.println(x));
    }

    public static void filter(){
        JavaSparkContext sc=getsc();

        List list= Arrays.asList(1,2,3,4,5,6,7,8,9,10);

        JavaRDD rdd=sc.parallelize(list);

        JavaRDD filterValue=rdd.filter(x->(Integer) x%2==0);

        filterValue.foreach(x->System.out.println(x));
    }

    public static JavaSparkContext getsc()
    {
        SparkConf sparkconf=new SparkConf().setAppName("Transformation").setMaster("local");

        JavaSparkContext sc=new JavaSparkContext(sparkconf);

        return sc;
    }
}

Scala版本

package com.huanfion.Spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object TransformationScala {
  def getsc():SparkContext={
    val sparkconf=new SparkConf().setAppName("Transformation").setMaster("local")

    val sc = new SparkContext(sparkconf)

    sc
  }
  def main(args: Array[String]): Unit = {
    //filter()
    //flatMap()
//    groupByKey()
//    reduceByKey()
//    sortByKey()
    join()
//    cogroup()
  }

  def filter():Unit={
   val sc=getsc()

    val list= Array(1,2,3,4,5,6,7,8,9,10)

    val rdd=sc.parallelize(list)

    val count=rdd.filter(x=>x%2==0)

    count.foreach(x=>System.out.println(x))
  }
  def map():Unit={
    val sparkconf=new SparkConf().setAppName("Transformation").setMaster("local")

    val sc = new SparkContext(sparkconf)

    val list= Array(1,2,3,4,5)

    val rdd=sc.parallelize(list)

    val count=rdd.map(x=>x*10)

    count.foreach(x=>System.out.println(x))
  }

  def flatMap():Unit={
    val list=Array("Hadoop Hive", "Hadoop Hbase")

    val rdd=getsc().parallelize(list)

    val flatmapvalue=rdd.flatMap(x=>x.split(" "))

    flatmapvalue.foreach(x=>System.out.println(x))
  }

  def groupByKey()= {
    val list=Array(
        Tuple2("class_1",90),
       Tuple2("class_2",78),
       Tuple2("class_1",99),
       Tuple2("class_2",76),
       Tuple2("class_2",90),
       Tuple2("class_1",86))
    val rdd=getsc().parallelize(list)

    val groupvalue=rdd.groupByKey()

    groupvalue.foreach(x=>{
      System.out.println(x._1)
      x._2.foreach(y=>System.out.println(y))
    })
  }

  def reduceByKey()={
    val list=Array(
      Tuple2("class_1",90),
      Tuple2("class_2",78),
      Tuple2("class_1",99),
      Tuple2("class_2",76),
      Tuple2("class_2",90),
      Tuple2("class_1",86))
    val rdd=getsc().parallelize(list)

    val reducevalue=rdd.reduceByKey(_+_)

    reducevalue.foreach(x=>System.out.println(x._1+"--"+x._2))
  }

  def sortByKey()={
    val list=Array(
      Tuple2("liuda",90),
      Tuple2("lier",78),
      Tuple2("zhangsan",99),
      Tuple2("gousi",76),
      Tuple2("lily",90),
      Tuple2("lucy",86))
    val rdd=getsc().parallelize(list)

    val sortvalue=rdd.sortBy(x=>x._2,false)

    sortvalue.foreach(x=>System.out.println(x._1+":"+x._2))
  }
  def join()={
    val stulist=Array(
       Tuple2(1,"liuda"),
       Tuple2(2,"lier"),
       Tuple2(3,"zhangsan"),
       Tuple2(4,"gousi"),
       Tuple2(5,"lily"),
       Tuple2(6,"lucy"));
    val scorelist=Array(
       Tuple2(1,88),
       Tuple2(2,87),
       Tuple2(3,90),
       Tuple2(4,100),
       Tuple2(5,58),
       Tuple2(6,65));
    val sc=getsc();
    val sturdd=sc.parallelize(stulist)

    val scorerdd=sc.parallelize(scorelist)

    val joinvalue=sturdd.join(scorerdd)

    joinvalue.foreach(x=>System.out.println(x._1+"->"+x._2))
  }

  def cogroup()={
    val stulist=Array(
      Tuple2(1,"liuda"),
      Tuple2(2,"lier"),
      Tuple2(3,"zhangsan"),
      Tuple2(4,"gousi"),
      Tuple2(5,"lily"),
      Tuple2(6,"lucy"));
    val scorelist=Array(
      Tuple2(1,88),
      Tuple2(2,87),
      Tuple2(2,84),
      Tuple2(2,86),
      Tuple2(3,90),
      Tuple2(3,65),
      Tuple2(4,100),
      Tuple2(5,58),
      Tuple2(6,65));
    val sc=getsc();
    val sturdd=sc.parallelize(stulist)

    val scorerdd=sc.parallelize(scorelist)

    val joinvalue=sturdd.cogroup(scorerdd)

    joinvalue.foreach(x=>System.out.println(x._1+"->"+x._2._1.toList+":"+x._2._2.toList))
  }
}

原文地址:https://www.cnblogs.com/huanfion/p/10382738.html

时间: 2024-10-14 08:18:19

Spark Transformation 算子的相关文章

Spark transformation算子之coalesce&amp;&amp;repartition

coalesce def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer:Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T] 一.功能介绍 coalesce算子最基本的功能就是返回一个numPartitions个partition的RDD. 二.使用及注意事项 这个算子的结果默认是窄

【Spark】RDD操作详解2——值型Transformation算子

处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输出分区关系分为以下几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输入分区与输出分区多对多型 4)输出分区为输入分区子集型 5)还有一种特殊的输入与输出分区一对一的算子类型:Cache型. Cache算子对RDD分区进行缓存 输入分区与输出分区一对一型 (1)map 将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素.源码中的map算子相当于初

【Spark】RDD操作具体解释2——值型Transformation算子

处理数据类型为Value型的Transformation算子能够依据RDD变换算子的输入分区与输出分区关系分为下面几种类型: 1)输入分区与输出分区一对一型 2)输入分区与输出分区多对一型 3)输入分区与输出分区多对多型 4)输出分区为输入分区子集型 5)另一种特殊的输入与输出分区一对一的算子类型:Cache型. Cache算子对RDD分区进行缓存 输入分区与输出分区一对一型 (1)map 将原来RDD的每一个数据项通过map中的用户自己定义函数f映射转变为一个新的元素. 源代码中的map算子相

Spark RDD算子实战

[TOC] Spark算子概述 RDD:弹性分布式数据集,是一种特殊集合.支持多种来源.有容错机制.可以被缓存.支持并行操作,一个RDD代表多个分区里的数据集. RDD有两种操作算子: Transformation(转换):Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作 Action(执行):触发Spark作业的运行,真正触发转换算子的计算 需要说明的是,下面写的scala代码,其实都是可以简写的,但是为了方便理解,我都

Java8函数式编程(二):类比Spark RDD算子的Stream流操作

1 Stream流 对集合进行迭代时,可调用其iterator方法,返回一个iterator对象,之后便可以通过该iterator对象遍历集合中的元素,这被称为外部迭代(for循环本身正是封装了其的语法糖),其示意图如下: 除此之外,还有内部迭代方法,这正是这里要说明的集合的stream()方法返回的Stream对象的一系列操作,比如,要统计一个数字列表的偶数元素个数,当使用Stream对象的操作时,如下: List<Integer> list = new ArrayList<Integ

Spark学习之路 (六)Spark Transformation和Action[转]

Transformation算子 基本的初始化 (1)java static SparkConf conf = null; static JavaSparkContext sc = null; static { conf = new SparkConf(); conf.setMaster("local").setAppName("TestTransformation"); sc = new JavaSparkContext(conf); } (2)scala pri

【Spark】RDD操作详解3——键值型Transformation算子

Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理. 方框代表RDD分区.a=>a+2代表只对( V1, 1)数据中的1进行加2操作,返回结果为3. 源码: /** * Pass each value in the key-value pair RDD through a m

Spark常用的transformation算子

1.map 和 mapPartitions map的输入变换函数应用于RDD中所有元素,而mapPartitions应用于所有分区.区别于mapPartitions主要在于调用粒度不同.mapPartition可以倒过来理解,先partition,再把每个partition进行map函数, 适用场景: 如果在映射的过程中需要频繁创建额外的对象,使用mapPartitions要比map高效的多. val numbers: RDD[Int] = sc.parallelize(seqs,3) //ma

RDD之五:Key-Value型Transformation算子

Transformation处理的数据为Key-Value形式的算子大致可以分为:输入分区与输出分区一对一.聚集.连接操作. 输入分区与输出分区一对一 mapValues mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处理. 方框代表RDD分区.a=>a+2代表只对( V1, 1)数据中的1进行加2操作,返回结果为3. 源码: [plain] view plain copy /** * Pass each value in the key-va