yarn client中的一个BUG的修复

org.apache.spark.deploy.yarn.Client.scala中的monitorApplication方法:

/**

   * Report the state of an application until it has exited, either successfully or

   * due to some failure, then return a pair of the yarn application state (FINISHED, FAILED,

   * KILLED, or RUNNING) and the final application state (UNDEFINED, SUCCEEDED, FAILED,

   * or KILLED).

   *

   * @param appId ID of the application to monitor.

   * @param returnOnRunning Whether to also return the application state when it is RUNNING.

   * @param logApplicationReport Whether to log details of the application report every iteration.

   * @return A pair of the yarn application state and the final application state.

   */

  def monitorApplication(

      appId: ApplicationId,

      returnOnRunning: Boolean = false,

      logApplicationReport: Boolean = true): (YarnApplicationState, FinalApplicationStatus) = {

    val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)

    var lastState: YarnApplicationState = null

    while (true) {

      Thread.sleep(interval)

      val report: ApplicationReport =

        try {

          getApplicationReport(appId)

        } catch {

          case e: ApplicationNotFoundException =>

            logError(s"Application $appId not found.")

            return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)

          case NonFatal(e) =>

            logError(s"Failed to contact YARN for application $appId.", e)

            return (YarnApplicationState.FAILED, FinalApplicationStatus.FAILED)

        }

      val state = report.getYarnApplicationState

      if (logApplicationReport) {

        logInfo(s"Application report for $appId (state: $state)")

        // If DEBUG is enabled, log report details every iteration

        // Otherwise, log them every time the application changes state

        if (log.isDebugEnabled) {

          logDebug(formatReportDetails(report))

        } else if (lastState != state) {

          logInfo(formatReportDetails(report))

        }

      }

      if (lastState != state) {

        state match {

          case YarnApplicationState.RUNNING =>

            reportLauncherState(SparkAppHandle.State.RUNNING)

          case YarnApplicationState.FINISHED =>

//            reportLauncherState(SparkAppHandle.State.FINISHED)

            report.getFinalApplicationStatus match {

              case FinalApplicationStatus.FAILED =>

                reportLauncherState(SparkAppHandle.State.FAILED)

              case FinalApplicationStatus.KILLED =>

                reportLauncherState(SparkAppHandle.State.KILLED)

              case _ =>

                reportLauncherState(SparkAppHandle.State.FINISHED)

            }

          case YarnApplicationState.FAILED =>

            reportLauncherState(SparkAppHandle.State.FAILED)

          case YarnApplicationState.KILLED =>

            reportLauncherState(SparkAppHandle.State.KILLED)

          case _ =>

        }

      }

      if (state == YarnApplicationState.FINISHED ||

        state == YarnApplicationState.FAILED ||

        state == YarnApplicationState.KILLED) {

        cleanupStagingDir(appId)

        return (state, report.getFinalApplicationStatus)

      }

      if (returnOnRunning && state == YarnApplicationState.RUNNING) {

        return (state, report.getFinalApplicationStatus)

      }

      lastState = state

    }

    // Never reached, but keeps compiler happy

    throw new SparkException("While loop is depleted! This should never happen...")

  }

其中:

      if (lastState != state) {

        state match {

          case YarnApplicationState.RUNNING =>

            reportLauncherState(SparkAppHandle.State.RUNNING)

          case YarnApplicationState.FINISHED =>

//            reportLauncherState(SparkAppHandle.State.FINISHED)

            report.getFinalApplicationStatus match {

              case FinalApplicationStatus.FAILED =>

                reportLauncherState(SparkAppHandle.State.FAILED)

              case FinalApplicationStatus.KILLED =>

                reportLauncherState(SparkAppHandle.State.KILLED)

              case _ =>

                reportLauncherState(SparkAppHandle.State.FINISHED)

            }

          case YarnApplicationState.FAILED =>

            reportLauncherState(SparkAppHandle.State.FAILED)

          case YarnApplicationState.KILLED =>

            reportLauncherState(SparkAppHandle.State.KILLED)

          case _ =>

        }

      }

yarn state为finished的时候的状态细分不够明确,将原来的 reportLauncherState(SparkAppHandle.State.FAILED)注释掉,改成:

report.getFinalApplicationStatus match {

              case FinalApplicationStatus.FAILED =>

                reportLauncherState(SparkAppHandle.State.FAILED)

              case FinalApplicationStatus.KILLED =>

                reportLauncherState(SparkAppHandle.State.KILLED)

              case _ =>

                reportLauncherState(SparkAppHandle.State.FINISHED)

            }

