通过DeveloperApi获取spark程序执行进度及异常

效果显示:

代码:

package org.apache.spark.zpc.listener

import org.apache.spark.Logging
import org.apache.spark.scheduler._

import scala.collection.mutable

/**
  * Spark 的 DeveloperApi 提供针对app, job, task的执行监听。
  * 通过该监听,可以实现:
  * 1.任务执行进度的粗略计算。
  * 2.执行异常失败时,获取异常信息。
  * 3.获取app启动的appId,从而可以控制杀死任务。
  * 4.自定义进度和异常的handle处理(如控制台打印,保存db,或jms传输到web等终端
  *
  * @param jobNum Application中Job个数。可以通过代码的提交查看spark日志查看到。
  */

abstract class SparkAppListener(jobNum: Int) extends SparkListener with Logging {

  //Job和Job信息(包括总task数,当前完成task数,当前Job百分比)的映射
  private val jobToJobInfo = new mutable.HashMap[Int, (Int, Int, Int)]
  //stageId和Job的映射,用户获取task对应的job
  private val stageToJob = new mutable.HashMap[Int, Int]
  //完成的job数量
  private var finishJobNum = 0
  private var hasException: Boolean = false

  override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = synchronized {
    val appId = applicationStart.appId
    //记录app的Id,用于后续处理:
    //如:yarn application  -kill  appId
    //handleAppId(appId)
  }

  //获取job的task数量,初始化job信息
  override def onJobStart(jobStart: SparkListenerJobStart) = synchronized {
    val jobId = jobStart.jobId
    val tasks = jobStart.stageInfos.map(stageInfo => stageInfo.numTasks).sum
    jobToJobInfo += (jobId ->(tasks, 0, 0))
    jobStart.stageIds.map(stageId => stageToJob(stageId) = jobId)
  }

  //task结束时,粗略估计当前app执行进度。
  //估算方法:当前完成task数量/总task数量。总完成task数量按(job总数*当前job的task数。)
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
    val stageId = taskEnd.stageId
    val jobId = stageToJob.get(stageId).get
    val (totalTaskNum: Int, finishTaskNum: Int, percent: Int) = jobToJobInfo.get(jobId).get
    val currentFinishTaskNum = finishTaskNum + 1
    val newPercent = currentFinishTaskNum * 100 / (totalTaskNum * jobNum)
    jobToJobInfo(jobId) = (totalTaskNum, currentFinishTaskNum, newPercent)

    if (newPercent > percent) {
      //hanlde application progress
      val totalPercent = jobToJobInfo.values.map(_._3).sum
      if (totalPercent <= 100){
//        handleAppProgress(totalPercent)
      }
    }
  }

  //job 结束,获取job结束的状态,异常结束可以将异常的类型返回处理。
  // handle处理自定义,比如返回给web端,显示异常log。
  override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
    jobEnd.jobResult match {
      case JobSucceeded => finishJobNum += 1
      case JobFailed(exception) if !hasException =>
        hasException = true

        // handle application failure
//        handleAppFailure(exception)
      case _ =>
    }
  }

  //app结束时,将程序执行进度标记为 100%。
  //缺陷:SparkListenerApplicationEnd没有提供app的Exception的获取。这样,当程序在driver端出错时,
  //获取不到出错的具体原因返回给前端,自定义提示。比如(driver对app中的sql解析异常,还没有开始job的运行)

  /*** driver 端异常可通过主程序代码里 try catch获取到 ***/
  override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) = synchronized {
    val totalJobNum = jobToJobInfo.keySet.size
    val totalPercent = jobToJobInfo.values.map(_._3).sum
    //handle precision lose
    if (!hasException && totalPercent == 99) {
//      handleAppProgress(100)
    }
    val msg = "执行失败"
    if(totalJobNum == 0){
      handleAppFailure(new Exception(msg))
    }
  }
}
时间: 2024-10-29 19:12:12

通过DeveloperApi获取spark程序执行进度及异常的相关文章

Spark的应用程序执行模型

今天看了一篇名为Top 3 Troubleshooting Tips to Keep You Sparking的文章,讲述了一些编写Spark程序需要注意的地方,看完之后想要总结一下. Spark执行模型,总结为官方的架构图: 本文主要讨论Driver和Worker. 我们知道,对于Spark开发的分布式应用程序,和写普通的scala程序基本类似.所以这时往往会陷入一些误区: 在Spark开发的应用程序的对象里,我给他们分成2类对象: 1.闭包内的对象:即在类似map, filter, redu

如何将 MapReduce 程序转化为 Spark 程序

