将java开发的wordcount程序部署到spark集群上运行

 1 package cn.spark.study.core;
 2
 3 import java.util.Arrays;
 4
 5 import org.apache.spark.SparkConf;
 6 import org.apache.spark.api.java.JavaPairRDD;
 7 import org.apache.spark.api.java.JavaRDD;
 8 import org.apache.spark.api.java.JavaSparkContext;
 9 import org.apache.spark.api.java.function.FlatMapFunction;
10 import org.apache.spark.api.java.function.Function2;
11 import org.apache.spark.api.java.function.PairFunction;
12 import org.apache.spark.api.java.function.VoidFunction;
13
14 import scala.Tuple2;
15
16 /**
17  * 将java开发的wordcount程序部署到spark集群上运行
18  * @author Administrator
19  *
20  */
21 public class WordCountCluster {
22     public static void main(String[] args) {
23
24
25         // 如果要在spark集群上运行,需要修改的,只有两个地方
26         // 第一,将SparkConf的setMaster()方法给删掉,默认它自己会去连接
27         // 第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件
28
29         // 实际执行步骤:
30         // 1、将spark.txt文件上传到hdfs上去
31         // 2、使用我们最早在pom.xml里配置的maven插件,对spark工程进行打包
32         // 3、将打包后的spark工程jar包,上传到机器上执行
33         // 4、编写spark-submit脚本
34         // 5、执行spark-submit脚本,提交spark应用到集群执行
35         SparkConf conf = new SparkConf()
36         .setAppName("WordCountCluster");
37
38         JavaSparkContext sc = new JavaSparkContext(conf);
39
40
41         JavaRDD<String> lines = sc.textFile("hdfs://hadoop:9000/test/spark.txt");
42
43         JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
44
45             private static final long serialVersionUID = 1L;
46
47             @Override
48             public Iterable<String> call(String line) throws Exception {
49                 return Arrays.asList(line.split(" "));
50             }
51         });
52
53         JavaPairRDD<String, Integer> pairs = words.mapToPair(new  PairFunction<String, String, Integer>() {
54             private static final long serialVersionUID = 1L;
55
56             @Override
57             public Tuple2<String, Integer> call(String word) throws Exception {
58                 return new Tuple2<String, Integer>(word, 1);
59             }
60         });
61
62         JavaPairRDD<String, Integer> wordsCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
63
64             private static final long serialVersionUID = 1L;
65
66             @Override
67             public Integer call(Integer v1, Integer v2) throws Exception {
68                 return v1 + v2;
69             }
70         });
71
72         wordsCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
73
74             private static final long serialVersionUID = 1L;
75
76             @Override
77             public void call(Tuple2<String, Integer> tuple) throws Exception {
78                 System.out.println(tuple._1+":"+tuple._2);
79             }
80         });
81
82         sc.close();
83
84     }
85
86 }

spark-submit脚本

1 /usr/local/spark/bin/spark-submit 2 --class cn.spark.sparktest.core.WordCountCluster 3 --num-executors 3 4 --driver-memory 100m 5 --executor-memory 100m 6 --executor-cores 3 7 /usr/local/SparkTest-0.0.1-SNAPSHOT-jar-with-dependencies.jar \

集群环境下要加

--master spark://192.168.1.107:7077

时间: 2024-10-14 12:04:42

将java开发的wordcount程序部署到spark集群上运行的相关文章

将java开发的wordcount程序提交到spark集群上运行

今天来分享下将java开发的wordcount程序提交到spark集群上运行的步骤. 第一个步骤之前,先上传文本文件,spark.txt,然用命令hadoop fs -put spark.txt /spark.txt,即可. 第一:看整个代码视图 打开WordCountCluster.java源文件,修改此处代码: 第二步: 打好jar包,步骤是右击项目文件----RunAs--Run Configurations 照图填写,然后开始拷贝工程下的jar包,如图,注意是拷贝那个依赖jar包,不是第

Spark集群上运行jar程序,状态一直Accepted且不停止不报错