因为完成状态的final state可能很多种状态,KILLED、FAILED、SUCCESS都可能是final state。
如果只返回一个finished状态给SparkLauncher的SparkAppHandle的话,其实我们在自己的代码中是无法知道这个spark 任务到底是成功了还是失败了,只知道它完成了。
所以要细分一下完成状态,自己用SparkLauncher提交JOB的时候可以监控JOB在失败的时候报警。
此BUG在spark1.6.0中存在对应CDH5.7到CDH5.9的spark都有这个问题,新的版本中已经修复此BUG。
如果在使用CDH版本的spark,那么就自己改一下代码重新编译打包一下,部署一个自己的spark on yarn服务吧。

原文地址:https://www.cnblogs.com/itboys/p/9998622.html

时间: 2024-08-30 14:27:47

yarn client中的一个BUG的修复的相关文章

记录一个使用HttpClient过程中的一个bug

最近用HttpClient进行链接请求,开了多线程之后发现经常有线程hang住,查看线程dump java.lang.Thread.State: RUNNABLE at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketI

C++ 写类中的一个bug修复

 #include"wz.h"  #include"sts.h" class _string {     friend std::istream& operator>>(std::istream& is, _string& a);//bug 1 2     friend std::ostream& operator<<(std::ostream& os,_string& a);       pu

SpriteKit游戏Delve随机生成地牢地图一个Bug的修复

大熊猫猪·侯佩原创或翻译作品.欢迎转载,转载请注明出处. 如果觉得写的不好请多提意见,如果觉得不错请多多支持点赞.谢谢! hopy ;) Delve是一个很有意思的地牢探险类型的游戏,其中每一关的地图都是随机生成的. 至于如何在生成地图时兼顾随机性和一定模式,网上有很多不错的建议.你也可以从Delve的源代码中找到答案(如果学习目的需要源代码的可以Q我,或者自行度娘) 这里只是谈一下源代码中有一个小bug,我们看如何修复它! 随机生成地图是放在LevelHelper.swift文件中,其中结构t

微软BI 之SSIS 系列 - MVP 们也不解的 Scrip Task 脚本任务中的一个 Bug

开篇介绍 前些天自己在整理 SSIS 2012 资料的时候发现了一个功能设计上的疑似Bug,在 Script Task 中是可以给只读列表中的变量赋值.我记得以前在 2008 的版本中为了弄明白这个配置,还特意测试过这个细节,获取错误并理解了这个功能.但是现在回去再次测试 2008 的版本时,发现这个功能在 2008 中其实也是错误的,把我印象中的测试结果完全给推翻了,所以到现在已经搞不清楚我当时到底是如果得出这个错误的. 疑似功能 Bug 描述 在 SSIS 包中定义了用户自定义变量 - PV

mysql5.5复制环境中的一个bug

mysql5.5,主从复制环境中 binlog格式是ROW时 有表使用了mrg_myisam引擎 以上三个条件都满足时,恭喜你,当你insert一条记录时,你会看到从库不停的在重启,每次重启都做crash recovery. 这个bug存在于所有5.5版本中,bug记录:https://bugs.mysql.com/bug.php?id=73635. 解决办法: 升级到5.6版本,亲测5.6已修复该bug. 改binlog格式. 改引擎吧,不使用mrg_myisam引擎.

VS 中的一个BUG?求大神帮我解答下。

首先,希望知道这个问题的人能够帮我解答下. 来看代码: int main() { long long int result=-321; cout<<result<<endl; if(INT_MIN==-2147483648) cout<<"相等"<<endl; if( result<-2147483648) cout<<"1"<<endl; if( result<INT_MIN) c

js数据计算中的一个bug,9.44+4.8计算结果竟然是14.239999999999998

在使用js计算数据时,遇到一个很奇怪的问题,9.44+4.8计算结果竟然是14.239999999999998这个??很夸张. 于是享有经验的同事请教,这是怎么回事,原来是js的bug,在减法时,经常出现,加法时出现的少,需要修正,修正方式有两种,一种是转化为整数,另一种是使用toFixed() ,还有一种方式就是给加上一个很小的数,例如0.000000001,然后在取几位小数 第一种方法:使用整数的方法: alert( (9.44*100+4.8*100)/100.0) 第二种方法:使用toF

【Python图像特征的音乐序列生成】关于mingus一个bug的修复,兼改进情感模型

mingus在输出midi文件的时候,使用这样的函数: 1 from mingus.containers import NoteContainer 2 from mingus.midi import midi_file_out 3 4 nc = NoteContainer(["A", "C", "E"]) 5 midi_file_out.write_NoteContainer("test.mid", nc) 在输出时会报错:

解决JSONCPP 0.10.2的一个Bug

最近在使用jsoncpp 0.10.2的过程中碰到一个bug,创建的数组,无法超过5个元素,测试代码如下: int j = 0; int count = 20; Json::Value root; Json::Value item; for (int i = 0; i < count; i++) { root[i] = i; j = root.size(); } 在我的实际项目中,如果数组只有1个是元素(该元素稍微有点大的JSON对象),也有可能出现这个元素的值错误的故障,超过5个肯定出错. 在