数据去重,key只输出一次
scala实现:先groupByKey(),然后SortByKey(),然后输出keys
object Reduplicate { def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local").setAppName("remove duplication"); val sc=new SparkContext(conf); val line=sc.textFile(""); line.filter(_.trim.length>0).map(line=>(line.trim,"")).groupByKey().sortByKey().keys.collect() } } MapReduce实现:以整个数据作为key,reduce过程中输出key就可以了
package HadoopvsSpark; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import java.io.IOException; //对数据进行去重,以整个数据作为key,reduce过程中输出key就可以了 /** * Created by Administrator on 2017/5/25. */public class Duplicate { public static class Map extends Mapper<Object,Text,Text,Text>{ private static Text text=new Text( ); public void map(Object key,Text value,Context context) throws IOException, InterruptedException { text=value; context.write( new Text(text),new Text(" ") ); } } public static class Reduce extends Reducer<Text,Text,Text,Text>{ public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException { context.write( key,new Text( "" ) ); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job=new Job(); Configuration conf=new Configuration(); conf.set("mapred.job.tracker","192.169.1.101:8200"); String[] input=new String[]{"a","s"}; String[] otherArgs=new GenericOptionsParser(conf,input).getRemainingArgs(); if(otherArgs.length!=2){ System.err.println("Usage: Data Deduplication <in> <out>"); System.exit(2); } job.setJarByClass(Duplicate.class); FileInputFormat.addInputPath( job,new Path( otherArgs[0] ) ); FileOutputFormat.setOutputPath( job,new Path(otherArgs[1]) ); job.setOutputKeyClass( Text.class); job.setOutputValueClass( Text.class ); job.setMapperClass( Map.class ); job.setCombinerClass( Reduce.class ); job.setReducerClass( Reduce.class); System.out.println(job.waitForCompletion( true )? 1:0 ); } }
时间: 2024-10-24 21:41:28