spark使用02

1.rdd的初始化

  1.1 读取文件来初始化rdd(通过sparkContext的textFile方法)

    1.1.1 读取本地文件

     SparkConf conf = new SparkConf().setAppName("LocalWordCount").setMaster("local");// 指定运行在本地
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        // 返回每一行作为一个元素的rdd
        JavaRDD<String> lines = sparkContext
                .textFile("C://Users//yanglin//Desktop//bb.txt", 5); // 返回为JavaRDD[String]

    1.1.2 读取hdfs文件

      //返回每一行作为一个元素的rdd
        JavaRDD<String> lines=sparkContext.textFile("hdfs://hadoop-senior.ibeifeng.com:8020/user/yanglin/spark/wc.input", 5);//返回为JavaRDD[String]

  1.2 并行化集合来初始化rdd(通过sparkContext.)

    JavaPairRDD<Integer, String> students = context.parallelizePairs(
                Arrays.asList(new Tuple2<Integer, String>(1, "zhangsan"),
                        new Tuple2<Integer, String>(2, "lisi"),
                        new Tuple2<Integer, String>(3, "wangwu"),
                        new Tuple2<Integer, String>(4, "zhaoliu")),
                1)

2.rdd的基本操作(分为transformation和action)

  2.1 Spark支持两种RDD操作:transformation和action

    2.1.1 区别

      transformation操作会针对已有的RDD创建一个新的RDD;

      action则主要是对RDD进行最后的操作,比如遍历、reduce、保存到文件等,并可以返回结果给Driver程序。
    2.1.2 特性

      transformation的特点就是lazy特性:transformation是不会触发spark程序的执行的,它们只是记录了对RDD所做的操作,但是不会自发的执行。只有当transformation之后,接着执行了一            个action操作,那么所有的transformation才会执行。

      action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行。

  2.2 常用transformation操作

    2.2.1 flatMap   将有嵌套类型的集合转换为没有嵌套的一个大集合

    // 返回每一个单词为一个元素的rdd,将每行数据按空格分割后合并为一个大的集合
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = 2192898403909387476L;
            public Iterable<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" "));
            }
        });

    2.2.2  map (在java中分为map和mapToPair,在scala中只有map),将一个rdd转换为另一个rdd

    // 返回每一个单词的映射
        JavaPairRDD<String, Integer> wordPairs = words
                .mapToPair(new PairFunction<String, String, Integer>() {
                    private static final long serialVersionUID = -4729349390159625475L;
                    public Tuple2<String, Integer> call(String word) throws Exception {
                        return new Tuple2<String, Integer>(word, 1);
                    }
                });

    2.2.3 reduceByKey (根据key分组和进行reduce操作)

    // 单词数的叠加
        JavaPairRDD<String, Integer> wordCountPairs = wordPairs
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    private static final long serialVersionUID = -8636811420038190538L;
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }
                });

    2.2.4 filter (过滤符合要求的数据,生成一个新的rdd)

        context.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
                .filter(new Function<Integer, Boolean>() {
                    private static final long serialVersionUID = 1L;
                    public Boolean call(Integer val) throws Exception {
                        return val % 2 == 0;//获取偶数
                    }
                })

    2.2.5 reduce (从左到右依次执行reduce操作)

      Integer evenSum = context
                .parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))          //过滤获取偶数
                .filter(new Function<Integer, Boolean>() {
                    private static final long serialVersionUID = 1L;
                    public Boolean call(Integer val) throws Exception {
                        return val % 2 == 0;
                    }          //计算所有偶数的和 2+4=6 6+6=12 12+8=20 20+10=30
                }).reduce(new Function2<Integer, Integer, Integer>() {
                    private static final long serialVersionUID = 1L;
                    public Integer call(Integer v1, Integer v2) throws Exception {
                        return v1 + v2;
                    }
                });

    2.2.6 gropuByKey(根据key进行分组,每个key对应一个Iterable<value>)

    2.2.7 sortByKey(false)(对每个key对应的value进行排序操作。)
        默认为true:表示升序;设置为false,可以按降序排列

    2.2.8 join (对两个包含<key,value>对的RDD进行join操作,每个key join上的pair,都会传入自定义函数进行处理。返回的rdd的value为两个rdd的返回元组对)

      //students对应的数据为(id,name),scores对应的数据为(id,score),join后为(id,(name,score))      /**         *   join的结果为:          *  id:4 name:zhaoliu scores:21 ===============================         *  id:1 name:zhangsan scores:69 ===============================          *  id:1 name:zhangsan scores:68 ===============================          *  id:3 name:wangwu scores:48 ===============================          *  id:3 name:wangwu scores:52 ===============================          *  id:2 name:lisi scores:35 ===============================          *  id:2 name:lisi scores:97 ===============================         */      JavaPairRDD<Integer, String> students = context.parallelizePairs(                Arrays.asList(new Tuple2<Integer, String>(1, "zhangsan"),                        new Tuple2<Integer, String>(2, "lisi"),                        new Tuple2<Integer, String>(3, "wangwu"),                        new Tuple2<Integer, String>(4, "zhaoliu")),                1);              JavaPairRDD<Integer, Integer> scores = context.parallelizePairs(Arrays.asList(                new Tuple2<Integer, Integer>(1, 69), new Tuple2<Integer, Integer>(1, 68),                new Tuple2<Integer, Integer>(2, 35), new Tuple2<Integer, Integer>(2, 97),                new Tuple2<Integer, Integer>(3, 48), new Tuple2<Integer, Integer>(3, 52),                new Tuple2<Integer, Integer>(4, 21)));

      JavaPairRDD<Integer, Tuple2<String, Integer>> studentScorse = students.join(scores);

    2.2.9 cogroup (同join,但是是每个key对应的Iterable<value>都会传入自定义函数进行处理)   

      //会对有相同列的元素进行合并到一个Iterable中      /**       * cogroup的结果:            * id:4 name:[zhaoliu] scores:[21]            * id:1 name:[zhangsan] scores:[69, 68]            * id:3 name:[wangwu] scores:[48, 52]            * id:2 name:[lisi] scores:[35, 97]         */      JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScorse = students.cogroup(scores);

  2.3 action常用操作

    2.3.1 collect(将RDD中所有元素获取到本地客户端)

    2.3.2 count (获取RDD元素总数)

    2.3.3 take(n) (获取RDD中前n个元素)

    2.3.4 saveAsTextFile(path) (将RDD元素保存到文件中,对每个元素调用toString方法)

    2.3.5 countByKey (对每个key对应的值进行count计数)

    2.3.6 foreach (遍历RDD中的每个元素。)

