初识Spark程序

执行第一个spark程序

普通模式提交任务:

bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hdp-node-01:7077 \
--executor-memory 1G --total-executor-cores 2 examples/jars/spark-examples_2.11-2.0.2.jar 10

该算法是利用蒙特·卡罗算法求圆周率PI,通过计算机模拟大量的随机数,最终会计算出比较精确的π。

高可用模式提交任务

在高可用模式下,因为涉及到多个Master,所以对于应用程序的提交就有了一点变化,因为应用程序需要知道当前的Master的IP地址和端口。这种HA方案处理这种情况很简单,只需要在SparkContext指向一个Master列表就可以了,

如spark://host1:port1,host2:port2,host3:port3,应用程序会轮询列表,找到活着的Master。

bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://hdp-node-01:7077,hdp-node-02:7077,hdp-node-03:7077 \
--executor-memory 1G --total-executor-cores 2 examples/jars/spark-examples_2.11-2.0.2.jar 10

启动Spark-Shell

spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。

运行spark-shell --master local[N] 读取本地文件

单机模式:通过本地N个线程跑任务,只运行一个SparkSubmit进程。

(1)需求

读取本地文件,实现文件内的单词计数。本地文件words.txt 内容如下:


hello me

hello you

hello her

(2)运行spark-shell --master local[2]

观察启动的进程:

(3)编写scala代码

sc.textFile("file:///root///words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

代码说明:

sc:Spark-Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可。

textFile:读取数据文件

flatMap:对文件中的每一行数据进行压平切分,这里按照空格分隔。

map:对出现的每一个单词记为1(word,1)

reduceByKey:对相同的单词出现的次数进行累加

collect:触发任务执行,收集结果数据。

(4)观察结果:

运行spark-shell --master local[N] 读取HDFS上数据

(1)、整合spark和HDFS,修改配置文件

在spark-env.sh ,添加HADOOP_CONF_DIR配置,指明了hadoop的配置文件后,默认它就是使用的hdfs上的文件

export HADOOP_CONF_DIR=/opt/bigdata/hadoop-2.6.4/etc/hadoop

(2)、再启动启动hdfs,然后重启spark集群

(3)、向hdfs上传一个文件到hdfs://hdp-node-01:9000/words.txt

(4)、在spark shell中用scala语言编写spark程序

sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect


运行spark-shell 指定具体的master地址

(1)需求:

spark-shell运行时指定具体的master地址,读取HDFS上的数据,做单词计数,然后将结果保存在HDFS上。

(2)执行启动命令:

spark-shell --master spark://hdp-node-01:7077 \
--executor-memory 1g --total-executor-cores 2

参数说明:

--master spark://hdp-node-01:7077 指定Master的地址

--executor-memory 1g 指定每个worker可用内存为1g

--total-executor-cores 2 指定整个集群使用的cup核数为2个

注意:

如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。

(2)编写scala代码

sc.textFile("/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/wc")

saveAsTextFile:保存结果数据到文件中

(3)查看hdfs上结果

在IDEA中编写WordCount程序

spark-shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDEA中编写程序,然后打成jar包,最后提交到集群。最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。

(1).创建一个项目

(2).选择Maven项目,然后点击next

(3).填写maven的GAV,然后点击next

(4)填写项目名称,然后点击finish

(5).创建好maven项目后,点击Enable Auto-Import

(6)配置Maven的pom.xml

<properties>
        <scala.version>2.11.8</scala.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.0.2</spark.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
</build>

(7)添加src/main/scala和src/test/scala,与pom.xml中的配置保持一致

(8)新建一个scala class,类型为Object

(9).编写spark程序

 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.rdd.RDD

  object WordCount {
  def main(args: Array[String]): Unit = {
    //设置spark的配置文件信息
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount")

    //构建sparkcontext上下文对象,它是程序的入口,所有计算的源头
    val sc: SparkContext = new SparkContext(sparkConf)

    //读取文件
    val file: RDD[String] = sc.textFile(args(0))

    //对文件中每一行单词进行压平切分
    val words: RDD[String] = file.flatMap(_.split(" "))

    //对每一个单词计数为1 转化为(单词,1)
    val wordAndOne: RDD[(String, Int)] = words.map(x=>(x,1))

    //相同的单词进行汇总 前一个下划线表示累加数据,后一个下划线表示新数据
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_)

    //保存数据到HDFS
    result.saveAsTextFile(args(1))
    sc.stop()
  }
}

