spark 集群运行python作业

今天尝试用刚搭建好的spark集群运行python作业,遇到了一些问题,解决了一些坑的同时也对spark集群的运作和配置方式有了一些比较浅的认识,不像之前那么没有概念了,记录如下,之后还要继续更多的对Hadoop生态圈和spark并行计算框架的探究。

首先说下环境,集群有五个节点,集群环境是用cloudera manager 搭建的,hadoop用的是cloudera的CDH,我对CDH和hadoop之间关系的理解就是与linux和CentOS的关系一样,其他的的相关组件例如Hbase和Hive也是用cloudera安装的,之前我看到服务器上已经有实验室学长下好的spark parcels安装包,于是也没看具体版本就直接用了,之后发现是0.9版本的,略囧。。。因为spark已经发布到1.4版本了,0.9版本都没有spark-submit,而且也不没有R的原生API,不过不影响,直接看0.9的文档就可以了,如果必须用到新版的功能就重新部署吧。。。。。

首先记录一下spark的四种运行模式

  • local:本地单进程模式,用于本地开发测试Spark代码
  • standalone:分布式集群模式,Master-Worker架构,Master负责调度,Worker负责具体Task的执行
  • on yarn/mesos:运行在yarn/mesos等资源管理框架之上,yarn/mesos提供资源管理,spark提供计算调度,并可与其他计算框架(如MapReduce/MPI/Storm)共同运行在同一个集群之上 (使用cloudera搭建的集群就是这种情况)
  • on cloud(EC2):运行在AWS的EC2之上。

下面用python的一个简单作业SimpleApp.py为例,记录下脚本的运行过程

from pyspark import SparkContext,SparkConf

conf=SparkConf()
conf.setMaster("spark://192.168.2.241:7077")
conf.setAppName("test application")

logFile="hdfs://hadoop241:8020/user/root/testfile"
sc=SparkContext(conf=conf)
logData=sc.textFile(logFile).cache()

numAs=logData.filter(lambda s: ‘a‘ in s).count()
numBs=logData.filter(lambda s: ‘b‘ in s).count()

print "Lines with a:%i,lines with b:%i" % (numAs,numBs)

关于这里的问题主要涉及到连接集群的配置问题,也就是上述代码的conf部分,首先要连接集群的master节点,注意这里的配置写法

spark://192.168.2.241:7077

前缀spark不可少,否则会报“could not parse master URL”的错误即无法解析URL的错误,至于端口号可以在/etc/spark/conf中查询$SPARK_MASTER_PORT这个环境变量,(具体安装方式配置文件位置也不同,根据具体情况来确定)

还有

logFile=”hdfs://hadoop241:8020/user/root/testfile”

我观察到这里默认是从hdfs文件系统上读取文件的,所以首先要把待处理文件put到hdfs上,同样注意路径的写法,这里写的是hdfs上得绝对路径,也可以写相对路径

这里的testfile里只有两句话,用来测试作业能否正确执行

stay hungery,stay foolish
steve jobs

之后执行

$ pyspark SimpleApp.py

运行结果贴出如下,可以从中观察运算任务的分配调度过程

