Hadoop内部的限流机制

前言

文章标题一開始提及到了一个令人感到有些抽象又显得有些非常"大"的词,限流.事实上这个词语在非常多行业都能够用到,比方近期春运,各大主要城市,火车站,地铁站都要做到限流吧,避免人流量过大造成事故或间接事故,这叫人流量限流,同理也能够用在车流量上.假设基于这个背景,把这里的人群和车辆抽象为数据,对数据进行限流,就是本篇文章的主题了.可能就有人疑惑了,数据为什么要做限流,怎么做限流,有什么优点呢,带着这个疑问,细致的阅读下文的分析吧.

数据的限流

数据的限流更让人理解的称呼应该是"数据流的限流".数据流指的就是传输中的源源不断的数据.这些传输数据会耗尽大量的网络带宽,bandwith.一台机器的网络带宽必定是有限的,假设带宽被这台机器上的某些任务用满的话,就会造成正常任务网络传输数据受到影响.假设带宽长时间的被打满之后,还会造成机器IO报警.所以限流的目的正在与此.可能造成网络带宽迅速被占满的不一定都是恶意的程序或服务,程序中一个疏忽的处理或小错误都可能会造成大规模数据的传输.所以与其去劝导用户规范敲代码,还不如从系统层面进行强加管理限制,把主动权掌握在自己手中.

Hadoop的数据限流

数据限流是一涉及面非常大的词,数据类型,使用场景就有非常多,所以本文仅仅分析我们所想要分析的数据限流,就是hadoop内部的限流机制.在写本文之前,我略微搜素了一下网上关于此方面的文章,发现确实相关文章少之又少.可是作为一个大型的分布式存储系统,数据的读写操作一定是非常频繁的,所以数据的传输量一定非常大.在数据量传输非常大的情况下。怎样保证避免出现个别服务把带宽占满的情况就显得格外重要。

有一点是至少明白的。在Hadoop中跑的job的读写数据操作都须要是正常的。为了方便下文的描写叙述,我们能够称此类型的传输数据为"普通任务数据流"。既然这里已经先定义一个了。那就必定还存在另外的数据流传输,并且类型比想象中的多了很多:

1.Balancer数据平衡数据流

2.Fsimage镜像文件的上传下载数据流传输

3.VolumeScanner磁盘扫描的数据读操作的数据传输数据

看完这3个结果。第一个Balance的数据流还是能想得到的,后面2个假设你没有从源代码中进行分析,非常easy会忽略掉.由于以上列举的3种属于非正常业务的数据流传输,是在系统自身内部进行的,所以hadoop对这3种操作做了限流操作.限流相关的类名叫做DataTransferThrottler,限流关系图结构例如以下:

DataTransferThrottler限流原理

data-transfer传输数据的限流原理在DataTransferThrottler中有着非常巧妙的设计.先看一下这个类的源代码凝视:

/**
 * a class to throttle the data transfers.
 * This class is thread safe. It can be shared by multiple threads.
 * The parameter bandwidthPerSec specifies the total bandwidth shared by
 * threads.
 */
public class DataTransferThrottler {

通过传入指代的bandwidthPerSec带宽速率来作为一个最大的限制值,在限制类的作用下,带宽的平均速度将会控制在这个速率之下.在这个类中,定义了以下几个变量:

  private final long period;          // period over which bw is imposed
  private final long periodExtension; // Max period over which bw accumulates.
  private long bytesPerPeriod;  // total number of bytes can be sent in each period
  private long curPeriodStart;  // current period starting time
  private long curReserve;      // remaining bytes can be sent in the period
  private long bytesAlreadyUsed;

在DataTransferThrottler类中的主要限流思想是通过单位时间段内限制指定字节数的方式来控制平均传输速度,假设发现IO传输速度过快,超过规定时间内的带宽限定字节数,则会进行等待操作,等待下一个同意带宽传输周期的到来,这个用结构图表演示样例如以下:

所以每一个period周期内的可同意传输字节数就是非常关键的变量,他是依据传入的带宽上限值进行转换.

/**
   * Constructor
   * @param period in milliseconds. Bandwidth is enforced over this
   *        period.
   * @param bandwidthPerSec bandwidth allowed in bytes per second.
   */
  public DataTransferThrottler(long period, long bandwidthPerSec) {
    this.curPeriodStart = monotonicNow();
    this.period = period;
    //将带宽依照周期做比例转化
    this.curReserve = this.bytesPerPeriod = bandwidthPerSec*period/1000;
    this.periodExtension = period*3;
  }

由于传入的带宽是以秒为单位,所以周期单位是ms,所以要除以1000.curReserve这个变量的意思可理解为可使用字节传输量.初始传输值就是一个周期的可传输字节数.DataTransferThrottler的throttle就是带宽限制的主要方法.

public synchronized void throttle(long numOfBytes, Canceler canceler) {
    if ( numOfBytes <= 0 ) {
      return;
    }

    //当前的可传输的字节数减去当前发送/接收字节数
    curReserve -= numOfBytes;
    //当前字节使用量
    bytesAlreadyUsed += numOfBytes;

    //假设curReserve<=0,说明当前周期内可使用字节数已经用完
    while (curReserve <= 0) {
      //假设设置了canceler对象,则不会进行限流操作
      if (canceler != null && canceler.isCancelled()) {
        return;
      }
      long now = monotonicNow();
      long curPeriodEnd = curPeriodStart + period;

      // 假设当前时间还在本周期时间内的话,则必须等待此周期的结束,
      // 又一次获取新的可传输字节量
      if ( now < curPeriodEnd ) {
        // Wait for next period so that curReserve can be increased.
        try {
          wait( curPeriodEnd - now );
        } catch (InterruptedException e) {
          // Abort throttle and reset interrupted status to make sure other
          // interrupt handling higher in the call stack executes.
          Thread.currentThread().interrupt();
          break;
        }
      } else if ( now <  (curPeriodStart + periodExtension)) {
        // 假设当前时间已经超过此周期的时间且不大于最大周期间隔,则添加可接受字节数,
        // 并更新周期起始时间为前一周期的末尾时间
        curPeriodStart = curPeriodEnd;
        curReserve += bytesPerPeriod;
      } else {
        // 假设当前时间超过curPeriodStart + periodExtension,则表示
        // 已经长时间没有使用Throttler,又一次重置时间
        // discard the prev period. Throttler might not have
        // been used for a long time.
        curPeriodStart = now;
        curReserve = bytesPerPeriod - bytesAlreadyUsed;
      }
    }

    //传输结束,当前字节使用量进行移除
    bytesAlreadyUsed -= numOfBytes;
  }

所以,这里能够得到一个启示,影响带宽平均传输速率的指标,不仅仅仅仅有传入的带宽速度上限值參数,相同period周期的设置也非常重要,相同的带宽周期设小了,发生wait等待的次数会相对变多,最后的带宽平均速度就会更低.这个问题在下文中还会继续提到.

数据流限流在Hadoop中的使用

了解完了DataTransferThrottler中的限流原理之后,我们有必要了解hadoop在哪些地方对数据做了限流动作,事实上答案在上文中也已经提过.

一.Balancer

数据balaner平衡的操作,当中这个throttler对象是在DataXceiverServer被调用的

//set up parameter for cluster balancing
    this.balanceThrottler = new BlockBalanceThrottler(
        conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
            DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT),
        conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
            DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT));

以下这个balancer带宽大小配置属性就是设置给Throttler对象的.

public static final String  DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec";
public static final long    DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;

