Hadoop实战读书笔记(9)

如何将一个反向引用索引的程序的Reducer输出的类型改为IntWritable

public static class Reduce extends MapReduceBase

implements Reducer<Text, Text, Text, IntWritable> {

public void reduce(Text key, Iterator<Text> values,

OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException {

int count = 0;

while (values.hasNext()) {

values.next();

count++;

}

output.collect(key, new IntWritable(count));

}

}

计算不同引用次数专利的数目

之所以选择K2、V2、K3和V3的数据为IntWritable类型,是因为它们的数据必然为整数,并且使用IntWritable比Text更高效

public class CitationHistogram extends Configured implements Tool {

public static class MapClass extends MapReduceBase

implements Mapper<Text, Text, IntWritable, IntWritable> {

private final static IntWritable uno = new IntWritable(1);

private IntWritable citationCount = new IntWritable();

public void map(Text key, Text, value,

OutputCollector<IntWritable, IntWritable> output,

Reporter reporter) throws IOException {

citationCount.set(Integer.parseInt(value.toString()));

output.collect(citationCount, uno);

}

}

public static class Reduce extends MapReduceBase

implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {

public void reduce(IntWritable key, Iterator<IntWritable> values,

OutputCollector<IntWritable, IntWritable > output,

Reporter reporter) throws IOException {

int count = 0;

while (values.hasNext()) {

count += values.next().get();

}

output.collect(key, new IntWritable(count));

}

}

public int run(String[] args) throws Exception {

Configuration conf = getConf();

JobConf job = new JobConf(conf, CitationHistogram.class);

Path in = new Path(args[0]);

Path out = new Path(args[1]);

FileInputFormat.setInputPaths(job, in);

FileOutputFormat.setOutputPath(job, out);

job.setJobName("CitationHistogram");

job.setMapperClass(MapClass.class);

job.setReducerClass(Reduce.class);

job.setInputFomrat(KeyValueTextInputFormat.class);

job.setOutputFormat(TextOutputFormat.class);

job.setOutputKeyClass(IntWritable.class);

job.setOutputValueClass(IntWritable.class);

JobClient.runJob(job);

return 0;

}

public static void main(String[] args) throws Exception {

int res = ToolRunner.run(new Configuration(), new CitationHistogram(), args);

system.exit(res);

}

}

需要说明的几点:

1、类名为CitationHistogram

2、输入格式为KeyValueTextInputFormat,输出格式为TextOutputFormat

3、KeyValueTextInputFormat默认以"<tab>"制表符进行分割,可以使用job.set("key.value.separator.in.input.line", ","),修改分隔符为",",其他需要自行修改

4、MapClass类使用了两个私有成员变量uno 和citationCount,为什么要这样定义呢?

出于对效率的考虑citationCount和uno的定义被放在类中而不是方法中,有多少记录,map()方法就会被调用多少次 (对每个JVM而言,就是一个分片中的记录数)。减少在map()方法中生成的对象个数可以提高性能,并减少垃圾回收。由于citationCount和uno被传递给output.collect(),我们依赖output.collect()方法的约定不会修改这两个对象。

5、Reducer计算每个key对应的值的总数,这似乎并不高效,因为我们知道所有的值都是1,为什么我们还要去加它们呢?原因在于,它会让我们在以后能更容易地增加一个combiner来提高性能。

Hadoop API是改了又改,变了又变

1、0.20版本被视为旧API和未来稳定API之间的过渡

2、为了保持向后兼容,版本0.20以及之后的版本支持下一代API并将旧API标记为弃用

并不推荐马上转向新的API

1、0.20版本,许多Hadoop自身的类库还没有基于新的API重写,如果MapReduce代码使用0.20中的新API,这些类将无法被使用。

2、在0.20版本之前,0.18.3仍被很多人认为是最完备与最稳定的版本

3、虽然在新版本的API中有所改变,但是改变的仅仅影响基础的MapReduce模板,我们可以基于新API所做的改变,重写这个模板以备将来使用。

你可能会奇怪为什么不提0.19

一般的意见认为它的初始版本问题比较多,有许多bug,一些副版本试图解决这个问题,但社区希望直接跳到版本0.20

新版本的API做了哪些改动?

1、在新的API中org.apahce.hadoop.mapred的许多类都被移走了,多数被放入org.apache.hadoop.mapreduce中,而且类库都放在org.apache.hadoop.mapreduce.lib的一个包里。当转为使用新API时,org.apache.hadoop.mapred下所有的类的import声明(或者完全引用)就不存在了,它们都被弃用

2、新API中最有益的变化是引入了上下文对象context,最直接的影响在于替换了map()和reduce()方法中使用的OutputCollector和Reporter对象。现在将通过调用Context.writer()而不是OutputCollector.collect()输出键/值对。深远的影响是统一了应用代码和MapReduce框架之间的通信,并固定了Mapper和Reducer的API,使得添加新功能时不会改变基本方法签名,新的功能仅仅时在context对象上增加的方法,在引入这些方法之前写的程序不会感知到这些新方法,它们可以在更新的版本中继续编译与运行

3、新的map()和reduce()方法分别被包含在新的抽象Mapper和Reducer中,它们取代了原始API中的Mapper和Reducer接口(org.apache.hadoop.mapred.Mapper和org.apache.hadoop.mapred.Reducer)。新的抽象类也替换了MapReudceBase类,使之被弃用

4、新的map()和reduce()方法多了一两处细微的改变,它们可以抛出InterruptedException而非单一的IOException,而且,reduce()方法不再以Iterator而以Iterable来接受一个值的列表,这样更容易使用Java的foreach语义来实现迭代。

回顾一下原始的API中的签名

public static class MapClass extends MapReduceBase

implements Mapper<K1, V1, K2, V2> {

public void map(K1 key, V1 value,

OutputCollector<K2, V2> output,

Reporter reporter) throws IOException { }

}

public static class Reduce extends MapReduceBase

implements Reducer<K2, V2, K3, V3> {

public void reduce(K2 key, Iterator<V2> values,

OutputCollector<K3, V3> output,

Reporter reporter) throws IOException { }

}

新的API中的签名,体会与上面签名的不同

public static class MapClass extends Mapper<K1, V2, K2, V2> {

public void map(K1 key, V1 value, Context context)

throws IOException, InterruptedException { }

}

public static class Reduce extends Reducer<K2, V2, K3, V3> {

public void reduce(K2 key, Iterable<V2> values, Context context)

throws IOException, InterruptedException { }

}

你还需要改变driver中的一些内容来支持新的API

1、在新的API中JobConf和JobClient被替换了,它们的功能已经被放入Configuration类(它原本是JobConf的父类)和一个新的类Job中

2、Configuration类纯粹为了配置作业而设,而Job类负责定义和控制一个作业的执行

3、比如,setOutputKeyClass()和setOutputValueClass()等方法被从JobConf转移到了Job

4、作业的构造和提交执行现在放在Job中,原本需要使用JobConf来构造一个作业:

JobConf job = new JobConf(conf, MyJob.class);

job.setJobName("MyJob");

而现在可通过Job类完成:

Job job = new Job(conf, "MyJob");

job.setJarByClass(MyJob.class);

以前是通过JobClient提交作业去执行:

JobClient.runJob(job);

现在同样通过Job类来完成

System.exit(job.waitForCompletion(true) ? 0 : 1);

基于版本0.20及其以上的新API重写的Hadoop基础程序模板(Hadoop 1.X也适用)

public class MyJob extends Configured implements Tool {

public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

public void map(LongWirtable key, Text value, Context context)

throws IOException, InterruptedException {

String[] citation = value.toString().split(",");

context.write(new Text(citation[1]), new Text(citation[0]));

}

}

public static class Reduce extends Reducer<Text, Text, Text, Text> {

// Iterable类型允许foreach循环

public void reduce(Text key, Iterable<Text> values, Context context)

throws IOException, InterruptedException {

for (Text val : values) {

if (csv.length() > 0) csv += ",";

csv += val.toString();

}

context.write(key, new Text(csv));

}

}

public int run(String[] args) throws Exception {

Configuration conf = getConf();

Job job = new Job(conf, "MyJob");

job.setJarByClass(MyJob.class);

Path in = new Path(args[0]);

Path out = new Path(args[1]);

FileInputFormat.setInputPaths(job, in);

FileOutputFormat.setOutputPath(job, out);

job.setMapperClass(MapClass.class);

job.setReducerClass(Reduce,.class);

// 兼容InputFormat类

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

System.exit(job.waitForCompletion(true)?0:1);

return 0;

}

public static void main(String[] args) throws Exception {

int res = ToolRunner.run(new Configuration(), new MyJob(), args);

System.exit(res);

}

}

