hadoop MapReduce自定义分区Partition输出各运营商的手机号码

MapReduce和自定义Partition

MobileDriver主类
package Partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

public class MobileDriver {
    public static void main(String[] args) {
        String[] paths = {"F:\\mobile.txt", "F:\\output"};

        JobUtils.commit(paths, true, 3, MobileDriver.class,
                MobileMapper.class, Text.class, NullWritable.class, MobilePartition.class,
                MobileReduce.class, Text.class, NullWritable.class);

    }
}
JobUtils工具类
package Partition;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.io.IOException;

public class JobUtils {
    private static Configuration conf;

    static {
        conf = new Configuration();
    }

    /**
     * 提交job
     *
     * @param paths        输入输出路径数组
     * @param isPartition  是否包含自定义分区类
     * @param reduceNumber reduce数量(若自定义分区为true,则此项必须>=自定义分区数)
     * @param params       可变参数
     */
    public static void commit(String[] paths, boolean isPartition, int reduceNumber, Class... params) {
        try {
            Job job = Job.getInstance(conf);
            job.setJarByClass(params[0]);

            job.setMapperClass(params[1]);
            job.setOutputKeyClass(params[2]);
            job.setOutputValueClass(params[3]);

            if (isPartition) {
                job.setPartitionerClass(params[4]);//设置自定义分区;
            }

            if (reduceNumber > 0) {
                job.setNumReduceTasks(reduceNumber);
                job.setReducerClass(params[5]);
                job.setOutputKeyClass(params[6]);
                job.setOutputValueClass(params[7]);
            } else {
                job.setNumReduceTasks(0);
            }
            deleteDirectory(paths[1]);
            FileInputFormat.setInputPaths(job, new Path(paths[0]));
            FileOutputFormat.setOutputPath(job, new Path(paths[1]));
            job.waitForCompletion(true);
        } catch (InterruptedException | ClassNotFoundException | IOException e) {
            e.printStackTrace();
        }
    }

    //输出目录存在则删除
    public static void deleteDirectory(String path) {
        File pFile = new File(path);
        if (!pFile.exists()) {
            return;
        }
        if ((pFile.isDirectory() && pFile.listFiles().length == 0) || pFile.isFile()) {
            pFile.delete();
        } else {
            for (File file : pFile.listFiles()) {
                if (file.isDirectory()) {
                    deleteDirectory(file.getAbsolutePath());
                } else {
                    file.delete();
                }
            }
        }
        pFile.delete();
    }
}
Map自定义类
package Partition;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MobileMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] mobiles = line.split("\t");
        for (String mobile : mobiles) {
            //不满足11位手机号进行过滤
            if (mobile.length() == 11) {
                context.write(new Text(mobile), NullWritable.get());
            }
        }
    }
}
Reduce自定义类
package Partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MobileReduce extends Reducer<Text, NullWritable, Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, NullWritable.get());
    }
}
Partition自定义分区类
package Partition;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.Arrays;

public class MobilePartition extends Partitioner<Text, NullWritable> {
    @Override
    public int getPartition(Text text, NullWritable nullWritable, int i) {
        String line = text.toString();
        String flag = line.substring(0, 3);
        if (Arrays.asList(Mobile.CHINA_MOBILE).contains(flag)) {
            return 0;//移动
        } else if (Arrays.asList(Mobile.CHINA_UNICOM).contains(flag)) {
            return 1;//联通
        } else {
            return 2;//电信
        }
    }
}

原文地址:https://www.cnblogs.com/ShadowFiend/p/11370881.html

时间: 2024-10-10 23:54:46

hadoop MapReduce自定义分区Partition输出各运营商的手机号码的相关文章

Hadoop mapreduce自定义分组RawComparator

