Hadoop Pipeline详解[摘抄]

最近使用公司内部的一个框架写map  reduce发现没有封装hadoop streaming这些东西,查了下pipeline相关的东西

Hadoop Pipeline详解

一、说明
Hadoop 2.x相比较于1.x有了较大的改变,像MapReduce层面架构以及代码基本上是完全重写的,在HDFS层面加入了HA,Federation等特性,代码更加层次化和易读,同时加入的PB初期可能给阅读带来障碍,熟悉之后就没有太大问题了。
Pipeline一只是Hadoop重要的一个过程,不管是MR任务,Hive任务等等,最后都需要Pipeline将数据写入到HDFS中。所以熟悉Hadoop Pipeline过程还是很有意义的。

二、流程说明
Hadoop pipeline的建立可以由以下的流程图来说明,Pipeline建立牵扯的有NameNode,DataNode,DFSClient等结构。

以下为简要说明
1、首先DFSClient向NameNode发送RPC Create请求,NameNode在INode中生成新的INodeFile并加入到新的LeaseManager中作为稍后写入的租约管理。
2、请求成功返回后,DFSClient生成一个OutputStream,既DFSOutputStream
3、行程pipeline之前向NameNode发送AddBlock请求,NameNode生成新的Block,选择要写入的DataNode节点,并将Block注册到INodeFile中,以及BlockManager中,准备写入
4、建立pipeline,根据NameNode返回的信息,找到一个primary Datanode作为第一个节点,准备写入
5、DataNode Socket接收后,首先将数据写入buffer中,然后写入到下一个节点,写入成功后将buffer中数据写入本地磁盘,并等待ACK信息
6、与5一样,写入下一个节点然后写入本地,最后等待ACK信息
7、如果ACK都成功返回后,发回给DFSClient,本次写入成功。

三、详细说明

首先明确一下DFSClient写入DataNode的一些基本概念。在大方面来说每个文件由一个或多个Block组成,这个Block可以有两个意义,一个是NameNode,DataNode中的Block,它是由Block id,generate stamp组成。另一个是指存储在DataNode磁盘中的Block,真正占据存储空间。

Block的size一般在conf中定义,默认是64M。

Hadoop写入过程中是通过Packet以及Chunk去传送的,每个Chunk是由512Byte的数据位和4Byte的校验位组成,每个Packet是由127个Chunk组成,大小是127*(512+4)=65532Byte。最后如果写入写到了Block size的边界位置,发送一个空白的Packet。

确认上面的一些概念以后,就可以来看下Pipeline了。还是按照二中流程图的步骤一步一步解析。
1、DFSClient向NameNode发送Create请求
DFSClients首先向NameNode发送Create请求,NameNode中首先在生成新的INodeFile:

  INodeFile newNode = dir.addFile(src, permissions, replication, blockSize,
          holder, clientMachine, clientNode);
      if (newNode == null) {
        throw new IOException("Unable to add " + src +  " to namespace");
      }
      leaseManager.addLease(newNode.getFileUnderConstructionFeature()
          .getClientName(), src);

并且在LeaseManager中加入Lease,Lease用于写入过程中的租约管理。然后再DFSClient中加入一个租约续期的线程,定期向NameNode续租约。

 beginFileLease(src, result);

2、生成DFSOutputStream

final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
        flag, progress, checksum, favoredNodes);
    out.start();

3、申请新的Block
Pipeline建立时的状态为BlockConstructionStage.PIPELINE_SETUP_CREATE,向NameNode申请新的Block。主要的代码为:

 // allocate new block, record block locations in INode.
      newBlock = createNewBlock();
      saveAllocatedBlock(src, inodesInPath, newBlock, targets);
 
      dir.persistNewBlock(src, pendingFile);
      offset = pendingFile.computeFileSize();

将新的Block与INodeFile关联,并且加入到BlockManager中。

4、建立Pipeline
与第一个DataNode建立Socket连接,端口是IPC端口,准备建立pipeline。

          // send the request
          new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
              nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
              nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
              cachingStrategy.get());
 
          // receive ack for connect
          BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
              PBHelper.vintPrefixed(blockReplyStream));

