从DFSOutputStream的pipeline写机制到Streamer线程泄漏问题

前言

之前一段时间写了篇文章DataNode数据处理中心DataXceiver从大的方向了解了下datanode读写操作的过程.但是并没有具体细粒度的去关注读写操作中的细节以及可能存在的问题,本篇文章算是对这方面的一个补充吧.尽管本文所涉及的范围面看起来很窄,但是所呈现出来的结果一定会让你有所收获的.

DFSOutputStream写数据以及周边相关类,变量

本文主要阐述的datanode写数据的过程,而写数据过程中,第一个联系到的就是DFSOutputStream对象类.但其实这只是其中的一个大类,内部还包括了与其内部对象类的各种交互,协同的操作.下面花简短的篇幅介绍这几个类.

DataStreamer

数据流类,这是数据写操作时调用的主要类,DFSOutputStream的start()方法调用的就是dataStreamer的线程run方法,DFSOutputStream的主操作都是依靠内部对象类dataStreamer完成实现,可以说,这二者的联系最为紧密.

ResponseProcessor

ResponseProcessor类是DataStreamer中的内部类,主要作用是接收pipeline中datanode的ack回复,它是一个线程类.给出源码中的注释:

  //
  // Processes responses from the datanodes.  A packet is removed
  // from the ackQueue when its response arrives.
  //

DFSPacket

数据包类,在DataStreamer和DFSOutputStream中都是用的这个类进行数据的传输的,给出源码中的注释:

/****************************************************************
 * DFSPacket is used by DataStreamer and DFSOutputStream.
 * DFSOutputStream generates packets and then ask DatStreamer
 * to send them to datanodes.
 ****************************************************************/

除了以上3个大类需要了解之外,还有几个变量同样需要重视,因为这些变量会在后面的分析中经常出现.

1.dataQueue(List<DFSPacket>)

待发送数据包列表

2.ackQueue(List<DFSPacket>)

数据包回复列表,数据包发送成功后,dfsPacket将会从dataQueue移到ackQueue中.

3.pipeline

pipeline是一个经常看见的名词,中文翻译的意思是"管道",但是这个词我在网上也搜了相关的更好的解释,稍稍比较好理解的方式是"流水线模型",也有些人把它与设计模式中的责任链模式相挂钩,所以这个词用中文翻译总是不能很好的表达他的原意,在后面的篇幅中还会继续提到.

DataStreamer数据流对象

了解写数据的具体细节,需要首先了解DataStreamer的实现机理,因为DFSOutputStream的主操作无非是调用了dataStreamer的内部方法.DataStreamer源码中的注释很好的解释了DataStreamer所做的事,学习DataStreamer可以从阅读他的注释开始.

/*********************************************************************
 *
 * The DataStreamer class is responsible for sending data packets to the
 * datanodes in the pipeline. It retrieves a new blockid and block locations
 * from the namenode, and starts streaming packets to the pipeline of
 * Datanodes. Every packet has a sequence number associated with
 * it. When all the packets for a block are sent out and acks for each
 * if them are received, the DataStreamer closes the current block.
 *
 * The DataStreamer thread picks up packets from the dataQueue, sends it to
 * the first datanode in the pipeline and moves it from the dataQueue to the
 * ackQueue. The ResponseProcessor receives acks from the datanodes. When an
 * successful ack for a packet is received from all datanodes, the
 * ResponseProcessor removes the corresponding packet from the ackQueue.
 *
 * In case of error, all outstanding packets are moved from ackQueue. A new
 * pipeline is setup by eliminating the bad datanode from the original
 * pipeline. The DataStreamer now starts sending packets from the dataQueue.
 *
 *********************************************************************/

如果看不懂这么多的英文,没有关系,我特地对其进行了翻译,帮助大家理解:

DataStreamer对象类是负责发送data packets数据包到pipeline中的各个datanode中.
它会从namenode中寻求一个新的blockId和block的位置信息,然后开始以流式的方式在pipeline
的datanode中进行packet数据包的传输.每个包有属于它自己的一个数字序列号.当属于一个block
块的所有的数据包发生完毕并且对应的ack回复都被接收到了, 则表明此次的block写入完成,dataStreamer
将会关闭当前block块.
DataStreamer线程从dataQueue中选取packets数据包,发送此数据包给pipeline中的首个datanode,
然后移动此数据从dataQueue列表到ackQueue.ResponseProcessor会从各个datanode中接收ack回复.
当对于一个packet的成功的ack回复被所有的datanode接收到了,ResponseProcessor将会从ackQueue列
表中移除相应的packet包.
当出现错误的时候,所有的未完成的packet数据包将会从ackQueue中移除掉.一个新的
pipeline会被重新建立,新建立的pipeline会除掉坏的datanode.DataStreamer会从dataQueue
中重新发送数据包

OK,读完官方注释,想必或多或少已经对其中的机理有所了解.下图是我做的一张结构简图:

这张图对应的程序逻辑在run()方法中,首先在while循环中会获取一个dataPacket数据包:

one = dataQueue.getFirst(); // regular data packet

然后在接下来的操作中会出现packet的转移

        // send the packet
        SpanId spanId = SpanId.INVALID;
        synchronized (dataQueue) {
          // move packet from dataQueue to ackQueue
          if (!one.isHeartbeatPacket()) {
            if (scope != null) {
              spanId = scope.getSpanId();
              scope.detach();
              one.setTraceScope(scope);
            }
            scope = null;
            dataQueue.removeFirst();
            ackQueue.addLast(one);
            dataQueue.notifyAll();
          }
        }

然后发送数据到远程datanode节点

        // write out data to remote datanode
        try (TraceScope ignored = dfsClient.getTracer().
            newScope("DataStreamer#writeTo", spanId)) {
          one.writeTo(blockStream);
          blockStream.flush();
        } catch (IOException e) {
        ...

dataStreamer发送完数据包之后,responseProcessor进程会收到来自datanode的ack回复,如果对于一个block块,收到了pipeline中datanode所有的ack回复信息,则代表这个block块发送完成了.pipeline的datanode的构建分为2种情形,代表着2种情形的数据传输

BlockConstructionStage.PIPELINE_SETUP_CREATE
BlockConstructionStage.PIPELINE_SETUP_APPEND

第一种情况在新分配块的时候进行的,从namenode上获取新的blockId和位置,然后连接上第一个datanode.

if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Allocating new block: " + this);
          }

          setPipeline(nextBlockOutputStream());
          initDataStreaming();
        }
/**
   * Open a DataStreamer to a DataNode so that it can be written to.
   * This happens when a file is created and each time a new block is allocated.
   * Must get block ID and the IDs of the destinations from the namenode.
   * Returns the list of target datanodes.
   */
  protected LocatedBlock nextBlockOutputStream() throws IOException {
      ...

      //
      // Connect to first DataNode in the list.
      //
      success = createBlockOutputStream(nodes, storageTypes, 0L, false);
      ...

pipeline的第一阶段可以用下图表示

然后另外一个阶段是第一个datanode节点向其他剩余节点建立连接

        } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Append to block {}", block);
          }

          setupPipelineForAppendOrRecovery();
          if (streamerClosed) {
            continue;
          }
          initDataStreaming();
        }

后面是建立连接的代码

  /**
   * Open a DataStreamer to a DataNode pipeline so that
   * it can be written to.
   * This happens when a file is appended or data streaming fails
   * It keeps on trying until a pipeline is setup
   */
  private void setupPipelineForAppendOrRecovery() throws IOException {
    // Check number of datanodes. Note that if there is no healthy datanode,
    // this must be internal error because we mark external error in striped
    // outputstream only when all the streamers are in the DATA_STREAMING stage
    ...
    setupPipelineInternal(nodes, storageTypes);
  }

  protected void setupPipelineInternal(DatanodeInfo[] datanodes,
      StorageType[] nodeStorageTypes) throws IOException {
      ...

      // set up the pipeline again with the remaining nodes
      success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery);
      ...

用图形展示的效果如下

