记录一则Spark读写和Lost Excutor错误的分析和解决过程

一、概述

上篇blog记录了些在用spark-sql时遇到的一些问题,今天继续记录用Spark提供的RDD转化方法开发公司第一期标签分析系统(一部分scala作业逻辑代码后面blog再给大家分享)遇到的一些SPARK作业错误信息。其中有些问题可能一些数据量或者shuffle量比较小的作业时不会遇到的,我们整套标签系统的初级输入数据大概是8T左右,这里也是个参考。(下面的Spark部署模式为spark on yarn)

二、问题

1、大规模数据往HDFS中写时候,报了HDFS读写超时,具体日志看下面。

(1)具体到某个Excutor的错误日志:

(2)具体到各个数据节点DataNode的日志:

分析:

从这两个错误信息首先可以将错误定位到整个HDFS的读写过程中,其中对于读写超时可以定位到2个参数:dfs.client.socket-timeout(默认60s)、dfs.datanode.socket.write.timeout(默认80s)。在spark的程序中按照自己的实际情况设置这两个值,问题可以解决。给个例子:

val dwd_new_pc_list_patch = "/user/hive/warehouse/pc.db/dwd_new_pc_list/2015-01-*/action=play"
val sparkConf = new SparkConf().setAppName("TagSystem_compositeTag")
  .set("spark.kryoserializer.buffer.max.mb", "128").set("spark.rdd.compress","true")
val sc = new SparkContext(sparkConf)
//hdfs客户端的读写超时时间
//默认60000
sc.hadoopConfiguration.set("dfs.client.socket-timeout", "180000")
//默认80000
sc.hadoopConfiguration.set("dfs.datanode.socket.write.timeout", "180000")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val hiveSqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

//(user_id,fo,fo_2,sty,fs)
val source = sc.textFile(dwd_new_pc_list_patch).filter(p => (p.trim != "" && p.split("\\|").length >= 105)).mapPartitions({ it =>
  for {
    line <- it
  } yield (line.split("\\|")(21), line.split("\\|")(9), line.split("\\|")(104), line.split("\\|")(40), line.split("\\|")(7))
}).persist(StorageLevel.MEMORY_AND_DISK_SER)
.
.
.

另外相似问题:https://jira.spring.io/si/jira.issueviews:issue-html/SHDP-404/SHDP-404.html

2、由spark.reducer.maxMbInFlight引起的Lost Excutor问题。

这个错误主要是发生在shuffle中的fetch阶段,由于Excutor 已经lost掉了,由于容错机制另外重新启动一个Excutor,但是在之前lost掉的Excutor中保存的blockManager已经完全丢失,所以之前的stage需要重新计算。具体在dirver或者CoarseGrainedExecutorBackend的日志主要提示超时和读写文件失败,截了下超时的错误提示:

解决方法:

处理Lost Excutor问题还是花了比较长的时间,调整了很多参数都不行。最后将spark.reducer.maxMbInFlight调小或者将spark.shuffle.copier.threads调小问题解决。在家里还是详细的研究了下spark.reducer.maxMbInFlight这个参数的具体机制含义。spark.reducer.maxMbInFlight官方的配置文档的说明有些笼统:大概的意思是同事从reduce task中取出的ShuffleTask输出最大值(默认48MB)。这个从字面上理解还是不怎么容易的,从源码上search这个参数,定位到org.apache.spark.storage.BlockFetcherIterator.BasicBlockFetcherIterator#splitLocalRemoteBlocks

    protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
      // Make remote requests at most maxBytesInFlight / 5 in length; the reason to keep them
      // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
      // nodes, rather than blocking on reading output from one node.
      //每个fetch线程获取的数据量大小(默认5个fetch线程)
      val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
      logInfo("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " + targetRequestSize)

      // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
      // at most maxBytesInFlight in order to limit the amount of data in flight.
      val remoteRequests = new ArrayBuffer[FetchRequest]
      var totalBlocks = 0
      for ((address, blockInfos) <- blocksByAddress) { //  address实际上是executor_id
        totalBlocks += blockInfos.size
        if (address == blockManagerId) { 
          // Filter out zero-sized blocks
          localBlocksToFetch ++= blockInfos.filter(_._2 != 0).map(_._1)
          _numBlocksToFetch += localBlocksToFetch.size
        } else {
          val iterator = blockInfos.iterator
          var curRequestSize = 0L
          var curBlocks = new ArrayBuffer[(BlockId, Long)]
          while (iterator.hasNext) {
          // blockId 是org.apache.spark.storage.ShuffleBlockId,
          // 格式:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
            val (blockId, size) = iterator.next()
            // Skip empty blocks
            if (size > 0) { 
              curBlocks += ((blockId, size))
              remoteBlocksToFetch += blockId
              _numBlocksToFetch += 1
              curRequestSize += size
            } else if (size < 0) {
              throw new BlockException(blockId, "Negative block size " + size)
            }
             // 避免一次请求的数据量过大
            if (curRequestSize >= targetRequestSize) {
              // Add this FetchRequest
              remoteRequests += new FetchRequest(address, curBlocks)
              curBlocks = new ArrayBuffer[(BlockId, Long)]
              logDebug(s"Creating fetch request of $curRequestSize at $address")
              curRequestSize = 0
            }
          }
          // Add in the final request
          // 将剩余的请求放到最后一个request中。
          if (!curBlocks.isEmpty) { 
            remoteRequests += new FetchRequest(address, curBlocks)
          }
        }
      }
      logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " +
        totalBlocks + " blocks")
      remoteRequests
    }

从代码上看我的个人理解是在shuffle节点每个reduce task会启动5个fetch线程(可以由spark.shuffle.copier.threads配置)去最多spark.reducer.maxMbInFlight个(默认5)其他Excuctor中获取文件位置,然后去fetch它们,并且每次fetch的抓取量不会超过spark.reducer.maxMbInFlight(默认值为48MB)/5。这种机制我个人理解,第一:可以减少单个fetch连接的网络IO、第二:这种将fetch数据并行执行有助于抓取速度提高,减少请求数据的抓取时间总和。

回来结合我现在的问题分析,我将spark.reducer.maxMbInFlight调小,从而减少了每个reduce task中的每个fetch线程的抓取数据量,进而减少了每个fetch连接的持续连接时间,降低了由于reduce task过多导致每个Excutor中存在的fetch线程太多而导致的fetch超时,另外降低内存的占用。

上述分析为个人理解,如有更深入的想法欢迎交流。

时间: 2024-10-10 10:38:42

记录一则Spark读写和Lost Excutor错误的分析和解决过程的相关文章

python接口测试之401错误的分析和解决(十六)

作者 无涯 在接口的测试中,经常会遇到客户端向服务端发送一个请求,服务端返回401的错误,那么今天本文章就来说明在接口测试中如何分析以及解决该问题. 我们知道在HTTP返回的状态码中,401错误表示的是被请求的页面需要用户名和密码.401的错误详细的可以描述为:客户端发送请求抖到服务端, 页面需要验证服务端会返回401的错误,见如下的错误信息: 401 UNAUTHORIZED Headers Content-Type: application/jsonWWW-Authenticate: Bas

SQL Server 磁盘请求超时的833错误原因分析以及解决

本文出处:http://www.cnblogs.com/wy123/p/6984885.html 最近遇到一个SQL Server服务器响应极度缓慢,并且出现客户端请求报错的情况,在数据库中的errorlog中出现磁盘请求超过一定时间才完成的error消息.对于此类问题,到底是存储系统或者磁盘的故障,还是SQL Server 自己的问题,亦或是应用程序引发的呢?又要如何解决?本文将对引起此问题的某一方面的因素进行简单的分析,但是无法涵盖所有潜在的可能性,因此遇到类似问题还要做具体的分析. SQL

