public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapred.job.tracker", Constants.HADOOP_MAIN_IP + Constants.MAO_HAO + Constants.HADOOP_MAIN_PORT); if (args.length != 3) { System.err.println("Usage: Data Deduplication <in> <out> <reduceNum>"); System.exit(2); } Job job = new Job(conf, "ETLTld Job"); job.setJarByClass(ETLTldMain.class); job.setMapperClass(ETLTldMapper.class); job.setReducerClass(ETLTldReducer.class); job.setInputFormatClass(LzoTextInputFormat.class); job.setNumReduceTasks(Integer.parseInt(args[2])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class); int result = job.waitForCompletion(true) ? 0 : 1; if (result == 0) { LzoIndexer lzoIndexer = new LzoIndexer(conf); lzoIndexer.index(new Path(args[1])); System.exit(result); } else if(result == 1){ System.exit(result); } }
如果已经有了lzo文件,可以采用如下方法添加索引:
bin/yarn jar /module/cloudera/parcels/GPLEXTRAS-5.4.0-1.cdh5.4.0.p0.27/lib/hadoop/lib/hadoop-lzo-0.4.15-cdh5.4.0.jar com.hadoop.compression.lzo.DistributedLzoIndexer /user/hive/warehouse/cndns.db/ods_cndns_log/dt=20160803/node=alicn/part-r-00000.lzo
lzo格式默认是不支持splitable的,需要为其添加索引文件,才能支持多个map并行对lzo文件进行处理。
【参考】http://blog.csdn.net/wisgood/article/details/17080361
时间: 2024-10-26 14:05:53