Spark学习散点总结

使用Spark 时,通常会有两种模式。一、在交互式编程环境(REPL, a.k.a spark-shell)下实现一些代码,测试一些功能点。二、像MapReduce 那样提前编写好源代码并编译打包(仅限 Java 或 Scala,Python 不需要),然后将程序代码通过spark-submit 命令提交到 YARN 集群完成计算。

spark-shell

启动 spark-shell 通常需要指定 master、executor 内存、executor 数量等参数。由于 YARN 集群有审计机制,每个人提交的 spark application 需要指定 name 参数,同时确保 name 是以个人的 LDAP 用户名为后缀。另外,如果你不确定 driver 是否有足够的内存能容纳一个 RDD 的计算结果,建议不要使用 RDD 的 collect 方法而使用其 take 方法,否则会使 driver 发生 OOM。

  1.scala交互式编程环境

  通过命令启动sprak-shell

/opt/tige/spark2/bin/spark-shell --master yarn-client --queue root.default --driver-memory 4g --executor-memory 8g--conf spark.dynamicAllocation.maxExecutors=10 --name spark_test_{your username} 

启动spark后系统自动创建sc和sqlContext(HiveContext实例),可以使用它们来创建RDD或者DataFarme

  2.使用Python交互式编程环境

  通过命令pyspark

/opt/tiger/spark_deploy/spark2/bin/ipyspark --master yarn-client --queue root.default --driver-memory 4g --executor-memory 8g --num-executors 8 --name spark_test_${your LDAP user name}

spark-submit

首先我们需要使用 Spark 的 API 实现一个拥有入口(main)的程序,然后通过 spark-submit 提交到 YARN 集群。
  1. Scala 版本的 WordCount

    import org.apache.spark.{SparkConf, SparkContext}
    
    object WordCount extends App {
        val sparkConf = new SparkConf()
        sparkConf.setAppName("spark_test_${your LDAP user name}")
        sparkConf.setMaster("yarn-client")
        sparkConf.set("spark.driver.memory", "4g")
        sparkConf.set("spark.executor.memory", "8g")
        sparkConf.set("spark.dynamicAllocation.initialExecutors", "3")
        sparkConf.set("spark.dynamicAllocation.maxExecutors", "10")
        val sc = new SparkContext(sparkConf)
        val words = sc.textFile("/path/to/text/file")
        val wordCount = words.map(word => (word, 1)).reduceByKey(_ + _).collect()
        wordCount.foreach(println)
    }

    完成代码编写与编译打包之后就可以通过 spark-submit 来提交应用了,命令如下:

    /opt/tiger/spark_deploy/spark2/bin/spark-submit --master yarn-client --class WordCount your_spark_test.jar
  2. python版本的WordCount

    from pyspark import SparkContext, SparkConf
    from operator import add
    
    if __name__ == ‘__main__‘:
        conf = SparkConf()
        conf.setMaster(‘yarn-client‘)
        conf.setAppName(‘spark_test_${your LDAP user name}‘)
        conf.set("spark.driver.memory", "4g")
        conf.set("spark.executor.memory", "8g")
        conf.set("spark.dynamicAllocation.initialExecutors", "3")
        conf.set("spark.dynamicAllocation.maxExecutors", "10")
        sc = SparkContext(conf=conf)
    
        words = sc.textFile("/path/to/text/file")
        wordCount = words.map(lambda word: (word, 1)).reduceByKey(add).collect()
        for key, value in wordCount:
            print key, value
    假设上面这段 Python 代码的文件名为 your_spark_test.py,那么提交这段代码到 YARN 集群的命令如下:
    /opt/tiger/spark_deploy/spark2/bin/spark-submit --master yarn-client your_spark_test.py
时间: 2024-12-19 13:51:07

Spark学习散点总结的相关文章

Spark 学习: spark 原理简述与 shuffle 过程介绍

