CDH版本:5.10.0
IDE环境:win7 64位 MyEclipse2015
spark模式:yarn
提交模式:yarn-client
之前同样的IDE环境下,向alone模式的spark提交任务,一直很顺利,今天测了一下spark on yarn模式,提交只能是yarn-client模式,其它基本不变,只是换了模式,结果出现下面错误:
java.io.IOException: Cannot run program "/etc/hadoop/conf.cloudera.yarn/topology.py" (in directory "D:\workspace2015\5_10_0cdh"): CreateProcess error=2, ?????μ???
折腾了五个小时,总算是解决了,方法如下:
修改工程中的core-site.xml,找到配置net.topology.script.file.name,将其value注释掉,如图:
再次运行,得到正确结果。
代码如下:
package com.byit.test;
import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import com.byit.getinfo.IGetConf;
/**
*
* @author 耿廑
* yarn client模式下将src/main/resources目录下的core-site.xml中的
* net.topology.script.file.name的value值注释掉,切记,切记!!!
*/
public class SparkYarnClientTest implements IGetConf{
static String sendString = " ";
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
SparkYarnClientTest.run();
}
public static void run() throws Exception {
// TODO Auto-generated method stub
System.setProperty("HADOOP_USER_NAME",sparkUser);
SparkConf conf = new SparkConf()
.setAppName("SparkYarnClientTest")
.setMaster("yarn-client")
.set("spark.yarn.jar",hdfsURL + "/test/spark/libs/spark-assembly-1.6.0-cdh5.10.0-hadoop2.6.0-cdh5.10.0.jar");
JavaSparkContext sc = new JavaSparkContext(conf);
String jar = jarsPath + "/SparkYarnClientTest.jar";
sc.addJar(jar);
String inPath = hdfsURL + "/test/spark/input";
String outPath = hdfsURL + "/test/spark/output";
JavaRDD<String> word = sc.textFile(inPath).flatMap(new toWord());
JavaPairRDD<String,Integer> wordPair = word.mapToPair(new myMapper());
JavaPairRDD<String,Integer> count = wordPair.reduceByKey(new myReducer());
JavaRDD<String> result = count.map(new toString());
result.saveAsTextFile(outPath);
sc.close();
}
@SuppressWarnings("serial")
private static class toWord implements FlatMapFunction<String,String> {
public Iterable<String> call(String words) throws Exception {
// TODO Auto-generated method stub
return Arrays.asList(words.split(sendString));
}
}
@SuppressWarnings("serial")
private static class myMapper implements PairFunction<String,String,Integer> {
public Tuple2<String, Integer> call(String word) throws Exception {
// TODO Auto-generated method stub
Tuple2<String, Integer> myWord = new Tuple2<String, Integer>(word,1);
return myWord;
}
}
@SuppressWarnings("serial")
private static class myReducer implements Function2<Integer,Integer,Integer> {
public Integer call(Integer value1, Integer value2) throws Exception {
// TODO Auto-generated method stub
Integer value = value1 + value2;
return value;
}
}
@SuppressWarnings("serial")
private static class toString implements Function<Tuple2<String, Integer>,String> {
public String call(Tuple2<String, Integer> result) throws Exception {
// TODO Auto-generated method stub
String myResult = result._1 + "\t" + result._2;
return myResult;
}
}
}