MapReduce编程实践

一、MapReduce编程思想

学些MapRedcue主要是学习它的编程思想,在MR的编程模型中,主要思想是把对数据的运算流程分成map和reduce两个阶段:

Map阶段:读取原始数据,形成key-value数据(map方法)

Reduce阶段:把map阶段的key-value数据按照相同的key进行分组聚合(reduce方法)

它其实是一种数据逻辑运算模型,对于这样的运算模型,有一些成熟的具体软件实现,比如hadoop中的mapreduce框架、spark等,例如在hadoop的mr框架中,对map阶段的具体实现是map task,对reduce阶段的实现是reduce task。这些框架已经为我们提供了一些通用功能的实现,让我们专注于数据处理的逻辑,而不考虑分布式的具体实现,比如读取文件、写文件、数据分发等。我们要做的工作就是在这些编程框架下,来实现我们的具体需求。

下面我们先介绍一些map task和reduce task中的一些具体实现:

二、MapTask和ReduceTask

2.1 Map Task

读数据:利用InputFormat组件完成数据的读取。

    InputFormat-->TextInputFormat 读取文本文件的具体实现

            -->SequenceFileInputFormat 读取Sequence文件

            -->DBInputFormat 读数据库

处理数据:这一阶段将读取到的数据按照规则进行处理,生成key-value形式的结果。maptask通过调用用Mapper类的map方法实现对数据的处理。

分区:这一阶段主要是把map阶段产生的key-value数据进行分区,以分发给不同的reduce task来处理,使用的是Partitioner类。maptask通过调用Partitioner类的getPartition()方法来决定如何划分数据给不同的reduce task。

排序:这一阶段,对key-value数据做排序。maptask会按照key对数据进行排序,排序时调用key.compareTo()方法来实现对key-value数据排序。

2.2 Reduce Task

读数据:这一阶段通过http方式从maptask产生的数据文件中下载属于自己的“区”的数据。由于一个区的数据可能来自多个maptask,所以reduce还要把这些分散的数据进行合并(归并排序)

处理数据:一个reduce task中,处理刚才下载到自己本地的数据。通过调用GroupingComparator的compare()方法来判断文件中的哪些key-value属于同一组。然后将这一组数传给Reducer类的reduce()方法聚合一次。

