Spark中的各种action算子操作(java版)

在我看来,Spark编程中的action算子的作用就像一个触发器,用来触发之前的transformation算子。transformation操作具有懒加载的特性,你定义完操作之后并不会立即加载,只有当某个action的算子执行之后,前面所有的transformation算子才会全部执行。常用的action算子如下代码所列:(java版)

package cn.spark.study.core;

import java.util.Arrays;

import java.util.List;

import java.util.Map;

import org.apache.spark.SparkConf;

import org.apache.spark.api.java.JavaPairRDD;

import org.apache.spark.api.java.JavaRDD;

import org.apache.spark.api.java.JavaSparkContext;

import org.apache.spark.api.java.function.Function;

import org.apache.spark.api.java.function.Function2;

import scala.Tuple2;

/**

* action操作实战

* @author dd

*

*/

public class ActionOperation {

public static void main(String[] args) {

//reduceTest();

//collectTest();

//countTest();

//takeTest();

countByKeyTest();

}

/**
 * reduce算子
 * 案例:求累加和
 */
private static void reduceTest(){
    SparkConf conf = new SparkConf()
                    .setAppName("reduce")
                    .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

    JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);

    //使用reduce操作对集合中的数字进行累加
    int sum = numbersRDD.reduce(new Function2<Integer, Integer, Integer>() {

        @Override
        public Integer call(Integer arg0, Integer arg1) throws Exception {
            return arg0+arg1;
        }
    });

    System.out.println(sum);

    sc.close();
}

/**
 * collect算子
 * 可以将集群上的数据拉取到本地进行遍历(不推荐使用)
 */
private static void collectTest(){
    SparkConf conf = new SparkConf()
    .setAppName("collect")
    .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

    JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);

    JavaRDD<Integer> doubleNumbers = numbersRDD.map(new Function<Integer, Integer>() {

        @Override
        public Integer call(Integer arg0) throws Exception {
            // TODO Auto-generated method stub
            return arg0*2;
        }
    });

    //foreach的action操作是在远程集群上遍历rdd中的元素,而collect操作是将在分布式集群上的rdd
    //数据拉取到本地,这种方式一般不建议使用,因为如果rdd中的数据量较大的话,比如超过一万条,那么性能会
    //比较差,因为要从远程走大量的网络传输,将数据获取到本地,有时还可能发生oom异常,内存溢出
    //所以还是推荐使用foreach操作来对最终的rdd进行处理
    List<Integer> doubleNumList = doubleNumbers.collect();
    for(Integer num : doubleNumList){
        System.out.println(num);
    }
    sc.close();
}

/**
 * count算子
 * 可以统计rdd中的元素个数
 */
private static void countTest(){
    SparkConf conf = new SparkConf()
    .setAppName("count")
    .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

    JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);

    //对rdd使用count操作统计rdd中元素的个数
    long count = numbersRDD.count();
    System.out.println(count);

    sc.close();
}

/**
 * take算子
 * 将远程rdd的前n个数据拉取到本地
 */
private static void takeTest(){
    SparkConf conf = new SparkConf()
    .setAppName("take")
    .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

    JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);

    //take操作与collect操作类似,也是从远程集群上获取rdd数据,但是,collect操作获取的是rdd的
    //所有数据,take获取的只是前n个数据
    List<Integer> top3number = numbersRDD.take(3);
    for(Integer num : top3number){
        System.out.println(num);
    }
    sc.close();
}

/**
 * saveAsTextFile算子
 *
 */
private static void saveAsTExtFileTest(){
    SparkConf conf = new SparkConf()
    .setAppName("saveAsTextFile");

    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Integer> numberList = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

    JavaRDD<Integer> numbersRDD = sc.parallelize(numberList);

    JavaRDD<Integer> doubleNumbers = numbersRDD.map(new Function<Integer, Integer>() {

        @Override
        public Integer call(Integer arg0) throws Exception {
            // TODO Auto-generated method stub
            return arg0*2;
        }
    });

    //saveAsTextFile算子可以直接将rdd中的数据保存在hdfs中
    //但是我们在这里只能指定保存的文件夹也就是目录,那么实际上,会保存为目录中的
    //  /double_number.txt/part-00000文件
    doubleNumbers.saveAsTextFile("hdfs://spark1:9000/double_number.txt");

    sc.close();
}

/**
 * countByKey算子
 */