(10).使用Maven打包:

点击idea右侧的Maven Project选项

点击Lifecycle,选择package,然后点击Run Maven Build

(11).选择编译成功的jar包,并将该jar上传到Spark集群中的某个节点上

(12).首先启动hdfs和Spark集群

如果采用HA模式,先启动zookeeper集群

启动hdfs

/opt/bigdata/hadoop-2.6.4/sbin/start-dfs.sh

启动spark

/opt/bigdata/spark/sbin/start-all.sh

(13).使用spark-submit命令提交Spark应用(注意参数的顺序)

spark-submit --class cn.test.spark.WordCount --master spark://hdp-node-01:7077 \
--executor-memory 1g --total-executor-cores 2 /root/spark-1.0-SNAPSHOT.jar /words.txt /spark_out

这里通过spark-submit提交任务到集群上。用的是spark的Standalone模式

Standalone模式是Spark内部默认实现的一种集群管理模式,这种模式是通过集群中的Master来统一管理资源。

1)       查看Spark的web管理界面

地址: 192.168.200.160:8080

2)       查看HDFS上的结果文件

hdfs dfs -cat /spark_out/part*

(hello,3)

(me,1)

(you,1)

(her,1)

使用java语言编写spark wordcount程序

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;

  /**
 * java代码实现spark的WordCount
 */
  public class WordCountJava {
    public static void main(String[] args) {
        //todo:1、构建sparkconf,设置配置信息
        SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]");

        //todo:2、构建java版的sparkContext
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        //todo:3、读取数据文件
        JavaRDD<String> dataRDD = sc.textFile("d:/data/words1.txt");

        //todo:4、对每一行单词进行切分
        JavaRDD<String> wordsRDD = dataRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
               String[] words = s.split(" ");
                return Arrays.asList(words).iterator();
            }
        });

        //todo:5、给每个单词计为 1
       // Spark为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为PairRDD。
        // mapToPair函数会对一个RDD中的每个元素调用f函数,其中原来RDD中的每一个元素都是T类型的,
        // 调用f函数后会进行一定的操作把每个元素都转换成一个<K2,V2>类型的对象,其中Tuple2为多元组
        JavaPairRDD<String, Integer> wordAndOnePairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String,Integer>(word, 1);
            }
        });

        //todo:6、相同单词出现的次数累加
        JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOnePairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //todo:7、反转顺序
        JavaPairRDD<Integer, String> reverseJavaPairRDD = resultJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            @Override
            public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
                return new Tuple2<Integer, String>(tuple._2, tuple._1);
            }
        });

        //todo:8、把每个单词出现的次数作为key,进行排序,并且在通过mapToPair进行反转顺序后输出
        JavaPairRDD<String, Integer> sortJavaPairRDD = reverseJavaPairRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception {
                return  new Tuple2<String, Integer>(tuple._2,tuple._1);
                //或者使用tuple.swap() 实现位置互换,生成新的tuple;
            }
        });

        //todo:执行输出
        System.out.println(sortJavaPairRDD.collect());

        //todo:关闭sparkcontext
        sc.stop();
    }
}

原文地址:https://www.cnblogs.com/jifengblog/p/9335872.html

时间: 2024-11-01 19:20:56

初识Spark程序的相关文章

底层战详解使用Java开发Spark程序(DT大数据梦工厂)

Scala开发Spark很多,为什么还要用Java开发原因:1.一般Spark作为数据处理引擎,一般会跟IT其它系统配合,现在业界里面处于霸主地位的是Java,有利于团队的组建,易于移交:2.Scala学习角度讲,比Java难.找Scala的高手比Java难,项目的维护和二次开发比较困难:3.很多人员有Java的基础,确保对Scala不是很熟悉的人可以编写课程中的案例预测:2016年Spark取代Map Reduce,拯救HadoopHadoop+Spark = A winning combat

luigi框架--关于python运行spark程序

