hadoop知识整理(3)之MapReduce之代码编写

前面2篇文章知道了HDFS的存储原理,知道了上传和下载文件的过程,同样也知晓了MR任务的执行过程,以及部分代码也已经看到,那么下一步就是程序员最关注的关于MR的业务代码(这里不说太简单的):

一、关于MapTask的排序

  mapTask正常情况,按照key的hashcode进行从小到大的排序操作,形成map输出,交给reduce,(据某篇博文说,hashcode排序使用的是快排,这个无从考证),这里说明一下如何使用POJO类作为key,使其进行排序。

  1)POJO类实现WritableComparable<T>接口;

  2)重写compareTo(T t)方法,在此方法中返回为int,使用当前对象的排序对象,减去传入对象的排序字段,便是倒序排序(按照想要的方式)。示例为按照电影热度值倒序排序。

    @Override
    public int compareTo(Movie o) {
        return this.hot-o.hot;
    }

二、关于地址复用

  1)注意,在reduce中,关于reduce方法中的values的迭代器,一旦遍历过后,迭代器中值将不再存在;

  2)这里是因为reduceTask在反射调用reduce方法时,为节省内存空间,使用了地址复用技术;

  3)所以如果想让对象保存下来,那么必须将对象完全克隆,这里建议,在使用POJO时候,最好实现clone方法,以便方便保存迭代器中的对象;示例代码是重写clone方法,以方便克隆;

public Item clone(){
        Item o=null;
        try {
            o = (Item) super.clone();
        } catch (CloneNotSupportedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return o;
    }

三、关于多文件join

  多文件join操作,map任务也是支持的,只需将多文件放在指定的hdfs输入目录即可,那么在map方法中,只需要将关联字段提为key,其他封装入对象,然后reduce中你想要的结果就是join后的,至于保留哪些字段,去掉冗余字段,那就全凭自己操作了。

四、关于combiner

  根据前文,combine操作,即使将每一个maptask的输出结果,进行合并排序操作,如果程序员使用MR的人,指定了conbine操作的存在,那么maptask会根据spill内存缓冲溢出文件的数量进行判断是否确实需要combine操作,因为combine操作也会浪费资源,默认值中,假如spill文件的数量小于3,那么便不会进行combine操作,否则先进行combine操作,combine操作只针对于每一个maptask小任务,然后根据shuffle的原理,这些combine后的输出文件会被reduce的复制进行拿走。combine的启用方式:

job.setCombinerClass(InventReducer.class);

此段代码运行在MRDriver中,指定一个combiner的reduce类,和reduce的思路以及代码方式都一样:

public class InventReducer extends Reducer<Text, Text, Text, Text>{
    @Override
    protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {
        int count=0;
        for(Text t:values){
            count++;
        }
        String[] sss=key.toString().split(" ");
        context.write(new Text(sss[0]), new Text(sss[1]+" "+count));
    }
}

五、关于分区

  指定分区类,分区操作是在maptask中进行,当maptask输出文件结束后,maptask会根据spill文件进行排序和分区操作。

  Driver中指定分区类:

job.setPartitionerClass(AuthPartitioner.class);

  分区类代码:

  分区类中,key为maptask的输出key,int numPartitions为分区编号,有几个分区编号,将会分几个区。

public class AuthPartitioner extends Partitioner<IntWritable, IntWritable>{
    @Override
    public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
        String num=String.valueOf(key.get());
        if(num.matches("[0-9][0-9]")||num.matches("[0-9]")){
            return 0;
        }
        if(num.matches("[0-9][0-9][0-9]")){
            return 1;
        }else{
            return 2;
        }

    }
}

  分区的目的在于指定的多个reduceTask可以分别处理自己分区的数据,以便让数据均匀地落盘。

六、关于Job链

  这就是在Java代码中让多个Job串起来,实现很简单:代码写在Driver中

