Map/Reduce个人实战--生成数据测试集

背景:
  在大数据领域, 由于各方面的原因. 有时需要自己来生成测试数据集, 由于测试数据集较大, 因此采用Map/Reduce的方式去生成. 在这小编(mumuxinfei)结合自身的一些实战经历, 具体阐述下生成测试数据集的Map/Reduce程序该如何写?

场景构造:
  假设某移动电信行业的某具体业务, 其记录了通话信息(包括拨打方/接听方/通话时间点/基站 等要素). 产商是不可能提供真实的用户数据用于测试的, 但提供了基本的数据规格. 具体针对该业务场景, 我们简单规划如下:

num1            varchar(13)      -- 手机号码(130 xxxx xxxx ~ 139 xxxx xxxx)
num2            varchar(13)      -- 手机号码(130 xxxx xxxx ~ 139 xxxx xxxx)
lac             varchar(16)      -- 基站信息
timestamp       varchar(128)     -- yyyyMMdd hh:mm:ss格式

  评注: 数据的分布在时间纬度上, 相对还是如意编造, 在其他纬度上, 要模拟真实的用户行为数据, 还是有一定的难度的.

Map/Reduce理论基础:
  1). Map/Reduce的原理架构图
  
  评注: Map/Reduce的运行和流程基本如图所示(来源于网络), 这边我们对原理不再详细阐述.
  2). Map/Reduce的类体系架构
  详见如下关于Map/reduce的类系统架构的基础文章

方案分析:
  在回顾完Map/Reduce的基础架构后, 针对数据生成, 我们提供如下的两种方案.
  1). 传统的Map/Reduce的数据生成方案 
  2). 只有Map/没有Reduce的数据生成方案.
  这两者有何区别呢? 如何在Job中控制和设置?
  1). Map阶段的产出结果经过sort/shuffle到reduce的, 由此Reduce阶段后的数据是有一定的顺序性的. 而止于Map阶段的数据是呈现一定随机性. 聪明的你是否猜着了? bingo, 如果生成的数据需要一定的排序组合, 则需要传统方案. 而如果生成的数据随机即可, 则采用2方案就好.
  2). Job的任务配置, 只需要配置numReduceTasks即可

job.setNumReduceTasks(0);

  评注: 是不是很简单, 不好意思让你大跌眼镜了....^_^!
  综合实际的案例分析, 我们的测试数据是随机分布的, 由此我们选用方案2.

解决方案:
  我们选定的方案大致如下:
  通过Map阶段来生成测试数据, 自定义InputFormat规则.
  我们的目标是, 运行MapReduce程序, 生成CSV格式的数据文件, 内容组织如下:

#num1,num2,lac,timestamp
1380001234,13800005678,1,2014-08-27 10:30:00
1380002058,13800005678,1,2014-08-28 11:30:00

  1). 自定义InputForamt, 以及内部的InputSplit和RecordReader
  MyInputSplit的类定义如下:

// *) 继承与InputSplit, 通过实现Writable接口
public static class MyInputSplit
		extends InputSplit implements Writable {

	private int number;

	// 需要一个无参构造函数
	public MyInputSplit() {
	}

	public MyInputSplit(int number) {
	  this.number = number;
	}

	@Override
	public long getLength()
		throws IOException, InterruptedException {
	  return 0;
	}
	@Override
	public String[] getLocations()
		throws IOException, InterruptedException {
	  return new String[]{};
	}

	public int getNumber() {
	  return number;
	}
        // *) 反序列化
  public void readFields(DataInput in)
        throws IOException {
    	  number = WritableUtils.readVInt(in);
  }
        // *) 序列化
  public void write(DataOutput out)
        throws IOException {
         WritableUtils.writeVInt(out, number);
  }
} 

  评注: MyInputSplit必须实现Writable接口, 因为InputSplit在map/reduce过程中需要序列化/反序列化, 同时InputSplit的实现类需要提供一个无参构造函数, 因为需要反射来实例化该对象. 请不要问我为何知道的这么说, 我只想说: "请叫我活雷锋!".
  MyRecordReader的定义如下所示:

public static class MyRecordReader
      extends RecordReader<NullWritable, Text> {

  private int current = 0;
  private int number = 0;
  private Text valueText = new Text();

  // *) 初始化工作
  @Override
  public void initialize(InputSplit split, TaskAttemptContext context)
      throws IOException, InterruptedException {
    this.number = ((MyInputSplit)split).getNumber();
  }
  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
    if ( current++ < number ) {
      valueText.set(DataGeneratorUtility.genetateData());
      return true;
    }
    return false;
  }
  @Override
  public NullWritable getCurrentKey() throws IOException,
      InterruptedException {
    return NullWritable.get();  
  }
  @Override
  public Text getCurrentValue() throws IOException,
    InterruptedException {
    return valueText;  
  }
  // *) 汇报进度
  @Override
  public float getProgress() throws IOException, InterruptedException {
    return current * 1.0f / number;
  }
  @Override
  public void close() throws IOException {
  }
}

  评注: MyRecordReader相对就简单了, 由于map默认是单线程生成, 因此采用带状态的函数nextKeyValue(), getCurrentKey(), getCurrentValue(). 不合理阿, 老师!!!
  最后来展示下MyInputFormat的实现, 其整合了之上的InputSplit和RecordReader.

public class MyInputFormat
	  extends InputFormat<NullWritable, Text> {
	@Override
	public List<InputSplit> getSplits(JobContext context)
			throws IOException, InterruptedException {
		int splitNumber = Integer.parseInt(
				context.getConfiguration().get("data.split_number"));
		int dataNumber = Integer.parseInt(
				context.getConfiguration().get("data.data_number"));
		List<InputSplit> results = new ArrayList<InputSplit>();
		for ( int i = 0; i < splitNumber; i++  ) {
			results.add(new MyInputSplit(dataNumber));
		}
		return results;
	}

	@Override
	public RecordReader<NullWritable, Text> createRecordReader(
			InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		return new MyRecordReader();
	}
}

  评注: MyInputFormat的实现, 就是获取分片信息, 以及提供对应的RecordReader, 对于Map/Reduce程序而言, 起到一个桥梁的作用.

  2). Map的定义处理

public class MyMap extends Mapper<NullWritable, Text, NullWritable, Text> {

  @Override
  protected void map(NullWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    context.write(key, value);
  }

}

  评注: MyMap的工作非常的简单, 就是单纯的write key/value对
  3). Job配置选项

public class MyJob extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {

      Job job = Job.getInstance(getConf());

      Path outputDir = new Path(args[0]);

      FileOutputFormat.setOutputPath(job, outputDir);
      job.setJobName("MyJob");
      job.setJarByClass(MyJob.class);
      job.setMapperClass(MyMap.class);
      // *) 设置reducer task 为0
      job.setNumReduceTasks(0);
      job.setOutputKeyClass(NullWritable.class);
      job.setOutputValueClass(Text.class);
      // *) 设置MyInputFormat
      job.setInputFormatClass(MyInputFormat.class);
      // *) 传入相关参数
      job.getConfiguration().set("data.split_number",  args[1]);
      job.getConfiguration().set("data.data_number",  args[2]);

      return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
	    int res = ToolRunner.run(new Configuration(), new MyJob(), args);
	    System.exit(res);
	}
}

  评注: 这边省略了部分参数的校验. 大致要点就是设置 NumReduceTasks(0), 然后设置InputFormatClass类MyInputFormat. Ok Let it go!!!

测试:
  编译成jar之后, 在mapreduce跑

  结果: mapreduce运行成功, 总共2个map, 每个map生成10行记录
  验证map的文件个数

  评注: part-m-00000, part-m-00001表明是map阶段生成的输出文件
  对文件内容进行验证:

  评注: 数据结果符合预期

总结:
  这边讲述了利用map/reduce生成测试集的一个流程, 同时也是写给自己, 希望自己对mapreduced的内部机制,有个更清晰的理解.

时间: 2024-08-10 22:37:46

Map/Reduce个人实战--生成数据测试集的相关文章

Python经常使用内置函数介绍【filter,map,reduce,apply,zip】

Python是一门非常简洁,非常优雅的语言,其非常多内置函数结合起来使用,能够使用非常少的代码来实现非常多复杂的功能,假设相同的功能要让C/C++/Java来实现的话,可能会头大,事实上Python是将复杂的数据结构隐藏在内置函数中,用C语言来实现,所以仅仅要写出自己的业务逻辑Python会自己主动得出你想要的结果.这方面的内置函数主要有,filter,map,reduce,apply,结合匿名函数,列表解析一起使用,功能更加强大.使用内置函数最显而易见的优点是: 1. 速度快,使用内置函数,比

