输入n个数,返回TOP5的数字
scala实现,以各个数字为key,""为空,按照key进行排序,取出前5个
object Top5 { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("") val sc = new SparkContext(conf) val one = sc.textFile("/spark/test") var index=0 val text=one.filter(x=>(x.trim.length>0)&&(x.split(",").length==4)).map(_.split(",")(2).toInt). map(x=>(x,"")).sortByKey(false).map(x=>x._1).take(5).foreach(x=>{ index=index+1 println("top index:"+index+"\t"+x) }) }} Mapreduce实现,(key,"") =>(index+"",key) MapReduce中的IntWritable默认是按照降序排列的,要实现升序排序,自己实现MyIntWritabel
public class MyIntWritable implements WritableComparable<MyIntWritable> { private Integer num; public MyIntWritable(Integer num){ this.num=num; } public MyIntWritable(){} public void write(DataOutput output) throws IOException { output.writeInt(num); } public void readFields(DataInput input) throws IOException { this.num=input.readInt(); } public int compareTo(MyIntWritable o){ int minux=this.num-o.num; return minux*(-1); } @Override public int hashCode() { return this.num.hashCode(); } public String toSting(){ return this.num+""; } public boolean equals(Object obj) { if (obj instanceof MyIntWritable) { return false; } MyIntWritable ok2 = (MyIntWritable) obj; return (this.num == ok2.num); }}
package HadoopvsSpark; import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * Created by Administrator on 2017/5/26. */public class TopN { public static class TopNMapper extends Mapper<LongWritable,Text,MyIntWritable,Text>{ public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { String line=value.toString(); if(line.trim().length()>0){ String str[]=line.split( "," ); if(str.length==4){ context.write( new MyIntWritable( Integer.parseInt( str[2] ) ),new Text( "" ) ); } } } } public static class TopNReducer extends Reducer<MyIntWritable,Text,Text,MyIntWritable>{ private int index=0; public void reduce(MyIntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException { index++; if(index<=5){ context.write( new Text( index+" " ),key ); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { org.apache.hadoop.conf.Configuration conf=new org.apache.hadoop.conf.Configuration(); Job job=new Job(conf,"topn"); job.setJarByClass( TopN.class ); job.setMapperClass( TopNMapper.class ); job.setMapOutputKeyClass( MyIntWritable.class ); job.setMapOutputValueClass( Text.class ); job.setReducerClass( TopNReducer.class ); job.setOutputKeyClass( Text.class); job.setOutputValueClass( MyIntWritable.class ); FileInputFormat.addInputPath( job,new Path( args[0] ) ); Path outputdir=new Path( args[1] ); FileSystem fs=FileSystem.get( conf ); //判断输出目录是否存在 if(fs.exists( outputdir )){ fs.delete( outputdir,true ); } FileOutputFormat.setOutputPath( job,outputdir ) ; System.out.println(job.waitForCompletion( true )?1:0); }}
时间: 2024-12-20 11:44:26