MapReduce程序开发

通过API操作之前要先了解几个基本知识

基本数据类型

Hadoop的基本数据类型和Java的基本数据类型是不一样的,但是都存在对应的关系

如下图

如果需要定义自己的数据类型,则必须实现Writable

hadoop的数据类型可以通过get方法获得对应的java数据类型

而java的数据类型可以通过hadoop数据类名的构造函数,或者set方法转换

关于Hadoop的Writable接口,详情请看Hadoop I/O中的序列化部分

MapReduce执行的基本步骤

Hadoop提交作业的的步骤分为八个,可以理解为天龙八步

Map端工作

  • 1.1 读取要操作的文件–这步会将文件的内容格式化成键值对的形式,键为每一行的起始位置偏移,值为每一行的内容。
  • 1.2 调用map进行处理–在这步使用自定义的Mapper类来实现自己的逻辑,输入的数据为1.1格式化的键值对,输入的数据也是键值对的形式。
  • 1.3 对map的处理结果进行分区–map处理完毕之后可以根据自己的业务需求来对键值对进行分区处理,比如,将类型不同的结果保存在不同的文件中等。这里设置几个分区,后面就会有对应的几个Reducer来处理相应分区中的内容。
  • 1.4 分区之后,对每个分区的数据进行排序,分组–排序按照从小到大进行排列,排序完毕之后,会将键值对中,key相同的选项 的value进行合并。如,所有的键值对中,可能存在

    hello 1

    hello 1

    key都是hello,进行合并之后变成

    hello 2

    可以根据自己的业务需求对排序和合并的处理进行干涉和实现。

  • 1.5 归约(combiner)–简单的说就是在map端进行一次reduce处理,但是和真正的reduce处理不同之处在于:combiner只能处理本地数据,不能跨网络处理。通过map端的combiner处理可以减少输出的数据,因为数据都是通过网络传输的,其目的是为了减轻网络传输的压力和后边reduce的工作量。并不能取代reduce。

Reduce端工作

  • 2.1 通过网络将数据copy到各个reduce。
  • 2.2 调用reduce进行处理–reduce接收的数据是整个map端处理完毕之后的键值对,输出的也是键值对的集合,是最终的结果。
  • 2.3 将结果输出到hdfs文件系统的路径中。

程序开发

开发流程

一般情况下我们不可能直接在生产环境中直接拿海量数据进行程序开发,不说程序有没有bug,能不能跑起来都是一个问题

所以通常步骤是:

1.本地程序开发测试:从海量数据中抽取小部分数据到本地,使用IDE等工具进行开发,并在测试数据集上进行程序运行、逻辑等测试

2.集群环境运行:本地测试通过后,就可以将代码运行在海量数据中,这时候90%的小bug已经得到修复了,剩余要解决的就是生产环境中的问题了

3.调优:程序能够在集群中运行起来并不代表成功,通过一些集群、程序调优方式可以让你的代码跑的更好、更快

导包

新建一个java项目,并导入hadoop包,在项目选项上右键,如图选择

找到hadoop的安装目录,选择所有的包

在找到hadoop安装目录下的lib,导入其中的所有包

代码

新建JMapper类为自定义的Mapper类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;  

//自定义的Mapper类必须继承Mapper类,并重写map方法实现自己的逻辑
public class JMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    //处理输入文件的每一行都会调用一次map方法,文件有多少行就会调用多少次
    protected void map(
            LongWritable key,
            Text value,
            org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, Text, LongWritable>.Context context)
            throws java.io.IOException, InterruptedException {
        //key为每一行的起始偏移量
        //value为每一行的内容  

        //每一行的内容分割,如hello   world,分割成一个String数组有两个数据,分别是hello,world
        String[] ss = value.toString().toString().split("\t");
        //循环数组,将其中的每个数据当做输出的键,值为1,表示这个键出现一次
        for (String s : ss) {
            //context.write方法可以将map得到的键值对输出
            context.write(new Text(s), new LongWritable(1));
        }
    };
}  

新建JReducer类为自定义的Reducer

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;  

