MapReduce小文件优化与分区

一、小文件优化

1.Mapper类

package com.css.combine;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 思路?
 * wordcount单词计数
 * <单词,1>
 *
 * 数据传输
 *
 * KEYIN:数据的起始偏移量0~10 11~20 21~30
 * VALUEIN:数据
 *
 * KEYOUT:mapper输出到reduce阶段 k的类型
 * VALUEOUT:mapper输出到reduce阶段v的类型
 * <China,1><Beijing,1><love,1>
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    //key 起始偏移量 value 数据  context 上下文
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 1.读取数据
        String line = value.toString();
        // 2.切割 love Beijing
        String[] words = line.split(" ");
        // 3.循环的写到下一个阶段<love,1><Beijing,1>
        for (String w : words) {
            // 4.输出到reducer阶段
            context.write(new Text(w), new IntWritable(1));
        }
    }
}

2.Reducer类

package com.css.combine;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        // 统计单词出现的次数
        int sum = 0;
        // 累加求和
        for (IntWritable count : values) {
            // 拿到值累加
            sum += count.get();
        }
        // 结果输出
        context.write(key, new IntWritable(sum));
    }
}

3.Driver类

package com.css.combine;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 获取jar包
        job.setJarByClass(WordCountDriver.class);
        // 获取自定义的mapper与reducer类
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        // 设置map输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 设置reduce输出的数据类型(最终的数据类型)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // 指定运行的inputformat方式  默认的方式是textinputformat(小文件优化)
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // 最大4M
        CombineTextInputFormat.setMinInputSplitSize(job, 3145728); // 最小3M
        // 设置输入存在的路径与处理后的结果路径
        FileInputFormat.setInputPaths(job, new Path("c:/in1024/"));
        FileOutputFormat.setOutputPath(job, new Path("c:/out1024/"));
        // 提交任务
        boolean rs = job.waitForCompletion(true);
        System.out.println(rs ? 0 : 1);
    }
}

二、分区

1.Mapper类

package com.css.flow.partition;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * 3631279850362    13726130503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    www.itstaredu.com    教育网站    24    27    299    681    200
 *
 * 13726130503  299    681 980
 */
public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // 1.获取数据
        String line = value.toString();

        // 2.切割
        String[] fields = line.split("\t");

        // 3.封装对象 拿到关键字段 数据清洗
        String phoneN = fields[1];
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long dfFlow = Long.parseLong(fields[fields.length - 2]);

        // 4.输出到reduce端
        context.write(new Text(phoneN), new FlowBean(upFlow, dfFlow));
    }
}

2.Reducer类

package com.css.flow.partition;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context)
            throws IOException, InterruptedException {
        // 1.相同手机号 的流量使用再次汇总
        long upFlow_sum = 0;
        long dfFlow_sum = 0;

        // 2.累加
        for (FlowBean f : values) {
            upFlow_sum += f.getUpFlow();
            dfFlow_sum += f.getDfFlow();
        }
        FlowBean rs = new FlowBean(upFlow_sum, dfFlow_sum);
        // 3.输出
        context.write(key, rs);
    }
}

3.封装类

package com.css.flow.partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

/**
 * 封装类 数据的传输
 */
public class FlowBean implements Writable{
    // 定义属性
    private long upFlow;
    private long dfFlow;
    private long flowSum;

    public FlowBean() {
    }

    // 流量累加
    public FlowBean(long upFlow, long dfFlow) {
        this.upFlow = upFlow;
        this.dfFlow = dfFlow;
        this.flowSum = upFlow + dfFlow;
    }

    // 反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        dfFlow = in.readLong();
        flowSum = in.readLong();
    }

    // 序列化
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(dfFlow);
        out.writeLong(flowSum);
    }

    @Override
    public String toString() {
        return upFlow + "\t" + dfFlow + "\t" + flowSum;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDfFlow() {
        return dfFlow;
    }

    public void setDfFlow(long dfFlow) {
        this.dfFlow = dfFlow;
    }

    public long getFlowSum() {
        return flowSum;
    }

    public void setFlowSum(long flowSum) {
        this.flowSum = flowSum;
    }
}

4.分区类

package com.css.flow.partition;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class PhoneNumPartitioner extends Partitioner<Text, FlowBean>{

    // 根据手机号前三位进行分区
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        // 获取手机号前三位
        String phoneNum = key.toString().substring(0, 3);
        // 分区
        int partitioner = 4;

        if ("135".equals(phoneNum)) {
            return 0;
        }else if ("137".equals(phoneNum)) {
            return 1;
        }else if ("138".equals(phoneNum)) {
            return 2;
        }else if ("139".equals(phoneNum)) {
            return 3;
        }
        return partitioner;
    }
}

5.Driver类