1、这段代码实现了反向索引功能

2、KeyValueTextInputFormat类未被移植到版本0.20的新API中,重写这个模板我们不得不使用TextInputFormat

时间: 2024-08-03 08:18:33

Hadoop实战读书笔记(9)的相关文章

Hadoop实战读书笔记(8)

什么是开发数据集? 一个流行的开发策略是为生产环境中的大数据集建立一个较小的.抽样的数据子集,称为开发数据集.这个开发数据集可能只有几百兆字节.当你以单机或者伪分布式模式编写程序来处理它们时,你会发现开发周期很短,在自己的机器上运行程序也很方便,而且还可以在独立的环境中进行调试. 为什么选择专利引用数据做测试? 1.因为它们与你将来会遇到的大多数数据类型相似 2.专利引用数据所构成的关系图与网页链接以及社会网络图可谓大同小异 3.专利发布以时间为序,有些特性类似于时间序列 4.每个专利关联到一个

Hadoop实战读书笔记(7)

输入数据概要 输入数据通常驻留在较大的文件中,通常几十或者数百GB,甚至更大.MapReduce处理的基本原则之一是将输入数据分割成块.这些块可以在多台计算机上并行处理,在Hadoop的术语中这些块被称为输入分片(Input Split).每个分片应该足够小以实现更细粒度的并行.(如果所有的输入数据都在一个分片中,那就没有并行了.) 另一方面,每个分片也不能太小,否则启动与停止各个分片处理所需的开销将占去很大一部分执行时间. 所以说: 1.单个文件要足够的大,这样才能被分片,才会有并行. 2.分