//判断上一个Job的完成情况
if (job.waitForCompletion(true)){
    //执行第二个Job代码
}

七、自定义格式输入(Map的输入)

  默认map方法的输入为:间隔字符数量long型、本行数据text类型;

  假如想改变这些输入,假如将第一个key输入变为intWritable

  需要一个Format类和Reader类:

public class AuthFormat extends FileInputFormat<IntWritable, Text>{
    @Override
    public RecordReader<IntWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        return new AuthReader();
    }
}
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
public class AuthReader extends RecordReader<IntWritable, Text>{
    private FileSplit fs ;
    private LineReader lineReader ;
    private IntWritable key ;
    private Text value ;
    //--定义一个计数器,记录本次读取到了多少行
    int count = 0;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.fs = (FileSplit) split;
        //--获取文件路径
        Path path = fs.getPath();
        //--获取文件系统
        Configuration conf = context.getConfiguration();
        FileSystem fileSystem = path.getFileSystem(conf);
        //--通过文件系统读取文件得到流
        FSDataInputStream in = fileSystem.open(path);
        //--将流包装为LineReader方便按行读取
        lineReader = new LineReader(in);
    }
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        //--要返回的键,本次读取的行数
        key = new IntWritable();
        //--要返回的值,本次读取到的内容
        value = new Text();
        //--定义一个temp临时记录内容
        Text temp = new Text();
        int len = lineReader.readLine(temp);
        //--判断是否读取到了数据
        if(len == 0){
            //--表示没有行数据可读,则不再执行 nextKeyValue()方法
            return false;

        }else{
            //--读到了数据,将数据追加到value中
            //可以这样写:value=tmp;
            //也可以像下面这样写
            value.append(temp.getBytes(), 0, temp.getLength());
            //--计数器加1,表明读取到了一行内容
            count++;
            key.set(count);
            return true;
            }
    }
    @Override
    public IntWritable getCurrentKey() throws IOException, InterruptedException {
        return key;
    }
    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {
        return value;
    }
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }
    @Override
    public void close() throws IOException {
        if(lineReader != null)lineReader.close();
    }

}

在上文的源码解析中,知道了Reader的作用在于一行一行地读取源文件给maptask任务。

这里相当于子类重写了父类的方法,在调用时,会直接调用子类的方法。

而在Driver中需要增加:

    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setInputFormatClass(AuthFormat.class);

八、自定义格式输出

  自定义格式输出的目的在于规范输出格式重量,简单来说就是可以输出想要的任意输出格式:

public class AuthOutputFormat<K,V> extends FileOutputFormat<K,V>{

    @Override
    public RecordWriter<K,V> gtetRecordWrier(TaskAttemptContext job) throws IOException, InterruptedException {

        //Get the default path and filename for the output format.
        //第二个参数:extension an extension to add to the filename
        Path path=super.getDefaultWorkFile(job, "");
        Configuration conf=job.getConfiguration();
        FileSystem fs=path.getFileSystem(conf);
        FSDataOutputStream out=fs.create(path);

        return new AuthWriter<K,V>(out,"|","\r\n");
    }

}
public class AuthWriter<K,V> extends RecordWriter<K,V>{

    private FSDataOutputStream out;
    private String keyValueSeparator;
    private String lineSeparator;

    public AuthWriter(FSDataOutputStream out, String keyValueSeparator, String lineSeparator) {
        this.out=out;
        this.keyValueSeparator=keyValueSeparator;
        this.lineSeparator=lineSeparator;
    }

    @Override
    public void write(K key, V value) throws IOException, InterruptedException {
        out.write(key.toString().getBytes());
        out.write(keyValueSeparator.getBytes());
        out.write(value.toString().getBytes());
        out.write(lineSeparator.getBytes());

    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        if(out!=null)out.close();

    }

}

同样一个format和一个writer,然后在Driver中指定即可

//有了这句话,不用再写原来的输出语句
job.setOutputFormatClass(AuthOutputFormat.class);

