通过50030 查看Hadoop任务的执行情况,在查看任务详细的时候可以看到有很多计数器,是hadoop框架默认提供的统计MR的详细信息,从计数器中可以看出程序运行的状态,那么我们如何自定义这些信息呢?
我们平时控制台输出的也有计数器
例子:
过滤数据中含有hello的词的数量
package counter; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 敏感词监控处理 记录敏感单词出现的次数,自定义计数器 * */ public class WordCountApp { private static final String inputPaths = "hdfs://hadoop:9000/hello"; private static final String OUT_PATH = "hdfs://hadoop:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf); fileSystem.delete(new Path(OUT_PATH), true); Job job = new Job(conf, WordCountApp.class.getSimpleName()); job.setJarByClass(WordCountApp.class); FileInputFormat.setInputPaths(job, inputPaths); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.waitForCompletion(true); } public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{ @Override protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { //获得一个计数器 Counter counter = context.getCounter("Sensitive Words","hello"); final String line = value.toString(); //假设hello是敏感词 if(line.contains("hello")){ //敏感词出现一次加一 counter.increment(1L); } String[] split = line.split("\t"); for (String word : split) { context.write(new Text(word), new LongWritable(1)); } } } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ @Override protected void reduce(Text key, Iterable<LongWritable> values, org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { Long count=0L; for (LongWritable times : values) { count+=times.get(); } context.write(key, new LongWritable(count)); } } }
运行结果:
控制台
时间: 2024-12-06 04:49:50