MapReduce 2.x 编程 系列二 Map阶段实现

现在想从日志中提取数据,日志文件如下:

2014-05-10 13:36:40,140307000287,536dbacc4700aab274729cca,login
2014-05-10 13:37:46,140310000378,536dbae74700aab274729ccb,login
2014-05-10 13:39:20,140310000382,536dbb284700aab274729ccd,login
2014-05-10 13:39:31,140331001080,536dbb864700aab274729ccf,login
2014-05-10 13:39:45,140331001105,536dbba04700aab274729cd4,login
2014-05-10 13:39:45,140328000969,536dbba04700aab274729ce4,login
2014-05-10 13:39:45,140408001251,536dbba04700aab274729cd8,login
2014-05-10 13:39:45,140328000991,536dbba04700aab274729ce9,login
2014-05-10 13:39:45,140324000633,536dbba14700aab274729cf5,login
2014-05-10 13:39:45,140331001077,536dbba04700aab274729cdd,login
2014-05-10 13:39:45,140408001242,536dbba04700aab274729cd7,login
2014-05-10 13:39:45,140327000941,536dbba14700aab274729cf1,login
2014-05-10 13:39:45,140408001265,536dbba04700aab274729ce5,login
2014-05-10 13:39:45,140324000673,536dbba04700aab274729cd3,login
2014-05-10 13:39:45,140331001066,536dbba04700aab274729cd5,login
2014-05-10 13:39:45,140408001292,536dbba14700aab274729cee,login
2014-05-10 13:39:45,140328000966,536dbba14700aab274729cec,login
2014-05-10 13:39:45,140312000501,536dbba04700aab274729ce1,login
2014-05-10 13:39:45,140306000216,536dbba14700aab274729d02,login
2014-05-10 13:39:45,140327000856,536dbba04700aab274729ce2,login
2014-05-10 13:39:46,140328000985,536dbba14700aab274729cf7,login
2014-05-10 13:39:46,140306000245,536dbba14700aab274729d0d,login
2014-05-10 13:39:46,140326000797,536dbba14700aab274729cf6,login
2014-05-10 13:39:46,140328000993,536dbba14700aab274729d12,login
2014-05-10 13:39:46,140331001115,536dbba14700aab274729d10,login
2014-05-10 13:39:46,140325000744,536dbba04700aab274729ce0,login
2014-05-10 13:39:46,140328000982,536dbba14700aab274729d0a,login
2014-05-10 13:39:46,140331001063,536dbba04700aab274729ce3,login
2014-05-10 13:39:46,140331001067,536dbba14700aab274729d1c,login
2014-05-10 13:39:46,140401001157,536dbba04700aab274729ce8,login
2014-05-10 13:39:46,140408001216,536dbba14700aab274729cef,login
2014-05-10 13:39:46,140401001174,536dbba14700aab274729d27,login
2014-05-10 13:39:46,140306000215,536dbba04700aab274729cde,login
2014-05-10 13:39:46,140331001064,536dbba04700aab274729cdc,login
2014-05-10 13:39:46,140326000825,536dbba04700aab274729cd9,login
2014-05-10 13:39:46,140408001294,536dbba14700aab274729d0f,login

我希望将login前面的设备ID取出来,进行数量的统计,最后得到结果:

各个设备的累计登录次数

536dbba04700aab274729cdc 5
536dbba04700aab274729ce3 4

好,创建一个LogMapper类,该类负责做数据的Map,前两各模板参数用于KeyIn和ValueIn, 后两个模板参数用于KeyOut和ValueOut,都是代表类型。

假定一个<KeyIn, ValueIn>组成一个pair,输入的很多pair在一个组里面, 这些pair被一定的算法Map之后,会变成很多组pair。

官方文档:http://hadoop.apache.org/docs/r2.4.1/api/org/apache/hadoop/mapreduce/Mapper.html

Maps input key/value pairs to a set of intermediate key/value pairs.

注意,这里的Mapper类用的包是mapreduce,以前有一个老的叫mapred。

这里介绍了两者的区别:

http://stackoverflow.com/questions/7598422/is-it-better-to-use-the-mapred-or-the-mapreduce-package-to-create-a-hadoop-job

LongWritable和IntWritable是两个类,用于帮助创建可以Long和Int类型的变量。它们能够帮助将Long和Int的值序列化成字节流,因此都有两个关键方法读入和写出:

 void readFields(DataInput in) 
          Deserialize the fields of this object from in.
   
 void write(DataOutput out) 
          Serialize the fields of this object to out.

这个和Hadoop内部RPC调用时采用的序列化算法有关。

我的Mapper代码:

package org.freebird.mapper;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * Describe class LogMapper here.
 *
 *
 * Created: Wed Sep 24 15:14:17 2014
 *
 * @author <a href="mailto:[email protected]"></a>
 * @version 1.0
 */
public class LogMapper
    extends Mapper<LongWritable, Text, Text, IntWritable> {

    private final static IntWritable ONE = new IntWritable(1);

    public void map(Object key, Text value, org.apache.hadoop.mapreduce.Mapper.Context ctx) throws IOException, InterruptedException {
        String[] line = value.toString().split(",");
        if (line.length == 4) {
            String dId = line[2];
            ctx.write(new Text(dId), ONE);
        }
    }
}

这个Mapper的子类覆盖了map函数,将字符串用,号拆开后,取出第三个元素作为设备ID, 然后作为key写入context对象。

这里value设置为1, 因为后面reduce阶段会简单的求和。

Context类文档参考: http://hadoop.apache.org/docs/r1.1.1/api/org/apache/hadoop/mapreduce/Mapper.Context.html

write方法不是一般概念的hasmap添加key,value,而是生成一个新的pair对象,里面包含了key和value。 如果多个key相同,也会产生多个pair对象,交给reduce阶段处理。

