Hadoop源码分析:HDFS数据读写流量控制(DataTransferThrottler类)

DataTransferThrottler类用于在Datanode读写数据时控制数据传输速率。这个类是线程安全的,可以被多个线程共享使用。使用方式是先构造DataTransferThrottler对象并设置周期period和带宽bandwidthPerSec,然后在数据实际读写前调用DataTransferThrottler.throttle()方法。如果I/O的速率相对给定的带宽太快,则该方法会将当前线程wait。

两个构造函数

  • 双参构造函数,可以设置周期period和带宽bandwidthPerSec。

                 public DataTransferThrottler( long period,
long bandwidthPerSec)

  • 单参构造函数,可以设置带宽bandwidthPerSec, 周期period默认被设置为500ms。

                 public DataTransferThrottler( long bandwidthPerSec)

重要属性

period 周期,单位毫秒
periodExtension 周期扩展时间,单位毫秒
bytesPerPeriod 一个周期内可以发送/接收的byte总数
curPeriodStart 当前周期开始时间,单位毫秒
curReserve 当前周期内还可以发送/接收的byte数
bytesAlreadyUsed 当前周期内已经使用的byte数

带宽bandWidthPerPerSec = bytesPerPeriod * period/1000

DataTransferThrottler.throttle()方法

该方法有两个入参:

  • numOfBytes是自从上次throttle被调用到现在,需要发送/接收的byte数;
  • canceler是一个外部取消器,可以用来检测是否取消流量控制。

DataTransferThrottler.throttle()方法会先将curReserve减去numOfBytes,接着执行如下逻辑。

  • 如果curReserve大于0,则说明当前周期还有余量,throttle方法直接返回。
  • 如果curReserve小于等于0,则说明当前周期已经没有余量,会执行下面逻辑。

>如果当前时间now小于当前周期结束时间curPeriodEnd,则wait到下一个周期

>如果当前时间now大于当前周期结束时间curPeriodEnd并且小于curPeriodStart+periodExtension,说明已经进入下一个周期并且throttle应该不是很长时间没有使用,则将curReserve加上下一个周期可以传输的byte总数bytesPerPeriod,并将curPeriodStart设置到下一个周期的开始。

>如果当前时间now大于curPeriodStart+periodExtension,则可能Throttler很长时间没有使用,则抛弃上一个周期。

DataTransferThrottler.throttle()方法源码如下:

public synchronized void throttle(long numOfBytes , Canceler canceler) {
  if ( numOfBytes <= 0 ) {
    return;
  }
  //当前周期余量减去需要发送/接收的byte数numOfBytes
  curReserve -= numOfBytes ;
  bytesAlreadyUsed += numOfBytes;

  //如果curReserve小于等于0,则说明当前周期已经没有余量
  while ( curReserve <= 0) {
    //如果传入了有效取消器canceler,并且取消器的取消状态isCancelled是true,则直接退出while循环
    if (canceler != null && canceler.isCancelled()) {
      return;
    }
    long now = monotonicNow();
    //计算当前周期结束时间,并存放在curPeriodEnd变量中
    long curPeriodEnd = curPeriodStart + period ;

    if ( now < curPeriodEnd ) {
      //等待下一个周期,这样curReserve就可以增加
      try {
        wait( curPeriodEnd - now );
      } catch (InterruptedException e) {
        //终止throttle, 并且重置interrupted状态来确保在调用栈中其它interrupt处理器可以正确执行
        Thread. currentThread().interrupt();
        break;
      }
    }
    //如果当前时间now比当前结束时间curPeriodEnd晚,并且小于curPeriodStart+periodExtension(周期3倍时间),则进入下一个周期
    //并增加bytesPerPeriod到curReserve
    else if ( now <  (curPeriodStart + periodExtension)) {
      curPeriodStart = curPeriodEnd;
      curReserve += bytesPerPeriod ;
    }
    //如果当前时间now大于curPeriodStart+periodExtension,则可能Throttler很长时间没有使用,抛弃上一个周期
    else {
      curPeriodStart = now;
      curReserve = bytesPerPeriod - bytesAlreadyUsed;
    }
  }

  bytesAlreadyUsed -= numOfBytes;
}