//自定义的Reducer类必须继承Reducer,并重写reduce方法实现自己的逻辑,泛型参数分别为输入的键类型,值类型;输出的键类型,值类型;之后的reduce类似
public class JReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    //处理每一个键值对都会调用一次reduce方法,有多少个键值对就调用多少次
    protected void reduce(
            Text key,
            java.lang.Iterable<LongWritable> value,
            org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)
            throws java.io.IOException, InterruptedException {
        //key为每一个单独的单词,如:hello,world,you,me等
        //value为这个单词在文本中出现的次数集合,如{1,1,1},表示总共出现了三次
        long sum = 0;
        //循环value,将其中的值相加,得到总次数
        for (LongWritable v : value) {
            sum += v.get();
        }
        //context.write输入新的键值对(结果)
        context.write(key, new LongWritable(sum));
    };
}  

新建执行提交作业的类,取名JSubmit

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;  

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  

public class JSubmit {
    public static void main(String[] args) throws IOException,
            URISyntaxException, InterruptedException, ClassNotFoundException {
        //Path类为hadoop API定义,创建两个Path对象,一个输入文件的路径,一个输入结果的路径
        Path outPath = new Path("hdfs://localhost:9000/out");
        //输入文件的路径为本地linux系统的文件路径
        Path inPath = new Path("/home/hadoop/word");
        //创建默认的Configuration对象
        Configuration conf = new Configuration();
        //根据地址和conf得到hadoop的文件系统独享
        //如果输入路径已经存在则删除
        FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf);
        if (fs.exists(outPath)) {
            fs.delete(outPath, true);
        }
        //根据conf创建一个新的Job对象,代表要提交的作业,作业名为JSubmit.class.getSimpleName()
        Job job = new Job(conf, JSubmit.class.getSimpleName());
        //1.1
        //FileInputFormat类设置要读取的文件路径
        FileInputFormat.setInputPaths(job, inPath);
        //setInputFormatClass设置读取文件时使用的格式化类
        job.setInputFormatClass(TextInputFormat.class);  

        //1.2调用自定义的Mapper类的map方法进行操作
        //设置处理的Mapper类
        job.setMapperClass(JMapper.class);
        //设置Mapper类处理完毕之后输出的键值对 的 数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);  

        //1.3分区,下面的两行代码写和没写都一样,默认的设置
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);
        //1.4排序,分组  

        //1.5归约,这三步都有默认的设置,如果没有特殊的需求可以不管
        //2.1将数据传输到对应的Reducer  

        //2.2使用自定义的Reducer类操作
        //设置Reducer类
        job.setReducerClass(JReducer.class);
        //设置Reducer处理完之后 输出的键值对 的数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);  

        //2.3将结果输出
        //FileOutputFormat设置输出的路径
        FileOutputFormat.setOutputPath(job, outPath);
        //setOutputFormatClass设置输出时的格式化类
        job.setOutputFormatClass(TextOutputFormat.class);  

        //将当前的job对象提交
        job.waitForCompletion(true);
    }  

运行

运行java程序,可以再控制台看到提交作业的提示

在hdfs中查看输出的文件

程序调试

在集群上调试程序是十分困难的,就算采用最常见的println打印信息你也不知道该信息会在集群上哪个节点打印出来

在MapReduce中,可以使用系统错误信息+计数器的组合来进行调试,在map或者reduce函数中,我们可以这样做:

System.out.println("一些错误信息");
context.setStatus("关于错误的一些提示")
context.getCounter(计数器组名一般为枚举类型).increment(1);

在CLI中可以通过以下命令查看计数器:

mapred job -counter jobId counterGroup counterName

counterGroup:计数器组名,一般为枚举类型的全类名

counterName:计数器名,一般为枚举类型的值

性能调优

参考:MapReduce性能调优记录

JobControl

开发MapReduce程序的时候,我们需要考虑如何把需求转换为MapReduce模型来解决问题

对于一些复杂的场景,我们通常是使用多个Job来完成任务,而不是一个非常复杂的单一Job

所以一个完成的任务就可能会有多个Job,一个Job有可能有多个Mapper的情况,参考:

多个Mapper和Reducer处理多个输入

开发技巧

参考:MapReduce开发技巧

更多应用场景

下面是自己在学习过程中收集整理的一些MapReduce场景Demo,可以提供参考和帮助:

GitHub - chubbyjiang/MapReduce: MapReduce Demo

作者:@小黑

时间: 2024-08-30 14:31:17

MapReduce程序开发的相关文章

linux上安装eclipse并配置mapreduce程序开发环境

我们打算在linux(centos)上安装eclipse,并配置好mapreduce程序开发环境. 第一步:下载并安装eclipse(前提是已经安装好JDK) 在linux系统中打开浏览器,输入网址:http://archive.eclipse.org/eclipse/downloads/我们选择3.7.2版本. 下载下来后,文件存在于: [[email protected] Downloads]$ ll total 178052 -rw-rw-r--. 1 liuqingjie liuqing