基本流程是, 框架会使用FileInputFormat读取文件,默认会根据文件大小的进行记录拆分,这里拆分器叫做InputSplitter。通过InputSplitter将文件拆成若干块,后面也就有若干个mapper与之对应。

InputSplitter里面使用RecordReader对文件快的记录进行读取,生成key/value的pair,调用mapper的map函数去处理。

当然这些流程中有些可以定制,比如InputSplitter的算法可以修改,RecordReader也是可以定制。

而且还有一个非常有效的方法,可以避免mapper将过多的数据传递给reducer。

比如前面的例子都是1, 其实可以先用一个HashMap对key做分组,有则value加1, 无则添加到HashMap中。

最后将分组统计后的key/value数据通过context.write方法发送给reducer,能够大大提高效率。

时间: 2024-12-09 17:09:05

MapReduce 2.x 编程 系列二 Map阶段实现的相关文章

MapReduce 编程 系列十一 Map阶段的调优

MapOutputBuffer 对于每一个Map,都有一个内存buffer用来缓存中间结果,这不仅可以缓存,而且还可以用来排序,被称为MapOutputBuffer, 设置这个buffer大小的配置是 io.sort.mb 默认值是100MB. 一般当buffer被使用到一定比例,就会将Map的中间结果往磁盘上写,这个比例的配置是: io.sort.spill.percent 默认值是80%或者0.8. 在内存中排序缓存的过程叫做sort,而当超过上面的比例在磁盘上写入中间结果的过程称之为spi

MapReduce 1.x 编程 系列三 Reduce阶段实现

Reduce代码就是做加和统计, package org.freebird.reducer; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Reducer.Context; import org.apache.hadoop.mapreduce.Reducer; public class LogReducer<Key> extends

swoft| 源码解读系列二: 启动阶段, swoft 都干了些啥?

date: 2018-8-01 14:22:17title: swoft| 源码解读系列二: 启动阶段, swoft 都干了些啥?description: 阅读 sowft 框架源码, 了解 sowft 启动阶段的那些事儿 小伙伴刚接触 swoft 的时候会感觉 压力有点大, 更直观的说法是 难. 开发组是不赞成 难 这个说法的, swoft 的代码都是 php 实现的, 而 php 又是 世界上最好的语言, swoft 的代码阅读起来是很轻松的. 之后开发组会用 系列源码 解读文章, 深入解析

MapReduce 1.x 编程 系列一 搭建基本的Maven工程

这是一个maven 工程,安装完mvn 3.2.2后, mvn --version Apache Maven 3.2.3 (33f8c3e1027c3ddde99d3cdebad2656a31e8fdf4; 2014-08-12T04:58:10+08:00) Maven home: /opt/apache-maven-3.2.3 Java version: 1.7.0_09, vendor: Oracle Corporation Java home: /data/hadoop/data1/us

学习ASP.NET Core Razor 编程系列五——Asp.Net Core Razor新建模板页面

学习ASP.NET Core Razor 编程系列目录 学习ASP.NET Core Razor 编程系列一 学习ASP.NET Core Razor 编程系列二——添加一个实体 学习ASP.NET Core Razor 编程系列三——创建数据表及创建项目基本页面 学习ASP.NET Core Razor 编程系列四——Asp.Net Core Razor列表模板页面 上一篇文章中我们学习了列表页面的结构,@page与@model两个关键Razor指令,以及页面布局应该修改哪里.这一篇文章我们来

学习ASP.NET Core Razor 编程系列七——修改列表页面

学习ASP.NET Core Razor 编程系列目录 学习ASP.NET Core Razor 编程系列一 学习ASP.NET Core Razor 编程系列二——添加一个实体 学习ASP.NET Core Razor 编程系列三——创建数据表及创建项目基本页面 学习ASP.NET Core Razor 编程系列四——Asp.Net Core Razor列表模板页面 学习ASP.NET Core Razor 编程系列五——Asp.Net Core Razor新建模板页面 学习ASP.NET C

学习ASP.NET Core Razor 编程系列九——增加查询功能

学习ASP.NET Core Razor 编程系列目录 学习ASP.NET Core Razor 编程系列一 学习ASP.NET Core Razor 编程系列二——添加一个实体 学习ASP.NET Core Razor 编程系列三——创建数据表及创建项目基本页面 学习ASP.NET Core Razor 编程系列四——Asp.Net Core Razor列表模板页面 学习ASP.NET Core Razor 编程系列五——Asp.Net Core Razor新建模板页面 学习ASP.NET C

学习ASP.NET Core Razor 编程系列十——添加新字段

学习ASP.NET Core Razor 编程系列目录 学习ASP.NET Core Razor 编程系列一 学习ASP.NET Core Razor 编程系列二——添加一个实体 学习ASP.NET Core Razor 编程系列三——创建数据表及创建项目基本页面 学习ASP.NET Core Razor 编程系列四——Asp.Net Core Razor列表模板页面 学习ASP.NET Core Razor 编程系列五——Asp.Net Core Razor新建模板页面 学习ASP.NET C

MapReduce 编程 系列十二 Reduce阶段内部细节和调节参数

Reduce计算分为若干阶段 1. copy(或者叫shuffle)阶段和merge阶段并行 之前Map产生的结果被存放在本地磁盘上,这时需要从reduce节点将数据从map节点复制过来.放得下进内存,比较大的则写到本地磁盘. 同时,有两个线程对已经获得的内存中和磁盘上的数据进行merge操作. 具体细节是: 通过RPC调用询问task tracker已经完成的map task列表,shuffle(洗牌)是对所有的task tracker host的洗牌操作,这样可以打乱copy数据的顺序,防止