MapReduce序列化及分区的java代码示例

概述

  序列化(Serialization)是指把结构化对象转化为字节流。

  反序列化(Deserialization)是序列化的逆过程。把字节流转为结构化对象。

  当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节流转换为对象,就要进行反序列化。

  Java 的序列化(Serializable)是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系…),不便于在网络中高效传输;所以,hadoop 自己开发了一套序列化机制( Writable),精简,高效。不用像 java 对象类一样传输多层的父子关系,需要哪个属性就传输哪个属性值,大大的减少网络传输的开销。

  Writable是Hadoop的序列化格式,hadoop定义了这样一个Writable接口。一个类要支持可序列化只需实现这个接口即可。

public interface Writable {
    void write(DataOutput out) throws IOException;
    void readFields(DataInput in) throws IOException;
}

  如需要将自定义的 bean 放在 key 中传输,则还需要实现 comparable 接口,因为 mapreduce 框中的 shuffle 过程一定会对 key 进行排序,此时,自定义的bean 实现的接口应该是:WritableComparable

代码示例

  1 . 需求

    统计每一个用户(手机号)所耗费的总上行流量、下行流量,总流量结果的基础之上再加一个需求:将统计结果按照总流量倒序排序。

  准备数据

1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157985066     13726230503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157995052     13826544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4            4    0    264    0    200
1363157991076     13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200
1363154400022     13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200
1363157993044     18211575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    视频网站    15    12    1527    2106    200
1363157995074     84138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200
1363157993055     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
1363157995033     15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    3156    2936    200
1363157983019     13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200
1363157984041     13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站点统计    24    9    6960    690    200
1363157973098     15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    3659    3538    200
1363157986029     15989002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站点统计    3    3    1938    180    200
1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
1363157992093     13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157986041     13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    180    180    200
1363157984040     13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    综合门户    15    12    1938    2910    200
1363157995093     13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200
1363157982040     13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    综合门户    57    102    7335    110349    200
1363157986072     18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    2412    200
1363157990043     13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    48243    200
1363157988072     13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200
1363157985066     13726238888    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
1363157993055     13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200

  2 . 分析

    实现自定义的bean 来封装流量信息,并将bean 作为 map 输出的 key 来传输

    MR 程序在处理数据的过程中会对数据排序(map 输出的 kv 对传输到 reduce之前,会排序),排序的依据是 map 输出的 key。所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到 key 中,让 key 实现接口:WritableComparable,然后重写 key 的 compareTo 方法。

  3 . 未排序的实现

    自定义JavaBean

public class FlowBean implements WritableComparable<FlowBean>{
    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }
    public FlowBean(long upFlow, long downFlow, long sumFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = sumFlow;
    }
    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }
    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }
    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }
    public long getDownFlow() {
        return downFlow;
    }
    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }
    public long getSumFlow() {
        return sumFlow;
    }
    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    /**
     * 序列化方法
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    /**
     * 反序列化方法
     * 先序列化的先反序列化
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.sumFlow = in.readLong();
    }

    /**
     * 指定对象排序的方法
     *  如果指定的数与参数相等返回 0。
     *  如果指定的数小于参数返回 -1。
     *  如果指定的数大于参数返回 1。
     */
    @Override
    public int compareTo(FlowBean o) {
        return this.getSumFlow() > o.getSumFlow() ? -1 : 1 ;//按照指定的总流量的倒序排序
//        return this.getSumFlow() > o.getSumFlow() ? 1 : -1 ;//按照指定的总流量的正序排序
    }
}

    Mapper方法

public class FlowSumMapper extends Mapper<LongWritable,Text,Text,FlowBean> {
    Text k = new Text();
    FlowBean v = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\t");

        String phoNum = fields[1];//提前目标文件中的手机号
        long upFlow = Long.parseLong(fields[fields.length-3]);//提取目标文件中的上行流量
        long downFlow = Long.parseLong(fields[fields.length-2]);//提取目标文件中的下行流量

        k.set(phoNum);
        v.set(upFlow,downFlow);
        context.write(k,v);
    }
}

    Reducer方法

public class FlowSumReducer extends Reducer<Text,FlowBean,Text,FlowBean> {
    FlowBean v = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long sumUpFlow = 0;
        long sumDownFlowd = 0;
        for (FlowBean value : values) {
            sumUpFlow += value.getUpFlow();//获取每条记录的上行流量并计算总和
            sumDownFlowd += value.getDownFlow();//获取每条记录的下行流量并计算总和
        }
        v.set(sumUpFlow ,sumDownFlowd);
        context.write(key,v);
    }
}

    主方法

public class FlowSumRunner {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        //指定mr程序使用本地模式模拟一套环境执行mr程序,一般用于本地代码测试
        conf.set("mapreduce.framework.name","local");
        //通过job方法获得mr程序运行的实例
        Job job = Job.getInstance(conf);