默认带宽大小1M.这个Throttler对象在DataXceiver.replaceBlock和DataXceiver.copyBlock中被调用.

  @Override
  public void copyBlock(final ExtendedBlock block,
      final Token<BlockTokenIdentifier> blockToken) throws IOException {
      ...

      long beginRead = Time.monotonicNow();
      // send block content to the target
      long read = blockSender.sendBlock(reply, baseStream,
                                        dataXceiverServer.balanceThrottler);
      long duration = Time.monotonicNow() - beginRead;
      datanode.metrics.incrBytesRead((int) read);
      datanode.metrics.incrBlocksRead();
      datanode.metrics.incrTotalReadTime(duration);
      ...
@Override
  public void replaceBlock(final ExtendedBlock block,
      final StorageType storageType,
      final Token<BlockTokenIdentifier> blockToken,
      final String delHint,
      final DatanodeInfo proxySource) throws IOException {
    ...

        // receive a block
        blockReceiver.receiveBlock(null, null, replyOut, null,
            dataXceiverServer.balanceThrottler, null, true);

        // notify name node
        datanode.notifyNamenodeReceivedBlock(
            block, delHint, blockReceiver.getStorageUuid());

        LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
            + ", delHint=" + delHint);
      }
    ...

最后会调用到BlockSender.sendPacket和BlockReceive.receivePacket方法.分别在相应类的以下2个方法中调用到了throttle(bytes)的方法

  private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
      boolean transferTo, DataTransferThrottler throttler) throws IOException {
    int dataLen = (int) Math.min(endOffset - offset,
                             (chunkSize * (long) maxChunks));

    int numChunks = numberOfChunks(dataLen); // Number of chunks be sent in the packet
    int checksumDataLen = numChunks * checksumSize;
    int packetLen = dataLen + checksumDataLen + 4;
    boolean lastDataPacket = offset + dataLen == endOffset && dataLen > 0;

    ...

    if (throttler != null) { // rebalancing so throttle
      throttler.throttle(packetLen);
    }

    return dataLen;
  }
  /**
   * Receives and processes a packet. It can contain many chunks.
   * returns the number of data bytes that the packet has.
   */
  private int receivePacket() throws IOException {
    // read the next packet
    packetReceiver.receiveNextPacket(in);

    ...

    if (throttler != null) { // throttle I/O
      throttler.throttle(len);
    }

    return lastPacketInBlock?

-1:len;
  }

所以我们能够从側面了解到DataXceiver的copyBlock和replaceBlock方法都是用于处理balancer相关程序时使用的.

二.TransferFsImage

transferFsImage指的是fsImage中的镜像文件的上传下载的过程.可能是hadoop的设计者考虑到常常性的fsImage文件的传输对集群短时间内的带宽也会有所影响,因此也进行了带宽限制的操作.上传下载fsImage样例比較相似,举当中下载镜像文件为样例:

@Override
  protected void doPut(final HttpServletRequest request,
      final HttpServletResponse response) throws ServletException, IOException {
    try {
      ServletContext context = getServletContext();
      final FSImage nnImage = NameNodeHttpServer.getFsImageFromContext(context);
      final Configuration conf = (Configuration) getServletContext()
          .getAttribute(JspHelper.CURRENT_CONF);
      final PutImageParams parsedParams = new PutImageParams(request, response,
          conf);
      final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();

      validateRequest(context, conf, request, response, nnImage,
          parsedParams.getStorageInfoString());

      UserGroupInformation.getCurrentUser().doAs(
          new PrivilegedExceptionAction<Void>() {

            @Override
            public Void run() throws Exception {

              ...
                InputStream stream = request.getInputStream();
                try {
                  long start = monotonicNow();
                  MD5Hash downloadImageDigest = TransferFsImage
                      .handleUploadImageRequest(request, txid,
                          nnImage.getStorage(), stream,
                          parsedParams.getFileSize(), getThrottler(conf));
...

当中getThrottler方法会从配置文件的相关属性中得到此实例对象

/**
   * Construct a throttler from conf
   * @param conf configuration
   * @return a data transfer throttler
   */
  public final static DataTransferThrottler getThrottler(Configuration conf) {
    long transferBandwidth =
      conf.getLong(DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_KEY,
                   DFSConfigKeys.DFS_IMAGE_TRANSFER_RATE_DEFAULT);
    DataTransferThrottler throttler = null;
    if (transferBandwidth > 0) {
      throttler = new DataTransferThrottler(transferBandwidth);
    }
    return throttler;
  }

默认是返回throttler对象为null的,由于限制带宽默觉得0

public static final String DFS_IMAGE_TRANSFER_RATE_KEY =
                                           "dfs.image.transfer.bandwidthPerSec";
  public static final long DFS_IMAGE_TRANSFER_RATE_DEFAULT = 0;  //no throttling

终于在receiveFile方法中调用了throttle方法

private static MD5Hash receiveFile(String url, List<File> localPaths,
      Storage dstStorage, boolean getChecksum, long advertisedSize,
      MD5Hash advertisedDigest, String fsImageName, InputStream stream,
      DataTransferThrottler throttler) throws IOException {
      ...

      int num = 1;
      byte[] buf = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
      while (num > 0) {
        num = stream.read(buf);
        if (num > 0) {
          received += num;
          for (FileOutputStream fos : outputStreams) {
            fos.write(buf, 0, num);
          }
          if (throttler != null) {
            throttler.throttle(num);
          }
        }
      }

默认此throttler是不开启的.

三.VolumeScanner

volume-scanner的意思是磁盘扫描.而磁盘扫描的目的则是为了发现坏的块.坏的块一般发生在读操作异常的情况下,所以这个阶段写的block块会被列为suspectBlock可疑块.hadoop设计者为了确保本节点的IO不受影响,特意对磁盘扫描的带宽做了预先限制,防止这一个附属操作程序影响正常业务.方法在VolumeScanner.scanBlock方法中调用:

  /**
   * Scan a block.
   *
   * @param cblock               The block to scan.
   * @param bytesPerSec          The bytes per second to scan at.
   *
   * @return                     The length of the block that was scanned, or
   *                               -1 if the block could not be scanned.
   */
  private long scanBlock(ExtendedBlock cblock, long bytesPerSec) {
    ...
    BlockSender blockSender = null;
    try {
      blockSender = new BlockSender(block, 0, -1,
          false, true, true, datanode, null,
          CachingStrategy.newDropBehind());
      throttler.setBandwidth(bytesPerSec);
      long bytesRead = blockSender.sendBlock(nullStream, null, throttler);
      resultHandler.handle(block, null);
      return bytesRead;
      //...

bytesPerSec在以下这种方法被设置

@SuppressWarnings("unchecked")
    Conf(Configuration conf) {
      this.targetBytesPerSec = Math.max(0L, conf.getLong(
          DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND,
          DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT));
...
public static final String  DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second";
  public static final long    DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;

默认1M.

限流部分的操作就是上述3个部分,结构图例如以下:

思维发散

学习了整个限流部分的代码之后,还是能够看到非常多设计的巧妙之处.可是相同存在美中不足之处,个人总结出了2点.

1.第一个是DataTransferThrottler的period存在"hard-code",period的周期长短的设置对于带宽的影响也不容忽视,原因在上文已经提到过.眼下hard-code的设置的是500ms.

/** Constructor
   * @param bandwidthPerSec bandwidth allowed in bytes per second.
   */
  public DataTransferThrottler(long bandwidthPerSec) {
    this(500, bandwidthPerSec);  // by default throttling period is 500ms
  }

优化建议是make property configurable,这点我在学习过程中已经完毕了,已建立Issue,提交开源社区,编号HDFS-9756.

2.上述带宽限制的场景都一个共同点,都还仅仅是在非Job层面做的,并没有在正常的read,write block操作做限制,这种话,Job的传输数据将会使用光已有带宽,个人感觉能够把这方面的限制也加上,做出可配,默认不开启正常的读写带宽限制,原理与balancer的coplyBlcok和replaceBlock操作相似.这种话,readBlock和writeBlock会变得更灵活,眼下readBlock传入的throttler为null.

read = blockSender.sendBlock(out, baseStream, null); // send data

这样做的优点能够依据机器带宽资源不同,从而进行总带宽速率的限制.有兴趣的同学能够自己试一试.

相似对照

Throttler限流方案是hadoop中限制资源使用的一种手段.事实上在Hadoop中,还有相似其它的相似限制资源滥用的方法,比方Quota配额机制.HDFS中的配额机制指的是对每一个文件夹下,我能够设置该文件夹下的space count存储空间使用,和namespace count,命名空间使用计数,能够理解为子文件数,通过Quota就能够限制文件夹下创建过多的文件或写入过量饿数据.否则,就会抛出异常.相关代码的定义例如以下:

/**
 * Counters for namespace, storage space and storage type space quota and usage.
 */
public class QuotaCounts {
  // Name space and storage space counts (HDFS-7775 refactors the original disk
  // space count to storage space counts)
  private EnumCounters<Quota> nsSsCounts;
  // Storage type space counts
  private EnumCounters<StorageType> tsCounts;

这里仅仅做概况的叙述,假设同学们想深入了解细节,可自行阅读相关源代码.

相关链接

Issue 链接: https://issues.apache.org/jira/browse/HDFS-9756

Github patch链接:https://github.com/linyiqun/open-source-patch/tree/master/hdfs/HDFS-9756

时间: 2024-10-20 12:15:59

Hadoop内部的限流机制的相关文章

架构设计之「服务限流」

上一篇我们聊过了架构设计中的「服务隔离」模式,今天我们继续来探索一下在分布式系统架构中的另一个常用的设计:服务限流.那么,什么是「服务限流」呢? 在解释「服务限流」之前,我们来看一下前些时间网上很火的一个段子,说的是新浪微博的一名工程师正在家里办婚礼,突然接到公司的电话要紧急处理线上流量激增的问题,那天应该是某当红明星突然在微博上公布恋情,微博流量突增好几倍,导致系统功能出现不稳定,用户访问不畅.然后这名工程师就只好晾开新娘,在婚礼现场穿着西装打开笔记本调试代码了.当时这名工程师内心肯定是崩溃的

快速入门系列--WCF--06并发限流、可靠会话和队列服务

这部分将介绍一些相对深入的知识点,包括通过并发限流来保证服务的可用性,通过可靠会话机制保证会话信息的可靠性,通过队列服务来解耦客户端和服务端,提高系统的可服务数量并可以起到削峰的作用,最后还会对之前的事务知识做一定补充. 对于WCF服务来说,其寄宿在一个资源有限的环境中,为了实现服务性能最大化,需要提高其吞吐量即服务的并发性.然而在不进行流量控制的情况下,并发量过多,会使整个服务由于资源耗尽而崩溃.因此为相对平衡的并发数和系统可用性,需要设计一个闸门(Throttling)控制并发的数量. 由于

鹰眼跟踪、限流降级,EDAS的微服务解决之道

本文主要从服务化的起源开始讲起,围绕EDAS介绍这些年来,随着阿里庞大的电商技术平台流量和并发不断攀升过程中,中间件的微服务技术面临的一系列挑战及解决方法.同时,也会向读者介绍历次双十一背后,EDAS服务化技术的演进历程. 服务化的起源 微服务的解决之道 海量微服务的挑战 关于作者 以下为精彩内容整理: 服务化的起源 阿里巴巴前期技术现状 当时阿里巴巴技术团队规模有500人左右,整个技术网站使用单一War应用,基于传统应用开发架构,业务每年翻倍增长. 我们面临着非常多的问题: 上百人维护一个核心

从构建分布式秒杀系统聊聊限流的多种实现

前言 俗话说的好,冰冻三尺非一日之寒,滴水穿石非一日之功,罗马也不是一天就建成的.两周前秒杀案例初步成型,分享到了中国最大的同×××友网站-码云.同时也收到了不少小伙伴的建议和投诉.我从不认为分布式.集群.秒杀这些就应该是大厂的专利,在互联网的今天无论什么时候都要时刻武装自己,只有这样,也许你的春天就在明天. 在开发秒杀系统案例的过程中,前面主要分享了队列.缓存.锁和分布式锁以及静态化等等.缓存的目的是为了提升系统访问速度和增强系统的处理能力:分布式锁解决了集群下数据的安全一致性问题:静态化无疑

从构建分布式秒杀系统聊聊限流特技

前言 俗话说的好,冰冻三尺非一日之寒,滴水穿石非一日之功,罗马也不是一天就建成的.两周前秒杀案例初步成型,分享到了中国最大的同性交友网站-码云.同时也收到了不少小伙伴的建议和投诉.我从不认为分布式.集群.秒杀这些就应该是大厂的专利,在互联网的今天无论什么时候都要时刻武装自己,只有这样,也许你的春天就在明天. 在开发秒杀系统案例的过程中,前面主要分享了队列.缓存.锁和分布式锁以及静态化等等.缓存的目的是为了提升系统访问速度和增强系统的处理能力:分布式锁解决了集群下数据的安全一致性问题:静态化无疑是

Sentinel 发布0.2.0,异步调用支持、热点参数限流等成产品新亮点

Sentinel 是阿里中间件团队开源的,面向分布式服务架构的轻量级流量控制组件,主要以流量为切入点,从流量控制.熔断降级.系统负载保护等多个维度来帮助用户保护服务的稳定性. 近日,Sentinel 0.2.0 正式发布.作为一个重要的里程碑版本,Sentinel 0.2.0 释放了多项产品新特性,如 异步调用支持.热点参数限流 等,并包括了大量的体验优化与 bug 修复.下面我们来看一下 Sentinel 0.2.0 的重要新特性. 异步调用支持 未来各种 RPC 框架.Web 框架都朝着异步

Spring Cloud Alibaba | Sentinel: 服务限流高级篇

目录 Spring Cloud Alibaba | Sentinel: 服务限流高级篇 1. 熔断降级 1.1 降级策略 2. 热点参数限流 2.1 项目依赖 2.2 热点参数规则 3. 系统自适应限流 3.1 背景 3.2 系统规则 3.3 原理 3.4 示例 4. 黑白名单控制 4.1 规则配置 4.2 示例 Spring Cloud Alibaba | Sentinel: 服务限流高级篇 Springboot: 2.1.6.RELEASE SpringCloud: Greenwich.SR

流量调整和限流技术

在早期的计算机领域,限流技术(time limiting)被用作控制网络接口收发通信数据的速率. 可以用来优化性能,减少延迟和提高带宽等. 现在在互联网领域,也借鉴了这个概念, 用来为服务控制请求的速率, 如果双十一的限流, 12306的抢票等. 即使在细粒度的软件架构中,也有类似的概念. 两种常用算法 令牌桶(Token Bucket)和漏桶(leaky bucket)是 最常用的两种限流的算法. 漏桶算法 它的主要目的是控制数据注入到网络的速率,平滑网络上的突发流量.漏桶算法提供了一种机制,

笔记:XML-解析文档-流机制解析器(SAX、StAX)

DOM 解析器完整的读入XML文档,然后将其转换成一个树型的数据结构,对于大多数应用,DOM 都运行很好,但是,如果文档很大,并且处理算法又非常简单,可以在运行时解析节点,而不必看到完整的树形结构,那么我们应该使用流机制解析器(streaming parser),Java 类库提供的流解析机制有 SAX 解析器和 StAX 解析器,SAX 解析器是基于事件回调机制,而 StAX解析器提供了解析事件的迭代器. 使用SAX解析器 SAX 解析器在解析XML 输入的组成部分时会报告事件,在使用 SAX