九、关于多输入源

  在Driver中指定:

public class ScoreDriver {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "JobName");
        job.setJarByClass(cn.gjm.hadoop.ScoreDriver.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setInputFormatClass(AuthInputFormat.class);
        //需要注意,如果一个Mapper代码不能通用的解决,则需要分别指定。此时,就不能去设置
        //setMapperClass()了
        MultipleInputs.addInputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/format-score.txt"),AuthInputFormat.class,ScoreMapper.class);

        MultipleInputs.addInputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/format-score-1.txt"),TextInputFormat.class,ScoreMapper2.class);

        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/result"));

        if (!job.waitForCompletion(true))
            return;
    }

十、多输出源

  

public class ScoreReducer extends Reducer<Text, Text, Text, Text>{

    private MultipleOutputs<Text, Text> mos;

    @Override
    protected void reduce(Text name, Iterable<Text> scores, Reducer<Text, Text, Text, Text>.Context context)
            throws IOException, InterruptedException {

        for(Text score:scores){
            if(name.toString().equals("jary")){
                mos.write("jary",name,score);
            }
            if(name.toString().equals("rose")){
                mos.write("rose",name,score);
            }
            if(name.toString().equals("tom")){
                mos.write("tom", name,score);
            }
        }

    }
    @Override
    protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
        mos=new MultipleOutputs<>(context);
    }

}
ScoreDriver代码:
public class ScoreDriver {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "JobName");
        job.setJarByClass(cn.tarena.hadoop.ScoreDriver.class);

        // TODO: specify a reducer
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setReducerClass(ScoreReducer.class);

        job.setInputFormatClass(AuthInputFormat.class);

        MultipleInputs.addInputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/format-score.txt"),AuthInputFormat.class,ScoreMapper.class);
        MultipleInputs.addInputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/format-score-1.txt"),TextInputFormat.class,ScoreMapper2.class);

        MultipleOutputs.addNamedOutput(job, "jary", AuthOutputFormat.class, Text.class, Text.class);
        MultipleOutputs.addNamedOutput(job, "tom", AuthOutputFormat.class, Text.class, Text.class);
        MultipleOutputs.addNamedOutput(job, "rose", AuthOutputFormat.class, Text.class, Text.class);

        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/result"));

        if (!job.waitForCompletion(true))
            return;
    }

}

十一,多次排序

//只需在compare方法中指定多字段排序即可
@Override
    public int compareTo(Profit o) {
        int result=this.month-o.month;
        if(result!=0){
            return result;
        }else{
            return o.profit-this.profit;
        }
    }

原文地址:https://www.cnblogs.com/qfxydtk/p/11173625.html

时间: 2024-10-05 23:25:17

hadoop知识整理(3)之MapReduce之代码编写的相关文章

hadoop知识整理(1)之HDFS

一.HDFS是一个分布式文件系统 体系架构: hdfs主要包含了3部分,namenode.datanode和secondaryNameNode namenode主要作用和运行方式: 1)管理hdfs的元数据信息,文件名字,大小,切成几块,有几个副本,切成块和副本分别存储在datanode的位置,块id.大小: 2)通过rpc心跳机制,来检测datanode的运行状态: 3)简单说,元数据的存储信息都放在namenode之上,为了快速查取,所以内存中有一份,但是为了保证元数据信息不丢,所以磁盘还要

【甘道夫】官网MapReduce实例代码详细批注

引言 1.本文不描述MapReduce入门知识,这类知识网上很多,请自行查阅 2.本文的实例代码来自官网 http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html 最后的WordCount v2.0,该代码相比源码中的org.apache.hadoop.examples.WordCount要复杂和完整,更适合作为MapReduc

Java基础知识整理(一)