时间: 2024-08-06 18:36:30

spark使用02的相关文章

Spark API编程动手实战-08-基于IDEA使用Spark API开发Spark程序-02

接下来进行打包,使用Project  Structure的Artifacts: 使用From modules with dependencies: 选择Main Class: 点击“OK”: 把名称改为SparkDemoJar: 因为每台机器上都安装了Scala和Spark,所以可以把Scala和Spark相关的jar文件都删除掉: 接下来进行Build: 选择“Build Artifacts”: 剩下的操作,就是jar包上传到服务器上,然后执行spark-submit命令,我在之前的文章已经详

spark学习02天-scala读取文件,词频统计

1.在本地安装jdk环境和scala环境 2.读取本地文件: scala> import scala.io.Source import scala.io.Source scala> val lines=Source.fromFile("F:/ziyuan_badou/file.txt").getLines().toList lines: List[String] = List("With the development of civilization, it is

Spark 1.60的executor schedule

第一次看源码还是Spark 1.02.这次看新源码发现调度方式有了一些新的特征,在这里随便写一下. 不变的是,master还是接收Appclient和worker的消息,并且在接收RegisterApplication等消息后会执行一遍schedule().schedule()依旧会先找到空闲的worker用以执行waitingDrivers.但是调度Executor的方式有了一点变化. 1 private def startExecutorsOnWorkers(): Unit = { 2 //

《Apache Spark源码剖析》

Spark Contributor,Databricks工程师连城,华为大数据平台开发部部长陈亮,网易杭州研究院副院长汪源,TalkingData首席数据科学家张夏天联袂力荐1.本书全面.系统地介绍了Spark源码,深入浅出,细致入微2.提供给读者一系列分析源码的实用技巧,并给出一个合理的阅读顺序3.始终抓住资源分配.消息传递.容错处理等基本问题,抽丝拨茧4.一步步寻找答案,所有问题迎刃而解,使读者知其然更知其所以然 内容简介 书籍计算机书籍 <Apache Spark源码剖析>以Spark

Spark读取Hbase中的数据_云帆大数据分享

Spark读取Hbase中的数据 大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1).调用parallelize函数直接从集合中获取数据,并存入RDD中:Java版本如下: 1 JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3)); Scala版本如下: 1 val myRDD= sc.parallelize(List(1,2,3)) 这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初

Spark入门系列视频教程

 视频目录: Spark入门| 01 Spark概念架构 Spark入门| 02 Spark集群搭建 Spark入门| 03 Spark Shell算子操作 Spark入门| 04 Spark单词计数Shell操作 Spark入门| 05 IDEA中编写Spark单词计数程序 Spark入门| 06 SparkSQL单词计数程序编写 视频截图:  关注下面公众号进行观看: 原文地址:https://www.cnblogs.com/dreamboy/p/11609979.html

百度刚放假啊数据库风口浪尖萨拉疯了

http://www.ebay.com/cln/l_x5585/2015.02.11/176746639012 http://www.ebay.com/cln/jiacha_boryk/2015.02.11/176837188016 http://www.ebay.com/cln/gbnlin0/2015.02.11/176837189016 http://www.ebay.com/cln/j_j2841/2015.02.11/177066749015 http://www.ebay.com/c

百度房间爱师傅卡卡是快乐疯了;爱死

http://www.ebay.com/cln/shx9479/-/177007606013/2015.02.11 http://www.ebay.com/cln/genqi12/-/176846034010/2015.02.11 http://www.ebay.com/cln/seyyon2/-/176906811016/2015.02.11 http://www.ebay.com/cln/wcn5971/-/176846032010/2015.02.11 http://www.ebay.co

百度和房价是否健康教案上开发

http://www.ebay.com/cln/l.kuan2/-/167247714018/2015.02.10 http://www.ebay.com/cln/setlia-3616/-/167086016019/2015.02.10 http://www.ebay.com/cln/pen-y77/-/167086017019/2015.02.10 http://www.ebay.com/cln/yua-me2/-/167399441016/2015.02.10 http://www.eba