1.MapReduce和Spark比较 目前的大数据处理可以分为以下三个类型: 1.复杂的批量数据处理(batch data processing),通常的时间跨度在数十分钟到数小时之间: 2.基于历史数据的交互式查询(interactive query),通常的时间跨度在数十秒到数分钟之间: 3.基于实时数据流的数据处理(streaming data processing),通常的时间跨度在数百毫秒到数秒之间. 大数据处理势必需要依赖集群环境,而集群环境有三大挑战,分别是并行化.单点失败处理.

MapReduce程序转换为Spark程序

MapReduce和Spark比较 目前的大数据处理可以分为以下三个类型: 1.复杂的批量数据处理(batch data processing),通常的时间跨度在数十分钟到数小时之间: 2.基于历史数据的交互式查询(interactive query),通常的时间跨度在数十秒到数分钟之间: 3.基于实时数据流的数据处理(streaming data processing),通常的时间跨度在数百毫秒到数秒之间. 大数据处理势必需要依赖集群环境,而集群环境有三大挑战,分别是并行化.单点失败处理.资源

刚入前端整合的一个手机端页面适配+预加载+获取资源加载进度等的一个小模板

刚入前端不久,之前主要学的是pc端的布局,到公司之后负责的主要是移动段页面,刚开始时为了使页面适应移动端不同的屏幕大小采用的是百分比加媒体查询的方式,做完一个项目之后,感觉非常不好,虽然最后也基本使页面做到了适配.所以做完这个项目之后,我就在网上查找各种屏幕适配的方案,最终找到了一个通过js控制使页面整体缩放的方案,还有一个就是通过js实时检测屏幕大改变html根字体大小的rem布局方案.目前我在使用的是缩放的方案.整体代码基本上是整合的是大牛们分享的一些实用代码,如有什么bug欢迎提出,共同进

关于 使用python向qq好友发送消息(对爬虫的作用----当程序执行完毕或者报错无限给自己qq发送消息,直到关闭)

以前看到网上一些小程序,在处理完事物后会自动发送qq消息,但是一直搞不懂是说明原理.也在网上找过一些python登陆qq发送消息的文字,但是都太复杂了.今天偶然看到一篇文章,是用python调用win32的接口发送qq消息的,觉得不错,就先记录下来,日后肯定会用得上这些小工具. 发送qq消息要求已经登陆qq,而且qq的窗口是独立的,现在新版的qq一般都是将所有的聊天窗口聚合在一起,因此要设置将qq窗口分离,或者将需要发送消息的那个窗口单独分离出来. 上代码吧. # 原理是先将需要发送的文本放到剪

06、部署Spark程序到集群上运行

06.部署Spark程序到集群上运行 6.1 修改程序代码 修改文件加载路径 在spark集群上执行程序时,如果加载文件需要确保路径是所有节点能否访问到的路径,因此通常是hdfs路径地址.所以需要修改代码中文件加载路径为hdfs路径: ... //指定hdfs路径 sc.textFile("hdfs://mycluster/user/centos/1.txt") ... ? 修改master地址 SparkConf中需要指定master地址,如果是集群上运行,也可以不指定,运行时可以通

luigi框架--关于python运行spark程序

首先,目标是写个python脚本,跑spark程序来统计hdfs中的一些数据.参考了别人的代码,故用了luigi框架. 至于luigi的原理 底层的一些东西Google就好.本文主要就是聚焦快速使用,知其然不知其所以然. python写Spark或mapreduce还有其他的方法,google上很多,这里用luigi只是刚好有参考的代码,而且理解起来还是简单,就用了. 上代码: import luigi, sysfrom datetime import datetime, timedeltafr

第9节课笔记-彻底实战IntelliJ IDEA 下的Spark程序开发

彻底实战IntelliJ IDEA 下的Spark程序开发下载IntelliJ IDEA 下载gitSpark源码下载:git clone git://github.com/apache/spark.git导入maven 工程 IntelliJ IDEA 启动的向导中Sacal下载需要下载,这是IDEA下载的,和系统层的不一样4.指定JDK1.8.x和Scala2.10.45.file ->Project Stucture 来设置工程lib 核心是添加Spark的jar6.添加Spark jar

Spark集群模式&amp;Spark程序提交

Spark集群模式&Spark程序提交 1. 集群管理器 Spark当前支持三种集群管理方式 Standalone-Spark自带的一种集群管理方式,易于构建集群. Apache Mesos-通用的集群管理,可以在其上运行Hadoop MapReduce和一些服务应用. Hadoop YARN-Hadoop2中的资源管理器. Tip1: 在集群不是特别大,并且没有mapReduce和Spark同时运行的需求的情况下,用Standalone模式效率最高. Tip2: Spark可以在应用间(通过集