hadoop 数据采样

http://www.cnblogs.com/xuxm2007/archive/2012/03/04/2379143.html
原文地址如上:

关于Hadoop中的采样器

1.为什么要使用采样器

在这个网页上有一段描述比较靠谱 http://www.philippeadjiman.com/blog/2009/12/20/hadoop-tutorial-series-issue-2-getting-started-with-customized-partitioning/

 简单的来说就是解决"How to automatically find “good” partitioning function",因为很多时候无法直接制订固定的partitioner策略,所以需要知道实际的数据分布.糟糕的策略导致的结果就是每个reduce节点得到的数据部均匀,对效率影响挺大

2.如何使用采样器

复制代码
  conf.setPartitionerClass(TotalOrderPartitioner.class);//关于partitioner可以参考这个实现 使用采样器产生的文件

  InputSampler.RandomSampler<IntWritable, NullWritable> sampler =
   new InputSampler.RandomSampler<IntWritable, NullWritable>(0.1,10000,10);

  Path partitionFile = new Path(input,”_partitions”);
  TotalOrderPartitioner.setPartitionFile(conf, partitionFile);////
  InputSampler.writePartitionFile(conf, sampler);

//一般都将该文件做distribute cache处理
  URI partitionURI = new URI(partitionFile.toString() + “#_partitions”);
  DistributedCache.addCacheFile(partitionURI, conf);
  DistributedCache.createSymlink(conf);

//从上面可以看出 采样器是在map阶段之前进行的 在提交job的client端完成的

复制代码

3.常用的采样器介绍

http://blog.csdn.net/andyelvis/article/details/7294811

Hadoop中采样是由org.apache.hadoop.mapred.lib.InputSampler类来实现的。

InputSampler类实现了三种采样方法:RandomSampler,SplitSampler和IntervalSampler。//RandomSampler最耗时

RandomSamplerSplitSampler、RandomSampler和IntervalSampler都是InputSampler的静态内部类,它们都实现了InputSampler的内部接口Sampler接口
public interface Sampler<K,V>{
      K[] getSample(InputFormat<K,V> inf,JobConf job) throws IOException;
}

getSample方法根据job的配置信息以及输入格式获得抽样结果,三个采样类各自有不同的实现。

RandomSampler随机地从输入数据中抽取Key,是一个通用的采样器。RandomSampler类有三个属性:freq(一个Key被选中的概率),numSamples(从所有被选中的分区中获得的总共的样本数目),maxSplitsSampled(需要检查扫描的最大分区数目)。
RandomSampler中getSample方法的实现如下:
复制代码
    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList<K> samples = new ArrayList<K>(numSamples);
      int splitsToSample = Math.min(maxSplitsSampled, splits.length);

      Random r = new Random();
      long seed = r.nextLong();
      r.setSeed(seed);
      LOG.debug("seed: " + seed);
      // shuffle splits
      for (int i = 0; i < splits.length; ++i) {
        InputSplit tmp = splits[i];
        int j = r.nextInt(splits.length);
        splits[i] = splits[j];
        splits[j] = tmp;
      }
      // our target rate is in terms of the maximum number of sample splits,
      // but we accept the possibility of sampling additional splits to hit
      // the target sample keyset
      for (int i = 0; i < splitsToSample ||
                     (i < splits.length && samples.size() < numSamples); ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i], job,
            Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
        while (reader.next(key, value)) {
          if (r.nextDouble() <= freq) {
            if (samples.size() < numSamples) {
              samples.add(key);
            } else {
              // When exceeding the maximum number of samples, replace a
              // random element with this one, then adjust the frequency
              // to reflect the possibility of existing elements being
              // pushed out
              int ind = r.nextInt(numSamples);
              if (ind != numSamples) {
                samples.set(ind, key);
              }
              freq *= (numSamples - 1) / (double) numSamples;
            }
            key = reader.createKey();
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();
    }
复制代码

首先通过InputFormat的getSplits方法得到所有的输入分区;然后确定需要抽样扫描的分区数目,取输入分区总数与用户输入的maxSplitsSampled两者的较小的值得到splitsToSample;然后对输入分区数组shuffle排序,打乱其原始顺序;然后循环逐个扫描每个分区中的记录进行采样,循环的条件是当前已经扫描的分区数小于splitsToSample或者当前已经扫描的分区数超过了splitsToSample但是小于输入分区总数并且当前的采样数小于最大采样数numSamples。

每个分区中记录采样的具体过程如下:

从指定分区中取出一条记录,判断得到的随机浮点数是否小于等于采样频率freq,如果大于则放弃这条记录,然后判断当前的采样数是否小于最大采样数,如果小于则这条记录被选中,被放进采样集合中,否则从【0,numSamples】中选择一个随机数,如果这个随机数不等于最大采样数numSamples,则用这条记录替换掉采样集合随机数对应位置的记录,同时采样频率freq减小变为freq*(numSamples-1)/numSamples。然后依次遍历分区中的其它记录。

SplitSampler从s个分区中采样前n个记录,是采样随机数据的一种简便方式。SplitSampler类有两个属性:numSamples(最大采样数),maxSplitsSampled(最大分区数)。其getSample方法实现如下:

复制代码
    public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList<K> samples = new ArrayList<K>(numSamples);
      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
      int splitStep = splits.length / splitsToSample;
      int samplesPerSplit = numSamples / splitsToSample;
      long records = 0;
      for (int i = 0; i < splitsToSample; ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
            job, Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
        while (reader.next(key, value)) {
          samples.add(key);
          key = reader.createKey();
          ++records;
          if ((i+1) * samplesPerSplit <= records) {
            break;
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();
    }
复制代码

首先根据InputFormat得到输入分区数组;然后确定需要采样的分区数splitsToSample为最大分区数和输入分区总数之间的较小值;然后确定对分区采样时的间隔splitStep为输入分区总数除splitsToSample的商;然后确定每个分区的采样数samplesPerSplit为最大采样数除splitsToSample的商。被采样的分区下标为i*splitStep,已经采样的分区数目达到splitsToSample即停止采样。

对于每一个分区,读取一条记录,将这条记录添加到样本集合中,如果当前样本数大于当前的采样分区所需要的样本数,则停止对这个分区的采样。如此循环遍历完这个分区的所有记录。

IntervalSampler根据一定的间隔从s个分区中采样数据,非常适合对排好序的数据采样。IntervalSampler类有两个属性:freq(哪一条记录被选中的概率),maxSplitsSampled(采样的最大分区数)。其getSample方法实现如下:

复制代码
public K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException {
      InputSplit[] splits = inf.getSplits(job, job.getNumMapTasks());
      ArrayList<K> samples = new ArrayList<K>();
      int splitsToSample = Math.min(maxSplitsSampled, splits.length);
      int splitStep = splits.length / splitsToSample;
      long records = 0;
      long kept = 0;
      for (int i = 0; i < splitsToSample; ++i) {
        RecordReader<K,V> reader = inf.getRecordReader(splits[i * splitStep],
            job, Reporter.NULL);
        K key = reader.createKey();
        V value = reader.createValue();
        while (reader.next(key, value)) {
          ++records;
          if ((double) kept / records < freq) {
            ++kept;
            samples.add(key);
            key = reader.createKey();
          }
        }
        reader.close();
      }
      return (K[])samples.toArray();
    }
复制代码

首先根据InputFormat得到输入分区数组;然后确定需要采样的分区数splitsToSample为最大分区数和输入分区总数之间的较小值;然后确定对分区采样时的间隔splitStep为输入分区总数除splitsToSample的商。被采样的分区下标为i*splitStep,已经采样的分区数目达到splitsToSample即停止采样。

对于每一个分区,读取一条记录,如果当前样本数与已经读取的记录数的比值小于freq,则将这条记录添加到样本集合,否则读取下一条记录。这样依次循环遍历完这个分区的所有记录。

4.采样器在实际中的使用

  常见的例子是terasort

 http://blog.csdn.net/scutshuxue/article/details/5915697

排序的基本思想是利用了mapreduce的自动排序功能,在hadoop中,从map到reduce阶段,map出来的结构会按照各个key按照hash值分配到各个reduce中,其中,在reduce中所有的key都是有序的了。如果使用一个reduce,那么我们直接将他output出来就行了,但是这不能够体现分布式的好处,所以,我们还是要用多个reduce来跑。

      比方说我们有1000个1-10000的数据,跑10个ruduce任务, 如果我们运行进行partition的时候,能够将在1-1000中数据的分配到第一个reduce中,1001-2000的数据分配到第二个reduce中,以此类推。即第n个reduce所分配到的数据全部大于第n-1个reduce中的数据。这样,每个reduce出来之后都是有序的了,我们只要cat所有的输出文件,变成一个大的文件,就都是有序的了。

       基本思路就是这样,但是现在有一个问题,就是数据的区间如何划分,在数据量大,还有我们并不清楚数据分布的情况下。一个比较简单的方法就是采样,假如有一亿的数据,我们可以对数据进行采样,如取10000个数据采样,然后对采样数据分区间。在Hadoop中,patition我们可以用TotalOrderPartitioner替换默认的分区。然后将采样的结果传给他,就可以实现我们想要的分区。在采样时,我们可以使用hadoop的几种采样工具,RandomSampler,InputSampler,IntervalSampler。
http://www.2cto.com/kf/201403/284174.html

用基于MapReduce的程序来处理TB级的数据集,要花费的时间可能是数以小时计。仅仅是优化代码是很难达到良好的效果。

在开发和调试代码的时候,没有必要处理整个数据集。但如果在这种情况下要保证数据集能够被正确地处理,就需要用到抽样了。抽样是统计学中的一个方法。它通过一定的过程从整个数据中抽取出一个子数据集。这个子数据集能够代表整体数据集的数据分布状况。在MapReduce中,开发人员可以只针对这个子数据集进行开发调试,极大减小了系统负担,提高了开发效率。

技术23 水塘抽样(Reservoir sampling)

假设如下场景:在开发一个MapReduce作业的时候,需要反复不断地去测试一个超大数据集。当然,处理这个数据集很费时间,想要快速开发几乎不可能。

问题

在开发MapReduce作业的时候,如何能够只用处理超大数据集的一个小小的子集?

方案

在读取数据的那部分,自定义一个InputFormat来封装默认的InputFormat。在自定义的InputFormat中,将从默认的InputFormat中得到的数据按一定比例进行抽样。

讨论
由于水塘抽样可以从数据流中随机采样,它就特别适合于MapReduce。在MapReduce中,数据源的形式就是数据流。图4.16说明了水塘抽样的算法。

这里需要实现ReservoirSamplerRecordReader类来封装默认的InputFormat类和RecordReader类。InputFormat类的作用是对输入进行分块。RecordReader类的作用是读取记录。抽样功能则在ReservoirSamplerRecordReader类中实现。图4.17说明了ReservoirSamplerRecordReader类的工作机制。

以下是ReservoirSamplerRecordReader类的实现代码:

复制代码
 1 public static class ReservoirSamplerRecordReader<K extends Writable, V extends Writable> extends RecordReader {
 2
 3     private final RecordReader<K, V> rr;
 4     private final int numSamples;
 5     private final int maxRecords;
 6     private final ArrayList<K> keys;
 7     private final ArrayList<V> values;
 8
 9     @Override
10     public void initialize(InputSplit split,TaskAttemptContext context)
11         throws IOException, InterruptedException {
12
13         rr.initialize(split, context);
14         Random rand = new Random();
15
16         for (int i = 0; i < maxRecords; i++) {
17             if (!rr.nextKeyValue()) {
18                 break;
19             }
20
21             K key = rr.getCurrentKey();
22             V val = rr.getCurrentValue();
23
24             if (keys.size() < numSamples) {
25                 keys.add(WritableUtils.clone(key, conf));
26                 values.add(WritableUtils.clone(val, conf));
27             } else {
28                 int r = rand.nextInt(i);
29                 if (r < numSamples) {
30                     keys.set(r, WritableUtils.clone(key, conf));
31                     values.set(r, WritableUtils.clone(val, conf));
32                 }
33             }
34         }
35     }
36 ...
复制代码

在使用ReservoirSamplerInputFormat类的时候,需要设置的参数包括InputFormat等。以下是设置代码:

1 ReservoirSamplerInputFormat.setInputFormat(job,TextInputFormat.class);
2 ReservoirSamplerInputFormat.setNumSamples(job, 10);
3 ReservoirSamplerInputFormat.setMaxRecordsToRead(job, 10000);
4 ReservoirSamplerInputFormat.setUseSamplesNumberPerInputSplit(job, true);

然后在batch中执行作业,输入文件是name.txt,有88799行。经过抽样后的文件只有10行了。以下是作业执行的过程:

复制代码
$ wc -l test-data/names.txt
88799 test-data/names.txt

$ hadoop fs -put test-data/names.txt names.txt

$ bin/run.sh com.manning.hip.ch4.sampler.SamplerJob names.txt output

$ hadoop fs -cat output/part* | wc -l
10
复制代码

前面设置的ReservoirSamplerInputFormat类的参数是抽样10行,最后的结果就是10行。

小结
抽样可以把数据集的尺寸变小,这对开发是很有帮助的。如果有时需要抽样,有时不需要抽样,怎么才能把抽样功能很好地整合到代码库中呢?这里有个方法,在作业的configure中加入一个开关,如下面的代码所示:

复制代码
1 if(appConfig.isSampling()) {
2     ReservoirSamplerInputFormat.setInputFormat(job,
3     TextInputFormat.class);
4 ...
5 } else {
6     job.setInputFormatClass(TextInputFormat.class);
7 }
复制代码
时间: 2024-10-12 00:16:56

hadoop 数据采样的相关文章

hadoop数据流转过程分析

hadoop:数据流转图(基于hadoop 0.18.3):通过一个最简单的例子来说明hadoop中的数据流转. hadoop:数据流转图(基于hadoop 0.18.3): 这里使用一个例子说明hadoop中的数据流转过程,这个例子是统计一些文章中词汇的总数.首先files表示这些需要统计词汇的文章. 首先,hadoop会把初始数据分配到各个机器的mapper任务中,图中的数字表示数据的依次流向步骤. 1.格式化输入,默认情况下,hadoop会使用 TextInputFormate,也就是按照

hadoop数据去重

"数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选.统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重.下面就进入这个实例的MapReduce程序设计. 1.1 实例描述 对数据文件中的数据进行去重.数据文件中的每行都是一个数据. 样例输入如下所示: 1.txt 内容 1 1 2 2 2.txt内容 4 4 3 3 样例输出如下: 1 2 3 4 将测试数据上传到hdfs上/input目录下,同时确保输出目录不存在. 1.2 设计思

数据采样与处理算法

数据采样与处理算法 这里特别说明,输入量是作为周期函数的算法 ? 均方根算法(参考馈线终端单元的设计与实现-郭阳春) 均方根将所采集的离散型的采样点值计算得出电压电流的幅值,主要用于设备的测量功能,该算法不区分系统中的谐波干扰分量,得到是整体的有效值. 算法可以采用多个周期计算一次,在一周期计算一次的情况下,将采样点的采样值先进行平方计算,然后讲一个周期的N点采样值的平方顺序相加,然后将所得的N点采样值的平方和除以N再开放,就可以得到电气量的有效值. 傅立叶算法 傅立叶算法主要是用于将采集到的离

关系数据库数据与hadoop数据进行转换的工具 - Sqoop

Sqoop 本文所使用的Sqoop版本为1.4.6 1.官网 http://sqoop.apache.org 2.作用 A:可以把hadoop数据导入到关系数据库里面(e.g. Hive -> Mysql) B:可以把关系数据库数据导入到hadoop里面(e.g. Mysql -> Hive) 3.下载 http://archive.apache.org/dist/sqoop/1.4.6/sqoop-1.4.6.bin__hadoop-1.0.0.tar.gz 4.安装 --上传到node1(

Hadoop数据存储—Hbase

大家都知道Hadoop是一个数据库,其实说的的就是Hbase.它和我们平常理解的关系型数据库有什么区别呢? 1.它是NoSQL的,它没有SQL的接口,有自己的一套API. 2.关系型数据库可以做汇总,可以进行常规的分析,但是Hbase不可以,它不能做汇总.那么Hbase操作不方便,不能做汇总,不能做分析,有什么作用呢?它的随机读写效率很高,可以存储海量数据,基于某个网点,某个城市,某个机器随机去查询速度快.或者去存储基于时间序列的数据,比如微信.微博.日志的数据,效率很高. 3.它的存储是列式的

详解APM数据采样与端到端

高驰涛 云智慧首席架构师 本文整理自GOPS2016全球运维大会 上海站APM专场 云智慧首席架构师高驰涛的演讲. 高驰涛:今天咱们的专场是APM专场,我相信在座的其实对APM这个东西肯定是了解的,要不然不会过来,APM我今天会从几个层面聊一下,因为今天的时间非常有限,也不占用大家太多的时间,我只就APM里面的一个小点和大家说一下,这个其实是我们的一些应用的实践,就是我们在APM这个行业里面的一些实践,其实像刚才这两位男神和丹姐聊的这个范围没有提到APM,其实就是做APM的事情.这个二维码是我的

hadoop数据容易出现错误的地方

最近在搞关于数据分析的项目,做了一点总结. 下图是系统的数据流向.容易出现错误的地方.1.数据进入hadoop仓库有四种来源,这四种是最基本的数据,简称ods,original data source,后续 的数据都是有这些组合而来a.日志文件b.http接口c.DB查询d.建表指向最后数据都是以hadoop文件的形式存放在hadoop中. 日志文件: 新增机器没有通知数据分析组抓日志 根据约定获取日志是遇到错误,例如:约定获取gz的压缩日志,结果没有 数据提供方rsync日志出现问题 http

Hadoop数据操作系统YARN全解析

“ Hadoop 2.0引入YARN,大大提高了集群的资源利用率并降低了集群管理成本.其在异构集群中是怎样应用的?Hulu又有哪些成功实践可以分享? 为了能够对集群中的资源进行统一管理和调度,Hadoop 2.0引入了数据操作系统YARN.YARN的引入,大大提高了集群的资源利用率,并降低了集群管理成本.首先,YARN允许多个应用程序运行在一个集群中,并将资源按需分配给它们,这大大提高了资源利用率,其次,YARN允许各类短作业和长服务混合部署在一个集群中,并提供了容错.资源隔离及负载均衡等方面的

Hadoop数据收集与入库系统Flume与Sqoop

Hadoop提供了一个中央化的存储系统,其有利于进行集中式的数据分析与数据共享. Hadoop对存储格式没有要求.可以存储用户访问日志.产品信息以及网页数据等数据. 常见的两种数据来源.一种是分散的数据源:机器产生的数据.用户访问日志以及用户购买日志.另一种是传统系统中的数据:传统关系型数据库(MySQL.Oracle).磁盘阵列以及磁带. Flume由三部分构成.Master负责负责通信及配置管理,是集群的控制器.Collector用于对数据进行聚合.往往会产生一个更大的数据流.然后加载到HD