基于SPARK SQL 读写ORACLE 的简单案例分析常见问题

该文章出自上海harli,偷偷地把女神的东西拿出来,希望女神不要介意。

一、概述

本文主要内容包含Spark SQL读写Oracle表数据的简单案例,并针对案例中比较常见的几个问题给出解决方法。

最后从常见的java.lang.ClassNotFoundException(无法找到驱动类)的异常问题出发,分析相关的几种解决方法,以及各个解决方法之间的异同点。

二、案例中比较常见问题及其解决方法

2.1 启动

首先查看Spark 官网给出的SparkSQL的编程指南部分(http://spark.apache.org/docs/latest/sql-programming-guide.html)的JDBC To Other Databases 内容。参考命令:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

对应写出访问 Oracle的命令,如下:

SPARK_CLASSPATH=$SPARK_HOME/ojdbc14.jar bin/spark-shell –master local

其中,CLASSPATH相关内容会在后一章节给出详细分析,在此仅针对其他一些常见问题给出解决方法。

启动过程如下(部分字符串已经被替换,如:$SPARK_HOME):

[[email protected] spark-1.5.2-bin-hadoop2.6]SPARKCLASSPATH=SPARK_HOME/lib/ojdbc14.jar bin/spark-shell –master local

……

Welcome to

__

/ / _ _/ /__

\ \/ \/ _ `/ _/ ‘/

// ./_,// //_\ version 1.5.2

/_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)

Type in expressions to have them evaluated.

Type :help for more information.

16/04/18 11:56:35 INFO spark.SparkContext: Running Spark version 1.5.2

16/04/18 11:56:35 WARN spark.SparkConf:

SPARK_CLASSPATH was detected (set to ‘$SPARK_HOME/lib/ojdbc14.jar’).

This is deprecated in Spark 1.0+.

Please instead use:

- ./spark-submit with –driver-class-path to augment the driver classpath

- spark.executor.extraClassPath to augment the executor classpath

16/04/18 11:56:35 WARN spark.SparkConf: Setting ‘spark.executor.extraClassPath’ to ‘SPARKHOME/lib/ojdbc14.jar′asawork?around.16/04/1811:56:35WARNspark.SparkConf:Setting‘spark.driver.extraClassPath′to‘SPARK_HOME/lib/ojdbc14.jar’ as a work-around.

……

16/04/18 11:56:51 INFO server.AbstractConnector: Started [email protected]:4040

16/04/18 11:56:51 INFO util.Utils: Successfully started service ‘SparkUI’ on port 4040.

16/04/18 11:56:51 INFO ui.SparkUI: Started SparkUI at http://192.168.149.86:4040

……

16/04/18 11:56:57 INFO repl.SparkILoop: Created sql context (with Hive support)..

SQL context available as sqlContext.

下面给出简单读写案例中的常见问题及其解决方法。

2.2 访问表数据时报无法找到驱动类

scala> val url = “jdbc:oracle:thin:@xx.xx.xx.xx:1521:dbsid”

url: String = jdbc:oracle:thin:@xx.xx.xx.xx:1521:dbsid

// 通过format指定访问jdbc,通过options指定访问时的参数,然后load加载数据

scala> val jdbcDF =sqlContext.read.format(“jdbc”).options( Map( “url” ->url, “dbtable” -> “MyTable”, “user” ->”username”, “password” -> ” password “)).load()

java.sql.SQLException: No suitable driverfound for jdbc:oracle:thin:@xx.xx.xx.xx:1521:dbsid

at java.sql.DriverManager.getConnection(DriverManager.java:596)

at java.sql.DriverManager.getConnection(DriverManager.java:187)

at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD

anonfun$getConnector$1.apply(JDBCRDD.scala:188)atorg.apache.spark.sql.execution.datasources.jdbc.JDBCRDD

anonfungetConnector1.apply(JDBCRDD.scala:181)

atorg.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.resolveTable(JDBCRDD.scala:121)atorg.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:91)atorg.apache.spark.sql.execution.datasources.jdbc.DefaultSource.createRelation(DefaultSource.scala:60)atorg.apache.spark.sql.execution.datasources.ResolvedDataSource.apply(ResolvedDataSource.scala:125)

at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:114)

……

报错信息为:

java.sql.SQLException: No suitable driver found for jdbc:oracle:thin:@xx.xx.xx.xx:1521:dbsid

即报错提示无法找到合适的驱动时,此时可以通过在options方法中指定”driver”属性为具体的驱动类来解决,如下所示:

scala> val jdbcDF =sqlContext.read.format(“jdbc”).options( Map( “url” ->url, “dbtable” -> “TEST_TABLE”, “user” ->”userName”, “password” -> “password”,”driver” -> “oracle.jdbc.driver.OracleDriver”)).load()

jdbcDF: org.apache.spark.sql.DataFrame = [ID:decimal(0,-127), XX: string, XX: string, XX: string, XX: decimal(0,-127)]

注意:在1.5版本中,当Oracle表字段为Number时,对应DataType为decimal,此时会由于scala的精度断言抛出异常——可以在stackoverflow网站查找该异常——应该是个bug,在1.6中应该已经解决。有兴趣可以试下下。——如果继续show数据的话,会抛出该异常。

补充:手动加载驱动类也可以解决。

2.3 访问表数据时报无效调用参数

另外,当属性名写错时的错误信息:

——由于在通常写jdbc的连接时,使用username标识用户名,(比如在sqoop工具中),在SparkSQL中需要使用user,否则会报以下错误(可以在官网或API的描述中查找相关属性名):

scala> val jdbcDF =sqlContext.read.format(“jdbc”).options( Map( “url” ->url, “dbtable” -> “TEST_TABLE”, “username”-> “userName”, “password”-> “password”)).load()

java.sql.SQLException: invalid arguments incall

at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:112)

at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:146)

at oracle.jdbc.driver.DatabaseError.throwSqlException(DatabaseError.java:208)

at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:236)

当出现invalid arguments in call错误,表示调用时使用了无效参数,可以检查options中的参数名是否写错,尤其是user是否写成了username。

下面解析最常见的无法找到驱动类的问题及其解决方法。

三、无法找到无法找到驱动类的问题及其解决方法

扩展知识:JVM的类加载机制——该知识是解决这一类问题的根本方法。

下面的解决方法主要从JVM进程的classpath配置出发去分析。

在Spark框架中,为JVM进程的classpath配置提供了以下几种方式:

  1. 使用环境变量SPARK_CLASSPATH:将jar包放置在类查找路径,比如前面的SPARK_CLASSPATH,设置该变量后可以查看4040(默认应用监控端口)的环境设置,此时需要在driver和executor的classpath的路径中放置所需的jar包。

由前面启动过程中的日志可知,这种方式已经废弃,对应的方式是直接设置driver和executor的classpath相关的配置属性。

  1. 使用配置属性分别设置driver和executor执行时加载jar包的路径。

    a) spark.executor.extraClassPath

    b) spark.driver.extraClassPath

取代使用环境变量进行配置的方法。由于通常情况下,driver和executor的进程不在同一个节点上,因此分别给出各自的配置属性。

和前面使用环境变量进行配置的方式一样,在设置相应的classpath路径之后,需要将对应的jar包部署到各个节点上的该classpath路径下。

说明:上述两种方法都是通过设置JVM进程的classpath属性,为进程提供jar包查找路径的。而对应的jar包,需要人为地部署到各个节点的对应路径下。

补充:另外可以通过提交命令的命令行选项–driver-class-path来设置driver端进程的classpath,原理类似,因此不作为单独一种。

注意:–conf命令项方式设置时,–confPROP=VALUE的等号中间不要加空格….

  1. 除了前面两种方式外,Spark框架还提供了第三种方法,可以通过提交命令(spark-submit或spark-shell)的命令行选项–jars,自动上传、下载所需jar包,并同时将下载的jar包放入JVM进程的classpath路径中。

说明:通过该命令行选项设置的jar包,会通过http服务(注意,该服务在driver端启动后才会启动)上传jar包,并在executor端执行时下载该jar包,然后放入classpath。即,此时不需要手动部署到各个节点上,但每个提交的应用都会增加jar包上传、下载的网络IO、磁盘IO开销。

下面针对提交时的–master选项、–deploymode的选项分别进行解析。

不同选项的简单说明如下:

  1. 当–master选项为local时,对应为in-process方式,此时仅一个进程(local-cluster时也对应一个进程,内部通过实例模拟),因此,对应的classpath实际上使用的都是driver短对应进程的classpath。即只需要配置driver的classpath即可。
  2. 当–master选项为集群的MasterURL(本文主要基于Standalone模式的集群)时,对应driver和executor是以不同的进程方式启动,因此需要分别进行设置。并且,在不同的部署模式(–deploymode)下也会有细节上的差异(本质上还是根据JVM类加载机制):

    a) –deploymode为client时:driver进程在当前提交节点上启动

    b) –deploymode为cluster时:driver进程提交到集群中,由集群调度Master负责分配节点,并在分配的节点上启动driver进程。

说明:不同的部署模式和jar包的上传、下载有关,即在使用–jars方式时会有所差异,其关键点在于,jar包的上传、下载是通过driver进程启动过程中启动的http服务来完成的,当指定的jar包是以本地文件系统的路径提供时,在另一个节点启动的driver进程中的http服务根据该路径上传jar包时,也会根据本地文件系统指定的路径去上传,所以此时必须保证由Master节点分配给driver的节点,对应的该路径上也存在需要上传的jar包。

因此,建议在使用cluster部署模式提交应用程序时,所使用的路径尽可能与节点无关,比如使用hdfs文件系统的路径等。

3.1 测试类的设计

关于下面使用的SparkTest.jar中的com.TestClass测试类,分别在driver端和executor端同时访问Oracle的表数据,即两处都需要加载Oracle的驱动器类。

通过这种方式,方便分别测试driver端和executor端与各自的classpath配置及其jar包放置等相关的内容。

3.1.1 包含Driver与Executor执行逻辑的代码

如下所示:

object TestJarwithOracle {

// 硬编码

valurl = “jdbc:oracle:thin:@xx.xx.xx.xx:1521:dbsid”

valuser = “userName”

valpassword = “password”

valdbtable = “TABLE_TEST”

defmain(args: Array[String]) {

val conf = new SparkConf().setAppName(this.getClass.getSimpleName)

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val logRDD = sc.parallelize( List( (“name”,”action”, “date”, 1), (“name”,”action”, “date”, 2) ))

// 处理逻辑:在Driver端

deleteRecodes(“date”)

// 处理逻辑:位于Executor端

logRDD.foreachPartition(insertInto)

sc.stop()

}

defdeleteRecodes(date: String): Unit = {

var conn: Connection = null

var ps: PreparedStatement = null

val sql = s”delete from dbtablewherelogdatein(′date’)”

println(sql)

try {

Class.forName(“oracle.jdbc.driver.OracleDriver”)

conn = DriverManager.getConnection(url, user, password)

ps = conn.prepareStatement(sql)

ps.executeUpdate()

}catch {

case e: Exception => e.printStackTrace

}finally {

if (ps != null) {

ps.close()

}

if (conn != null) {

conn.close()

}

}

}

definsertInto(iterator: Iterator[(String, String, String, Int)]): Unit = {

var conn: Connection = null

var ps: PreparedStatement = null

val sql = s”insert into $dbtable( USER_ACTION, USER_NAME, LOG_DATE,ACTION_CNT) values (?, ?, ?, ?)”

try {

//conn =DriverManager.getConnection(“jdbc:mysql://localhost:3306/spark”,”root”, “123456”)

Class.forName(“oracle.jdbc.driver.OracleDriver”)

conn = DriverManager.getConnection(url, user, password)

iterator.foreach(data => {

ps = conn.prepareStatement(sql)

ps.setString(1, data._1)

ps.setString(2, data._2)

ps.setString(3, data._3)

ps.setInt(4, data._4)

// ps.setInt(3, data._3)

ps.executeUpdate()

}

)

}catch {

case e: Exception => e.printStackTrace

}finally {

if (ps != null) {

ps.close()

 }
 if (conn != null) {
   conn.close()
 }
}

}

}

3.1.2 代码说明

前面给出了从oracle(其他数据库基本一样)中读取的简单案例,对应insert的话,和普通oracle表的insert类似,需要注意的是连接的创建位置(可以参考Spark官网的流部分)。

大致原理简单描述如下:

  1. Driver端创建的对象需要序列化到Executor端才能使用,当特定对象(如数据库连接)与具体节点绑定(如hostname绑定)时,即使序列化成功,在Executor端反序列化后对象也不能使用(比如反序列化时的初始化失败或hostname不同导致连接等无法使用等)
  2. 一个分区对应一个task,Executor上的执行单位为task。

    在一个执行单位中复用,即针对分区提供一个连接——可以复用连接池。

    比如:rdd.foreachPartition(insertInto)

对应的function :insertInto,和普通的数据库insert方式是一样的(可以采用批量插入),针对每个分区Partition,然后获取或构建一个数据库连接,通过该连接将分区的Iterator数据插入到数据库表

3.2 master为local时

首先查看下SparkSubmit提交应用程序时,一些不支持的组合形式,对应代码如下所示:

private[deploy] defprepareSubmitEnvironment(args: SparkSubmitArguments)

……

case (LOCAL, CLUSTER) =>

printErrorAndExit(“Cluster deploymode is not compatible with master \”local\”“)

,,,,,,

即,在使用local与local-cluster这种local方式时,不支持以CLUSTER的部署模式提交应用程序。

因此以下对应都是在CLIENT的部署模式提交应用程序。

3.2.1 环境变量方式

  1. 命令:

    SPARK_CLASSPATH=$SPARK_HOME/ojdbc14.jar….

  2. 说明:

    指定SPARK_CLASSPATH时,相当于同时指定driver和executor的classpath,根据前面的分析,实际上local模式下只需要设置driver端的classpath即可,同时需要手动在该路径下方式所需的jar包,否则会抛出驱动类无法找到的异常。

参考前面启动章节的启动日志中的红色斜体部分,表示的是SPARK_CLASSPATH已经被废弃,建议使用斜体部分的对应配置属性进行替换。

Please instead use:

- ./spark-submit with –driver-class-pathto augment the driver classpath

-spark.executor.extraClassPath to augment the executor classpath

16/04/18 11:56:35 WARN spark.SparkConf:Setting ‘spark.executor.extraClassPath’ to ‘SPARKHOME/lib/ojdbc14.jar′asawork?around.16/04/1811:56:35WARNspark.SparkConf:Setting′spark.driver.extraClassPath′to‘SPARK_HOME/lib/ojdbc14.jar’ as awork-around.

  1. 测试方式一:在环境变量的路径下不存在所需jar包,driver和executor端加载类同时异常,执行命令及其异常日志如下所示:

[[email protected] spark-1.5.2-bin-hadoop2.6]SPARKCLASSPATH=SPARK_HOME/ojdbc14.jar $SPARK_HOME/bin/spark-submit –master local \

–deploy-mode client \

–driver-memory 2g \

–driver-cores 1 \

–total-executor-cores 2 \

–executor-memory4g \

–conf”spark.ui.port”=4081 \

–class com.mb.TestJarwithOracle \

/tmp/test/Spark15.jar

16/04/26 10:31:39 INFO spark.SparkContext: RunningSpark version 1.5.2

16/04/26 10:31:40 WARN util.NativeCodeLoader: Unableto load native-hadoop library for your platform… using builtin-java classeswhere applicable

16/04/26 10:31:40 WARN spark.SparkConf:

SPARK_CLASSPATH was detected (set to ‘/ojdbc14.jar’).

This is deprecated in Spark 1.0+.

Please instead use:

-./spark-submit with –driver-class-path to augment the driver classpath

-spark.executor.extraClassPath to augment the executor classpath

16/04/26 10:31:40 WARN spark.SparkConf: Setting’spark.executor.extraClassPath’ to ‘/ojdbc14.jar’ as a work-around.

16/04/26 10:31:40 WARN spark.SparkConf: Setting’spark.driver.extraClassPath’ to ‘/ojdbc14.jar’ as a work-around.

16/04/26 10:31:40 INFO spark.SecurityManager: Changingview acls to: hdfs

16/04/26 10:31:40 INFO spark.SecurityManager: Changingmodify acls to: hdfs

16/04/26 10:31:40 INFO spark.SecurityManager:SecurityManager: authentication disabled; ui acls disabled; users with viewpermissions: Set(hdfs); users with modify permissions: Set(hdfs)

16/04/26 10:31:41 INFO slf4j.Slf4jLogger: Slf4jLoggerstarted

16/04/26 10:31:41 INFO Remoting: Starting remoting

16/04/26 10:31:42 INFO Remoting: Remoting started;listening on addresses :[akka.tcp://[email protected]:32898]

16/04/26 10:31:42 INFO util.Utils: Successfullystarted service ‘sparkDriver’ on port 32898.

16/04/26 10:31:42 INFO spark.SparkEnv: RegisteringMapOutputTracker

16/04/26 10:31:42 INFO spark.SparkEnv: RegisteringBlockManagerMaster

16/04/26 10:31:42 INFO storage.DiskBlockManager:Created local directory at /tmp/blockmgr-43894e1f-4546-477c-91f9-766179306112

16/04/26 10:31:42 INFO storage.MemoryStore:MemoryStore started with capacity 1060.3 MB

16/04/26 10:31:42 INFO spark.HttpFileServer: HTTP Fileserver directory is/tmp/spark-0294727b-0b57-48ff-9f36-f441fa3604aa/httpd-cec03f6e-ca66-49b8-94b9-39427a86ed65

16/04/26 10:31:42 INFO spark.HttpServer: Starting HTTPServer

16/04/26 10:31:42 INFO server.Server:jetty-8.y.z-SNAPSHOT

16/04/26 10:31:42 INFO server.AbstractConnector:Started [email protected]:57115

16/04/26 10:31:42 INFO util.Utils: Successfullystarted service ‘HTTP file server’ on port 57115.

16/04/26 10:31:42 INFO spark.SparkEnv: RegisteringOutputCommitCoordinator

16/04/26 10:31:57 INFO server.Server:jetty-8.y.z-SNAPSHOT

16/04/26 10:31:57 INFO server.AbstractConnector:Started [email protected]:4081

16/04/26 10:31:57 INFO util.Utils: Successfullystarted service ‘SparkUI’ on port 4081.

16/04/26 10:31:57 INFO ui.SparkUI: Started SparkUI athttp://192.168.149.86:4081

16/04/26 10:31:57 INFO spark.SparkContext: Added JARfile:/tmp/test/Spark15.jar at http://192.168.149.86:57115/jars/Spark15.jar withtimestamp 1461637917682

16/04/26 10:31:57 WARN metrics.MetricsSystem: Usingdefault name DAGScheduler for source because spark.app.id is not set.

16/04/26 10:31:57 INFO executor.Executor: Startingexecutor ID driver on host localhost

16/04/26 10:31:58 INFO util.Utils: Successfullystarted service ‘org.apache.spark.network.netty.NettyBlockTransferService’ onport 23561.

16/04/26 10:31:58 INFOnetty.NettyBlockTransferService: Server created on 23561

16/04/26 10:31:58 INFO storage.BlockManagerMaster:Trying to register BlockManager

16/04/26 10:31:58 INFOstorage.BlockManagerMasterEndpoint: Registering block manager localhost:23561with 1060.3 MB RAM, BlockManagerId(driver, localhost, 23561)

16/04/26 10:31:58 INFO storage.BlockManagerMaster:Registered BlockManager

16/04/26 10:31:59 INFO scheduler.EventLoggingListener:Logging events to hdfs://nodemaster:8020/user/hdfs/sparklogs/local-1461637917735

delete from TEST_TABLE where log_date in (‘date’)

java.lang.ClassNotFoundException:oracle.jdbc.driver.OracleDriver

atjava.net.URLClassLoader1.run(URLClassLoader.java:366)atjava.net.URLClassLoader1.run(URLClassLoader.java:355)

atjava.security.AccessController.doPrivileged(Native Method)

atjava.net.URLClassLoader.findClass(URLClassLoader.java:354)

atjava.lang.ClassLoader.loadClass(ClassLoader.java:425)

atjava.lang.ClassLoader.loadClass(ClassLoader.java:358)

atjava.lang.Class.forName0(Native Method)

atjava.lang.Class.forName(Class.java:191)

atcom.mb.TestJarwithOracle.deleteRecodes(TestJarwithOracle.scala:38)atcom.mb.TestJarwithOracle.main(TestJarwithOracle.scala:27)

atcom.mb.TestJarwithOracle.main(TestJarwithOracle.scala)

atsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

atsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

atjava.lang.reflect.Method.invoke(Method.java:606)

atorg.apache.spark.deploy.SparkSubmit.orgapachesparkdeploySparkSubmitrunMain(SparkSubmit.scala:674)atorg.apache.spark.deploy.SparkSubmit.doRunMain1(SparkSubmit.scala:180)atorg.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:205)

atorg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala:120)  
        atorg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)  
16/04/26 10:31:59 INFO spark.SparkContext: Startingjob: foreachPartition at TestJarwithOracle.scala:28  
16/04/26 10:31:59 INFO scheduler.DAGScheduler: Got job0 (foreachPartition at TestJarwithOracle.scala:28) with 1 output partitions  
16/04/26 10:31:59 INFO scheduler.DAGScheduler: Finalstage: ResultStage 0(foreachPartition at TestJarwithOracle.scala:28)  
16/04/26 10:31:59 INFO scheduler.DAGScheduler: Parentsof final stage: List()  
16/04/26 10:31:59 INFO scheduler.DAGScheduler: Missingparents: List()  
16/04/26 10:32:00 INFO scheduler.DAGScheduler:Submitting ResultStage 0 (ParallelCollectionRDD[0] at parallelize atTestJarwithOracle.scala:26), which has no missing parents  
16/04/26 10:32:00 INFO storage.MemoryStore:ensureFreeSpace(1200) called with curMem=0, maxMem=1111794647  
16/04/26 10:32:00 INFO storage.MemoryStore: Blockbroadcast_0 stored as values in memory (estimated size 1200.0 B, free 1060.3MB)  
16/04/26 10:32:00 INFO storage.MemoryStore:ensureFreeSpace(851) called with curMem=1200, maxMem=1111794647  
16/04/26 10:32:00 INFO storage.MemoryStore: Blockbroadcast_0_piece0 stored as bytes in memory (estimated size 851.0 B, free1060.3 MB)  
16/04/26 10:32:00 INFO storage.BlockManagerInfo: Addedbroadcast_0_piece0 in memory on localhost:23561 (size: 851.0 B, free: 1060.3MB)  
16/04/26 10:32:00 INFO spark.SparkContext: Createdbroadcast 0 from broadcast at DAGScheduler.scala:861  
16/04/26 10:32:00 INFO scheduler.DAGScheduler:Submitting 1 missing tasks from ResultStage 0 (ParallelCollectionRDD[0] atparallelize at TestJarwithOracle.scala:26)  
16/04/26 10:32:00 INFO scheduler.TaskSchedulerImpl:Adding task set 0.0 with 1 tasks  
16/04/26 10:32:00 INFO scheduler.TaskSetManager:Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 2310 bytes)  
16/04/26 10:32:00 INFO executor.Executor: Running task0.0 in stage 0.0 (TID 0)  
16/04/26 10:32:00 INFO executor.Executor: Fetchinghttp://192.168.149.86:57115/jars/Spark15.jar with timestamp 1461637917682  
16/04/26 10:32:00 INFO util.Utils: Fetchinghttp://192.168.149.86:57115/jars/Spark15.jar to/tmp/spark-0294727b-0b57-48ff-9f36-f441fa3604aa/userFiles-9b936a62-13aa-4ac4-8c26-caabe7bd4367/fetchFileTemp8857984169855770119.tmp  
16/04/26 10:32:00 INFO executor.Executor: Addingfile:/tmp/spark-0294727b-0b57-48ff-9f36-f441fa3604aa/userFiles-9b936a62-13aa-4ac4-8c26-caabe7bd4367/Spark15.jarto class loader  
java.lang.ClassNotFoundException:oracle.jdbc.driver.OracleDriver  
        atjava.net.URLClassLoader1.run(URLClassLoader.java:366)

atjava.net.URLClassLoader1.run(URLClassLoader.java:355)atjava.security.AccessController.doPrivileged(NativeMethod)atjava.net.URLClassLoader.findClass(URLClassLoader.java:354)atjava.lang.ClassLoader.loadClass(ClassLoader.java:425)atjava.lang.ClassLoader.loadClass(ClassLoader.java:358)atjava.lang.Class.forName0(NativeMethod)atjava.lang.Class.forName(Class.java:191)atcom.mb.TestJarwithOracle.insertInto(TestJarwithOracle.scala:61)

atcom.mb.TestJarwithOracle

anonfun$main$1.apply(TestJarwithOracle.scala:28)atcom.mb.TestJarwithOracle

anonfunmain1.apply(TestJarwithOracle.scala:28)

atorg.apache.spark.rdd.RDD

anonfun$foreachPartition$1

anonfunapply29.apply(RDD.scala:902)

atorg.apache.spark.rdd.RDD

anonfun$foreachPartition$1

anonfunapply29.apply(RDD.scala:902)

atorg.apache.spark.SparkContext

anonfun$runJob$5.apply(SparkContext.scala:1850)atorg.apache.spark.SparkContext

anonfunrunJob5.apply(SparkContext.scala:1850)

atorg.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)

atorg.apache.spark.scheduler.Task.run(Task.scala:88)

atorg.apache.spark.executor.ExecutorTaskRunner.run(Executor.scala:214)atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)atjava.util.concurrent.ThreadPoolExecutorWorker.run(ThreadPoolExecutor.java:615)

atjava.lang.Thread.run(Thread.java:745)

16/04/26 10:32:00 INFO executor.Executor: Finishedtask 0.0 in stage 0.0 (TID 0). 915 bytes result sent to driver

16/04/26 10:32:00 INFO scheduler.TaskSetManager:Finished task 0.0 in stage 0.0 (TID 0) in 235 ms on localhost (1/1)

两个异常分别对应driver端和executor端。

  1. 测试方式二:在环境变量的路径下存在所需jar包,driver和executor端加载类正常。

环境变量同时设置了driver端和executor端的classpath,只要该路径下有jar包,驱动类即可加载。

下面通过配置属性分别进行配置并测试。

3.2.2 配置属性方式

  1. 使用配置属性提交应用程序的命令:

SPARKHOME/bin/spark?submit–masterspark://masternode:7078 –conf“spark.executor.extraClassPath”=”SPARK_HOME/lib/ojdbc14.jar”\

–conf”spark.driver.extraClassPath”=”$SPARK_HOME/lib/ojdbc14.jar” \

–conf “spark.ui.port”=4061 \

–class com.TestClass \

/tmp/test/SparkTest.jar

通过前面的分析,在local下,只”spark.driver.extraClassPath”有效。

  1. 说明:

    此时,配置属性只是指定jar包在classpath指定的路径下,但没有手动将所需的jar包部署到该路径下,因此加载驱动器类时会抛出java.lang.ClassNotFoundException:oracle.jdbc.driver.OracleDriver的异常。

  2. 执行日志如下所示:

16/04/14 14:23:18 INFOspark.MapOutputTrackerMaster: Size of output statuses for shuffle 0 is 154bytes

16/04/14 14:23:19 WARNscheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, 192.168.149.98):java.lang.ClassNotFoundException: oracle.jdbc.driver.OracleDriver

at java.net.URLClassLoader1.run(URLClassLoader.java:366)atjava.net.URLClassLoader1.run(URLClassLoader.java:355)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:191)

at com.TestClass.insertInto(WebLogExtractor.scala:48)atcom.TestClassanonfunmain1.apply(WebLogExtractor.scala:43)atcom.TestClassanonfunmain1.apply(WebLogExtractor.scala:43)atorg.apache.spark.rdd.RDDanonfunforeachPartition1anonfunapply29.apply(RDD.scala:902)atorg.apache.spark.rdd.RDDanonfunforeachPartition1anonfunapply29.apply(RDD.scala:902)atorg.apache.spark.SparkContextanonfunrunJob5.apply(SparkContext.scala:1850)atorg.apache.spark.SparkContextanonfunrunJob5.apply(SparkContext.scala:1850)atorg.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)atorg.apache.spark.scheduler.Task.run(Task.scala:88)atorg.apache.spark.executor.ExecutorTaskRunner.run(Executor.scala:214)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

atjava.lang.Thread.run(Thread.java:745)

  1. 解决方法:

    将对应ojdbc14.jar拷贝到当前节点的”spark.driver.extraClassPath”路径下即可。

  2. 扩展:

    可以尝试”spark.executor.extraClassPath”与”spark.driver.extraClassPath”是否设置的几种情况分别进行测试,以验证前面针对local时,仅”spark.driver.extraClassPath”有效的分析。

再次总结:只要设置”spark.driver.extraClassPath”,并在该路径下放置了jar包即可。

3.2.3 自动上传jar包方式

当部署模式为CLIENT时,应用程序(对应Driver)会将childMainClass设置为传入的mainClass,然后启动JVM进程,对应代码如下所示:

if (deployMode == CLIENT) {

childMainClass = args.mainClass

if(isUserJar(args.primaryResource)) {

childClasspath += args.primaryResource

}

if(args.jars != null) { childClasspath ++= args.jars.split(“,”) }

if(args.childArgs != null) { childArgs ++= args.childArgs }

}

在client模式,直接启动应用程序的主类,同时,将主类的jar包和添加的jars包(如果在参数中设置的话)都添加到运行时的classpath中。即当前的driver的classpath会自动包含–jars 设置的jar包。

同时,driver通过启动的http服务上传该jar包,executor在执行时下载该jar包,同时放置到executor进程的classpath路径。

测试案例的构建:删除前面的环境变量或两个配置属性的设置,直接用–jars命令行选项指定所需的第三方jar包(即这里的驱动类jar包)即可。例如:

SPARKHOME/bin/spark?submit–masterlocal –deploy?modeclient –driver?memory2g –driver?cores1 –total?executor?cores2 –executor?memory4g –conf“spark.ui.port”=4081 –classcom.mb.TestJarwithOracle –jars”SPARK_HOME/thirdlib/ojdbc14.jar” \

/tmp/test/Spark15.jar

此时会直–jars指定的jar包加入classpath路径,因此可以成功加载驱动类。

当将应用程序提交到集群中时,对应不同的部署模式(–deploy-mode)会有不同的情况,因此下面分别针对不同的部署模式进行分析。

3.3 master为集群的MasterURL+部署模式为Client时

Client部署模式时,在提交点启动应用程序,因此对应driver端也在提交节点。此时,”spark.driver.extraClassPath”路径对应提交节点的路径。Executor则由调度分配到其他执行节点,此时”spark.executor.extraClassPath”对应的路径应是针对实际分配执行executor的节点(不是提交节点!)。

3.3.1 配置属性方式

属于:集群中jar包部署+ 配置属性的方式

通过前面的测试与分析,应该可以知道配置属性的方式只是将所需jar包放入类加载时查找的路径中,而对应的jar包需要人为去部署。

对应在分布式集群环境下,相关的JVM进程可能在各个节点上启动,包括driver和executor进程。因此当某个节点启动某类进程时,需要保证已经手动在该节点上,对应于配置属性所设置的路径下,已经存在或部署了所需的jar包。

进一步地,通常由资源调度器负责分配节点,运行进程,因此为了保证分配的节点上的进程能成功加载所需类,应该在集群的所有节点上部署所需jar包。

优点:一次部署多次使用

缺点:jar包冲突

3.3.1.1 测试1

  1. 启动命令:

SPARKHOME/bin/spark?submit–masterspark://masternode:7078 –conf”spark.executor.extraClassPath”=”SPARK_HOME/thirdlib/ojdbc14.jar”\

–conf”spark.driver.extraClassPath”=”$SPARK_HOME/thirdlib/ojdbc14.jar” \

–conf “spark.ui.port”=4071 \

–class com.mb.TestJarwithOracle \

/tmp/test/SparkTest.jar

  1. 测试说明:

    a) 通过–conf 命令行选项,设置Driver与Executor端的classpath配置属性。

    b) 在Driver端的classpath路径下放置所需的jar包。

    c) 在Executor端的classpath路径下删除所需的jar包。

补充:这里放置或删除jar包,可以简单通过修改对应配置属性的路径来模拟。

  1. 测试结果:

    a) Driver端与Oracle的操作:由于设置了classpath路径,同时该路径下放置了所需的jar包,因此操作成功,直接查看driver的终端输出日志。

    b) Executor端与Oracle的操作:虽然设置了classpath路径,但该路径下没有放置所需的jar包,因此操作失败,错误信息如下所示(查看4040-默认端口-的executor页面,找到对应的stderr日志信息):

3.3.1.2 测试2

  1. 启动命令:

SPARKHOME/bin/spark?submit–masterspark://masternode:7078 –conf”spark.executor.extraClassPath”=”SPARK_HOME/thirdlib/ojdbc14.jar”\

–conf”spark.driver.extraClassPath”=”$SPARK_HOME/thirdlib/ojdbc14.jar” \

–conf “spark.ui.port”=4071 \

–class com.mb.TestJarwithOracle \

/tmp/test/SparkTest.jar

  1. 测试说明:

    a) 通过–conf 命令行选项,设置Driver与Executor端的classpath配置属性。

    b) 在Executor端的classpath路径下放置所需的jar包。

    c) 在Driver端的classpath路径下删除所需的jar包。

  2. 测试结果:

    a) Executor端与Oracle的操作:由于设置了classpath路径,同时该路径下放置了所需的jar包,因此操作成功。

    b) Driver 端与Oracle的操作:虽然设置了classpath路径,但该路径下没有放置所需的jar包,因此操作失败,直接查看终端的错误信息,如下所示:

java.lang.ClassNotFoundException:oracle.jdbc.driver.OracleDriver

at java.net.URLClassLoader1.run(URLClassLoader.java:366)atjava.net.URLClassLoader1.run(URLClassLoader.java:355)

at java.security.AccessController.doPrivileged(Native Method)

at java.net.URLClassLoader.findClass(URLClassLoader.java:354)

at java.lang.ClassLoader.loadClass(ClassLoader.java:425)

at java.lang.ClassLoader.loadClass(ClassLoader.java:358)

at java.lang.Class.forName0(Native Method)

at java.lang.Class.forName(Class.java:191)

atcom.mb.TestJarwithOracle.deleteRecodes(TestJarwithOracle.scala:38)atcom.mb.TestJarwithOracle.main(TestJarwithOracle.scala:27)

at com.mb.TestJarwithOracle.main(TestJarwithOracle.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

atsun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

atsun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

atorg.apache.spark.deploy.SparkSubmit.orgapachesparkdeploySparkSubmitrunMain(SparkSubmit.scala:674)atorg.apache.spark.deploy.SparkSubmit.doRunMain1(SparkSubmit.scala:180)atorg.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:205)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)

atorg.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

补充说明:Driver端捕捉了异常,因此Executor可以继续执行。

3.3.1.3 扩展1

在classpath对应的这两个配置属性中,使用不同路径的结果是不同的,比如前面测试案例中的配置属性对应路径修改为hdfs文件系统路径时,driver端的驱动类加载会抛出异常,命令如下:

$SPARK_HOME/bin/spark-submit –master spark://nodemaster:7078 \

–deploy-mode client \

–driver-memory 2g \

–driver-cores 1 \

–total-executor-cores 2 \

–executor-memory 4g \

–conf “spark.ui.port”=4081 \

–conf “spark.executor.extraClassPath”=”hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar”\

–conf “spark.driver.extraClassPath”=”hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar” \

–classcom.mb.TestJarwithOracle \

/tmp/test/Spark15.jar

简单理解:

  1. Driver端classpath相关的配置:在启动应用程序(Driver)时作为JVM的Options使用,此时只能识别本地路径,使用hdfs文件系统路径的话,无法识别,因此类加载会失败。
  2. Executor端classpath相关的配置:会根据指定的路径去下载jar包,hdfs等文件系统以及被封装,因此可以下载到本地——对应默认在work路径的app目录中,然后添加到仅的classpath路径下,因此可以识别hdfs等文件系统的路径。

3.3.1.4 扩展2

由于之前有人提过几次类似的问题,再此顺便给出简单说明。

异常日志如下所示:

16/04/26 11:35:58 ERROR util.SparkUncaughtExceptionHandler:Uncaught exception in thread Thread[appclient-registration-retry-thread,5,main]

java.util.concurrent.RejectedExecutionException:Task [email protected] rejected [email protected][Running, pool size = 1, activethreads = 1, queued tasks = 0, completed tasks = 0]

atjava.util.concurrent.ThreadPoolExecutorAbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)atjava.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)atjava.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)atjava.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:110)atorg.apache.spark.deploy.client.AppClientClientEndpoint

anonfun$tryRegisterAllMasters$1.apply(AppClient.scala:96)atorg.apache.spark.deploy.client.AppClient$ClientEndpoint

anonfuntryRegisterAllMasters1.apply(AppClient.scala:95)

atscala.collection.TraversableLike

anonfun$map$1.apply(TraversableLike.scala:244)atscala.collection.TraversableLike

anonfunmap1.apply(TraversableLike.scala:244)

atscala.collection.IndexedSeqOptimizedclass.foreach(IndexedSeqOptimized.scala:33)atscala.collection.mutable.ArrayOpsofRef.foreach(ArrayOps.scala:108)

at scala.collection.TraversableLikeclass.map(TraversableLike.scala:244)atscala.collection.mutable.ArrayOpsofRef.map(ArrayOps.scala:108)

at org.apache.spark.deploy.client.AppClientClientEndpoint.tryRegisterAllMasters(AppClient.scala:95)atorg.apache.spark.deploy.client.AppClientClientEndpoint.orgapachesparkdeployclientAppClientClientEndpoint

registerWithMaster(AppClient.scala:121)atorg.apache.spark.deploy.client.AppClient$ClientEndpoint

anon2anonfunrun1.applymcVsp(AppClient.scala:132)atorg.apache.spark.util.Utils.tryOrExit(Utils.scala:1119)
atorg.apache.spark.deploy.client.AppClientClientEndpointanon2.run(AppClient.scala:124)

atjava.util.concurrent.ExecutorsRunnableAdapter.call(Executors.java:471)atjava.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)atjava.util.concurrent.ScheduledThreadPoolExecutorScheduledFutureTask.access301(ScheduledThreadPoolExecutor.java:178)atjava.util.concurrent.ScheduledThreadPoolExecutorScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)

atjava.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

atjava.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

16/04/26 11:35:58 INFOstorage.DiskBlockManager: Shutdown hook called

16/04/26 11:35:58 INFO util.ShutdownHookManager:Shutdown hook called

仅根据异常无法判断具体错误信息,需要跟踪其堆栈信息。根据堆栈信息,可以知道是AppClient(代表应用程序客户端)中的ClientEndpoint(RPC通讯终端)尝试注册到Master时被拒绝——此时,检查提交应用程序时使用的Master URL是否正确即可。

经测试验证,当使用错误的Master URL时,会抛出以上异常信息。

3.3.2 自动上传jar包方式

当部署模式为CLIENT时,应用程序(对应Driver)会将childMainClass设置为传入的mainClass,然后启动JVM进程,对应代码如下所示:

if (deployMode == CLIENT) {

childMainClass = args.mainClass

if(isUserJar(args.primaryResource)) {

childClasspath += args.primaryResource

}

if(args.jars != null) { childClasspath ++= args.jars.split(“,”) }

if(args.childArgs != null) { childArgs ++= args.childArgs }

}

在client模式,直接启动应用程序的主类,同时,将主类的jar包和添加的jars包(如果在参数中设置的话)都添加到运行时的classpath中。即当前的driver的classpath会自动包含–jars 设置的jar包。

同时,driver通过启动的http服务上传该jar包,executor在执行时下载该jar包,同时放置到executor进程的classpath路径。

因此,–jars相当于:

  1. 通过”spark.driver.extraClassPath”配置driver 端。
  2. 通过”spark.executor.extraClassPath”配置executor端,同时将指定的jar包上传到http 服务,并下载到executor端的该配置路径下。

3.4 master为集群的MasterURL+部署模式为Cluster时

CLUSTER部署模式时,Driver在某个节点提交,但却是在集群调度分配的节点上运行。

此时,可以将Driver看成是特殊的Executor,同样由分配的节点运行JVM进程,但对应进程的classpath配置信息(补充说明下,看配置属性的extra名,应该可以知道是附加的,或新增的classpath内容,而不是全部)由各自对应的配置属性进行设置。

CLUSTER部署模式在Drive分配到节点,并在节点上启动,相关属性配置及其作用等和CLIENT部署模式基本一致。这里仅基于配置属性方式针对Driver进行解析。

3.4.1 应用程序提交方式

下面给出两种形式的提交命令:

一、基于REST服务提交方式

$SPARK_HOME/bin/spark-submit –master spark://nodemaster:6066 \

–deploy-mode cluster \

–driver-memory 2g \

–driver-cores 1 \

–total-executor-cores 2 \

–executor-memory 4g \

–conf “spark.ui.port”=4081 \

–conf “spark.executor.extraClassPath”=”hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar”\

–conf “spark.driver.extraClassPath”=”hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar” \

–classcom.mb.TestJarwithOracle \

hdfs://nodemaster:8020/tmp/sptest/Spark15.jar

该方式采用REST服务作为masterurl提交应用程序。对应的值参考8080(默认)监控界面,如下所示:

日志:

Running Spark using the REST applicationsubmission protocol.

16/04/26 13:15:02 INFOrest.RestSubmissionClient: Submitting a request to launch an application inspark://nodemaster:7078.

16/04/26 13:15:03 WARNrest.RestSubmissionClient: Unable to connect to server spark://nodemaster:7078.

Warning: Master endpoint spark://nodemaster:7078was not a REST server. Falling back to legacy submission gateway instead.

16/04/26 13:15:04 WARN util.NativeCodeLoader:Unable to load native-hadoop library for your platform… using builtin-javaclasses where applicable

当REST 服务方式提交尝试失败后,会退回到传统方式进行提交。

二、传统提交方式

$SPARK_HOME/bin/spark-submit –master spark://nodemaster:7078 \

–deploy-mode cluster \

–driver-memory 2g \

–driver-cores 1 \

–total-executor-cores 2 \

–executor-memory 4g \

–conf “spark.ui.port”=4081 \

–conf “spark.executor.extraClassPath”=”hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar”\

–conf “spark.driver.extraClassPath”=”hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar” \

–classcom.mb.TestJarwithOracle \

hdfs://nodemaster:8020/tmp/sptest/Spark15.jar

日志:

Running Spark using the REST applicationsubmission protocol.

16/04/26 13:20:58 INFOrest.RestSubmissionClient: Submitting a request to launch an application inspark://nodemaster:6066.

16/04/26 13:20:58 INFOrest.RestSubmissionClient: Submission successfully created asdriver-20160426132058-0010. Polling submission state…

16/04/26 13:20:58 INFOrest.RestSubmissionClient: Submitting a request for the status of submissiondriver-20160426132058-0010 in spark://nodemaster:6066.

16/04/26 13:20:58 INFO rest.RestSubmissionClient:State of driver driver-20160426132058-0010 is now RUNNING.

16/04/26 13:20:58 INFOrest.RestSubmissionClient: Driver is running on workerworker-20160418110627-192.168.149.95-45661 at 192.168.149.95:45661.

16/04/26 13:20:58 INFO rest.RestSubmissionClient:Server responded with CreateSubmissionResponse:

{

“action” : “CreateSubmissionResponse”,

“message” : “Driver successfully submitted asdriver-20160426132058-0010”,

“serverSparkVersion” : “1.5.2”,

“submissionId” : “driver-20160426132058-0010”,

“success” : true

}

3.4.2 提交应用之后Driver的分析

两种方式都可以成功提交应用程序,对应在界面会分别增加一个Driver,同时Driver启动后,会和之前Client部署模式的流程一样,提交一个Application(这里的概念对应Executor),对应界面(8080默认端口界面-最下面)有:

两个driver提交的应用如下(8080默认端口界面):

对应下面的Application,配置及其影响和前面是一样的,只是当前的Driver在分配的节点上运行(所有相关路径等概念都改为基于该执行节点)。因此下面仅分析Driver的相关内容。

信息获取相关操作:

1. 点击Driver行所在的Worker,跳转到该Worker监控页面,到最下面,查找与Driver的SubmissionID相同的Driver信息,界面如下所示:

  1. 点击对应的stderr日志信息,可以看到Driver的启动命令及其执行日志
  2. 查看Driver的启动命令

    a) 对应传统提交方式的日志如下所示:

LaunchCommand: “/usr/java/jdk1.7.0_71/bin/java” “-cp”“hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar:SPARKHOME/sbin/../conf/:SPARK_HOME/lib/spark-assembly-1.5.2-hadoop2.6.0.jar:SPARKHOME/lib/datanucleus?api?jdo?3.2.6.jar:SPARK_HOME/lib/datanucleus-core-3.2.10.jar:SPARKHOME/lib/datanucleus?rdbms?3.2.9.jar:/etc/hadoop/conf/""?Xms2048M""?Xmx2048M""?Dspark.deploy.defaultCores=4""?Dspark.eventLog.enabled=true""?Dakka.loglevel=WARNING""?Dspark.history.fs.cleaner.maxAge=7d""?Dspark.submit.deployMode=cluster""?Dspark.executor.memory=4g""?Dspark.executor.extraClassPath=hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar""?Dspark.executor.extraJavaOptions=?XX:+PrintGCDetails""?Dspark.jars=hdfs://nodemaster:8020/tmp/sptest/Spark15.jar""?Dspark.history.fs.cleaner.enabled=true""?Dspark.master=spark://nodemaster:7078""?Dspark.driver.supervise=false""?Dspark.driver.extraClassPath=hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar""?Dspark.app.name=com.mb.TestJarwithOracle""?Dspark.history.fs.logDirectory=hdfs://nodemaster:8020/user/hdfs/sparklogs""?Dspark.driver.memory=2g""?Dspark.cores.max=2""?Dspark.rpc.askTimeout=10""?Dspark.eventLog.dir=hdfs://nodemaster:8020/user/hdfs/sparklogs""?Dspark.ui.port=4081""?Dspark.history.fs.cleaner.interval=1d""?Dspark.driver.cores=1""?XX:MaxPermSize=256m""org.apache.spark.deploy.worker.DriverWrapper""akka.tcp://sparkWorker@192.168.149.95:45661/user/Worker""SPARK_HOME/work/driver-20160426131505-0009/Spark15.jar”“com.mb.TestJarwithOracle”

b)  对应REST提交方式的日志如下所示:

Launch Command:”/usr/java/jdk1.7.0_71/bin/java” “-cp” “hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar:SPARKHOME/sbin/../conf/:SPARK_HOME/lib/spark-assembly-1.5.2-hadoop2.6.0.jar:SPARKHOME/lib/datanucleus?api?jdo?3.2.6.jar:SPARK_HOME/lib/datanucleus-core-3.2.10.jar:SPARKHOME/lib/datanucleus?rdbms?3.2.9.jar""?Xms2048M""?Xmx2048M""?Dspark.deploy.defaultCores=4""?Dspark.eventLog.enabled=true""?Dspark.history.fs.cleaner.maxAge=7d""?Dspark.submit.deployMode=cluster""?Dspark.executor.memory=4g""?Dspark.executor.extraClassPath=hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar""?Dspark.executor.extraJavaOptions=?XX:+PrintGCDetails""?Dspark.jars=hdfs://nodemaster:8020/tmp/sptest/Spark15.jar""?Dspark.history.fs.cleaner.enabled=true""?Dspark.master=spark://nodemaster:7078""?Dspark.driver.supervise=false""?Dspark.driver.extraClassPath=hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar""?Dspark.app.name=com.mb.TestJarwithOracle""?Dspark.history.fs.logDirectory=hdfs://nodemaster:8020/user/hdfs/sparklogs""?Dspark.driver.memory=2g""?Dspark.cores.max=2""?Dspark.eventLog.dir=hdfs://nodemaster:8020/user/hdfs/sparklogs""?Dspark.ui.port=4081""?Dspark.history.fs.cleaner.interval=1d""?Dspark.driver.cores=1""?XX:MaxPermSize=256m""org.apache.spark.deploy.worker.DriverWrapper""akka.tcp://sparkWorker@192.168.149.95:45661/user/Worker""SPARK_HOME/work/driver-20160426132058-0010/Spark15.jar”“com.mb.TestJarwithOracle”

(暂时不考虑两者的差异,仅关注Driver进程相关的内容。)

下面对其中比较重要的几个部分进行分析:

  1. 其中,启动的JVM进程主类为”org.apache.spark.deploy.worker.DriverWrapper”,在”-cp”后加入了hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar,这里对应的是提交命令中的–conf”spark.driver.extraClassPath”配置属性(可通过修改设置的路径进行验证)。

    在该启动命令(提交命令的配置参数)中,ojdbc14.jar路径为hdfs,因此无法识别,在driver部分的逻辑代码执行时会抛出异常。对应日志在该LaunchCommand:命令后面。因此”spark.driver.extraClassPath”配置属性中设置的路径应该对应当前节点(由于是调度分配的,对应就意味着应该是在集群中各个节点都进行部署)的路径,并且路径下有所需jar包。

  2. 对应的spark.driver.extraClassPath、spark.executor.extraClassPath等会继续作为Driver的参数传入(和Client部署模式下直接执行Driver一样)
  3. “-Dspark.jars=hdfs://nodemaster:8020/tmp/sptest/Spark15.jar”:对应提交时的主资源。
  4. “$SPARK_HOME/work/driver-20160426132058-0010/Spark15.jar”:work是默认的工作目录,driver-20160426132058-0010是当前Driver对应的Submission ID,在该目录中会下载Driver执行所需的Jar包,这里对应主资源Spark15.jar。

此时,需要注意的是,当CLUSTER部署模式时,如果使用的主资源是本地路径,如以下命令:

SPARKHOME/bin/spark?submit–masterspark://nodemaster:6066 –deploy?modecluster –driver?memory2g –driver?cores1 –total?executor?cores2 –executor?memory4g –conf“spark.ui.port”=4081 –conf“spark.executor.extraClassPath”=”hdfs://nodemaster:8020/tmp/sptest/ojdbc14.jar” –conf“spark.driver.extraClassPath”=”SPARK_HOME/thirdlib/ojdbc14.jar” \

–classcom.mb.TestJarwithOracle \

/tmp/test/Spark15.jar

其中红色部分对应为主资源jar包,采用本地文件系统的路径。执行时,Driver端输出的错误信息如下所示(对应界面的Driver状态为Error):

java.io.FileNotFoundException:/tmp/test/Spark15.jar (No such file or directory)

java.io.FileInputStream.open(NativeMethod)

java.io.FileInputStream.(FileInputStream.java:146)

org.spark-project.guava.io.FilesFileByteSource.openStream(Files.java:124)org.spark?project.guava.io.FilesFileByteSource.openStream(Files.java:114)

org.spark-project.guava.io.ByteSource.copyTo(ByteSource.java:202)

org.spark-project.guava.io.Files.copy(Files.java:436)

org.apache.spark.util.Utils.orgapachesparkutilUtilscopyRecursive(Utils.scala:514)org.apache.spark.util.Utils.copyFile(Utils.scala:485)

org.apache.spark.util.Utils.doFetchFile(Utils.scala:562)org.apache.spark.util.Utils.fetchFile(Utils.scala:369)

org.apache.spark.deploy.worker.DriverRunner.orgapachesparkdeployworkerDriverRunnerdownloadUserJar(DriverRunner.scala:150)org.apache.spark.deploy.worker.DriverRunneranon1.run(DriverRunner.scala:79)

在DriverRunner启动后,调用downloadUserJar,下载所需jar包,但此时使用本地文件系统的路径,对应的就需要在Driver当前执行节点上的该路径下存在该jar包(当前未部署),因此报异常:java.io.FileNotFoundException:/tmp/test/Spark15.jar。

因此,对应在CLUSTER部署模式时,需要注意提交应用程序对应的主资源的配置:

  1. 将主资源类似于其他的第三方jar包(如Oracle驱动类库的jar包)部署到集群中;
  2. 使用HDFS这类文件系统,可以下载到本地工作目录。

四、总结

通过配置classpath,为JVM加载类时提供搜索路径。

在分布式计算集群中,需要注意JVM进程是在哪台节点上启动,对应节点上的classpath下是否部署了所需的jar包(针对jar包以本地路径的形式,即与具体节点相关的路径)。

因此总结起来有以下两点:

1. 是否为对应JVM进程指定了classpath;

2. 在各个进程的classpath路径下是否放置了所需的jar包。放置的方式可以有两种:

a) 一种是Spark框架提供的自动放置到classpath的方式;

b) 一种手动在集群中部署的方式;

这里指的路径,表示的是该JVM进程能够读取的路径,比如本地文件系统路径、hdfs路径,其中本地文件系统路径是针对本节点的,这点在分布式集群中尤其要注意。

偷功 是一个基于开源精神,借助众人力量,分享IT技术学习和实战过程中的所感所悟,在分享中与大家一起进步 的公众号。如果您对我们的文章内容感兴趣,可以扫描下图中的二维码,选择关注我们。

时间: 2024-10-18 23:25:18

基于SPARK SQL 读写ORACLE 的简单案例分析常见问题的相关文章

[转] Oracle sql 查询突然变慢 -- 案例分析

转自:http://jingyan.baidu.com/article/8275fc868ce57946a03cf692.html 一条sql突然执行变慢,耗时9秒,应用是不能改的,只能从数据库方面下手解决 步骤思路: 1:查看sql是否走索引 2:查看索引是否失效 3:hint 强制走索引(只是用来查看hint状态下,查询是否更改,应用是不能改的) 4:收集该表所有信息(包括索引) 5:分析该表所有信息(包括索引) 6:再次执行并查看 注意:哪个用户执行较慢,就用哪个用户进行操作,这样才准确

基于svm和pca的人脸识别案例分析

数据集介绍 LFW (Labeled Faces in the Wild) 人脸数据库是由美国马萨诸塞州立大学阿默斯特分校计算机视觉实验室整理完成的数据库,主要用来研究非受限情况下的人脸识别问题.LFW 数据库主要是从互联网上搜集图像,而不是实验室,一共含有13000 多张人脸图像,每张图像都被标识出对应的人的名字,其中有1680 人对应不只一张图像,即大约1680个人包含两个以上的人脸.LFW数据集主要测试人脸识别的准确率. 代码实现 from time import time #记录时间 i

spark sql简单示例

运行环境 集群环境:CDH5.3.0 具体JAR版本如下: spark版本:1.2.0-cdh5.3.0 hive版本:0.13.1-cdh5.3.0 hadoop版本:2.5.0-cdh5.3.0 spark sql的JAVA版简单示例 spark sql直接查询JSON格式的数据 spark sql的自定义函数 spark sql查询hive上面的表 import java.util.ArrayList; import java.util.List; import org.apache.sp

Spark SQL External Data Sources JDBC简易实现

在spark1.2版本中最令我期待的功能是External Data Sources,通过该API可以直接将External Data Sources注册成一个临时表,该表可以和已经存在的表等通过sql进行查询操作.External Data Sources API代码存放于org.apache.spark.sql包中. 具体的分析可参见OopsOutOfMemory的两篇精彩博文: http://blog.csdn.net/oopsoom/article/details/42061077 ht

【原创 Hadoop&Spark 动手实践 9】Spark SQL 程序设计基础与动手实践(上)

[原创 Hadoop&Spark 动手实践 9]SparkSQL程序设计基础与动手实践(上) 目标: 1. 理解Spark SQL最基础的原理 2. 可以使用Spark SQL完成一些简单的数据分析任务 3. 可以利用Spark SQL完成一个完整的案例

Spark SQL数据加载和保存实战

一:前置知识详解: Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二:Spark SQL读写数据代码实战: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRD

京东基于Spark的风控系统架构实践和技术细节

京东基于Spark的风控系统架构实践和技术细节 时间 2016-06-02 09:36:32  炼数成金 原文  http://www.dataguru.cn/article-9419-1.html 主题 Spark软件架构 1.背景 互联网的迅速发展,为电子商务兴起提供了肥沃的土壤.2014年,中国电子商务市场交易规模达到13.4万亿元,同比增长31.4%.其中,B2B电子商务市场交易额达到10万亿元,同比增长21.9%.这一连串高速增长的数字背后,不法分子对互联网资产的觊觎,针对电商行业的恶

Spark SQL数据载入和保存实战

一:前置知识具体解释: Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作. Load:能够创建DataFrame. Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二:Spark SQL读写数据代码实战: import org.apache.spark.SparkConf; import org.apache.spark.api.java.Java

spark sql 不等值 join

products一个商品价格变化的表,orders商品订单,记录每次购买商品和日期基于Spark SQL中的不等值join实现orders和products的匹配,统计每个订单中商品对应当时的价格 缓慢变化的商品价格表旺仔牛奶,发生过一次价格变更 scala> val products = sc.parallelize(Array( | ("旺仔牛奶", "2017-01-01", "2018-01-01", 4), | ("旺仔