package com.css.flow.partition;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FlowCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1.获取job信息
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        // 2.获取jar包
        job.setJarByClass(FlowCountDriver.class);

        // 3.获取自定义的mapper与reducer类
        job.setMapperClass(FlowCountMapper.class);
        job.setReducerClass(FlowCountReducer.class);

        // 4.设置map输出的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 5.设置reduce输出的数据类型(最终的数据类型)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 设置自定义的分区类
        // 自定义分区个数要大于分区数
        job.setPartitionerClass(PhoneNumPartitioner.class);
        job.setNumReduceTasks(5);

        // 6.设置输入存在的路径与处理后的结果路径
        FileInputFormat.setInputPaths(job, new Path("c:/flow1020/in"));
        FileOutputFormat.setOutputPath(job, new Path("c:/flow1020/out"));

        // 7.提交任务
        boolean rs = job.waitForCompletion(true);
        System.out.println(rs ? 0 : 1);
    }
}

6.输入的文件HTTP_20180313143750.dat

3631279850362    13726130503    00-FD-07-A4-72-B8:CMCC    120.196.100.82    www.itstaredu.com    教育网站    24    27    299    681    200
3631279950322    13822544101    5C-0E-8B-C7-F1-E0:CMCC    120.197.40.4    www.taobao.com    淘宝网    4    0    264    0    200
3631279910362    13926435656    20-10-7A-28-CC-0A:CMCC    120.196.100.99            2    4    132    1512    200
3631244000322    13926251106    5C-0E-8B-8B-B1-50:CMCC    120.197.40.4            4    0    240    0    200
3631279930342    18212575961    94-71-AC-CD-E6-18:CMCC-EASY    120.196.100.99    iface.qiyi.com    视频网站    15    12    1527    2106    200
3631279950342    13884138413    5C-0E-8B-8C-E8-20:7DaysInn    120.197.40.4    122.72.52.12        20    16    4116    1432    200
3631279930352    13510439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1116    954    200
3631279950332    15920133257    5C-0E-8B-C7-BA-20:CMCC    120.197.40.4    sug.so.360.cn    信息安全    20    20    316    296    200
3631279830392    13719199419    68-A1-B7-03-07-B1:CMCC-EASY    120.196.100.82            4    0    240    0    200
3631279840312    13660577991    5C-0E-8B-92-5C-20:CMCC-EASY    120.197.40.4    s19.cnzz.com    站点统计    24    9    660    690    200
3631279730382    15013685858    5C-0E-8B-C7-F7-90:CMCC    120.197.40.4    rank.ie.sogou.com    搜索引擎    28    27    369    338    200
3631279860392    15889002119    E8-99-C4-4E-93-E0:CMCC-EASY    120.196.100.99    www.umeng.com    站点统计    3    3    938    380    200
3631279920332    13560439658    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            15    9    918    4938    200
3631279860312    13480253104    5C-0E-8B-C7-FC-80:CMCC-EASY    120.197.40.4            3    3    120    1320    200
3631279840302    13602846565    5C-0E-8B-8B-B6-00:CMCC    120.197.40.4    2052.flash2-http.qq.com    综合门户    15    12    198    910    200
3631279950332    13922314466    00-FD-07-A2-EC-BA:CMCC    120.196.100.82    img.qfc.cn        12    12    3008    3720    200
3631279820302    13502468823    5C-0A-5B-6A-0B-D4:CMCC-EASY    120.196.100.99    y0.ifengimg.com    综合门户    57    102    735    11349    400
3631279860322    18320173382    84-25-DB-4F-10-1A:CMCC-EASY    120.196.100.99    input.shouji.sogou.com    搜索引擎    21    18    9531    212    200
3631279900332    13925057413    00-1F-64-E1-E6-9A:CMCC    120.196.100.55    t3.baidu.com    搜索引擎    69    63    11058    4243    200
3631279880322    13760778710    00-FD-07-A4-7B-08:CMCC    120.196.100.82            2    2    120    120    200
3631279850362    13726238888    00-FD-07-A4-72-B8:CMCC    120.196.100.82    i02.c.aliimg.com        24    27    2481    24681    200
3631279930352    13560436666    C4-17-FE-BA-DE-D9:CMCC    120.196.100.99            18    15    1136    94    200
3631279930353    13560436326    C4-17-FE-BA-DE-D9:CMCC    120.196.100.77    lol.qq.com/    英雄联盟    18    15    1136    94    200

7.输出的文件

(1)part-r-00000
13502468823    735    11349    12084
13510439658    1116    954    2070
13560436326    1136    94    1230
13560436666    1136    94    1230
13560439658    918    4938    5856

(2)part-r-00001
13719199419    240    0    240
13726130503    299    681    980
13726238888    2481    24681    27162
13760778710    120    120    240

(3)part-r-00002
13822544101    264    0    264
13884138413    4116    1432    5548

(4)part-r-00003
13922314466    3008    3720    6728
13925057413    11058    4243    15301
13926251106    240    0    240
13926435656    132    1512    1644

