MapReduce编写的正确姿势

  • 先看一下目录结构

这里是job接口,负责参数的传递和定时的调用

下面的图是MR 程序相关的目录图片,其中MR的入口程序负责读取数据,并指定对应的Map、Reduce程序。

  • 程序的流程

首先简单的说一下,整体的流程:

  1. 首先是一个Job(定时任务),定时调用 入口程序,拼装参数。
  2. job调用 MR的入口程序,入口程序获得job传入的参数,根据参数获得所需的数据;可以去Hbase、mysql、HDFS中获取数据;这个文件会配置job名、要调用的Mapper、Reduce,添加需要的jar包
  3. 数据传入Mapper程序,Mapper程序进行数据的整合,整合完成的数据会变成:key:{value1,value2,value3,...}这个样子,所以传入Reduce的value应该是一个可迭代的参数(在这里坑了一会)。Mapper将所有的数据整合完成后,会进入Reduce程序
  4. Reduce程序,接受参数,参数类型要和Mapper的返回类型是一样的;values 的入参是 一个可迭代的类型,泛型必须与Mapper的value返回类型一致。然后根据需求进行处理。
  • 上代码

Job程序:

package com.sky.cy.mapreduce.dispatch;

import com.sky.cy.mapreduce.util.BasicMapreduceJob;
import com.ssports.util.LogFormat;
import com.ssports.util.ToolUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * @描述:
 * @文件名: UserActionRecordStatistJob
 * @创建人: YangLianjun
 * @创建时间: 2019/4/2 9:58
 * @修改人:
 * @修改备注: Copyright 北京和信金谷科技有限公司 2019/4/2
 */
@Component
public class UserActionRecordStatistJob extends BasicMapreduceJob {
    public static Log log = LogFactory.getLog(UserActionRecordStatistJob.class);
    @Override
    public String[] preProcessJob(String... args) {
        log.info(LogFormat.formatMsg("UserActionRecordStatistJob.preProcessJob","","UserActionRecordStatistJob is start"));
        String statTimeStr = "";
        if (args != null && args.length == 1) {
            statTimeStr = args[0];
        } else {
            Date date = ToolUtil.addDay(new Date(), -1);
            String statTime = ToolUtil.getDateStr(date, "yyyyMMdd");
//            String statTime = "20190401";
            log.info(LogFormat.formatMsg("UserActionRecordStatistJob.preProcessJob","statTime:"+statTime,""));
            statTimeStr = statTime;
        }

        String[] retArgs = new String[2];
        // 这个参数是指定哪个 MR入口程序
        retArgs[0] = "com.sky.cy.mapreduce.job.UserActionRecordStatistMR" ;
        retArgs[1] = statTimeStr;
        return retArgs;
    }
}

MR入口程序,负责数据的读取,指定对应的Map、Reduce程序:

package com.sky.cy.mapreduce.job;

import com.sky.cy.mapreduce.job.mapper.UserActionRecordMapper;
import com.sky.cy.mapreduce.job.mapper.UserActionRecordStatistMapper;
import com.sky.cy.mapreduce.job.reducer.UserActionRecordReducer;
import com.sky.cy.mapreduce.job.reducer.UserActionRecordStatistReducer;
import com.sky.cy.mapreduce.util.BasicMapreduce;
import com.ssports.util.LogFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

import java.io.IOException;

/**
 * @描述:
 * @文件名: UserActionRecordStatistMR
 * @创建人: YangLianjun
 * @创建时间: 2019/4/2 14:47
 * @修改人:
 * @修改备注: Copyright 北京和信金谷科技有限公司 2019/4/2
 */
public class UserActionRecordStatistMR extends BasicMapreduce {
    public static final Log LOG = LogFactory.getLog(UserActionRecordStatistMR.class);

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        LOG.info(LogFormat.formatMsg("UserActionRecordStatistMR.main","","start ..."));
        init();
        String statTime = args[0];
        // 将日期作为查询条件
        Filter filter = new SingleColumnValueFilter(Bytes.toBytes("f"),Bytes.toBytes("stat_time"), CompareFilter.CompareOp.EQUAL,Bytes.toBytes(statTime)) ;
        Scan scan = new Scan();
        scan.setFilter(filter);

        Configuration configuration = defaultHbaseConfiguration();
        configuration.set("statTime",statTime);

        Job job = Job.getInstance(configuration, "UserActionRecordStatistMR");

