Livy Java api
依赖
<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-client-http</artifactId>
<version>0.5.0-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.2</version>
</dependency>
业务程序
public class WordCountJavaSpark implements Job<Object> {
/**
* call就是执行逻辑
* @param jobContext
* @return
* @throws Exception
*/
@Override
public Object call(JobContext jobContext) throws Exception {
JavaSparkContext sc = jobContext.sc();
Map<String ,Integer> mp = new HashMap<String, Integer>();
// 此处要使用hdfs的ha路径,则需要在livy的livy-env.sh中配置HADOOP_CONF_DIR
JavaRDD<String> javaRDD = sc.textFile("hdfs://myha/data/livy/zookeeper.out");
JavaRDD<String> flatMapedRDD = javaRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
JavaPairRDD<String,Integer> mapedRDD = flatMapedRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
});
JavaPairRDD<String, Integer> reduceJavaPariRDD = mapedRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer1, Integer integer2) throws Exception {
return integer1 + integer2;
}
});
reduceJavaPariRDD.collect().forEach((tuple2)->{
System.out.println(tuple2._1+"<===>"+tuple2._2);
mp.put(tuple2._1,tuple2._2);
});
return mp;
}
}
启动程序
public class StartApp {
private static LivyClient client = null;
public static void main(String[] args) {
String livyURI ="http://192.168.128.100:8998";
//jar包的位置
String file = "/Users/chenzhuanglan/WorkSpace/livyjavatest/out/artifacts/livyWC/WordCountJavaSpark.jar";
try{
client = new LivyClientBuilder().setURI(new URI(livyURI)).build();
System.err.printf("Uploading %s to the Spark context...\n", file);
// 将 spark job的 jar包上传到服务器上
client.uploadJar(new File(file)).get();
System.err.printf("Running WordCountJavaSpark ...\n");
// 提交作业
HashMap<String,Integer> map = (HashMap<String, Integer>) client.submit(new WordCountJavaSpark()).get();
map.forEach((k,v)->{
System.out.println(k+"==="+v);
});
} catch (IOException e) {
e.printStackTrace();
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
注意、注意、注意!
log4j:WARN No appenders could be found for logger (org.apache.livy.shaded.apache.http.client.protocol.RequestAddCookies).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Uploading /Users/chenzhuanglan/WorkSpace/testwcjava/target/testwcjava-1.0-SNAPSHOT.jar to the Spark context...
Running WordCountJavaSpark ...
java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.livy.shaded.kryo.kryo.KryoException: Unable to find class: com.czlan.wordcount.WordCountJavaSpark
org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
org.apache.livy.shaded.kryo.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
org.apache.livy.shaded.kryo.kryo.Kryo.readClass(Kryo.java:656)
org.apache.livy.shaded.kryo.kryo.Kryo.readClassAndObject(Kryo.java:767)
org.apache.livy.client.common.Serializer.deserialize(Serializer.java:63)
org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:39)
org.apache.livy.rsc.driver.BypassJob.call(BypassJob.java:27)
org.apache.livy.rsc.driver.JobWrapper.call(JobWrapper.java:57)
org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:42)
org.apache.livy.rsc.driver.BypassJobWrapper.call(BypassJobWrapper.java:27)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
at org.apache.livy.client.http.JobHandleImpl.get(JobHandleImpl.java:198)
at org.apache.livy.client.http.JobHandleImpl.get(JobHandleImpl.java:88)
at com.czlan.wordcount.StartApp2.main(StartApp2.java:32)
上面这个报错是因为
将client.uploadJar(new File(file)).get();
写成了client.uploadFile(new File(file)).get();
uploadJar 是上传要添加到Spark应用程序类路径中的jar
uploadFile 是上传要传递给Spark应用程序的文件
原文地址:https://www.cnblogs.com/czlan91/p/10353139.html
时间: 2024-10-09 23:08:16