spark算子介绍

1.spark的算子分为转换算子和Action算子,Action算子将形成一个job,转换算子RDD转换成另一个RDD,或者将文件系统的数据转换成一个RDD

2.Spark的算子介绍地址:http://spark.apache.org/docs/2.3.0/rdd-programming-guide.html

3.Spark操作基本步骤【java版本,其他语言可以根据官网的案例进行学习】

(1)创建配置文件,将集群的运行模式设置好,给作业起一个名字,可以使用set方法其他配置设入。

SparkConf sparkConf = new SparkConf().setAppName("Demo").setMaster("local");
这里使用的是local的运行模式,起的名字是Demo

(2)创建SparkContext

JavaSparkContext javaContext = new JavaSparkContext(sparkConf);

(3)使用算子,操作数据

     JavaRDD<String> javaRdd = sparkContext.textFile("logfile.txt",1);
        javaRdd = javaRdd.cache();//这一句必须这样写,我们在数据计算很费时的时候,将数据缓存
        long line = javaRdd.count();
        System.out.println(line);

(4)关闭资源

sparkContext.close();

上面以一个求出数据行数的例子,看一下代码操作的流程。

4.Action算子和介绍和举例

(1)map算子;将数据读取使用map进行操作,使用foreach算子计算出 结果。 每一次读取partition中的一条数据进行分析

  案例:将数据乘以10,在输出,测试算子。

package kw.test.demo;

import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

/*
 * 本案例:将数据 值乘以一个数,然后将数据的值返回。
 */
public class MapApp {
    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setMaster("local").setAppName("MapTest");
        JavaSparkContext jsc= new JavaSparkContext(conf);
        List<Integer> list = Arrays.asList(1,2,3,4,5) ;
        JavaRDD<Integer> javaRdd = jsc.parallelize(list);
        JavaRDD<Integer> result = javaRdd.map(new Function<Integer,Integer>() {
            @Override
            public Integer call(Integer list) throws Exception {
                // TODO Auto-generated method stub
                return list*10;
            }
        });
        result.foreach(new  VoidFunction<Integer>() {
            @Override
            public void call(Integer result) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(result);
            }
        });
        jsc.close();
    }
}

(2)MapPartition:将一整块的数据放入然后处理,他和map的区别就是,map将一部分数据放入然后计算,MapPartition将一整块的数据一起放入计算。

如果数据量小的时候,可以是Mappartition中,如果数据量比较大的时候使用Map会比较好,因为可以防止内存溢出。

package kw.test.demo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;

public class MapPartitionApp {
    public static void main(String[] args) {
        /*
         * 创建配置文件
         * 创建出RDD
         */
        SparkConf sparkconf = new SparkConf().setMaster("local").setAppName("mapPartition");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkconf);
        /*
         * mapPartition的使用是将一个块一起放入到算子中操作。
         *
         * 假如说RDD上的数据不是太多的时候,可以使用mapPartition 来操作,如果一个RDD的数据比较多还是使用map好
         * 返回了大量数据,容易曹成内存溢出。
         */
    /*    准备数据集*/
        List <String> list= Arrays.asList("kangwang","kang","wang");
        JavaRDD<String> javaRDD = javaSparkContext.parallelize(list);

        final Map<String,Integer> sore = new HashMap<String ,Integer>();
        sore.put("kangwang", 0);
        sore.put("kang", 13);
        sore.put("wang", 454);

        JavaRDD<Integer> sRDD= javaRDD.mapPartitions(new FlatMapFunction<Iterator<String>, Integer>() {

            @Override
            public Iterator<Integer> call(Iterator<String> it) throws Exception {
                // TODO Auto-generated method stub
                List list = new ArrayList();

                while(it.hasNext())
                {
                    String name = it.next();
                    Integer so = sore.get(name);
                    list.add(so);
                }
                Iterator i =list.iterator();
                return  i;
            }
        });
        sRDD.foreach(new VoidFunction<Integer>() {

            @Override
            public void call(Integer it) throws Exception {
                // TODO Auto-generated method stub
                System.out.println("it的值是"+it);
            }
        });
    }
}

(3)MapPartitionWithIndex:

本案例:
  查看将数据的分配到具体的快上的信息。
  我们可以指定partition的个数,默认是2
  parallelize并行集合的时候,指定了并行度,也就是partition的个数是2
  具体他们的数据怎样分,我们并不知道,由spark自己分配
 如果想要知道,就可以使用此算子,将数据的值打印出来。

