DataNode数据处理中心DataXceiver

前言

最近在CSDN的首页上看到了hadoop十周年的文章,不禁感慨这真是一个伟大的系统啊.在这十年间,hadoop自身进行了许多演化和大的改变,而在其下,也孵化出了许多子项目,围绕着hadoop的生态圈现在变得越来越丰富了.所以作为一个出色的分布式系统,他有很多地方值得我们学习,最近本人在研究DataXceiver方面的代码,此篇文章算是这几天学习的一个总结吧.

为什么选择学习DataXceiver?

我们从大的层面往小说,你就知道他有多重要了.我们使用Hadoop系统,最看重的是什么,2个字,存储,存储的过程中,什么又是最看着的呢,那当然是数据了.而这些数据都是存在于各个DataNode之上的.所以掌握了解DataNode的读写操作原理就显得尤为重要了.而这个控制中心就在DataXceiver中.

DataXceiver的定义

DataXceiver是干什么用的呢,很多人只知DataNode,而不知另外一个很重要的线程DataXceiver.在Hadoop中对于DataXceiver中的注释解释如下:

/**
 * Thread for processing incoming/outgoing data stream.
 */
class DataXceiver extends Receiver implements Runnable {
  ...

中文大意为"处理输入/输出数据流的线程".我的个人理解就是数据流的处理中心.DataXceiver线程数的多少在一定程度上能反映出一个节点的忙碌程度.DataXceiver这个类中包含的变量和方法还是比较多的,我不大建议读者逐行的去详细的阅读内部的代码.我们去学习一个机制,原理的时候,主要去明白的是结构.比如我们现在要去学习DataXceiver这个类,我们的目标是去了解这个类中主要做了哪些操作,上游被哪些对象调用,下游又调用了哪些类,具体的代码细节等碰到具体的问题时再去分析即可,否则可能会被里面复杂的逻辑绕晕,毕竟这是一个成熟的分布式的程序,不是一时半会能够立刻理解的.

DataXceiver的结构

为了我们更好的去理解这个"数据处理中心",我们需要去了解这个类的整体结构,在此之前不妨去了解一下其中的内部方法:

首先,这是一个线程服务,执行入口一定是run方法,执行run方法,就可以找到与这些方法相关的联系.

/**
   * Read/write data from/to the DataXceiverServer.
   */
  @Override
  public void run() {
    int opsProcessed = 0;
    Op op = null;

      ...

      // We process requests in a loop, and stay around for a short timeout.
      // This optimistic behaviour allows the other end to reuse connections.
      // Setting keepalive timeout to 0 disable this behavior.
      do {
        updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));