DataNode启动的时候会启动IPC Server,其中有个DataXceiver专门监听IPC端口,将各种请求转发,代码为:

  /** Process op by the corresponding method. */
  protected final void processOp(Op op) throws IOException {
    switch(op) {
    case READ_BLOCK:
      opReadBlock();
      break;
    case WRITE_BLOCK:
      opWriteBlock(in);
      break;
    case REPLACE_BLOCK:
      opReplaceBlock(in);
      break;
    case COPY_BLOCK:
      opCopyBlock(in);
      break;
    case BLOCK_CHECKSUM:
      opBlockChecksum(in);
      break;
    case TRANSFER_BLOCK:
      opTransferBlock(in);
      break;
    case REQUEST_SHORT_CIRCUIT_FDS:
      opRequestShortCircuitFds(in);
      break;
    case RELEASE_SHORT_CIRCUIT_FDS:
      opReleaseShortCircuitFds(in);
      break;
    case REQUEST_SHORT_CIRCUIT_SHM:
      opRequestShortCircuitShm(in);
      break;
    default:
      throw new IOException("Unknown op " + op + " in data stream");
    }
  }

写入的Header它的Op是WRITE_BLOCK,通过writeBlock方法进行处理。当建立Pipeline的时候stage处于BlockConstruncionStage.PIPELINE_SETUP_CREATE阶段,这时候DataNode会将请求的数据发送到下一个节点,并等待ACK信息返回给Client,代码为:

   // read connect ack (only for clients, not for replication req)
          if (isClient) {
            BlockOpResponseProto connectAck =
              BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
            mirrorInStatus = connectAck.getStatus();
            firstBadLink = connectAck.getFirstBadLink();
            if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
              LOG.info("Datanode " + targets.length +
                       " got response for connect ack " +
                       " from downstream datanode with firstbadlink as " +
                       firstBadLink);
            }
          }

至此,Pipeline建立完成,这个阶段并不涉及数据的传输,只是尝试建立起Pipeline并进行异常处理等。

5、数据传输阶段
用户得到的OutputStream首先写入到本地的Buffer,写完一个Packet后就发送给Primary DataNode,这个时候stage处于BlockConstruncionStage.DATA_STREAMING,代码为:

    // get packet to be sent.
            if (dataQueue.isEmpty()) {
              one = new Packet();  // heartbeat packet
            } else {
              one = dataQueue.getFirst(); // regular data packet
            }

在DataNode部分,通过BlockReceiver来接收数据,代码为:

   // receive the block and mirror to the next target
      if (blockReceiver != null) {
        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
            mirrorAddr, null, targets);
 
        // send close-ack for transfer-RBW/Finalized
        if (isTransfer) {
          if (LOG.isTraceEnabled()) {
            LOG.trace("TRANSFER: send close-ack");
          }
          writeResponse(SUCCESS, null, replyOut);
        }
      }

DataNode首先在将数据接收到Buffer中,主要代码可以看BlockReceiver的receivePacket方法,首先mirror到下一个节点:

 //First write the packet to the mirror:
    if (mirrorOut != null && !mirrorError) {
      try {
        packetReceiver.mirrorPacketTo(mirrorOut);
        mirrorOut.flush();
      } catch (IOException e) {
        handleMirrorOutError(e);
      }
    }

然后写入到本地:

 // Write data to disk.
    out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);

写入到本地的时候首先在tmp目录下建立两个文件,一个是数据文件,一个是校验文件(后缀为meta),写入的时候首先写入数据文件,随后update checksum,写入到校验文件。

6、写完后的处理
如果一个Block写完后,DFSClient就会关闭这个Pipeline,代码为:

  private void endBlock() {
      if(DFSClient.LOG.isDebugEnabled()) {
        DFSClient.LOG.debug("Closing old block " + block);
      }
      this.setName("DataStreamer for file " + src);
      closeResponder();
      closeStream();
      setPipeline(null, null);
      stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
    }

然后重复第四步,如果都写完了,DataNode会向NameNode汇报接收Block,DFSClient会向NameNode汇报complete,如果Block数没有到达最低副本数,complete需要等待一定时间再去汇报,至此pipeline完成。

原文地址  http://dj1211.com/?p=178

时间: 2024-11-07 14:43:39

Hadoop Pipeline详解[摘抄]的相关文章

【转】Hadoop安全模式详解及配置

原文链接 http://www.iteblog.com/archives/977 在<Hadoop 1.x中fsimage和edits合并实现>文章中提到,Hadoop的NameNode在重启的时候,将会进入到安全模式.而在安全模式,HDFS只支持访问元数据的操作才会返回成功,其他的操作诸如创建.删除文件等操作都会导致失败. NameNode在重启的时候,DataNode需要向NameNode发送块的信息,NameNode只有获取到整个文件系统中有99.9%(可以配置的)的块满足最小副本才会自

