Java Web提交任务到Spark

相关软件版本:

Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7

机器:

windows7 (包含JDK1.8,MyEclipse2014,IntelliJ IDEA14,TOmcat7);

centos6.6虚拟机(Hadoop伪分布式集群,Spark standAlone集群,JDK1.8);

centos7虚拟机(Tomcat,JDK1.8);

1. 场景:

1. windows简单java程序调用Spark,执行Scala开发的Spark程序,这里包含两种模式:

1> 提交任务到Spark集群,使用standAlone模式执行;

2> 提交任务到Yarn集群,使用yarn-client的模式;

2. windows 开发java web程序调用Spark,执行Scala开发的Spark程序,同样包含两种模式,参考1.

3. linux运行java web程序调用Spark,执行Scala开发的Spark程序,包含两种模式,参考1.

2. 实现:

1. 简单Scala程序,该程序的功能是读取HDFS中的log日志文件,过滤log文件中的WARN和ERROR的记录,最后把过滤后的记录写入到HDFS中,代码如下:

import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by Administrator on 2015/8/23.
 */
object Scala_Test {
  def main(args:Array[String]): Unit ={
    if(args.length!=2){
      System.err.println("Usage:Scala_Test <input> <output>")
    }
    // 初始化SparkConf
    val conf = new SparkConf().setAppName("Scala filter")
    val sc = new SparkContext(conf)

    //  读入数据
    val lines = sc.textFile(args(0))

    // 转换
    val errorsRDD = lines.filter(line => line.contains("ERROR"))
    val warningsRDD = lines.filter(line => line.contains("WARN"))
    val  badLinesRDD = errorsRDD.union(warningsRDD)

    // 写入数据
    badLinesRDD.saveAsTextFile(args(1))

    // 关闭SparkConf
    sc.stop()
  }
}

使用IntelliJ IDEA 并打成jar包备用(lz这里命名为spark_filter.jar);

2.  java调用spark_filter.jar中的Scala_Test 文件,并采用Spark standAlone模式,java代码如下:

package test;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.spark.deploy.SparkSubmit;
/**
 * @author fansy
 *
 */
public class SubmitScalaJobToSpark {

	public static void main(String[] args) {
		SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
		String filename = dateFormat.format(new Date());
		String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();
		tmp =tmp.substring(0, tmp.length()-8);
		String[] arg0=new String[]{
				"--master","spark://node101:7077",
				"--deploy-mode","client",
				"--name","test java submit job to spark",
				"--class","Scala_Test",
				"--executor-memory","1G",
//				"spark_filter.jar",
				tmp+"lib/spark_filter.jar",//
				"hdfs://node101:8020/user/root/log.txt",
				"hdfs://node101:8020/user/root/badLines_spark_"+filename
		};

		SparkSubmit.main(arg0);
	}
}

具体操作,使用MyEclipse新建java web工程,把spark_filter.jar 以及spark-assembly-1.4.1-hadoop2.6.0.jar(该文件在Spark压缩文件的lib目录中,同时该文件较大,拷贝需要一定时间) 拷贝到WebRoot/WEB-INF/lib目录。(注意:这里可以直接建立java web项目,在测试java调用时,直接运行java代码即可,在测试web项目时,开启tomcat即可)

java调用spark_filter.jar中的Scala_Test 文件,并采用Yarn模式。采用Yarn模式,不能使用简单的修改master为“yarn-client”或“yarn-cluster”,在使用Spark-shell或者spark-submit的时候,使用这个,同时配置HADOOP_CONF_DIR路径是可以的,但是在这里,读取不到HADOOP的配置,所以这里采用其他方式,使用yarn.Clent提交的方式,java代码如下:

package test;

import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;

public class SubmitScalaJobToYarn {

