线上Spark处理Bzip2引出Hadoop Bzip2线程安全问题

我们的Hadoop生产环境有两个版本,其中一个是1.0.3,为了支持日志压缩和split,我们添加了hadoop-1.2中关于Bzip2压缩的feature. 一切运行良好。

为了满足公司对迭代计算的需求(复杂HiveSQL,广告推荐算法,机器学习 etc), 我们构建了自己的Spark集群,最初是Standalone Mode,版本spark-0.9.1,支持Shark。

上线后,问题接踵而来,最为致命的是,shark在处理Hadooop bzip2文件时计算结果通常会有偏差,有时差的特别离谱(比如,用shark统计1个5kw行的日志,结果只有

3kw行).

显然shark+hive+spark+hadoop的某个环节出了bug。第一次面对这么复杂的系统,着实头疼。

于是,开始蛮干,部署shark+hive+spark+hadoop开发环境,debug,查看出问题的环节。(这个过程中把Spark-core的源码也缕了一遍),始终没有发现什么问题。

后来,参加了Spark技术大会,和同行交流的过程中,幡然悔悟: Spark的task是线程级并发的,而Hadoop MR的task是进程级并发的,那么,会不会是Bzip2存在线程安全问题呢?

回来后,查看Bzip2Codec相关的代码,终于发现了问题所在。(话说,凌晨3点,没有eclipse,用vim 改的), 迫不及待的重新编译Hadoop和Spark,测试,发现处理Bzip2结果OK了!

由于最近比较忙,向社区提交path需要漫长的过程,暂时没有提交社区。具体的patch如下,如有同行遇到同类问题,请借鉴.

Index: src/core/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java
===================================================================
--- src/core/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java	(版本 525)
+++ src/core/org/apache/hadoop/io/compress/bzip2/CBZip2InputStream.java	(版本 510)
@@ -129,9 +129,7 @@
   private int computedBlockCRC, computedCombinedCRC;

   private boolean skipResult = false;// used by skipToNextMarker
-  //modified by jicheng.song
-  //private static boolean skipDecompression = false;
-  private  boolean skipDecompression = false;
+  private static boolean skipDecompression = false;

   // Variables used by setup* methods exclusively

@@ -317,18 +315,13 @@
  * @throws IOException
    *
    */