        TableMapReduceUtil.initTableMapperJob(UserActionRecordStatistMR.Constants.HBASE_FILTER_TABLE, scan, UserActionRecordStatistMapper.class, Text.class, Text.class, job);
        job.setReducerClass(UserActionRecordStatistReducer.class);
        job.setNumReduceTasks(1);
        job.setOutputFormatClass(MultiTableOutputFormat.class);
        //添加mysql驱动包
        job.addFileToClassPath(new Path(UserActionMR.Constants.MYSQL_JAR_PATH));
        //添加spring的jar
        job.addFileToClassPath(new Path(ActionFilterStatistMR.Constants.SPRING_CONTEXT_JAR_PATH));
        job.addFileToClassPath(new Path(ActionFilterStatistMR.Constants.SPRING_TX_JAR_PATH));
        job.addFileToClassPath(new Path(ActionFilterStatistMR.Constants.SPRING_AOP_JAR_PATH));
        job.addFileToClassPath(new Path(ActionFilterStatistMR.Constants.SPRING_BEAN_HADOOP_PATH));
        job.addFileToClassPath(new Path(ActionFilterStatistMR.Constants.SPRING_DATA_HADOOP_PATH));

        job.waitForCompletion(true);

    }

    public static class Constants {
        //需要获取的hbase数据的表名
        public static final String HBASE_FILTER_TABLE = "sky_user_action_record";
    }

}

Map程序:

 1 package com.sky.cy.mapreduce.job.mapper;
 2
 3 import com.sky.cy.mapreduce.util.HbaseUtil;
 4 import com.sky.cy.mapreduce.util.RegexUtil;
 5 import com.ssports.util.LogFormat;
 6 import com.ssports.util.SpringHelper;
 7 import com.ssports.util.ToolUtil;
 8 import org.apache.commons.logging.Log;
 9 import org.apache.commons.logging.LogFactory;
10 import org.apache.hadoop.hbase.client.Result;
11 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12 import org.apache.hadoop.hbase.mapreduce.TableMapper;
13 import org.apache.hadoop.io.Text;
14
15 import java.io.IOException;
16
17
18 /**
19  * @描述:
20  * @文件名: UserActionRecordStatistMapper
21  * @创建人: YangLianjun
22  * @创建时间: 2019/4/2 14:46
23  * @修改人:
24  * @修改备注: Copyright 北京和信金谷科技有限公司 2019/4/2
25  */
26 public class UserActionRecordStatistMapper extends TableMapper<Text, Text> {
27     private static final Log log = LogFactory.getLog(UserActionRecordStatistMapper.class);
28     public static final String FAMILY_F = "f";
29
30
31
32
33     protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
34         log.info(LogFormat.formatMsg("UserActionRecordStatistMapper.map", "", "mapper start ..."));
35         String userId = HbaseUtil.getValue(FAMILY_F, "user_id", result);
36         String actionName = HbaseUtil.getValue(FAMILY_F,"action_name",result) ;
37         String actionId = HbaseUtil.getValue(FAMILY_F,"action_id",result) ;
38         String statTime = context.getConfiguration().get("statTime");
39         String keyOut = userId + ":" + statTime;
40         log.info(LogFormat.formatMsg("UserActionRecordStatistMapper.map", "keyOut:"+keyOut, ""));
41         Text text = new Text(keyOut);
42         String valueOut = actionId + ":"+actionName;
43         log.info(LogFormat.formatMsg("UserActionRecordStatistMapper.map", "valueOut:"+valueOut, ""));
44         Text value = new Text(valueOut);
45         context.write(text, value);
46         log.info(LogFormat.formatMsg("UserActionRecordStatistMapper.map", "", "mapper end ..."));
47     }
48 }

Reduce程序:

package com.sky.cy.mapreduce.job.reducer;

import com.sky.cy.log.base.dao.SkyUserActionRecordStatistMapper;
import com.ssports.util.LogFormat;
import com.ssports.util.SpringHelper;
import com.ssports.util.ToolUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;
import java.util.*;

/**
 * @描述:
 * @文件名: UserActionRecordStatistReducer
 * @创建人: YangLianjun
 * @创建时间: 2019/4/2 14:47
 * @修改人:
 * @修改备注: Copyright 北京和信金谷科技有限公司 2019/4/2
 */
public class UserActionRecordStatistReducer extends TableReducer<Text, Text, NullWritable> {
    private static final Log log = LogFactory.getLog(UserActionRecordStatistReducer.class);
    private static final Integer ALL = 1 ;  // 统计状态,全部统计