	public static void main(String[] args) {
		SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-hh-mm-ss");
		String filename = dateFormat.format(new Date());
		String tmp=Thread.currentThread().getContextClassLoader().getResource("").getPath();
		tmp =tmp.substring(0, tmp.length()-8);
		String[] arg0=new String[]{
				"--name","test java submit job to yarn",
				"--class","Scala_Test",
				"--executor-memory","1G",
//				"WebRoot/WEB-INF/lib/spark_filter.jar",//
				"--jar",tmp+"lib/spark_filter.jar",//

				"--arg","hdfs://node101:8020/user/root/log.txt",
				"--arg","hdfs://node101:8020/user/root/badLines_yarn_"+filename,
				"--addJars","hdfs://node101:8020/user/root/servlet-api.jar",//
				"--archives","hdfs://node101:8020/user/root/servlet-api.jar"//
		};

//		SparkSubmit.main(arg0);
		Configuration conf = new Configuration();
		String os = System.getProperty("os.name");
		boolean cross_platform =false;
		if(os.contains("Windows")){
			cross_platform = true;
		}
		conf.setBoolean("mapreduce.app-submission.cross-platform", cross_platform);// 配置使用跨平台提交任务
		conf.set("fs.defaultFS", "hdfs://node101:8020");// 指定namenode
		conf.set("mapreduce.framework.name","yarn"); // 指定使用yarn框架
		conf.set("yarn.resourcemanager.address","node101:8032"); // 指定resourcemanager
		conf.set("yarn.resourcemanager.scheduler.address", "node101:8030");// 指定资源分配器
		conf.set("mapreduce.jobhistory.address","node101:10020");

		 System.setProperty("SPARK_YARN_MODE", "true");

		 SparkConf sparkConf = new SparkConf();
		 ClientArguments cArgs = new ClientArguments(arg0, sparkConf);

		new Client(cArgs,conf,sparkConf).run();
	}
}

3. java web测试 任务提交到Spark的两种模式,这里采用最简单的方式,直接配置servlet,其web.xml文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<web-app version="3.0"
    xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd">
  <servlet>
    <description>This is the description of my J2EE component</description>
    <display-name>This is the display name of my J2EE component</display-name>
    <servlet-name>SparkServlet</servlet-name>
    <servlet-class>servlet.SparkServlet</servlet-class>
  </servlet>
  <servlet>
    <description>This is the description of my J2EE component</description>
    <display-name>This is the display name of my J2EE component</display-name>
    <servlet-name>YarnServlet</servlet-name>
    <servlet-class>servlet.YarnServlet</servlet-class>
  </servlet>

  <servlet-mapping>
    <servlet-name>SparkServlet</servlet-name>
    <url-pattern>/servlet/SparkServlet</url-pattern>
  </servlet-mapping>
  <servlet-mapping>
    <servlet-name>YarnServlet</servlet-name>
    <url-pattern>/servlet/YarnServlet</url-pattern>
  </servlet-mapping>

</web-app>

SparkServlet如下:

package servlet;

import java.io.IOException;
import java.io.PrintWriter;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import test.SubmitScalaJobToSpark;

public class SparkServlet extends HttpServlet {

	/**
	 * Constructor of the object.
	 */
	public SparkServlet() {
		super();
	}

	/**
	 * Destruction of the servlet. <br>
	 */
	public void destroy() {
		super.destroy(); // Just puts "destroy" string in log
		// Put your code here
	}

	/**
	 * The doGet method of the servlet. <br>
	 *
	 * This method is called when a form has its tag value method equals to get.
	 *
	 * @param request the request send by the client to the server
	 * @param response the response send by the server to the client
	 * @throws ServletException if an error occurred
	 * @throws IOException if an error occurred
	 */
	public void doGet(HttpServletRequest request, HttpServletResponse response)
			throws ServletException, IOException {

		this.doPost(request, response);
	}

	/**
	 * The doPost method of the servlet. <br>
	 *
	 * This method is called when a form has its tag value method equals to post.
	 *
	 * @param request the request send by the client to the server
	 * @param response the response send by the server to the client
	 * @throws ServletException if an error occurred
	 * @throws IOException if an error occurred
	 */
	public void doPost(HttpServletRequest request, HttpServletResponse response)
			throws ServletException, IOException {
		System.out.println("开始SubmitScalaJobToSpark调用......");
		SubmitScalaJobToSpark.main(null);
		//YarnServlet也只是这里不同
		System.out.println("完成SubmitScalaJobToSpark调用!");
		response.setContentType("text/html");
		PrintWriter out = response.getWriter();
		out.println("<!DOCTYPE HTML PUBLIC \"-//W3C//DTD HTML 4.01 Transitional//EN\">");
		out.println("<HTML>");
		out.println("  <HEAD><TITLE>A Servlet</TITLE></HEAD>");
		out.println("  <BODY>");
		out.print("    This is ");
		out.print(this.getClass());
		out.println(", using the POST method");
		out.println("  </BODY>");
		out.println("</HTML>");
		out.flush();
		out.close();
	}

	/**
	 * Initialization of the servlet. <br>
	 *
	 * @throws ServletException if an error occurs
	 */
	public void init() throws ServletException {
		// Put your code here
	}

}