package kw.test.demo;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
public class MapPartionWithIndex {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("MapPartitionWithIndex").setMaster("local");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

        //准备数据
        List<String> list =new ArrayList<String>();
        list.add("Demo1");
        list.add("Demo2");
        list.add("Demo3");
        list.add("Demo4");
        list.add("Demo5");
        list.add("Demo6");
        list.add("Demo7");
        list.add("Demo8");
        list.add("Demo9");
        list.add("Demo10");
        list.add("Demo11");
        list.add("Demo12");
        //创建RDD,指定map的个数4
        JavaRDD<String> javaRDD = javaSparkContext.parallelize(list, 2);
        JavaRDD<String> javaRDD1 =javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

            @Override
            public Iterator<String> call(Integer index, Iterator<String> it2) throws Exception {
                // TODO Auto-generated method stub
                //index是partition的个数
                List<String> list = new ArrayList<String>();
                while(it2.hasNext())
                {
                    String name = it2.next();
                    String info = "partition是:"+index+"数据的name是:"+name;
                    list.add(info);
                }
                return list.iterator();
            }

        }, true);

        javaRDD1.foreach(new VoidFunction<String>() {

            @Override
            public void call(String infos) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(infos);
            }
        });
    }
}

(4)coalesce算子,是架构RDD的partition的数量缩减

将一定数量的partition压缩到更少的partition分区中去

使用的场景,很多时候在filter算子应用之后会优化一下到使用coalesce算子。

filter算子应用到RDD上面,说白了会应用到RDD对应到里面的每个partition上

数据倾斜,换句话说就是有可能的partition里面就剩下了一条数据 建议使用coalesce算子,

从前各个partition中 数据都更加的紧凑就可以减少它的 个数

package kw.test.demo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;

public class CoalesceOpter {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("coalesceDemo").setMaster("local");
        JavaSparkContext javaContext = new JavaSparkContext(sparkConf);

        List<String> list = Arrays.asList("kw1","djf","kw1","fgf",
                "djf","kw1","djf","sdsds","kw1","ssdu","djf");

        JavaRDD<String>javaRDD = javaContext.parallelize(list,6);
        JavaRDD<String> info = javaRDD.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

            @Override
            public Iterator<String> call(Integer arg0, Iterator<String> arg1) throws Exception {
                // TODO Auto-generated method stub
                List<String> list =new ArrayList<String>();
                while(arg1.hasNext())
                {
                    String name = arg1.next();
                    String info = arg0 +"^^^^^……………………………………………………………………"+ name;
                    list.add(info);
                }
                return list.iterator();
            }
        }, true);
        info.foreach(new VoidFunction<String>() {

            @Override
            public void call(String arg0) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(arg0);
            }
        });
        info.coalesce(3);
        JavaRDD<String> javaRDD1 = info.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

            @Override
            public Iterator<String> call(Integer arg0, Iterator<String> arg1) throws Exception {
                // TODO Auto-generated method stub
                List<String> list = new ArrayList<String>();
                while(arg1.hasNext())
                {
                    String name = arg1.next();
                    String info2 ="        " +name +"………………………………" +arg0;
                    list.add(info2);
                }
                return list.iterator();
            }
        }, true);
        javaRDD1.foreach(new VoidFunction<String>() {

            @Override
            public void call(String arg0) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(arg0);
            }
        });
    }
}
* 

(5)filter此案例将数据的值过滤出来。使用的是filter算子

package kw.test.demo;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;

/*
 * 此案例将数据的值过滤出来。使用的是filter算子
 */
public class APPFilter {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setMaster("local").setAppName("Filter");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        List<Integer> list = new ArrayList<Integer>();
        list.add(2);
        list.add(26);
        list.add(23);
        list.add(24);
        list.add(256);
        list.add(278);
        list.add(2543);
        list.add(23);
        list.add(26);

        JavaRDD<Integer> javaRDD = jsc.parallelize(list,2);
        //返回值  将返回true的数据返回
        JavaRDD<Integer> num= javaRDD.filter(new Function<Integer, Boolean>() {

            @Override
            public Boolean call(Integer it) throws Exception {
                // TODO Auto-generated method stub
                return it%2==0;
            }
        });
        num.foreach(new VoidFunction<Integer>() {

            @Override
            public void call(Integer arg0) throws Exception {
                // TODO Auto-generated method stub
                System.out.println(arg0);
            }
        });
    }
}

