MapReduce TopK 文件

问题描述:对于每日访问google 的ip做个记录 对应计算出当天前K个访问次数最多的ip地址。

对应此问题 先自定制一个ip格式的数据类型 继承WritableComparable接口。

package reverseIndex;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;

public class ipAndcount implements WritableComparable<ipAndcount>{
	private Text ip;
	private IntWritable count;
	public ipAndcount(){
		this.ip = new Text("");
		this.count = new IntWritable(1);
	}
	public ipAndcount(Text ip,IntWritable count){
		this.ip =ip;
		this.count = count;
	}
	@Override
	public void readFields(DataInput input) throws IOException {
		// TODO Auto-generated method stub
		ip.readFields(input);
		count.readFields(input);

	}
	@Override
	public void write(DataOutput output) throws IOException {
		// TODO Auto-generated method stub
		ip.write(output);
		count.write(output);
	}
	@Override
	public int compareTo(ipAndcount o) {
		// TODO Auto-generated method stub
		return ((ipAndcount)o).count.compareTo(count)==0?ip.compareTo(((ipAndcount)o).ip)
				:((ipAndcount)o).count.compareTo(count);
	}
	public boolean equals(ipAndcount o){
		if(!(o instanceof ipAndcount)){
			return false;
		}
		ipAndcount other = (ipAndcount)o;
		return ip.equals(other.ip) &&(count.equals(other.count));
	}
	public String toString(){
		StringBuffer buf = new StringBuffer("IP=");
		buf.append(ip.toString());
		buf.append(",Count=");
		buf.append(count.toString());
		buf.append(";");
		return buf.toString();
	}
	public Text getIp(){
		return ip;
	}
	public IntWritable getCount(){
		return count;
	}
	public void setCount(IntWritable count){
		this.count = count;
	}
}

此问题 应该分为俩个作业进行完成,一个用于统计IP及其整合的数量(类似WordCount)另一个用于选择出前K个进行输出:

package reverseIndex;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

//分为2个作业进行 完成 一个 用于统计每日的访问ip 另一个用于选择出前K个 访问高的ip
public class firstK {

	public static class FindIpMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
		private IntWritable one = new IntWritable(1);
		public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{
			context.write(value,one);
		}
	}
	public static class IpReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
		public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException{
			int sum = 0;
			for(IntWritable val : values){
				sum += val.get();
			}
			context.write(key, new IntWritable(sum));
		}
	}
	public static class beforeSortIpmapper extends Mapper<Text,Text,ipAndcount,Text>{
		public void map(Text key,Text value,Context context) throws IOException, InterruptedException{
			ipAndcount tmp = new ipAndcount(key,new IntWritable(Integer.valueOf(value.toString())));
			context.write(tmp,new Text());
		}
	}
	public static class selectTopKReducer extends Reducer<ipAndcount,Text,ipAndcount,Text>{
		int count = 0;
		int k = 10;
		public void reduce(ipAndcount key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
			if(count<k){
				context.write(key, null);
				count++;
			}
		}
	}
	public static void main(String[] args) throws IOException {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		Job job1 = new Job(conf,"sum ip");
		job1.setJarByClass(firstK.class);

		//默认输入输出格式
		job1.setInputFormatClass(TextInputFormat.class);
		job1.setOutputFormatClass(TextOutputFormat.class);

		//读取文件路径 和输出路径
		Path in = new Path(args[0]);
		Path out = new Path(args[1]);

		FileInputFormat.addInputPath(job1,in);
		FileOutputFormat.setOutputPath(job1,out);

		//设置map的输入输出格式
		job1.setMapOutputKeyClass(Text.class);
		job1.setMapOutputValueClass(IntWritable.class);
		job1.setOutputKeyClass(Text.class);
		job1.setOutputValueClass(IntWritable.class);
		//设置处理类
		job1.setMapperClass(FindIpMapper.class);
		job1.setReducerClass(IpReducer.class);
		//reduce任务个数
		job1.setNumReduceTasks(7);

		//作业2的配置
		Configuration conf2 = new Configuration();
		Job job2 = new Job(conf2,"select K");
		job1.setJarByClass(firstK.class);
		job1.setInputFormatClass(KeyValueTextInputFormat.class);
		job1.setOutputFormatClass(TextOutputFormat.class);

		Path in2 = new Path(args[1]);
		Path out2 = new Path(args[2]);
		FileInputFormat.addInputPath(job2,in2);
		FileOutputFormat.setOutputPath(job2,out2);

		job1.setMapOutputKeyClass(ipAndcount.class);
		job1.setMapOutputValueClass(Text.class);
		job1.setOutputKeyClass(ipAndcount.class);
		job1.setOutputValueClass(Text.class);
		job1.setMapperClass(beforeSortIpmapper.class);
		job1.setReducerClass(selectTopKReducer.class);
		job1.setNumReduceTasks(1);

		//作业的关联性  使用jobcontrol进行处理
		JobControl jc = new JobControl("select k ip");

		ControlledJob cjob1 = new ControlledJob(conf);
		cjob1.setJob(job1);
		ControlledJob cjob2 = new ControlledJob(conf2);
		cjob2.setJob(job2);

		jc.addJob(cjob1);
		jc.addJob(cjob2);
		//依赖关系
		cjob2.addDependingJob(cjob1);
		jc.run();
	}
}
时间: 2024-10-24 23:46:19