pipeline的异常重建发生在datanode io处理这块

  public void run() {
    long lastPacket = Time.monotonicNow();
    TraceScope scope = null;
    while (!streamerClosed && dfsClient.clientRunning) {
      ...

      DFSPacket one;
      try {
        // process datanode IO errors if any
        boolean doSleep = processDatanodeOrExternalError();
        ...
  /**
   * If this stream has encountered any errors, shutdown threads
   * and mark the stream as closed.
   *
   * @return true if it should sleep for a while after returning.
   */
  private boolean processDatanodeOrExternalError() throws IOException {
    if (!errorState.hasDatanodeError() && !shouldHandleExternalError()) {
      return false;
    }

   ...

    if (response != null) {
      LOG.info("Error Recovery for " + block +
          " waiting for responder to exit. ");
      return true;
    }
    closeStream();

    // move packets from ack queue to front of the data queue
    synchronized (dataQueue) {
      dataQueue.addAll(0, ackQueue);
      ackQueue.clear();
    }

    // If we had to recover the pipeline five times in a row for the
    // same packet, this client likely has corrupt data or corrupting
    // during transmission.
    if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5) {
      LOG.warn("Error recovering pipeline for writing " +
          block + ". Already retried 5 times for the same packet.");
      lastException.set(new IOException("Failing write. Tried pipeline " +
          "recovery 5 times without success."));
      streamerClosed = true;
      return false;
    }

    setupPipelineForAppendOrRecovery();
    ...

ResponseProcessor回复获取类

进入responseProcessor类的主运行方法:

@Override
    public void run() {

      setName("ResponseProcessor for block " + block);
      PipelineAck ack = new PipelineAck();

      TraceScope scope = null;
      while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
        // process responses from datanodes.
        try {
          // read an ack from the pipeline
          long begin = Time.monotonicNow();
          ack.readFields(blockReplyStream);
          ...

这里会从blockReplyStream输入流中读取ack返回信息,要特别注意的是,这里的读到的ack与之前的ackQueue中的ack并不是指同一个对象.这个ack指的是PipelineAck,主要的作用是获取其中的seqno序列号.

long seqno = ack.getSeqno();

判断是否是有效的block回复

          assert seqno != PipelineAck.UNKOWN_SEQNO :
              "Ack for unknown seqno should be a failed ack: " + ack;
          if (seqno == DFSPacket.HEART_BEAT_SEQNO) {  // a heartbeat ack
            continue;
          }

然后取出ack DFSPacket数据包,比较序列号,判断是否一致

          // a success ack for a data packet
          DFSPacket one;
          synchronized (dataQueue) {
            one = ackQueue.getFirst();
          }
          if (one.getSeqno() != seqno) {
            throw new IOException("ResponseProcessor: Expecting seqno " +
                " for block " + block +
                one.getSeqno() + " but received " + seqno);
          }

此ack回复包判断完毕后,会进行相应的Packet移除

          synchronized (dataQueue) {
            scope = one.getTraceScope();
            if (scope != null) {
              scope.reattach();
              one.setTraceScope(null);
            }
            lastAckedSeqno = seqno;
            pipelineRecoveryCount = 0;
            ackQueue.removeFirst();
            dataQueue.notifyAll();

            one.releaseBuffer(byteArrayManager);
          }

ackQueue中的packet就被彻底移除掉了,从最开始的加入到dataQueue,到move到ackQueue,到最后回复确认完毕,进行最终的移除.

在这些操作执行期间,还会进行一项判断

isLastPacketInBlock = one.isLastPacketInBlock();

如果此packet是发送block块的最后一个packet,则此responseProcessor将会退出循环.

while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock)

当然,期间发生异常的时候,会导致responderClosed设置为true,导致循环的退出

          catch (Exception e) {
          if (!responderClosed) {
            lastException.set(e);
            errorState.setInternalError();
            errorState.markFirstNodeIfNotMarked();
            synchronized (dataQueue) {
              dataQueue.notifyAll();
            }
            if (!errorState.isRestartingNode()) {
              LOG.warn("Exception for " + block, e);
            }
            responderClosed = true;
          }

同样地,我做了一张结构图简单的展示了其中的流程

DataStreamer与DFSOutputStream的关系

在前文中已经或多或少提到了这2个类之间的关系.可简要概况为4大关系:

1.创建与被创建的关系.

2.启动与被启动的关系.

3.关闭与被关闭的关系.

4.生产者与消费者的关系.

下面一一做简要的分析.创建与被创建的关系,可以从DFSOutputStream的构造函数中进行体现

  /** Construct a new output stream for append. */
  private DFSOutputStream(DFSClient dfsClient, String src,
      EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
      HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes)
      throws IOException {
    this(dfsClient, src, progress, stat, checksum);
    initialFileSize = stat.getLen(); // length of file when opened
    this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);

    boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK);

    this.fileEncryptionInfo = stat.getFileEncryptionInfo();

    // The last partial block of the file has to be filled.
    if (!toNewBlock && lastBlock != null) {
      // indicate that we are appending to an existing block
      streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress,
          checksum, cachingStrategy, byteArrayManager);
      getStreamer().setBytesCurBlock(lastBlock.getBlockSize());
      adjustPacketChunkSize(stat);
      getStreamer().setPipelineInConstruction(lastBlock);
    } else {
      computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
          bytesPerChecksum);
      streamer = new DataStreamer(stat,
          lastBlock != null ? lastBlock.getBlock() : null, dfsClient, src,
          progress, checksum, cachingStrategy, byteArrayManager, favoredNodes);
    }
  }

第二点,启动与被启动的关系

启动指的是start()方法

  protected synchronized void start() {
    getStreamer().start();
  }

getStreamer方法用于获取内部对象变量dataStreamer.

  /**
   * Returns the data streamer object.
   */
  protected DataStreamer getStreamer() {
    return streamer;
  }

第三点,关闭与被关闭的关系.

  public void close() throws IOException {
    synchronized (this) {
      try (TraceScope ignored = dfsClient.newPathTraceScope(
          "DFSOutputStream#close", src)) {
        closeImpl();
      }
    }
    dfsClient.endFileLease(fileId);
  }

  protected synchronized void closeImpl() throws IOException {
    ...
    closeThreads(true);
    ...
  }
  // shutdown datastreamer and responseprocessor threads.
  // interrupt datastreamer if force is true
  protected void closeThreads(boolean force) throws IOException {
    try {
      getStreamer().close(force);
      getStreamer().join();
      getStreamer().closeSocket();
    } catch (InterruptedException e) {
      throw new IOException("Failed to shutdown streamer");
    } finally {
      getStreamer().setSocketToNull();
      setClosed();
    }
  }

在这里就会把streamer相关的类进行关闭.

第四点,生成者与消费者的关系,这个关系有点意思,那消费对象是什么呢,答案就是DFSPacket,dataQueue中所存储的对象. 也就是说,DFSOutputStream中的方法会往dataQueue中put入DFSPacket,然后dataStreamer会在主方法中区获取,也就是上文分析的场景.其中在DFSOutputStream中写入数据包的方法如下

  // @see FSOutputSummer#writeChunk()
  @Override
  protected synchronized void writeChunk(byte[] b, int offset, int len,
      byte[] checksum, int ckoff, int cklen) throws IOException {
    dfsClient.checkOpen();
    checkClosed();

    if (len > bytesPerChecksum) {
      throw new IOException("writeChunk() buffer size is " + len +
                            " is larger than supported  bytesPerChecksum " +
                            bytesPerChecksum);
    }
    if (cklen != 0 && cklen != getChecksumSize()) {
      throw new IOException("writeChunk() checksum size is supposed to be " +
                            getChecksumSize() + " but found to be " + cklen);
    }

    if (currentPacket == null) {
      currentPacket = createPacket(packetSize, chunksPerPacket, getStreamer()
          .getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), false);

      ...
    }

    currentPacket.writeChecksum(checksum, ckoff, cklen);
    currentPacket.writeData(b, offset, len);
    currentPacket.incNumChunks();
    getStreamer().incBytesCurBlock(len);

    // If packet is full, enqueue it for transmission
    if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
        getStreamer().getBytesCurBlock() == blockSize) {
      enqueueCurrentPacketFull();
    }
  }