spark程序可以在本地运行,也可以在集群中运行,可以大成jar,放到真实的集群环境中运行程序。

 

原文地址:https://www.cnblogs.com/kw28188151/p/8570728.html

时间: 2024-10-10 04:24:52

spark算子介绍的相关文章

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现 测试数据 java代码 1 package com.hzf.spark.study; 2 3 import java.util.Map; 4 import java.util.Set; 5 6 import org.apache.spark.SparkConf; 7 import org.apache.spark.api.java.JavaPairRDD; 8 import org.apache.s

Spark MLlib介绍

Spark MLlib介绍 Spark之所以在机器学习方面具有得天独厚的优势,有以下几点原因: (1)机器学习算法一般都有很多个步骤迭代计算的过程,机器学习的计算需要在多次迭代后获得足够小的误差或者足够收敛才会停止,迭代时如果使用Hadoop的MapReduce计算框架,每次计算都要读/写磁盘以及任务的启动等工作,这回导致非常大的I/O和CPU消耗.而Spark基于内存的计算模型天生就擅长迭代计算,多个步骤计算直接在内存中完成,只有在必要时才会操作磁盘和网络,所以说Spark正是机器学习的理想的

Spark算子---实战应用

Spark算子实战应用 数据集 :http://grouplens.org/datasets/movielens/ MovieLens 1M Datase 相关数据文件 : users.dat ---UserID::Gender::Age::Occupation::Zip-code movies.dat --- MovieID::Title::Genres ratings.dat ---UserID::MovieID::Rating::Timestamp SogouQ.mini 完成以下业务需求

Spark算子:RDD基本转换操作(1)–map、flatMap、distinct

Spark算子:RDD基本转换操作(1)–map.flatMap.distinct 关键字:Spark算子.Spark RDD基本转换.map.flatMap.distinct map 将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素. 输入分区与输出分区一对一,即:有多少个输入分区,就有多少个输出分区. hadoop fs -cat /tmp/lxw1234/1.txt hello world hello spark hello hive //读取HDFS文件到RDD sca

Spark的介绍:前世今生

spark的 前世今生 标签(空格分隔): Spark的部分 一:大数据的spark概述 二:大数据的spark学习 一: 大数据的概述 1.1 Spark是什么? Spark,是一种通用的大数据计算框架,正如传统大数据技术Hadoop的MapReduce.Hive引擎,以及Storm流式实时计算引擎等. Spark包含了大数据领域常见的各种计算框架:比如Spark Core用于离线计算,Spark SQL用于交互式查询,Spark Streaming用于实时流式计算,Spark MLlib用于

Apache Spark开发介绍

Databricks的工程师,Apache Spark Committer介绍了Databricks和Spark的历史,包括了Spark 1.4中的重要特性和进展,涵盖了Spark早期版本的主要功能和使用方法,讲了大数据领域近些年的发展,也介绍了Spark从这些年其它理论或者技术中吸取的灵感,当然,更多介绍了Spark的基本组件的使用方法,可以看作非常好的Spark教学教程. 篇幅过长 点击下载资源https://www.slidestalk.com/s/IntroductiontoSparkd

Spark概念介绍

Spark概念介绍:spark应用程序在集群中以一系列独立的线程运行,通过驱动器程序(Driver Program)发起一系列的并行操作.SparkContext对象作为中间的连接对象,通过SparkContext对象连接集群.SparkContext对象可以连接集群管理器(YARN,Mesos.standalone等) 目前Spark集群支持以下集群管理模式:(1)本地模式(2)Mesos模式: 一种通用的集群管理模式,可以运行Hadoop Mapreduce和应用服务 (3)YARN模式:H

spark参数介绍

spark参数介绍 https://endymecy.gitbooks.io/spark-config-and-tuning/content/config.html 原文地址:https://www.cnblogs.com/pengwang52/p/12101910.html

Spark算子选择策略

摘要  1.使用reduceByKey/aggregateByKey替代groupByKey 2.使用mapPartitions替代普通map 3.使用foreachPartitions替代foreach 4.使用filter之后进行coalesce操作 5.使用repartitionAndSortWithinPartitions替代repartition与sort类操作 6.使用broadcast使各task共享同一Executor的集合替代算子函数中各task传送一份集合 7.使用相同分区方