[[email protected] workspace]# pyspark SimpleApp.py
15/07/31 16:22:27 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/07/31 16:22:27 INFO Remoting: Starting remoting
15/07/31 16:22:27 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:34248]
15/07/31 16:22:27 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:34248]
15/07/31 16:22:27 INFO spark.SparkEnv: Registering BlockManagerMaster
15/07/31 16:22:27 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150731162227-804b
15/07/31 16:22:27 INFO storage.MemoryStore: MemoryStore started with capacity 294.9 MB.
15/07/31 16:22:27 INFO network.ConnectionManager: Bound socket to port 42522 with id = ConnectionManagerId(hadoop241,42522)
15/07/31 16:22:27 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/07/31 16:22:27 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop241:42522 with 294.9 MB RAM
15/07/31 16:22:27 INFO storage.BlockManagerMaster: Registered BlockManager
15/07/31 16:22:27 INFO spark.HttpServer: Starting HTTP Server
15/07/31 16:22:27 INFO server.Server: jetty-7.6.8.v20121106
15/07/31 16:22:27 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:42944
15/07/31 16:22:27 INFO broadcast.HttpBroadcast: Broadcast server started at http://192.168.2.241:42944
15/07/31 16:22:27 INFO spark.SparkEnv: Registering MapOutputTracker
15/07/31 16:22:27 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c6a0d067-075c-493b-81c8-754f569a91b5
15/07/31 16:22:27 INFO spark.HttpServer: Starting HTTP Server
15/07/31 16:22:27 INFO server.Server: jetty-7.6.8.v20121106
15/07/31 16:22:27 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:33063
15/07/31 16:22:27 INFO server.Server: jetty-7.6.8.v20121106
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
15/07/31 16:22:27 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
15/07/31 16:22:27 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
15/07/31 16:22:27 INFO ui.SparkUI: Started Spark Web UI at http://hadoop241:4040
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Connecting to master spark://192.168.2.241:7077...
15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Connected to Spark cluster with app ID app-20150731162228-0018
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/0 on worker-20150728175302-hadoop246-7078 (hadoop246:7078) with 16 cores
15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/0 on hostPort hadoop246:7078 with 16 cores, 512.0 MB RAM
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/1 on worker-20150728175303-hadoop245-7078 (hadoop245:7078) with 8 cores
15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/1 on hostPort hadoop245:7078 with 8 cores, 512.0 MB RAM
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/2 on worker-20150728175303-hadoop254-7078 (hadoop254:7078) with 8 cores
15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/2 on hostPort hadoop254:7078 with 8 cores, 512.0 MB RAM
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/3 on worker-20150728175302-hadoop241-7078 (hadoop241:7078) with 8 cores
15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/3 on hostPort hadoop241:7078 with 8 cores, 512.0 MB RAM
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/4 on worker-20150728175302-hadoop217-7078 (hadoop217:7078) with 8 cores
15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/4 on hostPort hadoop217:7078 with 8 cores, 512.0 MB RAM
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/3 is now RUNNING
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/2 is now RUNNING
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/1 is now RUNNING
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/0 is now RUNNING
15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/4 is now RUNNING
15/07/31 16:22:28 INFO storage.MemoryStore: ensureFreeSpace(125687) called with curMem=0, maxMem=309225062
15/07/31 16:22:28 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 122.7 KB, free 294.8 MB)
15/07/31 16:22:29 INFO mapred.FileInputFormat: Total input paths to process : 1
15/07/31 16:22:29 INFO spark.SparkContext: Starting job: count at SimpleApp.py:13
15/07/31 16:22:29 INFO scheduler.DAGScheduler: Got job 0 (count at SimpleApp.py:13) with 2 output partitions (allowLocal=false)
15/07/31 16:22:29 INFO scheduler.DAGScheduler: Final stage: Stage 0 (count at SimpleApp.py:13)
15/07/31 16:22:29 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/07/31 16:22:29 INFO scheduler.DAGScheduler: Missing parents: List()
15/07/31 16:22:29 INFO scheduler.DAGScheduler: Submitting Stage 0 (PythonRDD[2] at count at SimpleApp.py:13), which has no missing parents
15/07/31 16:22:29 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[2] at count at SimpleApp.py:13)
15/07/31 16:22:29 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/07/31 16:22:29 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:48226/user/Executor#-1281030996] with ID 0
15/07/31 16:22:29 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: hadoop246 (PROCESS_LOCAL)
15/07/31 16:22:29 INFO scheduler.TaskSetManager: Serialized task 0.0:0 as 3119 bytes in 6 ms
15/07/31 16:22:29 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 0: hadoop246 (PROCESS_LOCAL)
15/07/31 16:22:29 INFO scheduler.TaskSetManager: Serialized task 0.0:1 as 3119 bytes in 1 ms
15/07/31 16:22:29 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:34233/user/Executor#-994522395] with ID 4
15/07/31 16:22:30 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:53345/user/Executor#1663802475] with ID 3
15/07/31 16:22:30 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop246:34291 with 294.9 MB RAM
15/07/31 16:22:30 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop217:35324 with 294.9 MB RAM
15/07/31 16:22:30 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop241:54770 with 294.9 MB RAM
15/07/31 16:22:31 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:49492/user/Executor#-967494826] with ID 2
15/07/31 16:22:31 INFO cluster.SparkDeploySchedulerBackend: Registered executor: Actor[akka.tcp://[email protected]:34383/user/Executor#266145334] with ID 1
15/07/31 16:22:31 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop254:52135 with 294.9 MB RAM
15/07/31 16:22:31 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop245:16696 with 294.9 MB RAM
15/07/31 16:22:31 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_1_0 in memory on hadoop246:34291 (size: 208.0 B, free: 294.9 MB)
15/07/31 16:22:31 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_1_1 in memory on hadoop246:34291 (size: 176.0 B, free: 294.9 MB)
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Finished TID 1 in 2128 ms on hadoop246 (progress: 0/2)
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Finished TID 0 in 2142 ms on hadoop246 (progress: 1/2)
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)
15/07/31 16:22:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Stage 0 (count at SimpleApp.py:13) finished in 2.476 s
15/07/31 16:22:32 INFO spark.SparkContext: Job finished: count at SimpleApp.py:13, took 2.550761544 s
15/07/31 16:22:32 INFO spark.SparkContext: Starting job: count at SimpleApp.py:14
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Got job 1 (count at SimpleApp.py:14) with 2 output partitions (allowLocal=false)
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Final stage: Stage 1 (count at SimpleApp.py:14)
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Parents of final stage: List()
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Missing parents: List()
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Submitting Stage 1 (PythonRDD[3] at count at SimpleApp.py:14), which has no missing parents
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (PythonRDD[3] at count at SimpleApp.py:14)
15/07/31 16:22:32 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 2 on executor 0: hadoop246 (PROCESS_LOCAL)
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Serialized task 1.0:0 as 3121 bytes in 0 ms
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 0: hadoop246 (PROCESS_LOCAL)
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Serialized task 1.0:1 as 3121 bytes in 0 ms
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Finished TID 3 in 27 ms on hadoop246 (progress: 0/2)
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Completed ResultTask(1, 1)
15/07/31 16:22:32 INFO scheduler.TaskSetManager: Finished TID 2 in 32 ms on hadoop246 (progress: 1/2)
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Completed ResultTask(1, 0)
15/07/31 16:22:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pool
15/07/31 16:22:32 INFO scheduler.DAGScheduler: Stage 1 (count at SimpleApp.py:14) finished in 0.034 s
15/07/31 16:22:32 INFO spark.SparkContext: Job finished: count at SimpleApp.py:14, took 0.04234127 s
Lines with a:1,lines with b:1

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-10-12 07:22:31

