关于MapReduce中自定义Combine类(一)

MRJobConfig

public static fina COMBINE_CLASS_ATTR

属性COMBINE_CLASS_ATTR = "mapreduce.job.combine.class"

————子接口(F4) JobContent

方法getCombinerClass

————子实现类 JobContextImpl

实现getCombinerClass方法:

public Class<? extends Reducer<?,?,?,?>> getCombinerClass()

throws ClassNotFoundException {

return (Class<? extends Reducer<?,?,?,?>>)

conf.getClass(COMBINE_CLASS_ATTR, null);

}

因为JobContextImpl是MRJobConfig子类

所以得到了父类MRJobConfig的COMBINE_CLASS_ATTR属性

————子类Job

public void setCombinerClass(Class<? extends Reducer> cls

) throws IllegalStateException {

ensureState(JobState.DEFINE);

conf.setClass(COMBINE_CLASS_ATTR, cls, Reducer.class);

}

因为JobContextImpl是MRJobConfig子类,

而Job是JobContextImpl的子类

所以也有COMBINE_CLASS_ATTR属性

通过setCombinerClass设置了父类MRJobConfig的属性

MRJobConfig

————子接口JobContent

方法getCombinerClass

————子实现类 JobContextImpl

————子类 Job

————子实现类 TaskAttemptContext

继承了方法getCombinerClass

Task

$CombinerRunner(Task的内部类)

该内部类有方法create:

public static <K,V> CombinerRunner<K,V> create(JobConf job,

TaskAttemptID taskId,

Counters.Counter inputCounter,

TaskReporter reporter,

org.apache.hadoop.mapreduce.OutputCommitter committer

) throws ClassNotFoundException

{

Class<? extends Reducer<K,V,K,V>> cls =

(Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();

if (cls != null) {

return new OldCombinerRunner(cls, job, inputCounter, reporter);

}

// make a task context so we can get the classes

org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =

new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,

reporter);

Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =

(Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)

taskContext.getCombinerClass();

if (newcls != null) {

return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,

inputCounter, reporter, committer);

}

return null;

}

其中这一段应该是旧的API

Class<? extends Reducer<K,V,K,V>> cls =

(Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();

if (cls != null) {

return new OldCombinerRunner(cls, job, inputCounter, reporter);

}

而这个是新的API

org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =

new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, taskId,

reporter);

Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls =

(Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)

taskContext.getCombinerClass();

if (newcls != null) {

return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext,

inputCounter, reporter, committer);

}

return null;

(不知道为什么要写全名,去掉那些包名、向上/下转型和各种泛型的话,看起来就会清晰很多?)

而TaskAttemptContext是JobContent的子实现类,所以继承了getCombinerClass方法

而且,这里用的是多态,其调用的是子实现类TaskAttemptContextImpl的getCombinerClass方法

(TaskAttemptContextImpl继承了JobContextImpl,而JobContextImpl实现了该方法)

所以最终get到了属性COMBINE_CLASS_ATTR,即得到了我们通过job.setCombinerClass的xxxC

而这个xxxC是给了newcls,而newcls是给了NewCombinerRunner的构造函数的reducerClassc参数

NewCombinerRunner(Class reducerClass,

JobConf job,

org.apache.hadoop.mapreduce.TaskAttemptID taskId,

org.apache.hadoop.mapreduce.TaskAttemptContext context,

Counters.Counter inputCounter,

TaskReporter reporter,

org.apache.hadoop.mapreduce.OutputCommitter committer)

{

super(inputCounter, job, reporter);

this.reducerClass = reducerClass;

this.taskId = taskId;

keyClass = (Class<K>) context.getMapOutputKeyClass();

valueClass = (Class<V>) context.getMapOutputValueClass();

comparator = (RawComparator<K>) context.getCombinerKeyGroupingComparator();

this.committer = committer;

}

Task

MapTask

$MapOutputBuffer

private CombinerRunner<K,V> combinerRunner;

$SpillThread类($表示内部类)

combinerRunner = CombinerRunner.create(job, getTaskID(),

combineInputCounter,

reporter, null);

