Hadoop之——自定义计数器

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/46057909

1、Mapper类的实现

	/**
	 * KEYIN	即k1		表示行的偏移量
	 * VALUEIN	即v1		表示行文本内容
	 * KEYOUT	即k2		表示行中出现的单词
	 * VALUEOUT	即v2		表示行中出现的单词的次数,固定值1
	 */
	static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
		protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
			Counter counter = context.getCounter("Sensitive word", "hello");
			String line = v1.toString();
			if(line.contains("hello")){
				//记录敏感词汇出现在一行中
				counter.increment(1);
			}
			final String[] splited = line.split("\t");
			for (String word : splited) {
				context.write(new Text(word), new LongWritable(1));
			}
		};
	}

2、Reducer类的实现

	/**
	 * KEYIN	即k2		表示行中出现的单词
	 * VALUEIN	即v2		表示行中出现的单词的次数
	 * KEYOUT	即k3		表示文本中出现的不同单词
	 * VALUEOUT	即v3		表示文本中出现的不同单词的总次数
	 *
	 */
	static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
		protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
			long times = 0L;
			for (LongWritable count : v2s) {
				times += count.get();
			}
			ctx.write(k2, new LongWritable(times));
		};
	}

3、程序入口Main方法

	//程序入口Mainfan
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		//如果已经存在输出文件,则先删除已存在的输出文件
		if(fileSystem.exists(outPath)){
			fileSystem.delete(outPath, true);
		}

		final Job job = new Job(conf , WordCount.class.getSimpleName());
		//1.1指定读取的文件位于哪里
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
		job.setInputFormatClass(TextInputFormat.class);

		//1.2 指定自定义的map类
		job.setMapperClass(MyMapper.class);
		//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,下面两行代码可以省略
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);

		//1.3 分区
		job.setPartitionerClass(HashPartitioner.class);
		//有一个reduce任务运行
		job.setNumReduceTasks(1);

		//1.4 TODO 排序、分组

		//1.5 TODO 规约

		//2.2 指定自定义reduce类
		job.setReducerClass(MyReducer.class);
		//指定reduce的输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		//2.3 指定写出到哪里
		FileOutputFormat.setOutputPath(job, outPath);
		//指定输出文件的格式化类
		job.setOutputFormatClass(TextOutputFormat.class);

		//把job提交给JobTracker运行
		job.waitForCompletion(true);
	}

4、完整程序

package com.lyz.hadoop.count;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
/**
 * 利用Hadoop MapReduce统计文本中每个单词的数量
 * 自定义计数器
 * @author liuyazhuang
 */
public class WordCount{
	//要统计的文件位置
	static final String INPUT_PATH = "hdfs://liuyazhuang:9000/d1/hello";
	//统计结果输出的位置
	static final String OUT_PATH = "hdfs://liuyazhuang:9000/out";
	//程序入口Mainfan
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		//如果已经存在输出文件,则先删除已存在的输出文件
		if(fileSystem.exists(outPath)){
			fileSystem.delete(outPath, true);
		}

		final Job job = new Job(conf , WordCount.class.getSimpleName());
		//1.1指定读取的文件位于哪里
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
		job.setInputFormatClass(TextInputFormat.class);

		//1.2 指定自定义的map类
		job.setMapperClass(MyMapper.class);
		//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,下面两行代码可以省略
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);

		//1.3 分区
		job.setPartitionerClass(HashPartitioner.class);
		//有一个reduce任务运行
		job.setNumReduceTasks(1);

		//1.4 TODO 排序、分组

		//1.5 TODO 规约

		//2.2 指定自定义reduce类
		job.setReducerClass(MyReducer.class);
		//指定reduce的输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);

		//2.3 指定写出到哪里
		FileOutputFormat.setOutputPath(job, outPath);
		//指定输出文件的格式化类
		job.setOutputFormatClass(TextOutputFormat.class);

		//把job提交给JobTracker运行
		job.waitForCompletion(true);
	}

	/**
	 * KEYIN	即k1		表示行的偏移量
	 * VALUEIN	即v1		表示行文本内容
	 * KEYOUT	即k2		表示行中出现的单词
	 * VALUEOUT	即v2		表示行中出现的单词的次数,固定值1
	 */
	static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
		protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
			Counter counter = context.getCounter("Sensitive word", "hello");
			String line = v1.toString();
			if(line.contains("hello")){
				//记录敏感词汇出现在一行中
				counter.increment(1);
			}
			final String[] splited = line.split("\t");
			for (String word : splited) {
				context.write(new Text(word), new LongWritable(1));
			}
		};
	}

	/**
	 * KEYIN	即k2		表示行中出现的单词
	 * VALUEIN	即v2		表示行中出现的单词的次数
	 * KEYOUT	即k3		表示文本中出现的不同单词
	 * VALUEOUT	即v3		表示文本中出现的不同单词的总次数
	 *
	 */
	static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
		protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
			long times = 0L;
			for (LongWritable count : v2s) {
				times += count.get();
			}
			ctx.write(k2, new LongWritable(times));
		};
	}

}