enqueueCurrentPacketFull方法就会将packet写入dataQueue中.

  void enqueueCurrentPacket() throws IOException {
    getStreamer().waitAndQueuePacket(currentPacket);
    currentPacket = null;
  }

其实在DFSOutputStream的close方法中,也会触发一次flush data最后清洗数据的操作到各个detained中,也会调用到enqueueCurrentPacket方法.

  protected synchronized void closeImpl() throws IOException {
    ...
        flushBuffer(); // flush from all upper layers

        if (currentPacket != null) {
          enqueueCurrentPacket();
        }

        if (getStreamer().getBytesCurBlock() != 0) {
          setCurrentPacketToEmpty();
        }

        flushInternal(); // flush all data to Datanodes
        // get last block before destroying the streamer
        // If exception happened before, the last block will be null
        lastBlock = getStreamer().getBlock();
        ...

同样地,我也设计了一张关系结果图展现上述的4种关系.

Streamer线程泄漏问题

Streamer线程泄漏问题是在学习DFSOutputStream相关机理时发现的,过程算是比较意外吧.线程泄漏问题可以类比于内存泄漏,就是该释放的空间没释放,线程泄漏问题同理,该关闭的线程对象没有及时关闭,发生的方法自然而然地在DFSOutputStream的close方法中了,重新调出这段程序.

  /**
   * Closes this output stream and releases any system
   * resources associated with this stream.
   */
  @Override
  public void close() throws IOException {
    synchronized (this) {
      try (TraceScope ignored = dfsClient.newPathTraceScope(
          "DFSOutputStream#close", src)) {
        closeImpl();
      }
    }
    dfsClient.endFileLease(fileId);
  }

再次进入closeImpl实质的关闭方法,仔细观察每步操作可能存在的问题

  protected synchronized void closeImpl() throws IOException {
    if (isClosed()) {
      getStreamer().getLastException().check(true);
      return;
    }

    try {
        flushBuffer(); // flush from all upper layers

        if (currentPacket != null) {
          enqueueCurrentPacket();
        }

        if (getStreamer().getBytesCurBlock() != 0) {
          setCurrentPacketToEmpty();
        }

        flushInternal(); // flush all data to Datanodes
        // get last block before destroying the streamer
        // If exception happened before, the last block will be null
        ExtendedBlock  lastBlock = getStreamer().getBlock();
        closeThreads(true);

      try (TraceScope ignored =
               dfsClient.getTracer().newScope("completeFile")) {
        completeFile(lastBlock);
      }
    } catch (ClosedChannelException ignored) {
    } finally {
      setClosed();
    }
  }

因为可能存在streamer线程对象未关闭的问题,所以我们得要找到可能在closeThreads方法之前可能有问题的代码.如果你比较细心的话,应该马上发现问题所在了.

        flushBuffer(); // flush from all upper layers

        if (currentPacket != null) {
          enqueueCurrentPacket();
        }

        if (getStreamer().getBytesCurBlock() != 0) {
          setCurrentPacketToEmpty();
        }

        flushInternal(); // flush all data to Datanodes

从flushBuffer到flushInternal中的操作都可能抛出IO异常,一旦抛出异常,自然就直接跳到finally处进行处理,中间的closeThread将不会被执行到,从而导致dataStreamer线程泄漏.这个bug我已经提交开源社区,并且提供相应的patch,编号HDFS-9812,解决办法很简单,在这层代码中再包一层try,catch,把closeThread方法放入新增try,catch方法的末尾进行处理,详细信息可以看文章末尾的链接.

相关链接

Issue链接: https://issues.apache.org/jira/browse/HDFS-9812
Github patch链接: https://github.com/linyiqun/open-source-patch/tree/master/hdfs/HDFS-9812

时间: 2024-10-06 23:57:53

从DFSOutputStream的pipeline写机制到Streamer线程泄漏问题的相关文章

linux块设备的IO调度算法和回写机制

************************************************************************************** 參考: <Linux内核设计与实现> http://laokaddk.blog.51cto.com/368606/699028/ http://www.cnblogs.com/zhenjing/archive/2012/06/20/linux_writeback.html *************************

Linux页高速缓存与回写机制分析

参考 <Linux内核设计与实现> ******************************************* 页高速缓存是linux内核实现的一种主要磁盘缓存,它主要用来减少对磁盘的IO操作,具体地讲,是通过把磁盘中的数据缓存到物理内存中,把对磁盘的访问变为对物理内存的访问.为什么要这么做呢?一,速度:二临时局部原理.有关这两个概念,相信熟悉操作系统的我们不会太陌生.页高速缓存是由RAM中的物理页组成的,缓存中的每一页都对应着磁盘中的多个块.每当内核开始执行一个页IO操作时,就先

Linux页快速缓存与回写机制分析

參考 <Linux内核设计与实现> ******************************************* 页快速缓存是linux内核实现的一种主要磁盘缓存,它主要用来降低对磁盘的IO操作,详细地讲,是通过把磁盘中的数据缓存到物理内存中,把对磁盘的訪问变为对物理内存的訪问.为什么要这么做呢?一,速度:二暂时局部原理.有关这两个概念,相信熟悉操作系统的我们不会太陌生.页快速缓存是由RAM中的物理页组成的,缓存中的每一页都相应着磁盘中的多个块.每当内核開始运行一个页IO操作时,就先

Cache写机制

Cache 写机制分为:Write-through和Write-back Write-through(直写模式) 定义:在数据更新时,同时写入缓存Cache和后端存储(主存): 优点:操作简单: 缺点:因为数据修改需要同时写入存储,数据写入速度较慢. 对于写缺失使用no write allocate policy(见下文)的write through 处理流程 Write-back(回写模式) 定义:在数据更新时只写入缓存Cache,只在数据被替换出缓存时,被修改(用dirty标记)的缓存数据才

Linux 线程实现机制分析 Linux 线程实现机制分析 Linux 线程模型的比较:LinuxThreads 和 NPTL

Linux 线程实现机制分析 Linux 线程实现机制分析  Linux 线程模型的比较:LinuxThreads 和 NPTL http://www.ibm.com/developerworks/cn/linux/kernel/l-thread/ 自从多线程编程的概念出现在 Linux 中以来,Linux 多线应用的发展总是与两个问题脱不开干系:兼容性.效率.本文从线程模型入手,通过分析目前 Linux 平台上最流行的 LinuxThreads 线程库的实现及其不足,描述了 Linux 社区是

线程机制、CLR线程池以及应用程序域

最近在总结多线程.CLR线程池以及TPL编程实践,重读一遍CLR via C#,比刚上班的时候收获还是很大的.还得要多读书,读好书,同时要多总结,多实践,把技术研究透,使用好. 话不多说,直接上博文吧.先说一下,为什么Windows要支持线程机制? 1. Windows为什么要支持线程 计算机的早期时代,操作系统没有线程的概念,整个系统只运行着一个执行线程,其中包含操作系统代码和应用程序代码.只用一个执行线程的问题在于,长时间运行的任务会阻止其他任务的执行.例如16位Windows的时代,打印文

rabbitMQ的简单实例——amqp协议带数据回写机制

rabbitMQ是一种高性能的消息队列,支持或者说它实现了AMQP协议(advanced message queue protocol高级消息队列协议). 下面简单讲一讲一个小例子.我们首先要部署好rabbitMQ,然后实现一个生产者—消费者,生产者向rabbit中发布一个消息,消费者去rabbit取这个消息,在正确收到这个消息后,消费者会通过返回队列回写通知生产者自己收到了消息. windows下部署rabbit非常简单,先安装erlang运行时,然后安装rabbitMQ安装文件即可,都是ex

Cache写机制:Write-through与Write-back

cache through https://www.cnblogs.com/gordonkong/p/7161809.html 原文地址:https://blog.csdn.net/wyzxg/article/details/7254458 通常有三种方法: write through:CPU向cache写入数据时,同时向memory(后端存储)也写一份,使cache和memory的数据保持一致.优点是简单,缺点是每次都要访问memory,速度比较慢. post write:CPU更新cache

一次由于Java内存管理机制导致的线程异常阻塞之旅

引言 一转眼已经两年多没写多博客了:一转眼也要快工作两三年了:一转眼我又开始写Java代码了.希望自己可以坚持写写博客,总结总结的习惯!加油. 今天在调试代码的时候,发现两个毫不相关的thread用jstack看竟然其中一个在等待另一个的线程持有的锁,很是奇怪.经过研究,是因为Integer类的实现机制导致的. 一.异常阻塞代码 1 package xxx; 2 3 public class TestDeadLock { 4 public static void main(String[] ar