使用Spark 时,通常会有两种模式。一、在交互式编程环境(REPL, a.k.a spark-shell)下实现一些代码,测试一些功能点。二、像MapReduce 那样提前编写好源代码并编译打包(仅限 Java 或 Scala,Python 不需要),然后将程序代码通过spark-submit 命令提交到 YARN 集群完成计算。
spark-shell
启动 spark-shell 通常需要指定 master、executor 内存、executor 数量等参数。由于 YARN 集群有审计机制,每个人提交的 spark application 需要指定 name 参数,同时确保 name 是以个人的 LDAP 用户名为后缀。另外,如果你不确定 driver 是否有足够的内存能容纳一个 RDD 的计算结果,建议不要使用 RDD 的 collect 方法而使用其 take 方法,否则会使 driver 发生 OOM。
1.scala交互式编程环境
通过命令启动sprak-shell
/opt/tige/spark2/bin/spark-shell --master yarn-client --queue root.default --driver-memory 4g --executor-memory 8g--conf spark.dynamicAllocation.maxExecutors=10 --name spark_test_{your username}
启动spark后系统自动创建sc和sqlContext(HiveContext实例),可以使用它们来创建RDD或者DataFarme
2.使用Python交互式编程环境
通过命令pyspark
/opt/tiger/spark_deploy/spark2/bin/ipyspark --master yarn-client --queue root.default --driver-memory 4g --executor-memory 8g --num-executors 8 --name spark_test_${your LDAP user name}
spark-submit
首先我们需要使用 Spark 的 API 实现一个拥有入口(main)的程序,然后通过 spark-submit 提交到 YARN 集群。
- Scala 版本的 WordCount
import org.apache.spark.{SparkConf, SparkContext} object WordCount extends App { val sparkConf = new SparkConf() sparkConf.setAppName("spark_test_${your LDAP user name}") sparkConf.setMaster("yarn-client") sparkConf.set("spark.driver.memory", "4g") sparkConf.set("spark.executor.memory", "8g") sparkConf.set("spark.dynamicAllocation.initialExecutors", "3") sparkConf.set("spark.dynamicAllocation.maxExecutors", "10") val sc = new SparkContext(sparkConf) val words = sc.textFile("/path/to/text/file") val wordCount = words.map(word => (word, 1)).reduceByKey(_ + _).collect() wordCount.foreach(println) }
完成代码编写与编译打包之后就可以通过 spark-submit 来提交应用了,命令如下:
/opt/tiger/spark_deploy/spark2/bin/spark-submit --master yarn-client --class WordCount your_spark_test.jar
- python版本的WordCount
from pyspark import SparkContext, SparkConf from operator import add if __name__ == ‘__main__‘: conf = SparkConf() conf.setMaster(‘yarn-client‘) conf.setAppName(‘spark_test_${your LDAP user name}‘) conf.set("spark.driver.memory", "4g") conf.set("spark.executor.memory", "8g") conf.set("spark.dynamicAllocation.initialExecutors", "3") conf.set("spark.dynamicAllocation.maxExecutors", "10") sc = SparkContext(conf=conf) words = sc.textFile("/path/to/text/file") wordCount = words.map(lambda word: (word, 1)).reduceByKey(add).collect() for key, value in wordCount: print key, value
假设上面这段 Python 代码的文件名为 your_spark_test.py,那么提交这段代码到 YARN 集群的命令如下:
/opt/tiger/spark_deploy/spark2/bin/spark-submit --master yarn-client your_spark_test.py
时间: 2024-12-19 13:51:07