        try {
          if (opsProcessed != 0) {
            assert dnConf.socketKeepaliveTimeout > 0;
            peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
          } else {
            peer.setReadTimeout(dnConf.socketTimeout);
          }
          op = readOp();
        } catch (InterruptedIOException ignored) {
          // Time out while we wait for client rpc
          break;
        } catch (IOException err) {
          // Since we optimistically expect the next op, it‘s quite normal to get EOF here.
          if (opsProcessed > 0 &&
              (err instanceof EOFException || err instanceof ClosedChannelException)) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");zhu
            }
          } else {
            incrDatanodeNetworkErrors();
            throw err;
          }
          break;
        }

        // restore normal timeout
        if (opsProcessed != 0) {
          peer.setReadTimeout(dnConf.socketTimeout);
        }

        opStartTime = monotonicNow();
        processOp(op);
        ++opsProcessed;
      } while ((peer != null) &&
          (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
      ...

在run方法中间的主循环方法中,可以看到1个readOp,对应的1个processOp.Op对应的意思是操作码.readOp会从输入流中读取操作码:

/** Read an Op.  It also checks protocol version. */
  protected final Op readOp() throws IOException {
    final short version = in.readShort();
    if (version != DataTransferProtocol.DATA_TRANSFER_VERSION) {
      throw new IOException( "Version Mismatch (Expected: " +
          DataTransferProtocol.DATA_TRANSFER_VERSION  +
          ", Received: " +  version + " )");
    }
    return Op.read(in);
  }

而processOp则会进行判断处理:

  /** Process op by the corresponding method. */
  protected final void processOp(Op op) throws IOException {
    switch(op) {
    case READ_BLOCK:
      opReadBlock();
      break;
    case WRITE_BLOCK:
      opWriteBlock(in);
      break;
    case REPLACE_BLOCK:
      opReplaceBlock(in);
      break;
    case COPY_BLOCK:
      opCopyBlock(in);
      break;
    case BLOCK_CHECKSUM:
      opBlockChecksum(in);
      break;
    case TRANSFER_BLOCK:
      opTransferBlock(in);
      break;
    case REQUEST_SHORT_CIRCUIT_FDS:
      opRequestShortCircuitFds(in);
      break;
    case RELEASE_SHORT_CIRCUIT_FDS:
      opReleaseShortCircuitFds(in);
      break;
    case REQUEST_SHORT_CIRCUIT_SHM:
      opRequestShortCircuitShm(in);
      break;
    default:
      throw new IOException("Unknown op " + op + " in data stream");
    }
  }

总共9种类型,对应着9种处理方法.到此,DataXceiver的基本结构慢慢清晰了,可以用下面的一张图展示:

左上方的Sender是什么意思在后面会解释,可以先忽略.

DataXceiver下游处理方法

从上一小节中的结构图已经看到了处理相应码的9个方法另加2个response回复方法.这个9个方法可以大致分为2大类方法:

1.普通读写block块操作方法.

划分到普通读写block块方法的有readBlock,writeBlock,transferBlock,copyBlock,replaceBlock,blockChecksum,剩下的待ShortCircuit的方法就是属于shortCircuit读相关的方法.下面对具体的这些方法做场景分析.

1.readBlock

方法名就已经体现了这个方法的操作了,自然是读取block信息操作,一般用于远程读或或者本地读操作.

2.writeBlock

写block块操作,将参数传入的数据块写入目标节点列表中.

3.transferBlock

传输指定副本到目标节点列表中,官方注释如下:

Transfer a replica to the datanode targets.

4.copyBlock

拷贝块信息数据,与readBlock原理类似,都用到了BlockSender.send方法.

5.replaceBlock

replaceBlock在DataXceiver中更接近的意思其实是moveBlock,此操作一般会在数据Balance的时候会做.

6.blockChecksum

从文件元信息头部读取校验和数据.

HDFS中的ShortCircuit读机制

这里要特地将shortCircuit读的几个方法单独分到一个模块中,因为shortCircuit读机制是HDFS在后面的版本中才引入的概念,可能有些人还不了解,这里给大家普及一下这方面的知识.

ShortCircuit的缘来

在早些时候,hadoop为了能让数据处理的更加的高效,都尽可能的让数据维持在本地,以此来避免大量的远程读操作,本地读的专业术语就是"Local Read".但是渐渐的到了后面,尽管本地读的比例确实提升了,但是好像还不是最优.因为虽说数据是在本地,但是每次客户端读取数据,还是需要走DataNode这一层,在其间还是会走网络通信的1块,能不能以类似于直接读取本地文件系统的方式去读本地的数据,而shortCircuit读就是源自于这个想法而诞生的.

shortCircuit本地读的实现

shortCircuit读俗称"短路读",后来采用了Linux操作系统中一种计数来实现这个功能,"Unix Domain Socket".他是一种进程间通信的方式,他很重要的一点是可以在进程间传递文件描述符,借此来进行进程间的通信.关于shortCircuit本地读的更细节的文章可以读此原文How Improved Short-Circuit Local Reads Bring Better Performance and Security to Hadoop.

shortCircuit机制

在HDFS中用的是short-circuit memory segments来实现数据的读操作.DfsClient客户端通过shortCircuit实现本地读的简要过程如下:

1.DfsClient客户端从DataNode请求shared memory segments共享内存片段.

2.ShortCircuitRegistry注册对象会产生并管理这些内存对象对象.

3.在本地读之前,DfsClient客户端会向DataNode请求需要的文件描述符,对应的就是requestShortCircuitFds方法.

4.block块在此期间的状态跟踪用的是slot表示.

5.如果一次本度读数据完成之后,相应的会执行释放操作.

给出源码中的官方解释:

/**
 * Manages client short-circuit memory segments on the DataNode.
 *
 * DFSClients request shared memory segments from the DataNode.  The
 * ShortCircuitRegistry generates and manages these segments.  Each segment
 * has a randomly generated 128-bit ID which uniquely identifies it.  The
 * segments each contain several "slots."
 *
 * Before performing a short-circuit read, DFSClients must request a pair of
 * file descriptors from the DataNode via the REQUEST_SHORT_CIRCUIT_FDS
 * operation.  As part of this operation, DFSClients pass the ID of the shared
 * memory segment they would like to use to communicate information about this
 * replica, as well as the slot number within that segment they would like to
 * use.  Slot allocation is always done by the client.
 *
 * Slots are used to track the state of the block on the both the client and
 * datanode. When this DataNode mlocks a block, the corresponding slots for the
 * replicas are marked as "anchorable".  Anchorable blocks can be safely read
 * without verifying the checksum.  This means that BlockReaderLocal objects
 * using these replicas can skip checksumming.  It also means that we can do
 * zero-copy reads on these replicas (the ZCR interface has no way of
 * verifying checksums.)
 *
 * When a DN needs to munlock a block, it needs to first wait for the block to
 * be unanchored by clients doing a no-checksum read or a zero-copy read. The
 * DN also marks the block‘s slots as "unanchorable" to prevent additional
 * clients from initiating these operations in the future.
 *
 * The counterpart of this class on the client is {@link DfsClientShmManager}.
 */

DataXceiver的上游调用

DataXceiver的上游调用其实就是Op操作码的输入方,通过寻找Op,XX的调用位置可以找到都是来自于同一个对象类,Sende.其中输入Op.COPY_BLOCK的例子:

@Override
  public void copyBlock(final ExtendedBlock blk,
      final Token<BlockTokenIdentifier> blockToken) throws IOException {
    OpCopyBlockProto proto = OpCopyBlockProto.newBuilder()
      .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken))
      .build();

    send(out, Op.COPY_BLOCK, proto);
  }