Hadoop实战读书笔记(6)

putmerge程序的大体流程是? 1.根据用户定义的参数设置本地目录和HDFS的目录文件 2.提取本地输入目录中每个文件的信息 3.创建一个输出流写入到HDF文件 4.遍历本地目录中的每个文件,打开一个输入流来读取该文件,剩下就是一个标准的Java文件复制过程了 具体程序如下: public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); FileSys

Hadoop实战读书笔记(5)

HDFS文件操作 你可以把一个大数据集(100TB)在HDFS中存储为单个文件,而大多数其他的文件系统无力实现这一点.虽然该文件存在多个副本分布在多台机器上来支持并行处理,你也不必考虑这些细节. HDFS (Hadoop Distribution File System)文件系统到底是一个怎样的文件系统? 并不是一个Unix文件系统,不支持像ls和cp这种标准的Unix文件命令,也不支持如fopen()和fread()这样的标准文件读写操作.但是Hadoop提供了一套与Linux文件命令类似的命

R实战读书笔记四

第三章 图形入门 本章概要 1 创建和保存图形 2 定义符号.线.颜色和坐标轴 3 文本标注 4 掌控图形维数 5 多幅图合在一起 本章所介绍内容概括如下. 一图胜千字,人们从视觉层更易获取和理解信息. 图形工作 R具有非常强大的绘图功能,看下面代码. > attach(mtcars) > plot(wt, mpg) > abline(lm(mpg~wt)) > title("Regression of MPG on Weight") > detach(m

JAVA并发编程实战 读书笔记(二)对象的共享

<java并发编程实战>读书摘要 birdhack 2015年1月2日 对象的共享 JAVA并发编程实战读书笔记 我们已经知道了同步代码块和同步方法可以确保以原子的方式执行操作,但一种常见的误解是,认为关键之synchronized只能用于实现原子性或者确定临界区.同步还有另一个重要的方面:内存可见性. 1.可见性 为了确保多个线程之间对内存写入操作的可见性,必须使用同步机制. 在没有同步的情况下,编译器.处理器以及运行时等都可能对操作的执行顺序进行一些意想不到的调整.在缺乏足够同步的多线程程

Spring3.x企业开发应用实战读书笔记 —— 第三章IoC容器概述

声明:    本篇博客绝大多数内容为<Spring3.x企业开发应用实战>一书原内容,所有版权归原书作者所有!,仅供学习参考,勿作他用! 3.2 相关Java基础知识 Java语言允许通过程序化的方式间接对Class对象实例操作,Class文件由类装载器装在后,在JVM(Java虚拟机)中将形成一份描述Class结构的元信息对象,通过该元信息对象可以获知Class的结构信息: 如构造函数.属性和方法等.Java允许用户借由这个Class相关的元信息对象间接调用Class对象的功能,这就为使用程

机器学习实战读书笔记(三)决策树

3.1 决策树的构造 优点:计算复杂度不高,输出结果易于理解,对中间值的缺失不敏感,可以处理不相关特征数据. 缺点:可能会产生过度匹配问题. 适用数据类型:数值型和标称型. 一般流程: 1.收集数据 2.准备数据 3.分析数据 4.训练算法 5.测试算法 6.使用算法 3.1.1 信息增益 创建数据集 def createDataSet(): dataSet = [[1, 1, 'yes'], [1, 1, 'yes'], [1, 0, 'no'], [0, 1, 'no'], [0, 1, '

R语言实战读书笔记(二)创建数据集

2.2.2 矩阵 matrix(vector,nrow,ncol,byrow,dimnames,char_vector_rownames,char_vector_colnames) 其中: byrow=TRUE/FALSE,表示按行填充还是按列填充,默认情况下是按列填充 2.2.4 数据框 1.attach,detach()和with() attach():将数据框加入搜索路径 detach():将数据框移除出搜索路径 with():赋值仅在括号内有效,如果想在括号外生效也可以,用<<- 2.