(5)part-r-00004
13480253104    120    1320    1440
13602846565    198    910    1108
13660577991    660    690    1350
15013685858    369    338    707
15889002119    938    380    1318
15920133257    316    296    612
18212575961    1527    2106    3633
18320173382    9531    212    9743

原文地址:https://www.cnblogs.com/areyouready/p/9853651.html

时间: 2024-11-05 23:24:04

MapReduce小文件优化与分区的相关文章

大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客共同粉丝案例+常见错误及解决方案

第6章 Hadoop企业优化(重中之重)6.1 MapReduce 跑的慢的原因6.2 MapReduce优化方法6.2.1 数据输入6.2.2 Map阶段6.2.3 Reduce阶段6.2.4 I/O传输6.2.5 数据倾斜问题6.2.6 常用的调优参数6.3 HDFS小文件优化方法6.3.1 HDFS小文件弊端6.3.2 HDFS小文件解决方案第7章 MapReduce扩展案例7.1 倒排索引案例(多job串联)7.2 TopN案例7.3 找博客共同粉丝案例第8章 常见错误及解决方案 第6章

MapReduce小文件处理之CombineFileInputFormat实现

在MapReduce使用过程中.一般会遇到输入文件特别小(几百KB.几十MB).而Hadoop默认会为每一个文件向yarn申请一个container启动map,container的启动关闭是很耗时的. Hadoop提供了CombineFileInputFormat.一个抽象类.作用是将多个小文件合并到一个map中,我们仅仅需实现三个类: CompressedCombineFileInputFormat CompressedCombineFileRecordReader CompressedCom

LOSF 海量小文件问题综述

1.LOSF问题概述 在互联网(尤其是移动互联网).物联网.云计算.大数据等高速发展的大背景下,数据呈现爆炸式地增长.根据IDC的预测,到2020年产生的数据量 将达到40ZB,而之前2011年6月的预测是35ZB.然而,社会化网络.移动通信.网络视频音频.电子商务.传感器网络.科学实验等各种应用产生的数 据,不仅存储容量巨大,而且还具有数据类型繁多.数据大小变化大.流动快等显著特点,往往能够产生千万级.亿级甚至十亿.百亿级的海量小文件,而且更多地 是海量大小文件混合存储.由于在元数据管理.访问

Hadoop之小文件处理与调优经验

HDFS上每个文件都要在namenode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用namenode的内存空间,另一方面就是索引文件过大是的索引速度变慢. 解决的方式:   1:Hadoop本身提供了一些文件压缩的方案 2:从系统层面改变现有HDFS存在的问题,其实主要还是小文件的合并,然后建立比较快速的索引. Hadoop自带小文件解决方案 1:Hadoop Archive: 是一个高效地将小文件放入HDFS块中的文件存档

Hive优化之小文件问题及其解决方案

小文件是如何产生的 1.动态分区插入数据,产生大量的小文件,从而导致map数量剧增. 2.reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的). 3.数据源本身就包含大量的小文件. 小文件问题的影响 1.从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能. 2.在HDFS中,每个小文件对象约占150byte,如果小文件过多会占用大量内存.这样NameNode内存容量严重制约了集群的扩展. 小

[转载]mapreduce合并小文件成sequencefile

mapreduce合并小文件成sequencefile http://blog.csdn.net/xiao_jun_0820/article/details/42747537

mapreduce 关于小文件导致任务缓慢的问题

小文件导致任务执行缓慢的原因: 1.很容易想到的是map task 任务启动太多,而每个文件的实际输入量很小,所以导致了任务缓慢 这个可以通过 CombineTextInputFormat,解决,主要需要设置 mapreduce.input.fileinputformat.split.maxsize(单位byte) 2.其次是set input 文件太多,需要一个一个set ,所以花费的时间很多,导致任务启动就很慢了 这个只能提前merge好小文件,组成大文件,可能还有更好的办法,需要再研究

FastDFS文件名策略及对小文件的优化

0.简介 FastDFS是一个应用级分布式文件存储服务,其采用中心型结构(类似GFS.HDFS.TFS等),主要用于大中型网站存储资源文件.FastDFS具有轻量级,支持高并发放访问,负载均衡,可扩展等优点.而FastDFS最大的亮点就是对小文件的存储性能较好,这主要来自于其文件名策略. 1.小文件存储性能优化 小文件的性能瓶颈主要来自于对元数据服务器(如FastDFS中的TrackerServer或TFS中的NameServer)的访问,因为当文件本身大小很小时,元数据存储所占空间与文件内容存

HDFS小文件合并问题的优化:copyMerge的改进

1.问题分析 用fsck命令统计 查看HDFS上在某一天日志的大小,分块情况以及平均的块大小,即 [[email protected] jar]$ hadoop fsck /wcc/da/kafka/report/2015-01-11 DEPRECATED: Use of this script to execute hdfs command is deprecated. Instead use the hdfs command for it. 15/01/13 18:57:23 WARN ut