-  //modified by jicheng.song
-  //public static long numberOfBytesTillNextMarker(final InputStream in) throws IOException{
-  public  long numberOfBytesTillNextMarker(final InputStream in) throws IOException{
-    this.skipDecompression = true;
-    //
-    this.in = new BufferedInputStream(in, 1024 * 9);// >1 MB buffer
-    this.readMode = readMode;
-    //CBZip2InputStream anObject = null;
+  public static long numberOfBytesTillNextMarker(final InputStream in) throws IOException{
+    CBZip2InputStream.skipDecompression = true;
+    CBZip2InputStream anObject = null;

-    //anObject = new CBZip2InputStream(in, READ_MODE.BYBLOCK);
+    anObject = new CBZip2InputStream(in, READ_MODE.BYBLOCK);

-    return this.getProcessedByteCount();
+    return anObject.getProcessedByteCount();
   }

   public CBZip2InputStream(final InputStream in) throws IOException {
@@ -402,9 +395,7 @@

     if(skipDecompression){
       changeStateToProcessABlock();
-      //modified by jicheng.song
-      //CBZip2InputStream.skipDecompression = false;
-      this.skipDecompression = false;
+      CBZip2InputStream.skipDecompression = false;
     }

     final int hi = offs + len;

.

时间: 2024-11-13 07:58:17

线上Spark处理Bzip2引出Hadoop Bzip2线程安全问题的相关文章

尖峰7月线上技术分享--Hadoop、MySQL

7月2号晚20:30-22:30 东大博士Dasight分享主题<大数据与Hadoop漫谈> 7月5号晚20:30-22:30  原支付宝MySQL首席DBA分享主题<MySQL发展趋势,MySQL各个分支介绍>.<MySQL 5.6版本特性介绍及如何从MySQL 5.5向MySQL 5.6> 7月10号晚20:30-22:30 东大博士Dasight分享主题<Hadoop与Nosql技术的适用性分析> 7月12号晚20:30-22:30  原支付宝MySQ

记Hadoop2.5.0线上mapreduce任务执行map任务划分的一次问题解决

前言 近日在线上发现有些mapreduce作业的执行时间很长,我们需要解决这个问题.输入文件的大小是5G,采用了lzo压缩,整个集群的默认block大小是128M.本文将详细描述这次线上问题的排查过程. 现象 线上有一个脚本,为了便于展示,我将这个脚本重新copy了一份并重命名为zzz.这个脚本实际是使用Hadoop streaming运行一个mapreduce任务,在线上执行它的部分输出内容如下: 可以看到map任务划分为1个.这个执行过程十分漫长,我将中间的一些信息省略,map与reduce

Spark是否会替代Hadoop?

我经常会从客户或者网上听到这个问题,尤其是最近几年.那么关于spark哪些被我们神化了,哪些又是真实的,以及它在"大数据"的生态系统中又是怎样的? 说实话,其实我把这个问题作为标题是有问题的,但是我们经常会这样问.Hadoop并不是一个单独的产品而是一个生态系统,而spark也是一样的.下面让我们来一个一个解释.目前Hadoop生态系统主要包括: HDFS-Hadoop分布式文件系统.它是一个分布式的.面向块的.不可更新的.高度伸缩性的.可运行在集群中普通硬盘上的文件系统.此外,HDF

关于线上优化服务器视频笔记1-----调优线上服务器

linux服务器调优的经验 目录: 1.系统故障排除思路 重视报错信息 永远不要忘记日志文件 分析.定位.解决问题 2.影响linux性能的因素 服务器硬件因素 操作系统的相关因素 程序因素 3.系统性能优化工具 Cpu性能优化工具 vmstat,iosta,sar 内存性能检测工具 free,top,sar,pidstat 磁盘性能评估工具 iostat,sar 网络性能分析工具 ping,mtr,netstat 4.系统性能分析与标准 5.性能调优的思路与技巧分享 几个故障鼓励案例和性能优化

[转]线上GC故障解决过程记录

排查了三四个小时,终于解决了这个GC问题,记录解决过程于此,希望对大家有所帮助.本文假定读者已具备基本的GC常识和JVM调优知识,关于JVM调优工具使用可以查看我在同一分类下的另一篇文章: http://my.oschina.net/feichexia/blog/196575 背景说明 发生问题的系统部署在Unix上,发生问题前已经跑了两周多了. 其中我用到了Hadoop源码中的CountingBloomFilter,并将其修改成了线程安全的实现(详情见:AdjustedCountingBloo

Spark 1.0.0 横空出世 Spark on yarn 部署(hadoop 2.4)

就在昨天,北京时间5月30日20点多.Spark 1.0.0终于发布了:Spark 1.0.0 released 根据官网描述,Spark 1.0.0支持SQL编写:Spark SQL Programming Guide 个人觉得这个功能对Hive的市场的影响很小,但对Shark冲击很大,就像win7和winXP的关系,自相残杀嘛? 这么着急的发布1.x 版是商业行为还是货真价实的体现,让我们拭目以待吧~~~~ 本文是CSDN-撸大湿原创,如要转载请注明出处,谢谢:http://blog.csd

【转】java线上程序排错经验2 - 线程堆栈分析

前言 在线上的程序中,我们可能经常会碰到程序卡死或者执行很慢的情况,这时候我们希望知道是代码哪里的问题,我们或许迫切希望得到代码运行到哪里了,是哪一步很慢,是否是进入了死循环,或者是否哪一段代码有问题导致程序很慢,或者出现了线程不安全的情况,或者是某些连接数或者打开文件数太多等问题,总之我们想知道程序卡在哪里了,哪块占用了大量的资源. 此时,或许通过线程堆栈的分析就能定位出问题. 如果能深入掌握堆栈分析的技术,很多问题都能迎刃而解,但是线程堆栈分析并不简单,设计到线上的排错问题,需要有一定的知识

一次线上GC故障解决过程记录

排查了三四个小时,终于解决了这个GC问题,记录解决过程于此,希望对大家有所帮助.本文假定读者已具备基本的GC常识和JVM调优知识,关于JVM调优工具使用可以查看我在同一分类下的另一篇文章: http://my.oschina.net/feichexia/blog/196575 背景说明 发生问题的系统部署在Unix上,发生问题前已经跑了两周多了. 其中我用到了Hadoop源码中的CountingBloomFilter,并将其修改成了线程安全的实现(详情见:AdjustedCountingBloo

线上服务应急与技术攻关方法论

海恩法则和墨菲定律 海恩法则指出: 每一起严重事故的背后,必然有29次轻微事故和300起未遂先兆以及1000起事故隐患. 海恩法则强调两点: (1)事故的发生是量的积累的结果: (2)再好的技术,再完美的规章,在实际操作层面,也无法取代人自身的素质和责任心. 根据海恩法则,一起重大事故发生之后,我们要在处理事故和解决问题的同事,还要及时的对同类问题的「事故征兆」和「事故苗头」进行排查并处理,以防止类似问题的再次发生,将问题在萌芽状态就将其解决掉,这可以作为互联网企业线上应急的指导思想. 墨菲定律