Map/Reduce 工作机制分析 --- 作业的执行流程

前言 从运行我们的 Map/Reduce 程序,到结果的提交,Hadoop 平台其实做了很多事情. 那么 Hadoop 平台到底做了什么事情,让 Map/Reduce 程序可以如此 "轻易" 地实现分布式运行? Map/Reduce 任务执行总流程 经过之前的学习,我们已经知道一个 Map/Reduce 作业的总流程为: 代码编写  -->  作业配置  -->  作业提交  -->  Map任务的分配和执行  -->  处理中间结果(Shuffle)  --&

分布式基础学习(2)分布式计算系统(Map/Reduce)

二. 分布式计算(Map/Reduce) 分 布式式计算,同样是一个宽泛的概念,在这里,它狭义的指代,按Google Map/Reduce框架所设计的分布式框架.在Hadoop中,分布式文件 系统,很大程度上,是为各种分布式计算需求所服务的.我们说分布式文件系统就是加了分布式的文件系统,类似的定义推广到分布式计算上,我们可以将其视为增 加了分布式支持的计算函数.从计算的角度上看,Map/Reduce框架接受各种格式的键值对文件作为输入,读取计算后,最终生成自定义格式的输出文件. 而从分布式的角度

一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)

Map/Reduce用户界面 本节为用户採用框架要面对的各个环节提供了具体的描写叙述,旨在与帮助用户对实现.配置和调优进行具体的设置.然而,开发时候还是要相应着API进行相关操作. 首先我们须要了解Mapper和Reducer接口,应用通常须要提供map和reduce方法以实现他们. 接着我们须要对JobConf, JobClient,Partitioner,OutputCollector,Reporter,InputFormat,OutputFormat,OutputCommitter等进行讨

hadoop学习WordCount+Block+Split+Shuffle+Map+Reduce技术详解

转自:http://blog.csdn.net/yczws1/article/details/21899007 纯干货:通过WourdCount程序示例:详细讲解MapReduce之Block+Split+Shuffle+Map+Reduce的区别及数据处理流程. Shuffle过程是MapReduce的核心,集中了MR过程最关键的部分.要想了解MR,Shuffle是必须要理解的.了解Shuffle的过程,更有利于我们在对MapReduce job性能调优的工作有帮助,以及进一步加深我们对MR内

第九篇:Map/Reduce 工作机制分析 - 作业的执行流程

前言 从运行我们的 Map/Reduce 程序,到结果的提交,Hadoop 平台其实做了很多事情. 那么 Hadoop 平台到底做了什么事情,让 Map/Reduce 程序可以如此 "轻易" 地实现分布式运行? Map/Reduce 任务执行总流程 经过之前的学习,我们已经知道一个 Map/Reduce 作业的总流程为: 代码编写  -->  作业配置  -->  作业提交  -->  Map任务的分配和执行  -->  处理中间结果(Shuffle)  --&

Hadoop Map/Reduce

Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集.一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们.框架会对map的输出先进行排序, 然后把结果输入给reduce任务.通常作业的输入和输出都会被存储在文件系统中. 整个框架负责任务的调度和监控,以及重新执行已经失败的任务.通常,Map/R

王亟亟的Python学习之路(八)-函数式编程,map(),reduce(),filter()

转载请注明出处:王亟亟的大牛之路 首先在这里祝愿大家,新年快乐,工作顺利,BUG少少!!! 本来说是在春节假期内继续维持着写文章的进度,但是还是偷懒了几天(打了4天SC2哈哈哈) 今天上的是关于Python的文章,毕竟在亲戚家拜年,懒得插各类手机调试什么的,况且确实好久没有弄Python了,就写了,废话不多,开始正题!! 函数式编程 函数是什么? 把复杂的操作化为简单的函数分解成简单的操作,这种操作就是面向过程,也就是C这类的实现的大体概念. 函数式是什么? 函数没有变量,任意一个函数,只要输入

python 之 map/reduce

Python内建了map()和reduce()函数. 如果你读过Google的那篇大名鼎鼎的论文"MapReduce: Simplified Data Processing on Large Clusters",你就能大概明白map/reduce的概念. 我们先看map.map()函数接收两个参数,一个是函数,一个是Iterable,map将传入的函数依次作用到序列的每个元素,并把结果作为新的Iterator返回. 举例说明,比如我们有一个函数f(x)=x2,要把这个函数作用在一个li