1 package cn.spark.study.core; 2 3 import java.util.Arrays; 4 5 import org.apache.spark.SparkConf; 6 import org.apache.spark.api.java.JavaPairRDD; 7 import org.apache.spark.api.java.JavaRDD; 8 import org.apache.spark.api.java.JavaSparkContext; 9 import org.apache.spark.api.java.function.FlatMapFunction; 10 import org.apache.spark.api.java.function.Function2; 11 import org.apache.spark.api.java.function.PairFunction; 12 import org.apache.spark.api.java.function.VoidFunction; 13 14 import scala.Tuple2; 15 16 /** 17 * 将java开发的wordcount程序部署到spark集群上运行 18 * @author Administrator 19 * 20 */ 21 public class WordCountCluster { 22 public static void main(String[] args) { 23 24 25 // 如果要在spark集群上运行,需要修改的,只有两个地方 26 // 第一,将SparkConf的setMaster()方法给删掉,默认它自己会去连接 27 // 第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件 28 29 // 实际执行步骤: 30 // 1、将spark.txt文件上传到hdfs上去 31 // 2、使用我们最早在pom.xml里配置的maven插件,对spark工程进行打包 32 // 3、将打包后的spark工程jar包,上传到机器上执行 33 // 4、编写spark-submit脚本 34 // 5、执行spark-submit脚本,提交spark应用到集群执行 35 SparkConf conf = new SparkConf() 36 .setAppName("WordCountCluster"); 37 38 JavaSparkContext sc = new JavaSparkContext(conf); 39 40 41 JavaRDD<String> lines = sc.textFile("hdfs://hadoop:9000/test/spark.txt"); 42 43 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { 44 45 private static final long serialVersionUID = 1L; 46 47 @Override 48 public Iterable<String> call(String line) throws Exception { 49 return Arrays.asList(line.split(" ")); 50 } 51 }); 52 53 JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { 54 private static final long serialVersionUID = 1L; 55 56 @Override 57 public Tuple2<String, Integer> call(String word) throws Exception { 58 return new Tuple2<String, Integer>(word, 1); 59 } 60 }); 61 62 JavaPairRDD<String, Integer> wordsCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { 63 64 private static final long serialVersionUID = 1L; 65 66 @Override 67 public Integer call(Integer v1, Integer v2) throws Exception { 68 return v1 + v2; 69 } 70 }); 71 72 wordsCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() { 73 74 private static final long serialVersionUID = 1L; 75 76 @Override 77 public void call(Tuple2<String, Integer> tuple) throws Exception { 78 System.out.println(tuple._1+":"+tuple._2); 79 } 80 }); 81 82 sc.close(); 83 84 } 85 86 }
spark-submit脚本
1 /usr/local/spark/bin/spark-submit 2 --class cn.spark.sparktest.core.WordCountCluster 3 --num-executors 3 4 --driver-memory 100m 5 --executor-memory 100m 6 --executor-cores 3 7 /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
集群环境下要加
--master spark://192.168.1.107:7077
时间: 2024-10-14 12:04:42