spark 集群运行python作业的相关文章

Spark教程-构建Spark集群-运行Ubuntu系统(2)

安装Java 1.打开终端,建立新目录“/usr/lib/java”,如下图所示: 2.把下载的JDK文件移到刚刚创建的“/usr/lib/java”中,如下图所示 3.解压JDK文件,如下图所示: 解压完成的文件目录如下图所示: 4.修改环境变量: 进入如下图所示的配置文件中: 按下“i”进入INSERT模式,把Java的环境编写信息加入其中,如下图所示: 按下“esc“键回到正常模式,保存并退出配置文件: 执行以下命令是配置文件的修改生效: 5.在终端中显示刚刚安装的Java版本,如下图所示

Spark教程-构建Spark集群-运行Ubuntu系统(1)

为了简化权限等问题,下面我们以root用户的身份登录和使用Ubuntu系统,而Ubuntu在默认情况下并没有开启root用户,这需要我们做如下设置: sudo  -s进入 root用户权限模式 vim /etc/lightdm/lightdm.conf [SeatDefaults] greeter-session=unity-greeter user-session=ubuntu greeter-show-manual-login=true #手工输入登陆系统的用户名和密码 allow-gues

3.2、spark集群运行应用之第三方jar的处理方式

在编写程序时,不可避免会用到第三方jar,有三种使用方式: 1.将运行程序需要的所有第三方 jar,分发到所有spark的/soft/spark/jars下 2.将第三方jar打散,和自己的源码打成一个jar包,如3.1中 3.在spark-submit命令中,通过--jars指定使用的第三方jar包 在s102上提交,fastjson-1.2.47.jar 本地,myspark.jar本地,temptags.txt HDFS上 spark-submit --class a --jars fas

如何在 Kubernetes 环境中运行 Spark 集群

处理这么大量的数据,背后的机器可能是数以千计,无法通过人工来监控机器的状态.因此,本文将介绍用 Kubernetes 容器管理工具,并通过简单示例,告诉你如何建立一个 Spark 集群. 准备阶段 1.需要拥有正在运行的 Kubernetes 集群,并使用 Kubectl 为其配置访问权限.如果你还没有可用的 Kubernetes 集群,则可以使用 Minikube 在本地计算机上设置测试集群 . 我们建议将 Minikube 更新为最新版本(编写本文档时为0.19.0),因为某些早期版本可能无

使用docker安装部署Spark集群来训练CNN(含Python实例)

使用docker安装部署Spark集群来训练CNN(含Python实例) 本博客仅为作者记录笔记之用,不免有很多细节不对之处. 还望各位看官能够见谅,欢迎批评指正. 博客虽水,然亦博主之苦劳也. 如需转载,请附上本文链接,不甚感激! http://blog.csdn.net/cyh_24/article/details/49683221 实验室有4台神服务器,每台有8个tesla-GPU,然而平时做实验都只使用了其中的一个GPU,实在暴遣天物! 于是想用spark来把这些GPU都利用起来.听闻d

Spark集群上运行jar程序,状态一直Accepted且不停止不报错

如果运行Spark集群时状态一直为Accepted且不停止不报错,比如像下面这样的情况: 15/06/14 11:33:33 INFO yarn.Client: Application report for application_1434263747091_0023 (state: ACCEPTED) 15/06/14 11:33:34 INFO yarn.Client: Application report for application_1434263747091_0023 (state:

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本]

编写Spark的WordCount程序并提交到集群运行[含scala和java两个版本] 1. 开发环境 Jdk 1.7.0_72 Maven 3.2.1 Scala 2.10.6 Spark 1.6.2 Hadoop 2.6.4 IntelliJ IDEA 2016.1.1 2. 创建项目1) 新建Maven项目 2) 在pom文件中导入依赖pom.xml文件内容如下: <?xml version="1.0" encoding="UTF-8"?> &l

将java开发的wordcount程序部署到spark集群上运行

1 package cn.spark.study.core; 2 3 import java.util.Arrays; 4 5 import org.apache.spark.SparkConf; 6 import org.apache.spark.api.java.JavaPairRDD; 7 import org.apache.spark.api.java.JavaRDD; 8 import org.apache.spark.api.java.JavaSparkContext; 9 impo

Spark wordcount开发并提交到集群运行

使用的ide是eclipse package com.luogankun.spark.base import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ /** * 统计字符出现次数 */ object WorkCount { def main(args: Array[String]) { if (args.length < 1) {