总结

DataTransferThrottler类可以控制数据传输在一段时期内的平均速率。但可能在一个周期结束时瞬时速率会失控。

参考资料

hadoop-release-2.5.0源码

转载请附上原博客地址:http://blog.csdn.net/jeff_fangji/article/details/44258827

时间: 2024-10-03 09:39:15

Hadoop源码分析:HDFS数据读写流量控制(DataTransferThrottler类)的相关文章

Hadoop HDFS源码分析 关于数据块的类

Hadoop HDFS源码分析 关于数据块的类 1.BlocksMap 官方代码中的注释为: /** * This class maintains the map from a block to its metadata. * block's metadata currently includes blockCollection it belongs to and * the datanodes that store the block. */ BlocksMap数据块映射,管理名字节点上的数据

细水长流Hadoop源码分析(3)RPC Server初始化构造

声明:个人原创,转载请注明出处.文中引用了一些网上或书里的资料,如有不妥之处请告之. 本文是我阅读Hadoop 0.20.2第二遍时写的笔记,在阅读过程中碰到很多问题,最终通过各种途径解决了大部分.Hadoop整个系统设计精良,源码值得学习分布式的同学们阅读,以后会将所有笔记一一贴出,希望能方便大家阅读源码,少走弯路. 目录 4 RPC服务器(org.apache.hadoop,ipc.Server) 4.1 服务器初始化 4 RPC服务器(org.apache.hadoop,ipc.Serve

hadoop源码分析解读入门

hadoop 源代码分析(一) Google 的核心竞争技术是它的计算平台.HadoopGoogle的大牛们用了下面5篇文章,介绍了它们的计算设施. Google的几篇论文 GoogleCluster:http://research.google.com/archive/googlecluster.html Chubby:http://labs.google.com/papers/chubby.html GFS:http://labs.google.com/papers/gfs.html Big

Hadoop源码分析(2)——Configuration类

这篇文章主要介绍Hadoop的系统配置类Configuration. 接着上一篇文章介绍,上一篇文章中Hadoop Job的main方法为: public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res); } 其中ToolRunner.run方法传入的第一个变量

Hadoop源码分析—— Job任务的程序入口

这篇文章大致介绍Hadoop Job的程序是如何启动的. 通常用Java编写的Hadoop MapReduce程序是通过一个main方法作为程序的整个入口,如下: public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res);} 可以看到这个Job任务的MapR

Cordova Android源码分析系列二(CordovaWebView相关类分析)

本篇文章是Cordova Android源码分析系列文章的第二篇,主要分析CordovaWebView和CordovaWebViewClient类,通过分析代码可以知道Web网页加载的过程,错误出来,多线程处理等. CordovaWebView类分析 CordovaWebView类继承了Android WebView类,这是一个很自然的实现,共1000多行代码.包含了PluginManager pluginManager,BroadcastReceiver receiver,CordovaInt

Spring源码分析——BeanFactory体系之抽象类、类分析(二)

上一篇分析了BeanFactory体系的2个类,SimpleAliasRegistry和DefaultSingletonBeanRegistry——Spring源码分析——BeanFactory体系之抽象类.类分析(一),今天继续分析. 一.工厂Bean注册支持——FactoryBeanRegistrySupport 废话不多说,直接看我注释的源码: /* * Copyright 2002-2012 the original author or authors. * * Licensed und

springMVC源码分析--HttpMessageConverter数据转化(一)

之前的博客我们已经介绍了很多springMVC相关的模块,接下来我们介绍一下springMVC在获取参数和返回结果值方面的处理.虽然在之前的博客老田已经分别介绍了参数处理器和返回值处理器: (1)springMVC参数值处理器 springMVC源码分析--HandlerMethodArgumentResolver参数解析器(一) (2)springMVC返回值处理器 springMVC源码分析--HandlerMethodReturnValueHandler返回值解析器(一) 但对参数和返回值

Hadoop源码分析之Map输入

对于MapReduce的输入输出Hadoop的官网如下所示 Input and Output types of a MapReduce job: (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output) 这里将从源码分析 input <k1,v1>->map 的过程, Mapper 基