    protected void setup(Context context) throws IOException, InterruptedException {
        SpringHelper.init("classpath*:spring/*.xml");
    }

    protected void reduce(Text key, Iterable<Text> values, Context context){
        log.info(LogFormat.formatMsg("UserActionRecordStatistReducer.reduce","","start ..."));
        String keyRow = new String(key.getBytes());
        String[] keys = keyRow.split(":");
        String userId = keys[0] ;
        log.info(LogFormat.formatMsg("UserActionRecordStatistReducer.reduce","userId:"+userId,""));
        String statTime = keys[1] ;
        log.info(LogFormat.formatMsg("UserActionRecordStatistReducer.reduce", "key:"+keyRow, ""));
        List<String> actionList = new ArrayList<>();
        for (Text value : values) {
            log.info(LogFormat.formatMsg("UserActionRecordStatistReducer.reduce", "value:"+value.toString(), ""));
            actionList.add(value.toString()) ;
        }
        SkyUserActionRecordStatistMapper mapper = SpringHelper.getBean("skyUserActionRecordStatistMapper") ;
        com.sky.cy.log.base.bean.SkyUserActionRecordStatistEntity entity = new com.sky.cy.log.base.bean.SkyUserActionRecordStatistEntity() ;

        Set<String> uniqueSet = new HashSet<>(actionList) ;
        for (String s : uniqueSet) {
            String actionId = s.split(":")[0] ;
            log.info(LogFormat.formatMsg("UserActionRecordStatistReducer.reduce","actionId:"+actionId,""));
            String actionName = s.split(":")[1] ;
            int actionNumber = Collections.frequency(actionList, s) ;  //统计出来数量
            entity = mapper.selectTotalByIdAndType(userId,actionId,ALL) ;  //查询全部统计 的信息
            if (null == entity){ //不存在 这个信息,插入
                log.info(LogFormat.formatMsg("UserActionRecordStatistReducer.reduce","","insert start ..."));
                com.sky.cy.log.base.bean.SkyUserActionRecordStatistEntity entity1 = new com.sky.cy.log.base.bean.SkyUserActionRecordStatistEntity() ;
                entity1.setUserId(userId);
                entity1.setActionId(actionId);
                entity1.setActionName(actionName);
                entity1.setStatistTotal(actionNumber);
                entity1.setStatistTime(statTime);
                entity1.setStatistType(ALL);
                int insert = mapper.insertSelective(entity1) ;
                log.info(LogFormat.formatMsg("UserActionRecordStatistReducer.reduce","insert number:"+insert,""));
            }else { //存在,进行更新
                log.info(LogFormat.formatMsg("UserActionRecordStatistReducer.reduce","","update start..."));
                int update = mapper.updateTotalAndDays(userId,actionId,actionNumber,ALL) ;  //更新
                log.info(LogFormat.formatMsg("UserActionRecordStatistReducer.reduce","update number:"+update,"update end..."));
            }
        }

    }
}

说明:

利用集群进行整合、计算、归纳,本身是一个特别复杂的事情,Hadoop中的MR框架可以让我们从复杂的操作中解脱出来,只关注于逻辑本身,无疑是程序员的福音。只要理解了MR的流程和基本的运作原理,就可以像写java程序那样简单的对数据进行处理,但是却比单机的java程序效率高得多。当然既然使用到了MR,数据量应该是巨大的,如果只是对单机mysql中的数据进行统计与计算,建议还是使用普通的方式,毕竟最适合自己才是最好的!!!

原文地址:https://www.cnblogs.com/theskying/p/10658812.html

时间: 2024-11-04 02:57:42

MapReduce编写的正确姿势的相关文章

使用layoutinflater的正确姿势

使用layoutinflater的正确姿势 一开始接触安卓开发的时候,知道layoutinflater是用来将布局文件生成对应的View.那时候还是懵懵懂懂知道需要传递一个layoutId一个parent参数和一个false参数.那时候就这样用,初初还是好好的.直到后来随着进一步学习安卓开发发现layoutinflater的这两个参数是有大大的门道在里面. 然后这一篇博客可以说是我对layoutinflater使用的一个总结. 怎么添加一个View到ViewGroup? 在讨论怎么使用layou

Dagger2 使用正确姿势。