如果运行Spark集群时状态一直为Accepted且不停止不报错,比如像下面这样的情况: 15/06/14 11:33:33 INFO yarn.Client: Application report for application_1434263747091_0023 (state: ACCEPTED) 15/06/14 11:33:34 INFO yarn.Client: Application report for application_1434263747091_0023 (state:

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本]

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本] 1. 开发环境 Jdk 1.7.0_72 Maven 3.2.1 Scala 2.10.6 Spark 1.6.2 Hadoop 2.6.4 IntelliJ IDEA 2016.1.1 2. 创建项目1) 新建Maven项目 2) 在pom文件中导入依赖pom.xml文件内容如下: <?xml version="1.0" encoding="UTF-8"?> &l

06、部署Spark程序到集群上运行

06.部署Spark程序到集群上运行 6.1 修改程序代码 修改文件加载路径 在spark集群上执行程序时,如果加载文件需要确保路径是所有节点能否访问到的路径,因此通常是hdfs路径地址.所以需要修改代码中文件加载路径为hdfs路径: ... //指定hdfs路径 sc.textFile("hdfs://mycluster/user/centos/1.txt") ... ? 修改master地址 SparkConf中需要指定master地址,如果是集群上运行,也可以不指定,运行时可以通

用python + hadoop streaming 编写分布式程序(二) -- 在集群上运行与监控

写在前面 前文:用python + hadoop streaming 编写分布式程序(一) -- 原理介绍,样例程序与本地调试 为了方便,这篇文章里的例子均为伪分布式运行,一般来说只要集群配置得当,在伪分布式下能够运行的程序,在真实集群上也不会有什么问题. 为了更好地模拟集群环境,我们可以在mapred-site.xml中增设reducer和mapper的最大数目(默认为2,实际可用数目大约是CPU核数-1). 假设你为Hadoop安装路径添加的环境变量叫$HADOOP_HOME(如果是$HAD

从认证到调度,K8s 集群上运行的小程序到底经历了什么?

作者 | 声东? 阿里云售后技术专家 导读:不知道大家有没有意识到一个现实:大部分时候,我们已经不像以前一样,通过命令行,或者可视窗口来使用一个系统了. 前言 现在我们上微博.或者网购,操作的其实不是眼前这台设备,而是一个又一个集群.通常,这样的集群拥有成百上千个节点,每个节点是一台物理机或虚拟机.集群一般远离用户,坐落在数据中心.为了让这些节点互相协作,对外提供一致且高效的服务,集群需要操作系统.Kubernetes 就是这样的操作系统. 比较 Kubernetes 和单机操作系统,Kuber

在集群上运行caffe程序时如何避免Out of Memory

不少同学抱怨,在集群的GPU节点上运行caffe程序时,经常出现"Out of Memory"的情况.实际上,如果我们在提交caffe程序到某个GPU节点的同时,指定该节点某个比较空闲的gpu id,便可以避免"Out of Memory"的情况.步骤如下: 1. 在提交任务前,制作一个带有“nvidia-smi”命令的run_gpu.sh文件 #!/bin/bash #$ -V #$ -cwd #$ -j y #$ -S /bin/bash nvidia-smi

在Hadoop集群上运行R程序--安装RHadoop

RHadoop是由Revolution Analytics发起的一个开源项目,它可以将统计语言R与Hadoop结合起来.目前该项目包括三个R packages,分别为支持用R来编写MapReduce应用的rmr.用于R语言访问HDFS的rhdfs以及用于R语言访问HBASE的rhbase.下载网址为https://github.com/RevolutionAnalytics/RHadoop/wiki/Downloads. 说明:下面的记录是在安装成功后的总结,中间的过程描述及解决方法可能并不精确

Java swing五子棋源码及实现之Java 开发图形界面程序五子棋的实现方式

今天给大家介绍一下如何用Java swing实现五子棋的开发即用Java开发图形界面程序五子棋,代码由于太多,只贴部分,最下面会附上下载地址,废话不多说,下面我们先看一下运行结果: 接下来我们看代码: 首先是创建主frame框架界面: package org.liky.game.frame; import java.awt.Color; import java.awt.Font; import java.awt.Graphics; import java.awt.Toolkit; import