MR案例:基站相关01

字段解释: product_no:用户手机号; lac_id:用户所在基站; start_time:用户在此基站的开始时间; staytime:用户在此基站的逗留时间。

product_no lac_id moment start_time user_id county_id staytime city_id
13429100031 22554 8 2013-03-11 08:55:19.151754088 571 571 282 571
13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 103 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100140 26642 9 2013-03-11 09:02:19.151754088 571 571 18 571
13429100082 22691 8 2013-03-11 08:57:32.151754088 571 571 287 571
13429100189 22558 8 2013-03-11 08:56:24.139539816 571 571 48 571
13429100349 22503 8 2013-03-11 08:54:30.152622440 571 571 211 571

需求描述:  根据 lac_id 和 start_time 知道用户当时的位置,根据 staytime 知道用户各个基站的逗留时长。根据轨迹合并连续基站的 staytime。最终得到每一个用户按时间排序在每一个基站驻留时长。
期望输出:

13429100082 22540 8 2013-03-11 08:58:20.152622488 571 571 270 571
13429100082 22691 8 2013-03-11 08:56:37.149593624 571 571 390 571
13429100082 22540 8 2013-03-11 08:55:38.140225200 571 571 133 571
13429100087 22705 8 2013-03-11 08:56:51.139539816 571 571 220 571
13429100087 22540 8 2013-03-11 08:55:45.150276800 571 571 66 571

问题分析:针对每个product_no按照start_time进行排序(本例降序),如果相邻两项的lac_id相同,则将staytime进行相加保存到后一项中,并将前一项移除。

完整代码v1:此版本只启用了Map阶段。map()函数:将每行内容解析成自定义的RecordWritable对象并添加到List集合中,然后对List集合进行排序。clearup()函数:将product_no和lac_id相同的相邻两项中的staytime进行相加。

缺点:将全部数据添加到List集合,对于大数据量无法满足要求。

package demo0902;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Demo090203 {
    final static String INPUT_PATH = "hdfs://10.16.17.182:9000/test/in/0902/";
    final static String OUT_PATH = "hdfs://10.16.17.182:9000/test/out/0902/06";

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(Demo090203.class);

        //指定map        job.setMapperClass(Demo090201Mapper.class);

        job.setMapOutputKeyClass(RecordWritable.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(RecordWritable.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

        job.waitForCompletion(true);

    }
    //map
    public static class Demo090201Mapper extends Mapper<LongWritable, Text, RecordWritable, NullWritable>{

        //存储一条记录
        ArrayList<RecordWritable> list = new ArrayList<RecordWritable>();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] splited = value.toString().split("\t");

            //将一行内容组装成一条记录
            RecordWritable record = new RecordWritable();
            record.product_no=splited[0];
            record.lac_id=splited[1];
            record.moment=Integer.parseInt(splited[2]);
            record.start_time=splited[3];
            record.user_id=splited[4];
            record.county_id=splited[5];
            record.staytime=Integer.parseInt(splited[6]);
            record.city_id=splited[7];

            list.add(record);    

            //对List中数据进行排序(自定义比较器)
            Collections.sort(list, new Comparator<RecordWritable>() {
                @Override
                public int compare(RecordWritable r1, RecordWritable r2) {    

                    //调用RecordWritable的compareTo()方法
                    return (r1.compareTo(r2));
                }
            });
        }

        @Override
        protected void cleanup(Context context)
                throws IOException, InterruptedException {

            for(RecordWritable r : list){
                System.out.println(r.toString());
            }

            for(int i=0; i<list.size() ;i++){
                if(i != list.size()-1){

                    //取出相邻的两个RecordWritable
                    RecordWritable record_pre = list.get(i);
                    RecordWritable record_next = list.get(i+1);                

                    //只有手机号和基站号都相等的情况下,才将 staytime 相加
                    if(record_pre.product_no.equals(record_next.product_no) && record_pre.lac_id.equals(record_next.lac_id)){

                        //将相加后的staytime赋予后一条记录
                        record_next.staytime += record_pre.staytime;

                        //移除前一条记录
                        list.remove(record_pre);
                    }
                }
            }
            for(RecordWritable record : list){
                context.write(record, NullWritable.get());
            }
        }
    }

    //自定义的序列化类
    public static class RecordWritable implements WritableComparable<RecordWritable>{
        String product_no;
        String lac_id;
        int moment;
        String start_time;
        String user_id;
        String county_id;
        int staytime;
        String city_id;

        @Override
        public int compareTo(RecordWritable o) {
            // 先按手机号排序 Asc
            int value = this.product_no.compareTo(o.product_no);
            if(value==0)
                // 再按时间进行排序 Desc
                return o.start_time.compareTo(this.start_time);
            return value;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(product_no);
            out.writeUTF(lac_id);
            out.writeInt(moment);
            out.writeUTF(start_time);
            out.writeUTF(user_id);
            out.writeUTF(county_id);
            out.writeInt(staytime);
            out.writeUTF(city_id);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            product_no=in.readUTF();
            lac_id=in.readUTF();
            moment=in.readInt();
            start_time=in.readUTF();
            user_id=in.readUTF();
            county_id=in.readUTF();
            staytime=in.readInt();
            city_id=in.readUTF();
        }

        @Override
        public String toString() {
            return this.product_no+" "+this.lac_id+" "+this.moment+" "+this.start_time+" "+user_id+" "+county_id+" "+ staytime+" "+city_id;
        }
    }
}