本文发表于本人博客. 今天接着上次[Hadoop mapreduce自定义排序WritableComparable]文章写,按照顺序那么这次应该是讲解自定义分组如何实现,关于操作顺序在这里不多说了,需要了解的可以看看我在博客园的评论,现在开始. 首先我们查看下Job这个类,发现有setGroupingComparatorClass()这个方法,具体源码如下: /** * Define the comparator that controls which keys are grouped toge

Hadoop学习之路(6)MapReduce自定义分区实现

MapReduce自带的分区器是HashPartitioner原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走.自定义分分区需要继承Partitioner,复写getpariton()方法自定义分区类:注意:map的输出是<K,V>键值对其中int partitionIndex = dict.get(text.toString()),partitionIndex是获取K的值 附:被计算的的文本 Dear Dea

hadoop 学习自定义分区

(网易云课程hadoop大数据实战学习笔记) 如图所示:有三个ReducerTask,因此处理完成之后的数据存储在三个文件中: 默认情况下,numReduceTasks的数量为1,前面做的实验中,输出数据都是在一个文件中.通过自定义myPatitioner类,可以把ruduce处理后的数据分类汇总,这里MyPartitioner是Partitioner的基类,如果需要定制partitioner也需要继承该类.HashPartitioner是mapreduce的默认partitioner.计算方法

[Hadoop] - Mapreduce自定义Counter

在Hadoop的MR程序开发中,经常需要统计一些map/reduce的运行状态信息,这个时候我们可以通过自定义Counter来实现,这个实现的方式是不是通过配置信息完成的,而是通过代码运行时检查完成的. 1.创建一个自己的Counter枚举类. enum PROCESS_COUNTER { BAD_RECORDS, BAD_GROUPS; } 2.在需要统计的地方,比如map或者reduce阶段进行下列操作. context.getCounter(PROCESS_COUNTER.BAD_RECO

hadoop mapreduce 自定义InputFormat

很久以前为了满足公司的需求写过一些自定义InputFormat,今天有时间拿出来记一下     需求是这样的,如果如果使用FileInputFormat作为输入,是按照行来读取日志的,也就是按照\n来区分每一条日志的,而由于一条日志中记录着一些错误信息,比如java的异常信息,这些信息本身就带有换行符,如果还是按照\n进行区分每一条日志的话明显是错误的,由于我们日志的特殊性,将以"]@\n"作为区分日志的标识.     接下来就来看看如何自定义InputFormat,还是不画类图了,我

【Hadoop】Hadoop MR 自定义分组 Partition机制

1.概念 2.Hadoop默认分组机制--所有的Key分到一个组,一个Reduce任务处理 3.代码示例 FlowBean package com.ares.hadoop.mr.flowgroup; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean

Hadoop mapreduce自定义排序WritableComparable

本文发表于本人博客. 今天继续写练习题,上次对分区稍微理解了一下,那根据那个步骤分区.排序.分组.规约来的话,今天应该是要写个排序有关的例子了,那好现在就开始! 说到排序我们可以查看下hadoop源码里面的WordCount例子中对LongWritable类型定义,它实现抽象接口WritableComparable,代码如下: public interface WritableComparable<T> extends Writable, Comparable<T> { } pub

一起学Hadoop——使用自定义Partition实现hadoop部分排序

排序在很多业务场景都要用到,今天本文介绍如何借助于自定义Partition类实现hadoop部分排序.本文还是使用java和python实现排序代码. 1.部分排序. 部分排序就是在每个文件中都是有序的,和其他文件没有关系,其实很多业务场景就需要到部分排序,而不需要全局排序.例如,有个水果电商网站,要对每个月的水果的销量进行排序,我们可以把reduce进程之后的文件分成12份,对应1到12月份.每个文件按照水果的销量从高到底排序,1月份的排序和其他月份的排序没有任何关系. 原始数据如下,有三个字

Hadoop读书笔记(十二)MapReduce自定义排序

Hadoop读书笔记系列文章:http://blog.csdn.net/caicongyang/article/category/2166855 1.说明: 对给出的两列数据首先按照第一列升序排列,当第一列相同时,第二列升序排列 数据格式: 3 3 3 2 3 1 2 2 2 1 1 1 2.代码 SortApp.java package sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOExc