LatAndLonRange项目记录

Mapper:

package latandlonRange;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import au.com.bytecode.opencsv.CSVParser;

public class LatAndLonRangeMapper extends Mapper<LongWritable,Text,Text, Text>{
    protected void map(LongWritable key, Text value, Context context){
        if (key.get() > 0) {
            CSVParser parser = new CSVParser();
            try{
                String[] data = parser.parseLine(value.toString());
                //1:bus_line,2:start_station,3:end_station,4:stationnumber,5:stationname,
                //6:rawlongitude,7:rawlatitude,8:alter_line_code,9:alter_label,10:alter_flag,
                //11:line_code,12:rawlink
                if(data[5]!=null&&data[6]!=null){//两个字段都不为空时,执行
                    String outkey=data[0];
                    String outValue=data[5]+‘\t‘+data[6];
                    //只取出rawlongitude和rawlatitude两个
                    System.out.println("MapoutKey:"+outkey);
                    System.out.println("MapoutValue:"+outValue);
                    context.write(new Text(outkey), new Text(outValue));
                }
            }catch (IOException | InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

Reduce:

package latandlonRange;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class LatAndLonRangeReduce extends Reducer<Text, Text, Text, Text>{
    Float minlat=new Float(1000.0);
    Float maxlat=new Float(0.0);
    Float minlon=new Float(1000.0);
    Float maxlon=new Float(0.0);
    public void reduce(Text key,Iterable<Text> values, Context context){
        try {
            for (Text  val : values) {
                String inputValue = val.toString();
                   String[] valueData = inputValue.split("\t");
                   float lon= Float.parseFloat(valueData[0]);
                   float lat= Float.parseFloat(valueData[1]);
                   minlat=lon<minlat?lat:minlat;
                   maxlat=lon>maxlat?lat:maxlat;
                   minlon=lon<minlon?lon:minlon;
                   maxlon=lon>maxlon?lon:maxlon;
                   System.out.print("minlat:"+minlat+‘\t‘);
                   System.out.print("maxlat:"+maxlat+‘\t‘);
                   System.out.print("minlon:"+minlon+‘\t‘);
                   System.out.println("maxlon:"+maxlon);
                   String outkey_reduce=key.toString();
                   String outputStr=new String(minlat.toString()+‘\t‘+maxlat.toString()+‘\t‘+minlon.toString()+‘\t‘+maxlon.toString());
                   context.write(new Text(outkey_reduce), new Text(outputStr));
            }
        }catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

Job:

package latandlonRange;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LatAndLonRangeJob {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        try{
            Job job=new Job();
            job.setJarByClass(LatAndLonRangeJob.class);

            String inpath = new String("hdfs://172.18.32.177:9000/Amelie-ting/total_line_station_info_201508.csv");
            String outpath = new String("hdfs://172.18.32.177:9000/Amelie-ting/LatandlonRange/");
            FileInputFormat.addInputPath(job, new Path(inpath));
            FileOutputFormat.setOutputPath(job, new Path(outpath));  

            job.setMapperClass(LatAndLonRangeMapper.class);
            job.setReducerClass(LatAndLonRangeReduce.class);  

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);  

            System.exit(job.waitForCompletion(true)?0:1);
        }catch (IOException | ClassNotFoundException | InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

此代码运行不成功:不是我想要的:接下来作修改:

时间: 2024-11-04 11:45:30

LatAndLonRange项目记录的相关文章

项目记录:spring+springmvc 项目中 @Transactional 失效的解决方法

第一步,修改spring的配置文件和springmvc的配置文件 --------------------------------applicationContext.xml <context:annotation-config/>  <context:component-scan base-package="com.xxx"> <context:exclude-filter type="annotation" expression=&

项目记录:springmvc+freemarker 实现国际化

第一步,在SpringMVC的配置文件中,添加如下支持国际化的两段配置 <bean id="messageSource" class="org.springframework.context.support.ResourceBundleMessageSource"> <property name="useCodeAsDefaultMessage" value="true" /> <property

项目记录:springmvc forward redirect 问题

@RequestMapping("/redirect")public String redirect(RedirectAttributes redirectAttributes){redirectAttributes.addFlashAttribute("test", "testdata"); //专供此种情况下使用.return "redirect:read";} 注意:此种情况下,网址会跳转的同时,还携带着一个名字为“te

开源项目记录

数据存储 MongoDb 针对大数据量.高并发.弱事务的互联网应用 MemCache 简单的key-value存储,读取内存,效率高 Redis 可存储list,持久化,内存读取效率高 Cassandra 分布式数据库,更好的扩展性,对大数据更好的支持 http://www.ibm.com/developerworks/cn/opensource/os-cn-cassandra/ 开源项目记录,布布扣,bubuko.com

项目记录 -- python调用回调函数

C源文件: 1 static int 2 get_callback(zpool_handle_t *zhp, void *data) 3 { 4 zprop_get_cbdata_t *cbp = (zprop_get_cbdata_t *)data; 5 char value[MAXNAMELEN]; 6 zprop_source_t srctype; 7 zprop_list_t *pl; 8 9 for (pl = cbp->cb_proplist; pl != NULL; pl = pl

项目记录,仿今日头条app

项目记录,仿今日头条app,五六月份主要做的项目,第一版已经完成上架,二次开发正在进行中

项目记录2:整合SSH2

本文内容来自:<传智播客-OA项目> 一,集成 Spring 与 Hibernate    1,配置SessionFactory        1,配置            ---------------------- applicationContext.xml ------------------------            <!-- 配置SessionFactory(整合Hibernate) -->            <context:property-pl

项目记录3:基础功能

本文内容来自:<传智播客-OA项目> 1,设计 BaseDao 与 BaseDaoImpl    1,设计接口 BaseDao        1,每个实体都应有一个对应的Dao接口,封装了对这个实体的数据库操作.例            实体            Dao接口                实现类            ========================================================            User        

项目记录 -- config2html 理解

html 代码: 1 <table width=1280 border=0 cellspacing=1 cellpadding=1> 2 <tr id=tblhdr> 3 <td><b>pool: {{ d['pool']}} </b></td> 4 </tr> 5 <tr id=row1><td>state: {{ d['state']}} </td></tr> 6 <