这里只是调用了java编写的任务调用类而已。同时,SparServlet和YarnServlet也只是在调用的地方不同而已。

在web测试时,首先直接在MyEclipse上测试,然后拷贝工程WebRoot到centos7,再次运行tomcat,进行测试。

3. 总结及问题

1. 测试结果:

1> java代码直接提交任务到Spark和Yarn,进行日志文件的过滤,测试是成功运行的。可以在Yarn和Spark的监控中看到相关信息:

同时,在HDFS可以看到输出的文件:

2> java web 提交任务到Spark和Yarn,首先需要把spark-assembly-1.4.1-hadoop2.6.0.jar中的javax.servlet文件夹删掉,因为会和tomcat的servlet-api.jar冲突。

a. 在windows和linux上启动tomcat,提交任务到Spark standAlone,测试成功运行;

b. 在windows和linux上启动tomcat,提交任务到Yarn,测试失败;

2. 遇到的问题:

1> java web 提交任务到Yarn,会失败,失败的主要日志如下:

15/08/25 11:35:48 ERROR yarn.ApplicationMaster: User class threw exception: java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse
java.lang.NoClassDefFoundError: javax/servlet/http/HttpServletResponse

这个是因为javax.servlet的包被删掉了,和tomcat的冲突。

同时,在日志中还可以看到:

15/08/26 12:39:27 INFO Client: Setting up container launch context for our AM
15/08/26 12:39:27 INFO Client: Preparing resources for our AM container
15/08/26 12:39:27 INFO Client: Uploading resource file:/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark-assembly-1.4.1-hadoop2.6.0.jar -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark-assembly-1.4.1-hadoop2.6.0.jar
15/08/26 12:39:32 INFO Client: Uploading resource file:/D:/workspase_scala/SparkWebTest/WebRoot/WEB-INF/lib/spark_filter.jar -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/spark_filter.jar
15/08/26 12:39:33 INFO Client: Uploading resource file:/C:/Users/Administrator/AppData/Local/Temp/spark-46820caf-06e0-4c51-a479-3bb35666573f/__hadoop_conf__5465819424276830228.zip -> hdfs://node101:8020/user/Administrator/.sparkStaging/application_1440464833795_0012/__hadoop_conf__5465819424276830228.zip
15/08/26 12:39:33 INFO Client: Source and destination file systems are the same. Not copying hdfs://node101:8020/user/root/servlet-api.jar
15/08/26 12:39:33 WARN Client: Resource hdfs://node101:8020/user/root/servlet-api.jar added multiple times to distributed cache.

这里在环境初始化的时候,上传了两个jar,一个就是spark-assembly-1.4.1-hadoop2.6.0.jar 还有一个就是我们自定义的jar。上传的spark-assembly-1.4.1-hadoop2.6.0.jar 里面没有javax.servlet的文件夹,所以会报错。在java中直接调用(没有删除javax.servlet的时候)同样会看到这样的日志,同样的上传,那时是可以的,也就是说这里确实是删除了包文件夹的关系。那么如何修复呢?

上传servlet-api到hdfs,同时在使用yarn.Client提交任务的时候,添加相关的参数,这里查看参数,发现两个比较相关的参数,--addJars以及--archive 参数,把这两个参数都添加后,看到日志中确实把这个jar包作为了job的共享文件,但是java web提交任务到yarn 还是报这个类找不到的错误。所以这个办法也是行不通!

使用yarn.Client提交任务到Yarn参考http://blog.sequenceiq.com/blog/2014/08/22/spark-submit-in-java/ 。

分享,成长,快乐

脚踏实地,专注

转载请注明blog地址:http://blog.csdn.net/fansy1990

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-08-28 21:05:10

Java Web提交任务到Spark的相关文章

Java Web HTTP协议的提交方式

TTP/1.1协议中共定义了八种方法(有时也叫“动作”)来表明Request-URI指定的资源的不同操作方式: OPTIONS 返回服务器针对特定资源所支持的HTTP请求方法.也可以利用向Web服务器发送'*'的请求来测试服务器的功能性. HEAD 向服务器索要与GET请求相一致的响应,只不过响应体将不会被返回.这一方法可以在不必传输整个响应内容的情况下,就可以获取包含在响应消息头中的元信息. GET 向特定的资源发出请求.注意:GET方法不应当被用于产生“副作用”的操作中,例如在Web App

提交任务到spark(以wordcount为例)