        //指定本次mr程序的运行主类
        job.setJarByClass(FlowSumRunner.class);
        //指定本次mr程序使用的mapper reduce
        job.setMapperClass(FlowSumMapper.class);
        job.setReducerClass(FlowSumReducer.class);
        //指定本次mr程序map输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        //指定本次mr程序reduce输出的数据类型,也就是说最终的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        //指定本次mr程序待处理数据目录   输出结果存放目录
        FileInputFormat.addInputPath(job,new Path("D:\\flowsum\\input"));
        FileOutputFormat.setOutputPath(job,new Path("D:\\flowsum\\output"));

        //提交本次mr程序
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);//程序执行成功,退出状态码为0,退出程序,否则为1
    }
}

    

   3 . 排序的实现

      使用上面的输出作为该需求的输入

    Mapper方法

public class FlowSumSortMapper extends Mapper<LongWritable,Text,FlowBean,Text> {
    FlowBean k = new FlowBean();
    Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fileds = line.split("\t");

        String phoNum = fileds[0];
        long sumUpFlow = Long.parseLong(fileds[1]);
        long sumDownFlow = Long.parseLong(fileds[2]);

        v.set(phoNum);
        k.set(sumUpFlow,sumDownFlow);
        context.write(k,v);
    }
}

    Reducer方法

public class FlowSumSortReducer extends Reducer<FlowBean,Text,Text,FlowBean> {
    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        Text phoNum = values.iterator().next();//iterator中只有一个值
        context.write(phoNum,key);
    }
}

    主方法

 1 //得出上题结果的基础之上再加一个需求:将统计结果按照总流量倒序排序
 2 public class FlowSumSortDriver {
 3     public static void main(String[] args) throws Exception{
 4         Configuration conf = new Configuration();
 5         //指定mr程序使用本地模式模拟一套环境执行mr程序,一般用于本地代码测试
 6         conf.set("mapreduce.framework.name","local");
 7
 8         //通过job方法获得mr程序运行的实例
 9         Job job = Job.getInstance(conf);
10
11         //指定本次mr程序的运行主类
12         job.setJarByClass(FlowSumSortDriver.class);
13         //指定本次mr程序使用的mapper reduce
14         job.setMapperClass(FlowSumSortMapper.class);
15         job.setReducerClass(FlowSumSortReducer.class);
16         //指定本次mr程序map输出的数据类型
17         job.setMapOutputKeyClass(FlowBean.class);
18         job.setMapOutputValueClass(Text.class);
19         //指定本次mr程序reduce输出的数据类型,也就是说最终的输出类型
20         job.setOutputKeyClass(Text.class);
21         job.setOutputValueClass(FlowBean.class);
22         //指定本次mr程序待处理数据目录   输出结果存放目录
23         FileInputFormat.addInputPath(job,new Path("D:\\flowsum\\output"));
24         FileOutputFormat.setOutputPath(job,new Path("D:\\flowsum\\outsortput"));
25
26         //提交本次mr程序
27         boolean b = job.waitForCompletion(true);
28         System.exit(b ? 0 : 1);//程序执行成功,退出状态码为0,退出程序,否则为1
29     }
30 }

Mapreduce的分区—Partitioner

1 .  需求

    将流量汇总统计结果按照手机归属地不同省份输出到不同文件中。

2 .  分析

    Mapreduce 中会将 map 输出的 kv 对,按照相同 key 分组,然后分发给不同的 reducetask。

    默认的分发规则为:根据 key 的 hashcode%reducetask 数来分发

    所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件 Partitioner,自定义一个 CustomPartitioner 继承抽象类:Partitioner,然后在job 对象中,设置自定义 partitioner: job.setPartitionerClass(CustomPartitioner.class)

3 .  实现

    自定义partitioner类

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
    public static HashMap<String, Integer>  provinceMap = new HashMap<String, Integer>();
    static{
        provinceMap.put("134", 0);
        provinceMap.put("135", 1);
        provinceMap.put("136", 2);
        provinceMap.put("137", 3);
        provinceMap.put("138", 4);
    }

    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        Integer code = provinceMap.get(key.toString().substring(0, 3));
        if (code != null) {
            return code;
        }
        return 5;
    }
}

    Mapper、Reducer及主方法

 1 public class FlowSumProvince {
 2  public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
 3      Text k = new Text();
 4      FlowBean  v = new FlowBean();
 5
 6      @Override
 7      protected void map(LongWritable key, Text value,Context context)
 8              throws IOException, InterruptedException {
 9              //拿取一行文本转为String
10              String line = value.toString();
11              //按照分隔符\t进行分割
12              String[] fileds = line.split("\t");
13              //获取用户手机号
14              String phoneNum = fileds[1];
15
16              long upFlow = Long.parseLong(fileds[fileds.length-3]);
17              long downFlow = Long.parseLong(fileds[fileds.length-2]);
18
19              k.set(phoneNum);
20              v.set(upFlow, downFlow);
21              context.write(k,v);
22         }
23     }
24
25     public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
26         FlowBean  v  = new FlowBean();
27         @Override
28         protected void reduce(Text key, Iterable<FlowBean> flowBeans,Context context) throws IOException, InterruptedException {
29             long upFlowCount = 0;
30             long downFlowCount = 0;
31
32             for (FlowBean flowBean : flowBeans) {
33                 upFlowCount += flowBean.getUpFlow();
34                 downFlowCount += flowBean.getDownFlow();
35             }
36             v.set(upFlowCount, downFlowCount);
37             context.write(key, v);
38     }
39
40     public static void main(String[] args) throws Exception{
41         Configuration conf = new Configuration();
42         Job job = Job.getInstance(conf);
43
44         //指定我这个 job 所在的 jar包位置
45         job.setJarByClass(FlowSumProvince.class);
46         //指定我们使用的Mapper是那个类  reducer是哪个类
47         job.setMapperClass(FlowSumProvinceMapper.class);
48         job.setReducerClass(FlowSumProvinceReducer.class);
49         // 设置我们的业务逻辑 Mapper 类的输出 key 和 value 的数据类型
50         job.setMapOutputKeyClass(Text.class);
51         job.setMapOutputValueClass(FlowBean.class);
52         // 设置我们的业务逻辑 Reducer 类的输出 key 和 value 的数据类型
53         job.setOutputKeyClass(Text.class);
54         job.setOutputValueClass(FlowBean.class);
55
56         //这里设置运行reduceTask的个数
57         job.setNumReduceTasks(6);
58
59         //这里指定使用我们自定义的分区组件
60         job.setPartitionerClass(ProvincePartitioner.class);
61
62         FileInputFormat.setInputPaths(job, new Path("D:\\flowsum\\input"));
63         // 指定处理完成之后的结果所保存的位置
64         FileOutputFormat.setOutputPath(job, new Path("D:\\flowsum\\outputProvince"));
65         boolean res = job.waitForCompletion(true);
66         System.exit(res ? 0 : 1);
67     }
68  }
69 }