完整代码v2:此版本Map阶段以product_no为key,每行内容为value进行输出。Reduce阶段和上一个版本的Map阶段功能类似。

优点:相比于v1,此版本优化在于每次只处理一个product_no相关的数据,减缓数据量带来的压力。

package demo0902;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Demo090204 {
    final static String INPUT_PATH = "hdfs://10.16.17.182:9000/test/in/0902/";
    final static String OUT_PATH = "hdfs://10.16.17.182:9000/test/out/0902/02";

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        job.setJarByClass(Demo090203.class);

        job.setMapperClass(Demo090201Mapper.class);
        job.setReducerClass(Demo090201Reducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(RecordWritable.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

        job.waitForCompletion(true);

    }
    //map
    public static class Demo090201Mapper extends Mapper<LongWritable, Text, Text, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] splited = value.toString().split("\t");

            context.write(new Text(splited[0]), new Text(value));
        }
    }

    //reduce
    public static class Demo090201Reducer extends Reducer<Text, Text, RecordWritable, NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<Text> v2s, Context context)
                throws IOException, InterruptedException {

            ArrayList<RecordWritable> list = new ArrayList<RecordWritable>();

            for(Text text : v2s){
                String[] splited = text.toString().split("\t");

                RecordWritable record = new RecordWritable();
                record.product_no=splited[0];
                record.lac_id=splited[1];
                record.moment=Integer.parseInt(splited[2]);
                record.start_time=splited[3];
                record.user_id=splited[4];
                record.county_id=splited[5];
                record.staytime=Integer.parseInt(splited[6]);
                record.city_id=splited[7];

                list.add(record);
            }

            //对List中数据进行排序(自定义比较器)
            Collections.sort(list, new Comparator<RecordWritable>() {
                @Override
                public int compare(RecordWritable r1, RecordWritable r2) {    

                    //调用RecordWritable的compareTo()方法
                    return (r1.compareTo(r2));
                }
            });

            for(int i=0; i<list.size() ;i++){

                //滤过最后一条记录
                if(i != list.size()-1){

                    //取出相邻的两个RecordWritable
                    RecordWritable record_pre = list.get(i);
                    RecordWritable record_next = list.get(i+1);        

                    if(record_pre.lac_id.equals(record_next.lac_id)){

                        //将相加后的staytime赋予后一条记录
                        record_next.staytime += record_pre.staytime;

                        //移除前一条记录
                        list.remove(record_pre);
                    }
                }
            }
            for(RecordWritable record : list){
                context.write(record, NullWritable.get());
            }
        }
    }
    //自定义的序列化类
    public static class RecordWritable implements WritableComparable<RecordWritable>{
        String product_no;
        String lac_id;
        int moment;
        String start_time;
        String user_id;
        String county_id;
        int staytime;
        String city_id;

        @Override
        public int compareTo(RecordWritable o) {
            // 先按手机号排序 Asc
            int value = this.product_no.compareTo(o.product_no);
            if(value==0)
                // 再按时间进行排序 Desc
                return o.start_time.compareTo(this.start_time);
            return value;
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(product_no);
            out.writeUTF(lac_id);
            out.writeInt(moment);
            out.writeUTF(start_time);
            out.writeUTF(user_id);
            out.writeUTF(county_id);
            out.writeInt(staytime);
            out.writeUTF(city_id);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            product_no=in.readUTF();
            lac_id=in.readUTF();
            moment=in.readInt();
            start_time=in.readUTF();
            user_id=in.readUTF();
            county_id=in.readUTF();
            staytime=in.readInt();
            city_id=in.readUTF();
        }

        @Override
        public String toString() {
            return this.product_no+" "+this.lac_id+" "+this.moment+" "+this.start_time+" "+user_id+" "+county_id+" "+ staytime+" "+city_id;
        }
    }
}
时间: 2024-10-18 02:25:15