//此时,我们得到了设置好的合并类

if (combinerRunner == null) {

// spill directly

DataInputBuffer key = new DataInputBuffer();

while (spindex < mend &&

kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {

final int kvoff = offsetFor(spindex % maxRec);

int keystart = kvmeta.get(kvoff + KEYSTART);

int valstart = kvmeta.get(kvoff + VALSTART);

key.reset(kvbuffer, keystart, valstart - keystart);

getVBytesForOffset(kvoff, value);

writer.append(key, value);

++spindex;

}

} else {

int spstart = spindex;

while (spindex < mend &&

kvmeta.get(offsetFor(spindex % maxRec)

+ PARTITION) == i) {

++spindex;

}

// Note: we would like to avoid the combiner if we‘ve fewer

// than some threshold of records for a partition

if (spstart != spindex) {

combineCollector.setWriter(writer);

RawKeyValueIterator kvIter =

new MRResultIterator(spstart, spindex);

combinerRunner.combine(kvIter, combineCollector);

}

}

再查看combine函数

在Task的内部类NewCombinerRunner下

public void combine(RawKeyValueIterator iterator,

OutputCollector<K,V> collector)

throws IOException, InterruptedException,ClassNotFoundException

{

// make a reducer

org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer =

(org.apache.hadoop.mapreduce.Reducer<K,V,K,V>)

ReflectionUtils.newInstance(reducerClass, job);

org.apache.hadoop.mapreduce.Reducer.Context

reducerContext = createReduceContext(reducer, job, taskId,

iterator, null, inputCounter,

new OutputConverter(collector),

committer,

reporter, comparator, keyClass,

valueClass);

reducer.run(reducerContext);

}

上面的reducerClass就是我们传入的xxxC

最终是通过反射创建了一个xxxC对象,并将其强制向上转型为Reducer实例对象,

然后调用了向上转型后对象的run方法(当前的xxxC没有run方法,调用的是父类Reduce的run)

在类Reducer中,run方法如下

/**

* Advanced application writers can use the

* {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to

* control how the reduce task works.

*/

public void run(Context context) throws IOException, InterruptedException {

setup(context);

try {

while (context.nextKey()) {

reduce(context.getCurrentKey(), context.getValues(), context);

// If a back up store is used, reset it

Iterator<VALUEIN> iter = context.getValues().iterator();

if(iter instanceof ReduceContext.ValueIterator) {

((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();

}

}

} finally {

cleanup(context);

}

}

有由于多态,此时调用的reduce是子类xxxC中的reduce方法

(多态态性质:子类复写了该方法,则实际上执行的是子类中的该方法)

所以说,我们自定义combine用的类的时候,应该继承Reducer类,并且复写reduce方法

且其输入形式:(以wordcount为例)

reduce(Text key, Iterable<IntWritable> values, Context context)

其中key是单词个数,而values是个数列表,也就是value1、value2........

注意,此时已经是列表,即<键,list<值1、值2、值3.....>>

(之所以得到这个结论,是因为我当时使用的combine类是WCReduce,

即Reduce和combine所用的类是一样的,通过对代码的分析,传入值的结构如果是<lkey,value>的话,是不可能做到combine的啊——即所谓的对相同值合并,求计数的累积和,这根本就是两个步骤,对key相同的键值对在map端就进行了一次合并了,合并成了<key,value list>,然后才轮到combine接受直接换个形式的输入,并处理——我们的处理是求和,然后再输出到context,进入reduce端的shuffle过程。

然后我在reduce中遍历了用syso输出

结果发现是0,而这实际上是因为经过一次遍历,我的指针指向的位置就不对了啊,

)

嗯,自己反复使用以下的代码,不断的组合、注释,去测试吧~就会得出这样的结论了

  1. /reduce
  2.     publicstaticclassWCReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
  3.         private final IntWritableValueOut=newIntWritable();
  4.         @Override
  5.         protectedvoid reduce(Text key,Iterable<IntWritable> values,
  6.                 Context context)  throws IOException,InterruptedException{
  7.             for(IntWritable value : values){
  8.                 System.out.println(value.get()+"--");
  9.             }
  10.  
  11. //            int total = 0 ;
  12. //            for (IntWritable value : values) {
  13. //                total += value.get();
  14. //            }
  15. //            ValueOut.set(total);
  16. //            context.write(key, ValueOut);
  17.         }
  18.  
  19.     }
  20.           
  21. job.setCombinerClass(WCReduce.class);

来自为知笔记(Wiz)

附件列表

时间: 2024-10-20 21:03:53

关于MapReduce中自定义Combine类(一)的相关文章

关于MapReduce中自定义分区类(四)

MapTask类 在MapTask类中找到run函数 if(useNewApi){       runNewMapper(job, splitMetaInfo, umbilical, reporter);     } 再找到runNewMapper @SuppressWarnings("unchecked")   private<INKEY,INVALUE,OUTKEY,OUTVALUE>   void runNewMapper(final JobConf job,    

关于MapReduce中自定义分组类(三)

Job类  /**    * Define the comparator that controls which keys are grouped together    * for a single call to    * {@link Reducer#reduce(Object, Iterable,    *                       org.apache.hadoop.mapreduce.Reducer.Context)}    * @param cls the raw

读取SequenceFile中自定义Writable类型值

1)hadoop允许程序员创建自定义的数据类型,如果是key则必须要继承WritableComparable,因为key要参与排序,而value只需要继承Writable就可以了.以下定义一个DoubleArrayWritable,继承自ArrayWritable.代码如下: 1 package matrix; 2 import org.apache.hadoop.io.*; 3 public class DoubleArrayWritable extends ArrayWritable { 4

Mapreduce中自定义分区

Reducer任务的数据来自于Mapper任务,也就说Mapper任务要划分数据,对于不同的数据分配给不同的Reducer任务运行.Mapper任务划分数据的过程就称作Partition.负责实现划分数据的类称作Partitioner. 默认的分区类是HashPartitioner,是处理Mapper任务输出的,getPartition()方法有三个形参,key.value分别指的是Mapper任务的输出,numReduceTasks指的是设置的Reducer任务数量,默认值是1.那么任何整数与

c#(winform)中自定义ListItem类方便ComboBox添加Item项

1.定义ListItem类 public class ListItem { private string _key = string.Empty; private string _value = string.Empty; public ListItem(string pKey, string pValue) { _key = pKey; _value = pValue; } public override string ToString() { return this._value; } pu

Hadoop Mapreduce 中的FileInputFormat类的文件切分算法和host选择算法

文件切分算法 文件切分算法主要用于确定InputSplit的个数以及每个InputSplit对应的数据段. FileInputFormat以文件为单位切分成InputSplit.对于每个文件,由以下三个属性值确定其对应的InputSplit的个数. goalSize:根据用户期望的InputSplit数据计算,即totalSize/numSplit.totalSize为文件总大小:numSplit为用户设定的Map Task个数,默认情况下是1. minSize:InputSplit的最小值,由

【MVC】中自定义扩展类实现客户端验证

照图片一步一步做,你就会成功的~ 哈哈 注意细节哦~

024_MapReduce中的基类Mapper和基类Reducer

内容提纲 1) MapReduce中的基类Mapper类,自定义Mapper类的父类. 2) MapReduce中的基类Reducer类,自定义Reducer类的父类. 1.Mapper类 API文档 1) InputSplit输入分片,InputFormat输入格式化 2) 对Mapper输出结果进行Sorted排序和Group分组 3) 对Mapper输出结果依据Reducer个数进行分区Patition 4) 对Mapper输出数据进行Combiner 在Hadoop官方文档的Mapper

MapReduce中combine、partition、shuffle的作用是什么

http://www.aboutyun.com/thread-8927-1-1.html Mapreduce在hadoop中是一个比較难以的概念.以下须要用心看,然后自己就能总结出来了. 概括: combine和partition都是函数.中间的步骤应该仅仅有shuffle! 1.combine combine分为map端和reduce端,作用是把同一个key的键值对合并在一起,能够自己定义的. combine函数把一个map函数产生的<key,value>对(多个key,value)合并成一