1.首先需要搭建好hadoop+spark环境,并保证服务正常.本文以wordcount为例. 2.创建源文件,即输入源.hello.txt文件,内容如下: tom jerry henry jim suse lusy 注:以空格为分隔符 3.然后执行如下命令: hadoop fs -mkdir -p /Hadoop/Input(在HDFS创建目录) hadoop fs -put hello.txt /Hadoop/Input(将hello.txt文件上传到HDFS) hadoop fs -ls

JAVA web 框架集合

“框架”犹如滔滔江水连绵不绝, 知道有它就好,先掌握自己工作和主流的框架: 在研究好用和新框架. 主流框架教程分享在Java帮帮-免费资源网 其他教程需要时间制作,会陆续分享!!! 152款框架,你还知道其他的吗? 留言你用过的web框架 Java开源web框架汇总 1 Struts2 Struts2是一个web应用框架.它不是一个Struts的新的发布版本,而是一个全新的框架.Struts2 是第二代基于Model-View-Controller (MVC)模型的web应用框架. Struts

如何通过Java程序提交yarn的mapreduce计算任务

由于项目需求,需要通过Java程序提交Yarn的MapReduce的计算任务.与一般的通过Jar包提交MapReduce任务不同,通过程序提交MapReduce任务需要有点小变动,详见以下代码. 以下为MapReduce主程序,有几点需要提一下: 1.在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分. 2.为了控制reduce的处理过程,map的输出键的格式为组合键格式.与常规的<key,value>不同,这里变为了<TextPair,Valu

MVC模式在Java Web应用程序中的实例分析

MVC作为软件架构中及其重要的一种架构思想,在实际的java web项目开发中被开发者们广泛的应用.遵循MVC思想的所产生的解决方法也可以因实际情况的不同来进行不同的选择.这里以一个应用struts+hibernate+jsp的实例来进一步认识MVC. 学生管理系统中添加学生模块的的MVC架构: Model(系统的业务逻辑):Hibernate进行管理的数据实体+定义的业务逻辑的体现--------数据库操作类,也就是通常所说的Dao层 数据实体(PO,Entity): package PO;

Java Web学习总结

一. Java WEB开发中几个重要的概念 1.HTTP请求:客户端连接上服务器后,向服务器请求某个WEB资源,称之为客户端向服务器发送了一个HTTP请求.一个完整的HTTP请求包括:一个请求头.若干消息头.以及实体内容. 2.Servlet:Servlet是sun公司提供的一门用于开发动态WEB资源的技术.sun公司在其API中提供了一个Servlet接口,用户若想开发一个动态WEB资源(即开发一个Java程序向浏览器输出数据),需要完成两个步骤:1.编写一个Java类,实现Servlet接口

java web中请求和响应中包含中文出现乱码解析

说明:在计算机中保存的一切文本信息是以一定的编码表(0,1,0,1)来保存我们所认识的字符(汉字或英文字符),由字符到计算机存储的二进制过程是编码,由读取二进制到文本的过程称为解码.而字符编码有多种不同的编码表,所以,如果编码格式和解码格式不是同一个码表就会出现乱码.想要避免出现乱码,需要使保存和读取时使用相同的码表. 在java web编程中经常会出现乱码,现在详细讲解一下如何进行设置,避免乱码 1 网页编码 在编写网页的时候,需要指定网页的编码格式,使用<meta http-equiv=&quo

Java Web学习系列——Maven Web项目中集成使用Spring、MyBatis实现对MySQL的数据访问

本篇内容还是建立在上一篇Java Web学习系列——Maven Web项目中集成使用Spring基础之上,对之前的Maven Web项目进行升级改造,实现对MySQL的数据访问. 添加依赖Jar包 这部分内容需要以下Jar包支持 mysql-connector:MySQL数据库连接驱动,架起服务端与数据库沟通的桥梁: MyBatis:一个支持普通SQL查询,存储过程和高级映射的优秀持久层框架: log4j:Apache的开源项目,一个功能强大的日志组件,提供方便的日志记录: 修改后的pom.xm

6_12 java WEB 第一节课

java web 工作原理 动态网页与静态网页最本质的区别是: 能否进行数据库操作 动态网页最大的特点就是具有交互性 ,  交互性指: 服务端会根据不同的用户请求显示不同的结果 OSI 7层参考模型 1.应用层 2.表示层 3.会话层 4.传输层 5 网络层 6 .数据链路层 7 .物理层 HTTP协议(hypertext  transfer protocol) : 超文本传输协议 ,是面向应用层的协议 TCP/IP  四层模型 1 . 应用层                 HTTP 2. 传