private static void countByKeyTest(){
    SparkConf conf = new SparkConf()
    .setAppName("take")
    .setMaster("local");
    JavaSparkContext sc = new JavaSparkContext(conf);

    List<Tuple2<String, String>> studentsList = Arrays.asList(
            new Tuple2<String, String>("class1","leo"),
            new Tuple2<String, String>("class2","jack"),
            new Tuple2<String, String>("class1","marry"),
            new Tuple2<String, String>("class2","tom"),
            new Tuple2<String, String>("class2","david"));

    JavaPairRDD<String, String> studentsRDD = sc.parallelizePairs(studentsList);

    //countByKey算子可以统计每个key对应元素的个数
    //countByKey返回的类型直接就是Map<String,Object>

    Map<String, Object> studentsCounts = studentsRDD.countByKey();

    for(Map.Entry<String, Object> studentsCount : studentsCounts.entrySet()){
        System.out.println(studentsCount.getKey()+" : "+studentsCount.getValue());
    }
    sc.close();
}

}

时间: 2025-01-02 18:13:18

Spark中的各种action算子操作(java版)的相关文章

python 中调用windows系统api操作剪贴版

# -*- coding: utf-8 -*- ''' Created on 2013-11-26 @author: Chengshaoling ''' import win32clipboard as w32 import win32con class OperateClipboard(object): def __init__(self): # print "OperateClipboard" pass def getText(self): w32.OpenClipboard()

Spark的transformation和action算子简介

transformation算子 map(func) 返回一个新的分布式数据集,由每个原元素经过func函数处理后的新元素组成 filter(func) 返回一个新的数据集,由经过func函数处理后返回值为true的原元素组成 flatMap(func) 类似于map,但是每一个输入元素,会被映射为0个或多个输出元素,(因此,func函数的返回值是一个seq,而不是单一元素) mapPartitions(func) 类似于map,对RDD的每个分区起作用,在类型为T的RDD上运行时,func的函

spark 中的RDD编程 -以下基于Java api

1.RDD介绍:     RDD,弹性分布式数据集,即分布式的元素集合.在spark中,对所有数据的操作不外乎是创建RDD.转化已有的RDD以及调用RDD操作进行求值.在这一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行化. Spark中的RDD就是一个不可变的分布式对象集合.每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上.RDD可以包含Python,Java,Scala中任意类型的对象,甚至可以包含用户自定义的对象. 用户可以使用两种方法创建RDD:读取一个

Spark中的键值对操作-scala

1.PairRDD介绍     Spark为包含键值对类型的RDD提供了一些专有的操作.这些RDD被称为PairRDD.PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口.例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD. 2.创建Pair RDD 程序示例:对一个英语单词组成的文本行,提取其中的第一个单词作为key,将整个句子作为value,建立 PairR

Spark的transformation 和 action的操作学习笔记

一.spark的transformation 和 action区别 Spark有一些基本的transformation 和 action的操作,其中transformation形成各类型的RDD,action不形成RDD,而是对RDD进行累加.合并.保存操作. 二.transformation 有哪些 transformation有map.filter.flatMap(与map不一样).Sample.groupByKey.ReduceByKey.Union.Join.cogroup.crossP

spark中map和mapPartitions算子的区别

区别: 1.map是对rdd中每一个元素进行操作 2.mapPartitions是对rdd中每个partition的迭代器进行操作 mapPartitions优点: 1.若是普通map,比如一个partition中有一万条数据,那么function要执行一万次,而使用mapPartions,一个task只执行一次function,function一次接收所有数据,只执行一次,性能高 2.若在map中需要频繁创建额外对象(如将rdd的数据通过jdbc写入数据库,map需要为每条数据创建一个链接,m

spark中transformation操作的各种算子(java版)

package cn.spark.study.core; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark

操作系统中的几种调度算法(JAVA版)

1 import java.text.DecimalFormat; 2 import java.util.Arrays; 3 import java.util.Scanner; 4 5 /* 6 * 作者:Chensx1020 7 * 时间:2016-12-11 8 * 功能:CPU调度算法 9 * 1)先到先服务调度算法(FCFS) 10 * 2)最短作业优先调度算法,非抢占式(SJF) 11 * 3)优先级调度算法(PSA) 12 * 4)轮转法调度算法(RR) 13 * 5)最高响应比调度

Unity Game窗口中还原Scene窗口摄像机操作 强化版

之前写的那个版本看来真的是不行啊.最近研究了一下官方第一人称脚本,人家的平滑过渡真的是没得说.借鉴了一下,写出来了一个新的比较完美的控制. 之前我们的操作是通过鼠标输入的开始坐标和转动坐标.其实官方有一个函数~ 1 float yRot = Input.GetAxis("Mouse X"); 2 float xRot = Input.GetAxis("Mouse Y"); 这就分别能获取到鼠标的X轴操作和Y轴操作了. 那为什么用yRot获取X轴,xRot获取Y轴呢?