Dagger2 使用正确姿势. 上一篇文章<Dagger2 这次入门就不用放弃了>中介绍了Dagger2的一些显浅的使用方式,我觉得是非常适合入门Dagger2的傻瓜式讲解,后来发现有些内容讲的不够仔细,有些细节没有详细解释清楚.参考了以下三篇文章后,对之前的内容进行一些补充. Android:dagger2让你爱不释手-基础依赖注入框架篇 Android:dagger2让你爱不释手-重点概念讲解.融合篇 Android:dagger2让你爱不释手-终结篇 以上这三篇文章对于Dagger2的思

开发函数计算的正确姿势 —— 使用 Fun Local 本地运行与调试

前言首先介绍下在本文出现的几个比较重要的概念: 函数计算(Function Compute): 函数计算是一个事件驱动的服务,通过函数计算,用户无需管理服务器等运行情况,只需编写代码并上传.函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而用户只需根据实际代码运行所消耗的资源进行付费.函数计算更多信息 参考. Fun: Fun 是一个用于支持 Serverless 应用部署的工具,能帮助您便捷地管理函数计算.API 网关.日志服务等资源.它通过一个资源配置文件(template.yml),

开发函数计算的正确姿势 —— 爬虫

在 <函数计算本地运行与调试 - Fun Local 基本用法> 中,我们介绍了利用 Fun Local 本地运行.调试函数的方法.但如果仅仅这样简单的介绍,并不能展现 Fun Local 对函数计算开发的巨大效率的提升. 这一次,我们拿一个简单的场景来举例子--开发一个简单的爬虫函数(代码参考函数计算控制台模板),介绍如何以正确姿势,从零开始,开发一个自动伸缩.按调用次数收费的 serverless 爬虫应用. 开发步骤我们将这个完整的应用拆分成多步,并且在每一步完成后,我们都会进行相应的运

Git 提交的正确姿势

Git 提交的正确姿势:Commit message 编写指南 SCOP范围 middleware core config plugin test type范围 Git 每次提交代码,都要写 Commit message(提交说明),否则就不允许提交. $ git commit -m "hello world" 上面代码的-m参数,就是用来指定 commit mesage 的. 如果一行不够,可以只执行git commit,就会跳出文本编译器,让你写多行. $ git commit 基

揭秘“撩”大数据的正确姿势:生动示例解说大数据“三驾马车”

我是我:"缘起于美丽,相识于邂逅,厮守到白头!" 众听众:"呃,难道今天是要分享如何作诗?!" 我是我:"大家不要误会,今天主要的分享不是如何作诗,而是<揭秘:'撩'大数据的正确姿势>,下面进入正题." 话说当下技术圈的朋友,一起聚个会聊个天,如果不会点大数据的知识,感觉都融入不了圈子,为了以后聚会时让你有聊有料,接下来就跟随我的讲述,一起与大数据混个脸熟吧,不过在"撩"大数据之前,还是先揭秘一下研发这些年我们都经

开发函数计算的正确姿势——轻松解决大依赖部署

<a name="1"></a> 前言 首先介绍下在本文出现的几个比较重要的概念: 函数计算(Function Compute): 函数计算是一个事件驱动的服务,通过函数计算,用户无需管理服务器等运行情况,只需编写代码并上传.函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而用户只需根据实际代码运行所消耗的资源进行付费.函数计算更多信息 参考.Fun: Fun 是一个用于支持 Serverless 应用部署的工具,能帮助您便捷地管理函数计算.API 网关.

程序员取悦女朋友的正确姿势---Tips(iOS美容篇)

前言 女孩子都喜欢用美图工具进行图片美容,近来无事时,特意为某人写了个自定义图片滤镜生成器,安装到手机即可完成自定义滤镜渲染照片.app独一无二,虽简亦繁. JH定律:魔镜:最漂亮的女人是你老婆魔镜:程序员不是木头人 核心技术 图片滤镜核心技术的基本思路如下: 核心技术流程 具体流程 1.创建一个图像处理工具类 注:该类实例包括一个图像处理方法,该方法在传入原始图像和一个颜色矩阵后生成一个处理好的图像. @interface JHFeilterManager : NSObject @proper

docker centos rpm离线安装1.8.2及pull的正确姿势

1.离线安装 本次只针对1.8.2版本,所需要的rpm包如下 docker-engine-1.8.2-1.el7.centos.x86_64.rpm(已上传51cto) libcgroup-0.41-8.el7.x86_64.rpm libcgroup-devel-0.41-8.el7.x86_64.rpm libcgroup-pam-0.41-8.el7.x86_64.rpm libcgroup-tools-0.41-8.el7.x86_64.rpm(libcgroup请参考开源镜像库) 简单