sql server数据库附加错误 / 数据库无法附加解决过程

故障描述本案例中涉及一台装有SqlServer数据库的某品牌r520型号的服务器存储,这台存储中又包含有两组磁盘阵列,raid级别都是raid5.正常情况下用户的SqlServer数据库存放在D盘中,后因为数据量大导致D盘容量不足,管理员便在E盘中生成了一个.ndf的文件并且将数据库路径指向E盘继续进行使用.但是大约半个月后数据库突然出现故障报错,连接失效,SqlServer数据库无法附加查询.管理员于是进行尝试性数据恢复操作(管理员在原环境下进行了多次尝试性恢复,导致了原始数据库文件被重复的更

记录一次MySQL两千万数据的大表优化解决过程,提供三种解决方案

问题概述 使用阿里云rds for MySQL数据库(就是MySQL5.6版本),有个用户上网记录表6个月的数据量近2000万,保留最近一年的数据量达到4000万,查询速度极慢,日常卡死.严重影响业务. 问题前提:老系统,当时设计系统的人大概是大学没毕业,表设计和sql语句写的不仅仅是垃圾,简直无法直视.原开发人员都已离职,到我来维护,这就是传说中的维护不了就跑路,然后我就是掉坑的那个!!! 我尝试解决该问题,so,有个这个日志. 方案概述 方案一:优化现有mysql数据库.优点:不影响现有业务

Struts2中的Unable to load configuration错误的分析与解决方法

当我们遇到 Unable to load configuration. 这样的错误时,可以根据具体的错误提示找出错误的原因. Unable to load configuration. - interceptor-ref - file:/D:/Java/apache-tomcat-6.0.32/webapps/examquestions/WEB-INF/classes/struts.xml:28:49 at com.opensymphony.xwork2.config.Configuration

Spark集群无法停止的原因分析和解决

今天想停止spark集群,发现执行stop-all.sh的时候spark的相关进程都无法停止.提示: no org.apache.spark.deploy.master.Master to stop no org.apache.spark.deploy.worker.Worker to stop 上网查了一些资料,再翻看了一下stop-all.sh,stop-master.sh,stop-slaves.sh,spark-daemon.sh,spark-daemons.sh等脚本,发现很有可能是由

记录一奇葩scp问题的分析与解决过程

引 入 linux是一个庞大复杂的系统.整天跟它打交道的运维或开发人员难免不遇到什么问题.这里,本人聊聊遇到的一个跟scp相关的奇葩问题. 一.问题描述 两台centos6.7主机(主机一:172.16.13.62和主机二:172.16.13.72),都安装好ssh,相关的ssh连接配置也没问题,但是在用scp进行两主机之间的文件传输复制时,出现以下情况: 在主机一上操作scp,成功从主机二传输文件到主机一.当然也能成功把文件从主机一成功传输到主机二.如: [[email protected] 

Spring Boot 2.0下配置Log4j2下的错误问题分析与解决

环境介绍 Spring Boot 2.0.2 Java 8 任务描述 由于Spring Boot 2.0 默认情况下是使用logback作为日志系统的,这里希望切换到log4j2. pom.xml内容定义 这里在pom.xml新增了spring-boot中的日志组件 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</

定长记录采用数据库读写并非最佳解决方案

对于有些应用场合如仪器仪表的采样数据,不需要对数据排序.插入和修改,只需要对数据写和读操作,在这种情况下,使用数据库来存取这样的记录数据,未必是最佳的选择,本文根据工作实践,采用文件的分块记录的方法,来处理采样这样的定长记录数据,实践证明,通过文件的分块存储方法,比数据库存储方法读写速度更快,尤其是在处理大批量的记录数据的读写的时候,这种速度上的优势更为显著.下面是分块记录的具体实现方法: 首先,假设我们的记录数据为:记录id号,电流,电压,温度,电阻,用结构体表示为: [html] view