概述 公司业务需要,产品既要有.NET又需要Java,没得选择,只能业余时间学习Java,整体觉得Java也.NET还是很相似的,只是语法有差别,差别也不是很大,这就将学习Java的基础知识整理下,以便于自己的学习.作为个.NET程序猿也可以学习Java ,毕竟技多不压身,学习多也要精通. 开发工具 eclipse ,开发java类似.NET 需要装JDK类似.NET Framework. Java开发工具eclipse设置 1.设置字体:window设置: 2.设置快捷键:window--ke

Linux动态库相关知识整理

动态库和静态库在C/C++开发中很常见,相比静态库直接被编译到可执行程序, 动态库运行时加载使得可执行程序的体积更小,更新动态库可以不用重新编译可执 行程序等诸多好处.作者是一个Linux后台开发,这些知识经常用到,所以 整理了一下这方面的知识.静态库相对简单,本文只关心Linux平台下的动态库. 创建动态库 这里我把一个短小却很有用的哈希函数编译成动态库做为示例,ELFhash用于对字符串做哈希,返回一个无符号整数. //elfhash.h #include <stdio.h> unsign

DIV+CSS网页布局常用的一些基础知识整理

CSS命名规范一.文件命名规范 全局样式:global.css:框架布局:layout.css:字体样式:font.css:链接样式:link.css:打印样式:print.css: 二.常用类/ID命名规范页 眉:header内 容:content容 器:container页 脚:footer 版 权:copyright 导 航:menu主导航:mainMenu子导航:subMenu 标 志:logo标 语:banner标 题:title侧边栏:sidebar 图 标:Icon注 释:note

Linux进程管理知识整理

Linux进程管理知识整理 1.进程有哪些状态?什么是进程的可中断等待状态?进程退出后为什么要等待调度器删除其task_struct结构?进程的退出状态有哪些? TASK_RUNNING(可运行状态) TASK_INTERRUPTIBLE(可中断等待状态) TASK_UNINTERRUPTIBLE(不可中断等待状态) TASK_STOPPED(进程被其它进程设置为暂停状态) TASK_TRACED(进程被调试器设置为暂停状态) TASK_DEAD(退出状态) 进程由于所需资源得不到满足,从而进入

JAVA基础知识整理

一.首先先明白get与post的基本定义和区别: 这是两种在客户端和服务器端进行请求-响应的方法. 1get:从指定的资源请求数据. 2post:向指定的资源提交要处理的数据. get基本上用于从服务器取回数据,注意:get方法可能返回缓存数据. post可以从服务器上获取数据,不过,post方法不会缓存数据,并且常用语连同请求一起发送数据. 二. Jquery $.get()方法. $.get()方法通过Http Get发起请求,从服务器上请求数据. 语法:&.get(URL,callback

js事件(Event)知识整理

事件(Event)知识整理,本文由网上资料整理而来,需要的朋友可以参考下 鼠标事件 鼠标移动到目标元素上的那一刻,首先触发mouseover 之后如果光标继续在元素上移动,则不断触发mousemove 如果按下鼠标上的设备(左键,右键,滚轮--),则触发mousedown 当设备弹起的时候触发mouseup 目标元素的滚动条发生移动时(滚动滚轮/拖动滚动条..)触发scroll 滚动滚轮触发mousewheel,这个要区别于scroll 鼠标移出元素的那一刻,触发mouseout 事件注册 平常

Aprior算法Hadoop上实现思路与关键部分代码

本人最近研究Aprior算法,由于要实现海量数据的分析挖掘,需要在hadoop平台加以实现. 在网上看过一些Aprior算法Mapreduce的代码,感觉拿过来都不好直接用,而且,多数都不是原味的Aprior,或者经过改进,是FP-growth算法,或者是将数据分块,各块独立运行Aprior算法,不是严格意义上的Aprior算法. 本人也是几经实验,终于自己也实现了一种基于Mapreduce的原汁原味的Aprior算法的分布式实现. Aprior分布式实现的问题:输入有多个,一个是事务数据库,一