MapReduce TopK 文件的相关文章

mapreduce 多文件输出新API续

对于上一篇hadoop mapreduce 多文件输出,有一些地方介绍的不准确,这里做个续简单更正一下,同时正好解决了上一篇的不能多文件夹输出的问题 1.针对于上一篇代码中的 MultipleOutputs.addNamedOutput(job, "errorlog",     TextOutputFormat.class, Text.class, NullWritable.class);  方法,其实第二个参数并非是这么用的,下面看代码: private MultipleOutput

浅谈hadoop中mapreduce的文件分发

最近在做数据分析的时候,需要在mapreduce中调用c语言写的接口,此时就需要把动态链接库so文件分发到hadoop的各个节点上,原来想自己来做这个分发,大概过程就是把so文件放在hdfs上面,然后做mapreduce的时候把so文件从hdfs下载到本地,但查询资料后发现hadoop有相应的组件来帮助我们完成这个操作,这个组件就是DistributedCache,分布式缓存,运用这个东西可以做到第三方文件的分发和缓存功能,下面详解: 如果我们需要在map之间共享一些数据,如果信息量不大,我们可

MapReduce TopK统计加排序

Hadoop技术内幕中指出Top K算法有两步,一是统计词频,二是找出词频最高的前K个词.在网上找了很多MapReduce的Top K案例,这些案例都只有排序功能,所以自己写了个案例. 这个案例分两个步骤,第一个是就是wordCount案例,二就是排序功能. 一,统计词频 1 package TopK; 2 import java.io.IOException; 3 import java.util.StringTokenizer; 4 5 import org.apache.hadoop.co

MapReduce TopK问题实际应用

一:背景 TopK问题应该是海量数据处理中应用最广泛的了,比如在海量日志数据处理中,对数据清洗完成之后统计某日访问网站次数最多的前K个IP.这个问题的实现方式并不难,我们完全可以利用MapReduce的Shuffle过程实现排序,然后在Reduce端进行简单的个数判断输出即可.这里还涉及到二次排序,不懂的同学可以参考我之前的文章. 二:技术实现 #我们先来看看一条Ngnix服务器的日志: [java] view plain copy 181.133.250.74 - - [06/Jan/2015

MapReduce小文件处理之CombineFileInputFormat实现

在MapReduce使用过程中.一般会遇到输入文件特别小(几百KB.几十MB).而Hadoop默认会为每一个文件向yarn申请一个container启动map,container的启动关闭是很耗时的. Hadoop提供了CombineFileInputFormat.一个抽象类.作用是将多个小文件合并到一个map中,我们仅仅需实现三个类: CompressedCombineFileInputFormat CompressedCombineFileRecordReader CompressedCom

MapReduce小文件优化与分区

一.小文件优化 1.Mapper类 package com.css.combine; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * 思路? * wordcou

mapreduce 对文件分词读取

MapReduce 实例一:(进行文件的分词读取) 1.1 首先导入架包 <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.8.2</version> </dependency> <dependency> <groupId>org.

Mapreduce的文件和hbase共同输入

package duogemap; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hba

Mapreduce TopK

思想比较简单,就是每个通过map来获取当前的数据块中的的topk个数据,然后将他们以相同的key值放到reduce中,最后通过reduce来对这n*k个数据排序并获得topk个数据.具体的就是建立一个k个大小的数组,一开始初始化为都是100(假定这里的100是最大的数),然后往里面插数据小的数据即可. PS:有几个小细节以及当时写代码的时候出错的地方. 1 map和reduce都是在每个键值对来的时候会被调用.当时觉得应该把这k的数组放在哪,以及怎么初始化.如果放在map方法里面,那每次都会被初