5、核心

时间: 2024-10-22 17:14:38

Hadoop之——自定义计数器的相关文章

Hadoop学习笔记—7.计数器与自定义计数器

一.Hadoop中的计数器 计数器:计数器是用来记录job的执行进度和状态的.它的作用可以理解为日志.我们通常可以在程序的某个位置插入计数器,用来记录数据或者进度的变化情况,它比日志更便利进行分析. 例如,我们有一个文件,其中包含如下内容: hello you hello me 它被WordCount程序执行后显示如下日志: 在上图所示中,计数器有19个,分为四个组:File Output Format Counters.FileSystemCounters.File Input Format

Hadoop初学指南(7)--MapReduce自定义计数器

本文主要介绍了MapReduce中的自定义计数器的相关内容. 在上次的单词统计例子中,我们可以看到MapReduce在执行过程中会有很多的控制台输出信息,其中有一个很关键的内容:计数器.如下图: 可以看到最上方的关键字:Counters,这就表示计数器. 在这里,只有一个制表符缩进的表示计数器组,有两个制表符缩进的表示计数器组下的计数器.如File Output Format Counters就表示文件输出的计数器组,里面的Bytes Written表示输出的字符数,在输出的文本中,hello,

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

commoncrawl 源码库是用于 Hadoop 的自定义 InputFormat 配送实现

commoncrawl 源码库是用于 Hadoop 的自定义 InputFormat 配送实现. Common Crawl 提供一个示例程序 BasicArcFileReaderSample.java (位于 org.commoncrawl.samples) 用来配置 InputFormat. commoncrawl / commoncrawl Watch414 Fork86 CommonCrawl Project Repository — More... http://www.commoncr

ionic3.x 自定义组件component双向绑定之自定义计数器

本文主要示例在ionic3.x环境下实现一个自定义计数器,实现后最终效果如图: 1.使用命令创建一个component ionic g component CounterInput 类似的命令还有: ionic g page YourPageName //创建新页面 ionic g directive YourPageName //创建指令 ionic g component YourComponentName //创建组件 ionic g provider YourProviderName /

Hadoop之--&gt;自定义排序

data: 3 33 23 12 22 11 1 --------------------- 需求: 1 12 12 23 13 23 3 package sort; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path

Hadoop之--&gt;自定义分组 RawComparator

data: 3 33 23 22 22 11 1 --------------------- 需求: 1 12 23 3 当第一列相同时候要第二列的最小值 package group; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org

Hadoop 学习自定义数据类型

(学习网易云课堂Hadoop大数据实战笔记) 序列化在分布式环境的两大作用:进程间通信,永久存储. Writable接口, 是根据 DataInput 和 DataOutput 实现的简单.有效的序列化对象. MR的任意Value必须实现Writable接口: MR的key必须实现WritableComparable接口,WritableComparable继承自Writable和Comparable接口: (本节先讲自定义value值,下一节再讲自定义key值,根据key值进行自定义排序) 以

hadoop 学习自定义排序

(网易云课程hadoop大数据实战学习笔记) 自定义排序,是基于k2的排序,设现有以下一组数据,分别表示矩形的长和宽,先按照面积的升序进行排序. 99 66 78 11 54 现在需要重新定义数据类型,MR的key值必须继承WritableComparable接口,因此定义RectangleWritable数据类型如下: import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import