Hadoop之MapReduce程序开发流程

摘要:MapReduce程序开发流程遵循算法思路.Mapper.Reducer.作业执行的步骤. 关键词:MapReduce 程序   开发流程 对于一个数据处理问题,若需要MapReduce,那么如何设计和实现?MapReduce程序基础模板,包含两个部分,一个是map,一个是reduce.map和reduce的设计取决解决问题的算法思路:而map和reduce的执行需要作业的调度. 因此,MapReduce程序开发可以遵循以下流程. 第一步:清楚问题是什么,确定解决问题的算法思路. 第二步:

基于HBase Hadoop 分布式集群环境下的MapReduce程序开发

HBase分布式集群环境搭建成功后,连续4.5天实验客户端Map/Reduce程序开发,这方面的代码网上多得是,写个测试代码非常容易,可是真正运行起来可说是历经挫折.下面就是我最终调通并让程序在集群上运行起来的一些经验教训. 一.首先说一下我的环境: 1,集群的环境配置请见这篇博文. 2,开发客户机环境:操作系统是CentOS6.5,JDK版本是1.7.0-60,开发工具是Eclipse(原始安装是从google的ADT网站下载的ADT专用开发环境,后来加装了Java企业开发的工具,启动Flas

Hadoop之MapReduce程序应用一

摘要:MapReduce程序处理专利数据集. 关键词:MapReduce程序   专利数据集 数据源:专利引用数据集cite75_99.txt.(该数据集可以从网址http://www.nber.org/patents/下载) 问题描述: 读取专利引用数据集并对它进行倒排.对于每一个专利,找到那些引用它的专利并进行合并.top5输出结果如下: 1                                3964859, 4647229 10000                      

Windows平台开发Mapreduce程序远程调用运行在Hadoop集群—Yarn调度引擎异常

共享原因:虽然用一篇博文写问题感觉有点奢侈,但是搜索百度,相关文章太少了,苦苦探寻日志才找到解决方案. 遇到问题:在windows平台上开发的mapreduce程序,运行迟迟没有结果. Mapreduce程序 public class Test { public static void main(String [] args) throws Exception{ Configuration conf = new Configuration(); conf.set("fs.defaultFS&qu

在Eclipse中开发MapReduce程序

一.Eclipse的安装与设置 1.在Eclipse官网上下载eclipse-jee-oxygen-3a-linux-gtk-x86_64.tar.gz文件并将其拷贝到/home/jun/Resources下,然后再将文件拷贝到/home/jun下并解压. [[email protected] ~]$ cp /home/jun/Resources/eclipse-jee-oxygen-3a-linux-gtk-x86_64.tar.gz /home/jun/ [[email protected]

本地idea开发mapreduce程序提交到远程hadoop集群执行

https://www.codetd.com/article/664330 https://blog.csdn.net/dream_an/article/details/84342770 通过idea开发mapreduce程序并直接run,提交到远程hadoop集群执行mapreduce. 简要流程:本地开发mapreduce程序–>设置yarn 模式 --> 直接本地run–>远程集群执行mapreduce程序: 完整的流程:本地开发mapreduce程序--> 设置yarn模式

使用Eclipse运行Hadoop 2.x MapReduce程序常见问题

1. 当我们编写好MapReduce程序,点击Run on Hadoop的时候,Eclipse控制台输出如下内容: 这个信息告诉我们没有找到log4j.properties文件.如果没有这个文件,程序运行出错的时候,就没有打印日志,因此我们会很难调试. 解决方法:复制$HADOOP_HOME/etc/hadoop/目录下的log4j.properties文件到MapReduce项目 src文件夹下. 2.当执行MapReduce程序的时候,Eclipse可能会报告堆益处的错误. 此时,MapRe

Mapreduce程序运行的多模式

Mapreduce程序可在多种模式下运行: 本地模式: 1)         本地文件,本地处理:将MR的输入输出路径设置为本地路径: 2)         集群文件,本地处理:将MR的输入输出设置为HDFS的路径,job在本地进行处理; 2.集群模式:集群文件,集群处理:将MR的输入输出设置为HDFS的路径,并将Job提交到集群里面(Yarn)处理:其中以集群模式运行的时候还可通过以下几种方式对Job作业进行提交(前提是在集群里面已经启动HDFS以及Yarn): 1)         在Ecl