Hadoop DistributedCache详解

DistributedCache是Hadoop提供的文件缓存工具,它能够自动将指定的文件分发到各个节点上,缓存到本地,供用户程序读取使用.它具有以下几个特点:缓存的文件是只读的,修改这些文件内容没有意义:用户可以调整文件可见范围(比如只能用户自己使用,所有用户都可以使用等),进而防止重复拷贝现象:按需拷贝,文件是通过HDFS作为共享数据中心分发到各节点的,且只发给任务被调度到的节点.本文将介绍DistributedCache在Hadoop 1.0和2.0中的使用方法及实现原理. Hadoop D

深入理解Java中的流---结合Hadoop进行详解

在JavaSe的基础课程当中,可以说流是一个非常重要的概念,并且在Hadoop中得到了广泛的应用,本篇博客将围绕流进行深入的详解. (一)JavaSe中流的相关概念 1.流的定义 ①在Java当中,若一个类专门用于数据传输,则这个类称为流 ②流就是程序和设备之间嫁接以来的一根用于数据传输的管道,这个设备可以是本地硬盘,可以是内存条,也可以是网络所关联的另外一台计算机等等,其中不同管道上有不同的按钮,按下不同的按钮相当于调用不同的方法,这根带按钮的用于数据传输的管道就是流,即流就是一根管道 ③流一

Hadoop WordCount详解(二)

Hadoop集群WordCount详解(二) 源代码程序 WordCount处理过程 具体代码讲解 1.源代码程序 package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.WordCount.Token

Hadoop实战之二~ hadoop作业调度详解(1)

前言 对Hadoop的最感兴趣的地方,也就在于Hadoop的作业调度了,在正式介绍如何搭建Hadoop之前,深入理解一下Hadoop的作业调度很有必要.我们不一定能用得上Hadoop,但是如果理通顺Hadoop的分布式调度原理,在有需要的时候未必不能自己写一个Mini Hadoop~: ) 开始 本文转载自:http://www.cnblogs.com/shipengzhi/articles/2487429.html Map/Reduce是一个用于大规模数据处理的分布式计算模型,它最初是由Goo

hadoop文件系统详解--(1)

Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现.Java抽象类 org.apache.hadoop.fs.FileSystem展示了Hadoop的一个文件系统,而且有几个具体实现,如表 3-1所示. 文件系统 URI 方案 Java实 现(全部在 org.apache.hadoop) 描述 Local file fs.LocalFileSystem 针对有客户端校验和 的本地连接磁盘使用 的文件系统.针对没 有校验和的本 地文件系统使用 RawLocalFileSystem.

Hadoop文件系统详解-----(一)

Hadoop有一个抽象的文件系统概念,HDFS只是其中的一个实现.Java抽象类 org.apache.hadoop.fs.FileSystem展示了Hadoop的一个文件系统,而且有几个具体实现,如表 3-1所示. 文件系统 URI  方案 Java实 现(全部在  org.apache.hadoop) 描述 Local file fs.LocalFileSystem 针对有客户端校验和 的本地连接磁盘使用 的文件系统.针对没 有校验和的本 地文件系统使用 RawLocalFileSystem

Hadoop安装详解及安装错误的解决方案

特此声明:本内容所有详细内容一下面提供的附件形式出现,了解详细内容可查看附件 从4月28号搭建hadoop平台,到今天已经足足5天了,不断地改配置,敲命令,可以说真是人生一大快事吗?好了废话不多说,正式进入我的安装过程: 目录: 第一篇:Ubuntu13.1安装 第二篇:Sublime Text2安装 第三篇:hadoop+jdk1.8.0安装+ssh无密码登录 第四篇:出现的问题及解决办法(见另一篇博客) 正文: 第一篇: 我用的是Ubuntu13.1,大家可以到我的云盘下载,下面是链接: h

hadoop InputFormat详解

1. 概述 我们在设置MapReduce输入格式的时候,会调用这样一条语句: job.setInputFormatClass(KeyValueTextInputFormat.class); 这条语句保证了输入文件会按照我们预设的格式被读取.KeyValueTextInputFormat即为我们设定的数据读取格式. 所有的输入格式类都继承自InputFormat,这是一个抽象类.其子类有例如专门用于读取普通文件的FileInputFormat,还有用来读取数据库的DBInputFormat等等.相