Mapper:
package latandlonRange; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import au.com.bytecode.opencsv.CSVParser; public class LatAndLonRangeMapper extends Mapper<LongWritable,Text,Text, Text>{ protected void map(LongWritable key, Text value, Context context){ if (key.get() > 0) { CSVParser parser = new CSVParser(); try{ String[] data = parser.parseLine(value.toString()); //1:bus_line,2:start_station,3:end_station,4:stationnumber,5:stationname, //6:rawlongitude,7:rawlatitude,8:alter_line_code,9:alter_label,10:alter_flag, //11:line_code,12:rawlink if(data[5]!=null&&data[6]!=null){//两个字段都不为空时,执行 String outkey=data[0]; String outValue=data[5]+‘\t‘+data[6]; //只取出rawlongitude和rawlatitude两个 System.out.println("MapoutKey:"+outkey); System.out.println("MapoutValue:"+outValue); context.write(new Text(outkey), new Text(outValue)); } }catch (IOException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
Reduce:
package latandlonRange; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class LatAndLonRangeReduce extends Reducer<Text, Text, Text, Text>{ Float minlat=new Float(1000.0); Float maxlat=new Float(0.0); Float minlon=new Float(1000.0); Float maxlon=new Float(0.0); public void reduce(Text key,Iterable<Text> values, Context context){ try { for (Text val : values) { String inputValue = val.toString(); String[] valueData = inputValue.split("\t"); float lon= Float.parseFloat(valueData[0]); float lat= Float.parseFloat(valueData[1]); minlat=lon<minlat?lat:minlat; maxlat=lon>maxlat?lat:maxlat; minlon=lon<minlon?lon:minlon; maxlon=lon>maxlon?lon:maxlon; System.out.print("minlat:"+minlat+‘\t‘); System.out.print("maxlat:"+maxlat+‘\t‘); System.out.print("minlon:"+minlon+‘\t‘); System.out.println("maxlon:"+maxlon); String outkey_reduce=key.toString(); String outputStr=new String(minlat.toString()+‘\t‘+maxlat.toString()+‘\t‘+minlon.toString()+‘\t‘+maxlon.toString()); context.write(new Text(outkey_reduce), new Text(outputStr)); } }catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
Job:
package latandlonRange; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LatAndLonRangeJob { public static void main(String[] args) { // TODO Auto-generated method stub try{ Job job=new Job(); job.setJarByClass(LatAndLonRangeJob.class); String inpath = new String("hdfs://172.18.32.177:9000/Amelie-ting/total_line_station_info_201508.csv"); String outpath = new String("hdfs://172.18.32.177:9000/Amelie-ting/LatandlonRange/"); FileInputFormat.addInputPath(job, new Path(inpath)); FileOutputFormat.setOutputPath(job, new Path(outpath)); job.setMapperClass(LatAndLonRangeMapper.class); job.setReducerClass(LatAndLonRangeReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true)?0:1); }catch (IOException | ClassNotFoundException | InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
此代码运行不成功:不是我想要的:接下来作修改:
时间: 2024-11-04 11:45:30