MR案例:基站相关01的相关文章

IO流相关01

File类的常用静态方法: void AppendAllText(string path, string contents),将文本contents附加到文件path中 bool Exists(string path)判断文件path是否存在 string ReadAllText(string path) 读取文本文件到字符串中 string[] ReadAllLines(string path) 读取文本文件到字符串数组中 void WriteAllText(string path, stri

MR案例:CombineFileInputFormat

此案例让我明白了三点:详见 解读:MR多路径输入 和 解读:CombineFileInputFormat类 对于单一输入路径情况: //指定输入格式CombineFileInputFormat job.setInputFormatClass(CombineTextInputFormat.class); //指定SplitSize CombineTextInputFormat.setMaxInputSplitSize(job, 60*1024*1024L); //指定输入路径 CombineTex

MR案例:输出/输入SequenceFile

SequenceFile文件是Hadoop用来存储二进制形式的key-value对而设计的一种平面文件(Flat File).在SequenceFile文件中,每一个key-value对被看做是一条记录(Record),基于Record的压缩策略,SequenceFile文件支持三种压缩类型: NONE: 对records不进行压缩; (组合1) RECORD: 仅压缩每一个record中的value值; (组合2) BLOCK: 将一个block中的所有records压缩在一起;(组合3) p

MR案例:倒排索引

1.map阶段:将单词和URI组成Key值(如“MapReduce :1.txt”),将词频作为value. 利用MR框架自带的Map端排序,将同一文档的相同单词的词频组成列表,传递给Combine过程,实现类似于WordCount的功能. Class Map<Longwritable, Text, Text, Longwritable>{ method map(){ //获取输入分片对应的文件名 String fileName=((FileSplit)context.getInputSpli

MR案例:WordCount改写

请参照wordcount实现一个自己的MapReduce,需求为: a. 输入文件格式: xxx,xxx,xxx,xxx,xxx,xxx,xxx b. 输出文件格式: xxx,20 xxx,30 xxx.40 c. 功能:根据命令行参数统计输入文件中指定关键字出现的次数,并展示出来 例如:hadoop jar xxxxx.jar keywordcount xxx,xxx,xxx,xxx(四个关键字) package demo0830; import org.apache.hadoop.conf.

0-C相关01:NSlog函数介绍。

  NSlog()函数介绍: 首先:NSlog()函数是cocoa的框架中提供的一个方法: 下图中最上方是它在Xcode中的路径: : 同样都是输出函数.下边我们来看一下,在O-C中NSlog()和在 c 语言中的printf的一些不同: 1.nslog 和printf都可以输出字符串到控制台.@"1213244" @开头表示oc的字符串. 2.NSlog()在打印时能自带一次自动换行,后者没有,想换行需要\手动添加"\n".当然在NSlog()中也可以手动添加&q

MR案例:多表关联

问题描述:两种类型输入文件:address类(地址)和company类(公司)进行一对多的关联查询,得到地址名(例如:Beijing)与公司名(例如:Beijing JD.Beijing Red Star)的关联信息. 1.map阶段:对比于前者的单表关联可知,reduce阶段的key必须为关联两表的key,即address.Id = company.Id.则两表经过map处理输出的key必须是Id. Class Map<LongWritable, Text, LongWritable, Tex

MR案例:小文件合并SequeceFile

SequeceFile是Hadoop API提供的一种二进制文件支持.这种二进制文件直接将<key, value>对序列化到文件中.可以使用这种文件对小文件合并,即将文件名作为key,文件内容作为value序列化到大文件中.这种文件格式有以下好处: 1). 支持压缩,且可定制为基于Record或Block压缩(Block级压缩性能较优)2). 本地化任务支持:因为文件可以被切分,因此MapReduce任务时数据的本地化情况应该是非常好的.3). 难度低:因为是Hadoop框架提供的API,业务

大数据采集、清洗、处理:使用MapReduce进行离线数据分析完整案例

[TOC] 1 大数据处理的常用方法 大数据处理目前比较流行的是两种方法,一种是离线处理,一种是在线处理,基本处理架构如下: 在互联网应用中,不管是哪一种处理方式,其基本的数据来源都是日志数据,例如对于web应用来说,则可能是用户的访问日志.用户的点击日志等. 如果对于数据的分析结果在时间上有比较严格的要求,则可以采用在线处理的方式来对数据进行分析,如使用Spark.Storm等进行处理.比较贴切的一个例子是天猫双十一的成交额,在其展板上,我们看到交易额是实时动态进行更新的,对于这种情况,则需要