问题描述:对于每日访问google 的ip做个记录 对应计算出当天前K个访问次数最多的ip地址。
对应此问题 先自定制一个ip格式的数据类型 继承WritableComparable接口。
package reverseIndex; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; public class ipAndcount implements WritableComparable<ipAndcount>{ private Text ip; private IntWritable count; public ipAndcount(){ this.ip = new Text(""); this.count = new IntWritable(1); } public ipAndcount(Text ip,IntWritable count){ this.ip =ip; this.count = count; } @Override public void readFields(DataInput input) throws IOException { // TODO Auto-generated method stub ip.readFields(input); count.readFields(input); } @Override public void write(DataOutput output) throws IOException { // TODO Auto-generated method stub ip.write(output); count.write(output); } @Override public int compareTo(ipAndcount o) { // TODO Auto-generated method stub return ((ipAndcount)o).count.compareTo(count)==0?ip.compareTo(((ipAndcount)o).ip) :((ipAndcount)o).count.compareTo(count); } public boolean equals(ipAndcount o){ if(!(o instanceof ipAndcount)){ return false; } ipAndcount other = (ipAndcount)o; return ip.equals(other.ip) &&(count.equals(other.count)); } public String toString(){ StringBuffer buf = new StringBuffer("IP="); buf.append(ip.toString()); buf.append(",Count="); buf.append(count.toString()); buf.append(";"); return buf.toString(); } public Text getIp(){ return ip; } public IntWritable getCount(){ return count; } public void setCount(IntWritable count){ this.count = count; } }
此问题 应该分为俩个作业进行完成,一个用于统计IP及其整合的数量(类似WordCount)另一个用于选择出前K个进行输出:
package reverseIndex; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob; import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; //分为2个作业进行 完成 一个 用于统计每日的访问ip 另一个用于选择出前K个 访问高的ip public class firstK { public static class FindIpMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private IntWritable one = new IntWritable(1); public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ context.write(value,one); } } public static class IpReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ public void reduce(Text key,Iterable<IntWritable>values,Context context) throws IOException, InterruptedException{ int sum = 0; for(IntWritable val : values){ sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static class beforeSortIpmapper extends Mapper<Text,Text,ipAndcount,Text>{ public void map(Text key,Text value,Context context) throws IOException, InterruptedException{ ipAndcount tmp = new ipAndcount(key,new IntWritable(Integer.valueOf(value.toString()))); context.write(tmp,new Text()); } } public static class selectTopKReducer extends Reducer<ipAndcount,Text,ipAndcount,Text>{ int count = 0; int k = 10; public void reduce(ipAndcount key,Iterable<Text> values,Context context) throws IOException, InterruptedException{ if(count<k){ context.write(key, null); count++; } } } public static void main(String[] args) throws IOException { // TODO Auto-generated method stub Configuration conf = new Configuration(); Job job1 = new Job(conf,"sum ip"); job1.setJarByClass(firstK.class); //默认输入输出格式 job1.setInputFormatClass(TextInputFormat.class); job1.setOutputFormatClass(TextOutputFormat.class); //读取文件路径 和输出路径 Path in = new Path(args[0]); Path out = new Path(args[1]); FileInputFormat.addInputPath(job1,in); FileOutputFormat.setOutputPath(job1,out); //设置map的输入输出格式 job1.setMapOutputKeyClass(Text.class); job1.setMapOutputValueClass(IntWritable.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(IntWritable.class); //设置处理类 job1.setMapperClass(FindIpMapper.class); job1.setReducerClass(IpReducer.class); //reduce任务个数 job1.setNumReduceTasks(7); //作业2的配置 Configuration conf2 = new Configuration(); Job job2 = new Job(conf2,"select K"); job1.setJarByClass(firstK.class); job1.setInputFormatClass(KeyValueTextInputFormat.class); job1.setOutputFormatClass(TextOutputFormat.class); Path in2 = new Path(args[1]); Path out2 = new Path(args[2]); FileInputFormat.addInputPath(job2,in2); FileOutputFormat.setOutputPath(job2,out2); job1.setMapOutputKeyClass(ipAndcount.class); job1.setMapOutputValueClass(Text.class); job1.setOutputKeyClass(ipAndcount.class); job1.setOutputValueClass(Text.class); job1.setMapperClass(beforeSortIpmapper.class); job1.setReducerClass(selectTopKReducer.class); job1.setNumReduceTasks(1); //作业的关联性 使用jobcontrol进行处理 JobControl jc = new JobControl("select k ip"); ControlledJob cjob1 = new ControlledJob(conf); cjob1.setJob(job1); ControlledJob cjob2 = new ControlledJob(conf2); cjob2.setJob(job2); jc.addJob(cjob1); jc.addJob(cjob2); //依赖关系 cjob2.addDependingJob(cjob1); jc.run(); } }
时间: 2024-10-24 23:46:19