原文地址:https://www.cnblogs.com/jifengblog/p/9277425.html

时间: 2024-10-15 12:11:52

MapReduce序列化及分区的java代码示例的相关文章

jxl创建Excel文件java代码示例

记得要下载 并 导入 jxl.jar 包,免积分下载地址:http://download.csdn.net/detail/u010011052/7561041 package Test; import java.io.*; import jxl.*; import jxl.format.Colour; import jxl.write.*; public class JXLTest { private static WritableWorkbook book; private static Wr

kafka集群和zookeeper集群的部署,kafka的java代码示例

来自:http://doc.okbase.net/QING____/archive/19447.html 也可参考: http://blog.csdn.net/21aspnet/article/details/19325373 http://blog.csdn.net/unix21/article/details/18990123 kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它.kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下

spark使用KryoRegistrator java代码示例

转载引用自:http://www.cnblogs.com/tovin/p/3833985.html 最近在使用spark开发过程中发现当数据量很大时,如果cache数据将消耗很多的内存.为了减少内存的消耗,测试了一下 Kryo serialization的使用 代码包含三个类,KryoTest.MyRegistrator.Qualify. 我们知道在Spark默认使用的是Java自带的序列化机制.如果想使用Kryo serialization,只需要添加KryoTest类中的红色部分,指定spa

服务端发送xml请求java代码示例

/** * */ package com.autoyol.pay.cmb.core; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.SocketTimeoutException; import ja

android webView开发之js调用java代码示例

1.webView设置 webView.getSettings().setJavaScriptEnabled(true);//设置支持js webView.addJavascriptInterface(new JsOperation(),"client");//设置js调用的java类 2.声明js要调用java类 class JsOperation { // 测试方法 @JavascriptInterface//这句标识必须要写上否则会出问题 public void test(Str

数据对象如何定义为Java代码示例

想将数据保存为这样子: [{ "subject": { "code": "B123", "words": [{ "key": "gjc1", "wight": 9.8, "ct": 1575126920 }, { "key": "gjc1", "wight": 9.8, "ct&

Java代码示例

https://codeforces.com/contest/1209/problem/H import java.io.OutputStream; import java.io.IOException; import java.io.InputStream; import java.io.PrintWriter; import java.util.SortedSet; import java.util.Set; import java.util.NavigableSet; import jav

四种java代码静态检查工具

[转载]常用 Java 静态代码分析工具的分析与比较 转载自 开源中国社区 http://www.oschina.net/question/129540_23043 1月16日厦门 OSC 源创会火热报名中,奖品多多哦 »   简介: 本文首先介绍了静态代码分析的基本概念及主要技术,随后分别介绍了现有 4 种主流 Java 静态代码分析工具 (Checkstyle,FindBugs,PMD,Jtest),最后从功能.特性等方面对它们进行分析和比较,希望能够帮助 Java 软件开发人员了解静态代码

native关键字初识--java调用非java代码的接口

Java基础知识--JNI入门介绍(上) Java? 本机接口(Java Native Interface,JNI)是一个标准的 Java API,它支持将 Java 代码与使用其他编程语言编写的代码相集成.如果您希望利用已有的代码资源,那么可以使用 JNI 作为您工具包中的关键组件 -- 比如在面向服务架构(SOA)和基于云的系统中.但是,如果在使用时未注意某些事项,则 JNI 会迅速导致应用程序性能低下且不稳定. JNI 的发展JNI 自从 JDK 1.1 发行版以来一直是 Java 平台的