Spark学习: 简述总结 Spark 是使用 scala 实现的基于内存计算的大数据开源集群计算环境.提供了 java,scala, python,R 等语言的调用接口. Spark学习 简述总结 引言 1 Hadoop 和 Spark 的关系 Spark 系统架构 1 spark 运行原理 RDD 初识 shuffle 和 stage 性能优化 1 缓存机制和 cache 的意义 2 shuffle 的优化 3 资源参数调优 4 小结 本地搭建 Spark 开发环境 1 Spark-Scal

Spark学习四:网站日志分析案例

Spark学习四:网站日志分析案例 标签(空格分隔): Spark Spark学习四网站日志分析案例 一创建maven工程 二创建模板 三日志分析案例 一,创建maven工程 1,执行maven命令创建工程 mvn archetype:generate -DarchetypeGroupId=org.scala-tools.archetypes -DarchetypeArtifactId=scala-archetype-simple -DremoteRepositories=http://scal

Spark学习三:Spark Schedule以及idea的安装和导入源码

Spark学习三:Spark Schedule以及idea的安装和导入源码 标签(空格分隔): Spark Spark学习三Spark Schedule以及idea的安装和导入源码 一RDD操作过程中的数据位置 二Spark Schedule 三Idea导入spark源码 一,RDD操作过程中的数据位置 [hadoop001@xingyunfei001 spark-1.3.0-bin-2.5.0]$ bin/spark-shell --master local[2] val rdd = sc.t

Spark学习七:spark streaming与flume集成

Spark学习七:spark streaming与flume集成 标签(空格分隔): Spark 一,启动flume flume-conf.properties文件 agent002.sources = sources002 agent002.channels = channels002 agent002.sinks = sinks002 ## define sources agent002.sources.sources002.type = exec agent002.sources.sour

Spark学习六:spark streaming

Spark学习六:spark streaming 标签(空格分隔): Spark Spark学习六spark streaming 一概述 二企业案例分析 三Spark streaming的工作原理 四textFileStreaming的应用 四企业中的开发方式 五总结 一,概述 一个简单的实例 1,安装nc nc -lk 9999 2,启动应用 ./bin/run-example streaming.NeworkWordCount localhost 9999 二,企业案例分析 需求: 实时统计

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre

Spark学习笔记之SparkRDD

Spark学习笔记之SparkRDD 一.   基本概念 RDD(resilient distributed datasets)弹性分布式数据集. 来自于两方面 ①   内存集合和外部存储系统 ②   通过转换来自于其他RDD,如map,filter等 2.创建操作(creation operation):RDD的创建由SparkContext来负责. 3.转换操作(transformation operation):将一个RDD通过一定操作转换为另一个RDD. 4.控制操作(control o

用Spark学习矩阵分解推荐算法

在矩阵分解在协同过滤推荐算法中的应用中,我们对矩阵分解在推荐算法中的应用原理做了总结,这里我们就从实践的角度来用Spark学习矩阵分解推荐算法. 1. Spark推荐算法概述 在Spark MLlib中,推荐算法这块只实现了基于矩阵分解的协同过滤推荐算法.而基于的算法是FunkSVD算法,即将m个用户和n个物品对应的评分矩阵M分解为两个低维的矩阵:$$M_{m \times n}=P_{m \times k}^TQ_{k \times n}$$ 其中k为分解成低维的维数,一般远比m和n小.如果大

用Spark学习FP Tree算法和PrefixSpan算法

在FP Tree算法原理总结和PrefixSpan算法原理总结中,我们对FP Tree和PrefixSpan这两种关联算法的原理做了总结,这里就从实践的角度介绍如何使用这两个算法.由于scikit-learn中没有关联算法的类库,而Spark MLlib有,本文的使用以Spark MLlib作为使用环境. 1. Spark MLlib关联算法概述 在Spark MLlib中,也只实现了两种关联算法,即我们的FP Tree和PrefixSpan,而像Apriori,GSP之类的关联算法是没有的.而