输出结果:调用OutputFormat组件将结果key-value数据写出去。

    Outputformat --> TextOutputFormat 写文本文件(会把一个key-value对写一行,分隔符为制表符\t

          --> SequenceFileOutputFormat 写Sequence文件(直接将key-value对象序列化到文件中)

          --> DBOutputFormat

下面介绍下利用MapReduce框架下的一般编程过程。我们要做的 工作就是把我们对数据的处理逻辑加入到框架的业务逻辑中。我们编写的MapReduce的job客户端主要包括三个部分,Mapper 、 Reducer和JobSubmitter,三个部分分别完成MR程序的map逻辑、reduce逻辑以及将我们编写的job程序提交给集群。下面分别介绍这三个部分如何实现。

三、Hadoop中MapReduce框架下的一般编程步骤

Mapper:创建类,该类要实现Mapper父类,复写read()方法,在方法内实现当前工程中的map逻辑。

Reducer:创建类,继承Reducer父类,复写reduce()方法,方法内实现当前工程中的reduce逻辑。

jobSubmitter:这是job在集群上实际运行的类,主要是通过main方法,封装job相关参数,并把job提交。jobsubmitter内一般包括以下操作

step1:创建Configuration对象,并通过创建的对象对集群进行配置,同时支持用户自定义一些变量并配置。这一步有些像我们集群搭建的时候对$haoop_home/etc/hadoop/*下的一些文件进行的配置。

step2:获得job对象,并通过job对象对我们job运行进行一些配置。例如,设置集群运行的jar文件、设置实际执行map和reduce的类等,下面列出一些必要设置和可选设置。

        Configuration conf = new Configuration(); //创建集群配置对象。
        Job job = Job.getInstance(conf);//根据配置对象获取一个job客户端实例。
        job.setJarByClass(JobSubmitter.class);//设置集群上job执行的类
        job.setMapperClass(FlowCountMapper.class);//设置job执行时使用的Mapper类
        job.setReducerClass(FlowCountReducer.class);//设置job执行时使用的Reducer类

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path("I:\\hadooptest\\input"));
        FileOutputFormat.setOutputPath(job, new Path("I:\\hadooptest\\output_pri"));

        //设置maptask做数据分发时使用的分发逻辑类,如果不指定,默认使用hashpar
        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(4);//自定义的分发逻辑下,可能产生n个分区,所以reducetask的数量需要是n

        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0:-1);

一般实践中,可以定义一个类,其中添加main方法对job进行提交,并在其中定义静态内部类maper和reduce类。

四、MapReduce框架中的可自定义项

遇到一些复杂的需求,需要我们自定义实现一些组件

2.1 自定义排序规则

2.2 自定义序列化数据类型

五、MR程序的调试、执行方式

5.1 提交到linux运行

5.2 Win本地执行

原文地址:https://www.cnblogs.com/Jing-Wang/p/10886890.html

时间: 2024-08-27 18:09:23

MapReduce编程实践的相关文章

Hadoop 实践(二) Mapreduce 编程

Mapreduce 编程,本文以WordCount  为例:实现文件字符统计 在eclipse 里面搭建一个java项目,引入hadoop lib目录下的jar,和 hadoop主目录下的jar. 新建WordCount 类: package org.scf.wordcount; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.co

MapReduce 编程 系列五 MapReduce 主要过程梳理

前面4篇文章介绍了如何编写一个简单的日志提取程序,读取HDFS share/logs目录下的所有csv日志文件,然后提取数据后,最终输出到share/output目录下. 本篇停留一下,梳理一下主要过程,然后提出新的改进目标. 首先声明一下,所有的代码都是maven工程的,没有使用任何IDE.  这是我一贯的编程风格,用Emacs + JDEE开发.需要使用IDE的只需要学习如何在IDE中使用maven即可. 可比较的序列化 第一个是序列化,这是各种编程技术中常用的.MapReduce的特别之处

Storm实时计算:流操作入门编程实践

转自:http://shiyanjun.cn/archives/977.html Storm实时计算:流操作入门编程实践 Storm是一个分布式是实时计算系统,它设计了一种对流和计算的抽象,概念比较简单,实际编程开发起来相对容易.下面,简单介绍编程实践过程中需要理解的Storm中的几个概念: Topology Storm中Topology的概念类似于Hadoop中的MapReduce Job,是一个用来编排.容纳一组计算逻辑组件(Spout.Bolt)的对象(Hadoop MapReduce中一

MapReduce编程模型及其在Hadoop上的实现

转自:https://www.zybuluo.com/frank-shaw/note/206604 MapReduce基本过程 关于MapReduce中数据流的传输过程,下图是一个经典演示:  关于上图,可以做出以下逐步分析: 输入数据(待处理)首先会被切割分片,每一个分片都会复制多份到HDFS中.上图默认的是分片已经存在于HDFS中. Hadoop会在存储有输入数据分片(HDFS中的数据)的节点上运行map任务,可以获得最佳性能(数据TaskTracker优化,节省带宽). 在运行完map任务

MapReduce编程实战之“高级特性”

本篇介绍MapReduce的一些高级特性,如计数器.数据集的排序和连接.计数器是一种收集作业统计信息的有效手段,排序是MapReduce的核心技术,MapReduce也能够执行大型数据集间的""连接(join)操作. 计数器 计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计.计数器还可用于辅助诊断系统故障.对于大型分布式系统来说,获取计数器比分析日志文件容易的多. 示例一:气温缺失及不规则数据计数器 import java.io.IOException; import

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter; import java.net.URI; import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Co

Linux shell一行流编程实践

Linux下很多命令用起来真相当方便,尤其是进行批处理操作时.(话说感觉这种程序也不复杂,windows咋一直不搞一个好用的shell呢) 这里列出一些常用shell操作的应用,具体命令的用法与解释就不列了,网上有很多很好的教程. 批量重命名 假如当前目录下有若干.wma文件,我希望把它们批量转成.mp3文件 例: 001.wma -> 001.mp3 解决方案: awk ? 1 ls * | awk -F '.' '{print "mv "$0" "$1&q

Socket编程实践(10) --select的限制与poll的使用

select的限制 用select实现的并发服务器,能达到的并发数一般受两方面限制: 1)一个进程能打开的最大文件描述符限制.这可以通过调整内核参数.可以通过ulimit -n(number)来调整或者使用setrlimit函数设置,但一个系统所能打开的最大数也是有限的,跟内存大小有关,可以通过cat /proc/sys/fs/file-max 查看 /**示例: getrlimit/setrlimit获取/设置进程打开文件数目**/ int main() { struct rlimit rl;

试读《JavaScript语言精髓与编程实践》

有幸看到iteye的活动,有幸读到<JavaScript语言精髓与编程实践_第2版>的试读版本,希望更有幸能完整的读到此书. 说来读这本书的冲动,来得很诡异,写一篇读后感,赢一本书,其实奖励并不大,依靠纯粹的物质奖励,很显然,不会强烈的促使我去读这本书.而原因在于,一方面对javascript的极大兴趣,另一方面之前已经拜读过如<javascript高级程序设计><高性能javascript><javascript设计模式>等书,那我就有了要看看这本书都写了