首先,目标是写个python脚本,跑spark程序来统计hdfs中的一些数据.参考了别人的代码,故用了luigi框架. 至于luigi的原理 底层的一些东西Google就好.本文主要就是聚焦快速使用,知其然不知其所以然. python写Spark或mapreduce还有其他的方法,google上很多,这里用luigi只是刚好有参考的代码,而且理解起来还是简单,就用了. 上代码: import luigi, sysfrom datetime import datetime, timedeltafr

第9节课笔记-彻底实战IntelliJ IDEA 下的Spark程序开发

彻底实战IntelliJ IDEA 下的Spark程序开发下载IntelliJ IDEA 下载gitSpark源码下载:git clone git://github.com/apache/spark.git导入maven 工程 IntelliJ IDEA 启动的向导中Sacal下载需要下载,这是IDEA下载的,和系统层的不一样4.指定JDK1.8.x和Scala2.10.45.file ->Project Stucture 来设置工程lib 核心是添加Spark的jar6.添加Spark jar

Spark集群模式&amp;Spark程序提交

Spark集群模式&Spark程序提交 1. 集群管理器 Spark当前支持三种集群管理方式 Standalone-Spark自带的一种集群管理方式,易于构建集群. Apache Mesos-通用的集群管理,可以在其上运行Hadoop MapReduce和一些服务应用. Hadoop YARN-Hadoop2中的资源管理器. Tip1: 在集群不是特别大,并且没有mapReduce和Spark同时运行的需求的情况下,用Standalone模式效率最高. Tip2: Spark可以在应用间(通过集

在Spark程序中使用压缩

当大片连续区域进行数据存储并且存储区域中数据重复性高的状况下,数据适合进行压缩.数组或者对象序列化后的数据块可以考虑压缩.所以序列化后的数据可以压缩,使数据紧缩,减少空间开销. 1. Spark对压缩方式的选择 压缩采用了两种算法:Snappy和LZF,底层分别采用了两个第三方库实现,同时可以自定义其他压缩库对Spark进行扩展.Snappy提供了更高的压缩速度,LZF提供了更高的压缩比,用户可以根据具体需求选择压缩方式.压缩格式及解编码器如下.·LZF:org.apache.spark.io.

基于IDEA使用Spark API开发Spark程序

清明假期折腾了两天,总结了两种方式使用IDE进行spark程序,记录一下: 第一种方法比较简单,两种方式都是采用SBT进行编译的. 注意:本地不需要安装Scala程序,否则在编译程序时有版本兼容性问题. 一.基于Non-SBT方式 创建一个Scala IDEA工程 我们使用Non-SBT的方式,点击"Next" 命名工程,其他按照默认 点击"Finish"完成工程的创建 修改项目的属性 首先修改Modules选项 在src下创建两个文件夹,并把其属性改为source

搭建scala 开发spark程序环境及实例演示

上一篇博文已经介绍了搭建scala的开发环境,现在进入正题.如何开发我们的第一个spark程序. 下载spark安装包,下载地址http://spark.apache.org/downloads.html(因为开发环境需要引用spark的jar包) 我下载的是spark-2.1.0-bin-hadoop2.6.tgz,因为我的scalaIDE版本是scala-SDK-4.5.0-vfinal-2.11-win32.win32.x86_64.zip 最好,IDE版本和spark版本要匹配,否则,开

如何运行Spark程序

[[email protected] spark-2.0.2-bin-hadoop2.6]# ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master local examples/jars/spark-examples_2.11-2.0.2.jar 注意在hxsyl下,在root下运行提示hdfs上的historyserverforSpark没有权限,擦,好奇怪啊,另外运行后在hdfs上查看结果spark的用户是

基于YARN的Spark程序工作过程

一. YARN的理解 YARN是Hadoop 2.x版本的产物,它最基本的设计思想是将JobTracker的两个主要功能,即资源管理,作业调度和监控分解成为两个独立的进程.再详细介绍Spark程序工作过程前,先简单的介绍一下YARN,即Hadoop的操作系统,不仅支持MapReduce计算框架,而且还支持流式计算框架,迭代计算框架,MPI并行计算框架等,实现时采用了基于事件的驱动机制. YARN的架构图,如下所示: 1. ResourceManager ResourceManager类似JobT