剩余的8个方法均与DataXceiver中的相对应.现在就可以很好的解释上图中Sender存在的原因了,大家可以仔细理解一下.Sender对象虽然是操作码的直接传入类,但不是方法最初始的调用方,我们需要从这个点往上寻找,找到最开始的触发者.为了节省篇幅,直接给出结果:

最后Dispatcher类是用在Balancer操作中的.如上图所显示的,真正读写数据的发起方就是我们经常碰到的DfsClient,DfsOutputStream,BlcokReader这些对象类.这样的话,DataXceiver的上游以及下游处理就打通了.

DataXceiver与DataXceiverServer

提到DataXceiver,就不得不提DataXceiverServer.在DataXceiverServer会保存记录每次新启动的DataXceiver线程.在他的主循环方法中,会进行DataXceiver的创建

@Override
  public void run() {
    Peer peer = null;

    while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
      try {
        peer = peerServer.accept();

        // Make sure the xceiver count is not exceeded
        int curXceiverCount = datanode.getXceiverCount();
        if (curXceiverCount > maxXceiverCount) {
          throw new IOException("Xceiver count " + curXceiverCount
              + " exceeds the limit of concurrent xcievers: "
              + maxXceiverCount);
        }

        new Daemon(datanode.threadGroup,
            DataXceiver.create(peer, datanode, this))
            .start();
      } catch (SocketTimeoutException ignored) {

随之会加入DataXceiverServer的2个map对象中:

  /**
   * Read/write data from/to the DataXceiverServer.
   */
  @Override
  public void run() {
    int opsProcessed = 0;
    Op op = null;

    try {
      dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
      ...
synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver)
      throws IOException {
    if (closed) {
      throw new IOException("Server closed.");
    }
    peers.put(peer, t);
    peersXceiver.put(peer, xceiver);
  }

所以DataXceiver与DataXceiverServer的关系图可用下面的关系结构表示:

补充

添加一下额外的补充,最近阅读了DataXceiver的源码,发现里面的代码比较乱,多处异常日志级别输出不准确,都是INFO级别,不利于发现异常日志记录,于是向社区提交Issue, HDFS-9727.

相关链接

Issue链接: https://issues.apache.org/jira/browse/HDFS-9727

Github patch链接: https://github.com/linyiqun/open-source-patch/tree/master/hdfs/HDFS-9727

时间: 2024-08-08 21:47:51

DataNode数据处理中心DataXceiver的相关文章

从DFSOutputStream的pipeline写机制到Streamer线程泄漏问题

前言 之前一段时间写了篇文章DataNode数据处理中心DataXceiver从大的方向了解了下datanode读写操作的过程.但是并没有具体细粒度的去关注读写操作中的细节以及可能存在的问题,本篇文章算是对这方面的一个补充吧.尽管本文所涉及的范围面看起来很窄,但是所呈现出来的结果一定会让你有所收获的. DFSOutputStream写数据以及周边相关类,变量 本文主要阐述的datanode写数据的过程,而写数据过程中,第一个联系到的就是DFSOutputStream对象类.但其实这只是其中的一个

hadoop(2.5,2.6) HDFS偶发性心跳异常以及大量DataXceiver线程被Blocked故障处理分享

一.概要 公司近期Storm清洗程序那边反应HDFS会出现偶发性的异常导致数据写不进HDFS,另外一些Spark作业在大规模往HDFS灌数据时客户端会出现各种"all datanode bad.."以及服务端出现各种timeout,值得注意的是出现这样的问题是各个datanode节点的负载并不高! 二.故障分析 首先,当我们在HDFS客户端看到各种timeOut...什么waiting for reading等信息的时候,我们第一反应是为什么在往HDFS写数据时组成pipeline的各

HDFS内部的认证机制

前言 数据的安全性是一直被大家所重视的.对于一个存有大规模数据量的成熟企业来说,如何做到数据不丢失,不损坏,不窃取就显得格外重要了.而HDFS恰恰满足了"海量数据规模"的特点,所以如果我们用HDFS存储大量的非结构化的数据,我们如何保证其中数据的安全性呢?在之前的文章中,有提到过一个"Encryption Zone"数据加密空间的概念.Encryption Zone可以保证用户在指定的加密空间路径下,数据是被加/解密的,而且对于用户来说完全透明.详细信息可点击HDF

九爷 带你走向职场 百度篇

 转眼即逝,步入IT行业的我不知不觉已经在这个领域打拼了三个年头.虽然称不上什么技术大牛,但一路也是乘风破浪.                             畅想未来;      起初的我刚刚大学毕业,拿着一份简历四处求职,那个时候可以说互联网行业已经步入正轨.转眼2017年已经过了三分之一 ,回头看去中国IT行业正在逐步攀升.从大型机械时代 个人PC时代 互联网时代 云计算时代 人工智能化时代 机器人时代 每个时代的变迁都考验着IT精英的迅速递进.随着云计算大数据的来临每个IT精英都

SOA EDA 事件驱动架构 (Event-Driven Architecture,EDA) 简介

事件驱动架构 (Event-Driven Architecture,EDA) 简介 可以从两个方面来理解 EDA: EDA 是一种侧重于以生成/消费为基础的异步通信的架构模式.这主要对照于传统的基于线程的同步系统. EDA 是一种以事件 (event)为核心,提供事件产生,路由,消费已经结果回调等机制的架构模式. 简单地说, 面向服务架构 (Service-Oriented Architecture, SOA) 是一种 IT 架构策略,其基于面向服务的概念之上.自从 2002 开始为大家熟知以来

从本地上传到hdfs上出现异常

hdfs dfs -put  从本地上传到hdfs上出现异常 与namenode  同台机器的datanode错误日志信息如下: 2015-12-03 09:54:03,083 WARN org.apache.hadoop.hdfs.server.datanode.DataNode: Slow BlockReceiver write data to disk cost:727ms (threshold=300ms) 2015-12-03 09:54:03,991 INFO org.apache.

从xfire谈WebService接口化编程

前段时间有博友在看我的博文<WebService入门案例>后,发邮件问我关于WebService 接口在java中的开发,以及在实际生产环境中的应用.想想自己入职也有一段时间了,似乎也该总结一下自己对于接口化开发的认识,以及如何利用java开发一些可以供其他应用程序使用的web服务. 其实最开始对Web服务的认识还是在课堂上,老师反复地在讲SOAP协议,其实也不懂究竟是什么意思,无缘无故就在大脑中形成了条件反射,SOAP协议的中文意思就是简单对象访问协议:而且,更加巧合的是自己在求职面试时就被

【转】几种现代GPS测量方法和技术

随着科技的发展,GPS测量技术和方法也在不断的改进和更新,目前用得最多的GPS测量技术方法有如下几种:静态和快速静态定位,差分GPS,RTK,网络RTK技术等等,下面将逐一介绍: 1.静态与快速静态定位技术 所谓静态定位,就是在进行 GPS 定位时,认为接收机的天线在整个观测进程中的位置是保持不变的.也就是说,在数据处理时,将接收机天线的位置作为一个不随时间的改变而改变的量.在测量 中,静态定位一般用于高精度的测量定位,其具体观测模式是多台接收机在不同的观测站上进行静止同步观测,观察时间有几分钟

航伴项目介绍

几个月前写了一个航伴项目的想法,虽然没有人认可开发,但感觉还是有一些值得思考和纪念的价值.内容如下: 航伴即:航空伴侣.航空伙伴,航伴这个模式和名字的项目是根据我的互联网经验“凭空想象”并反复思考出来的,市面上未发现有类试产品,该项目的初衷和想法可能有问题或缺陷. 一.背景: 航信作为国内的航空客运相关业务的服务提供商,拥有全球最大的BSP数据处理中心,是全球第四大GDS旅游分销系统提供商,目前是国内领先的航旅旅游业信息技术以及商务服务提供商.拥有大量“高端”用户的数据和信息,如果有一些好的创意