Hadoop通过HCatalog编写Mapreduce任务

1、dirver

package com.kangaroo.hadoop.drive;

import java.util.Map;
import java.util.Properties;

import com.kangaroo.hadoop.mapper.AggregateMapper;
import com.kangaroo.hadoop.reducer.AggregateReducer;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.kangaroo.hadoop.utils.PropertiesUtil;

public class DriveMain extends Configured implements Tool {

    private static final Logger logger = LoggerFactory.getLogger(DriveMain.class);
    private Configuration conf;
    private PropertiesUtil propUtil;

    public DriveMain() {
        this.conf = new Configuration();
        this.propUtil = new PropertiesUtil("configure.properties");
    }

    public int run(String[] args) throws Exception {
        try {
            logger.info("MapReduce Job Beginning.");
            String dbName = args[0];
            String tableName = args[1];
            String partition = args[2];
            String sumField = args[3];
            String outPath = args[4];
            String partFilter = partitionFormat(partition);
            logger.info("[Params] dbName:{}; tableName:{}, partition:{}, sumField:{}, outPath:{}, partFilter:{}",
                    dbName, tableName, partition, sumField, outPath, partFilter);
            this.conf.set("sumField", sumField);
            this.setMapRedConfiguration();
            Job job = this.setJobConfiguration(this.conf);
            HCatInputFormat.setInput(job, dbName, tableName, partFilter);
            logger.info("setInput successfully.");
            FileOutputFormat.setOutputPath(job, new Path(outPath));
            logger.info("setOutput successfully.");
            return (job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception ex) {
            logger.error(ex.getMessage());
            throw ex;
        }
    }

    private Job setJobConfiguration(Configuration conf) throws Exception {
        try {
            logger.info("enter setJobConfiguration");
            Job job = Job.getInstance(conf);
            job.setJarByClass(DriveMain.class);
            job.setInputFormatClass(HCatInputFormat.class);
            job.setMapperClass(AggregateMapper.class);
            job.setReducerClass(AggregateReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.setNumReduceTasks(1);
            logger.info("setJobConfiguration successfully.");
            return job;
        } catch (Exception ex) {
            logger.error("setJobConfiguration: " + ex.getMessage());
            throw new Exception(ex);
        }
    }

    private void setMapRedConfiguration() {
        try {
            Properties properties = propUtil.getProperties();
            logger.info("Load MapReduce Configuration Successfully.");
            for (Map.Entry entry : properties.entrySet()) {
                if (entry.getKey().toString().startsWith("mapred")) {
                    conf.set(entry.getKey().toString(), entry.getValue().toString());
                    logger.info("[MR][Config] key:{}, value:{}", entry.getKey().toString(), entry.getValue().toString());
                }
            }
            logger.info("[MR][Config] Set MapReduce Configuration Successfully.");
        } catch (Exception e) {

        }

    }

    private String partitionFormat(String partition) {
        String format = "";
        if(!partition.contains("pt") && ! partition.contains("dt")) {
            String[] items = partition.split("/");
            String[] keys = {"year","month","day", "hour"};
            for(int i=0; i<items.length; i++) {
                if (i == items.length-1) {
                    format += keys[i] + "=‘" + items[i] + "‘";
                } else {
                    format += keys[i] + "=‘" + items[i] + "‘ and ";
                }
            }
        } else {
            format = partition;
        }
        return format;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new DriveMain(), args);
        System.exit(exitCode);
    }

}

2、Mapper

package com.kangaroo.hadoop.mapper;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

@SuppressWarnings("rawtypes")
public class AggregateMapper extends Mapper<WritableComparable, HCatRecord, Text, Text> {

    private static final Logger logger = LoggerFactory.getLogger(AggregateMapper.class);

    private HCatSchema schema;
    private Text outKey;
    private Text outValue;
    private IntWritable one;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        outKey = new Text();
        outValue = new Text();
        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
    }

    @Override
    protected void map(WritableComparable key, HCatRecord value, Context context) throws IOException, InterruptedException {
        String sumField = context.getConfiguration().get("sumField");
        Map<String, String> recordMap = new HashMap<String, String>();
        for (String fieldName : schema.getFieldNames()) {
            logger.info("fieldName={}", fieldName);
            String fieldValue = value.get(fieldName, schema).toString();
            logger.info("fieldName={}, fieldValue={}", fieldName, fieldValue);
            recordMap.put(fieldName, fieldValue);
            logger.info("recordMap={}", recordMap.toString());
        }
        outKey.set(recordMap.get(sumField));
        outValue.set("1");
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(outKey, outValue);
    }
}

3、Reducer

package com.kangaroo.hadoop.reducer;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

@SuppressWarnings("rawtypes")
public class AggregateReducer extends Reducer<Text, Text, Text, Text> {
    protected static final Logger logger = LoggerFactory.getLogger(AggregateReducer.class);
    HCatSchema schema;
    Text outKey;
    Text outValue;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        schema = HCatInputFormat.getTableSchema(context.getConfiguration());
    }

    @Override
    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,InterruptedException {
        outKey.set(key);
        int sum = 0;
        for (Text value : values) {
            sum += Integer.parseInt(value.toString());
        }
        outValue.set(String.valueOf(sum));
    }

    protected void cleanup(Context context) throws IOException, InterruptedException {
        context.write(outKey, outValue);
    }
}

4、propertyUtil

package com.kangaroo.hadoop.utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

public class PropertiesUtil {
    private String filePath;

    public PropertiesUtil() {
        this.filePath = "configure.properties";
    }

    public PropertiesUtil(String filePath) {
        this.filePath = filePath;
    }

    public Properties getProperties() throws IOException {
        Properties prop;
        InputStream inStream = null;
        try {
            inStream = PropertiesUtil.class.getClassLoader()
                    .getResourceAsStream(this.filePath);
            prop = new Properties();
            prop.load(inStream);

            return prop;
        } finally {
            if (inStream != null)
                inStream.close();
        }
    }
}

5、配置

mapred.job.queue.name=root.dashujudidiyanjiuyuan-zhinengpingtaibu.datapolicy-develop
mapred.jar=./rulecheck.jar
mapred.map.tasks=300
mapred.reduce.tasks=100
#mapred.map.capacity=1
#mapred.reduce.capacity=1
mapred.job.priority=HIGH
mapred.job.name=bigdata_qa_data_monitor
时间: 2024-12-18 18:16:11

Hadoop通过HCatalog编写Mapreduce任务的相关文章

【Hadoop测试程序】编写MapReduce测试Hadoop环境

我们使用之前搭建好的Hadoop环境,可参见: <[Hadoop环境搭建]Centos6.8搭建hadoop伪分布模式>http://www.cnblogs.com/ssslinppp/p/5923793.html 示例程序为<Hadoop权威指南3>中的获取最高温度的示例程序: 数据准备 输入数据为:sample.txt 0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN99

在hadoop上进行编写mapreduce程序,统计关键词在text出现次数

mapreduce的处理过程分为2个阶段,map阶段,和reduce阶段.在要求统计指定文件中的所有单词的出现次数时, map阶段把每个关键词写到一行上以逗号进行分隔,并初始化数量为1(相同的单词hadoop中的map会自动放到一行中) reduce阶段是把每个单词出现的频率统计出来重新写回去. 如代码: package com.clq.hadoop2; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Lo

hive--构建于hadoop之上、让你像写SQL一样编写MapReduce程序

hive介绍 什么是hive? hive:由Facebook开源用于解决海量结构化日志的数据统计 hive是基于hadoop的一个数据仓库工具,可以将结构化的数据映射为数据库的一张表,并提供类SQL查询功能.本质就是将HQL(hive sql)转化为MapReduce程序 我们使用MapReduce开发会很麻烦,但是程序员很熟悉sql,于是hive就出现了,可以让我们像写sql一样来编写MapReduce程序,会自动将我们写的sql进行转化.但底层使用的肯定还是MapReduce. hive处理

hadoop 学习笔记:mapreduce框架详解

hadoop 学习笔记:mapreduce框架详解 开始聊mapreduce,mapreduce是hadoop的计算框架,我 学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的 思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习 hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不

Hadoop初学指南(6)--MapReduce的简单实例及分析

本文在上一节的基础上通过一个简单的MR示例对MapReduce的运行流程进行分析. 假设有两行数据,分别是hello you,hello me,我们要统计其中出现的单词以及每个单词出现的次数. 所得的结果为 hello   2 you     1 me      1 (1)大致运行流畅 1.解析成2个<k,v>,分别是<0, hello you><10, hello me>.调用2次map函数. 2.执行map任务 3.map输出后的数据是:<hello,1>

【Big Data - Hadoop - MapReduce】hadoop 学习笔记:MapReduce框架详解

开始聊MapReduce,MapReduce是Hadoop的计算框架,我学Hadoop是从Hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不可分,所以当我写分布式文件系统时候,总是感觉自己的理解肤浅

Hadoop学习笔记:MapReduce框架详解

原文出处: 夏天的森林 开始聊mapreduce,mapreduce是hadoop的计算框架,我学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不可分,所以当我写分布式文件系统时候

[0004] Hadoop 版hello word mapreduce wordcount 运行

目的: 初步感受一下hadoop mapreduce 环境: hadoop 2.6.4 1 准备输入文件 paper.txt 内容一般为英文文章,随便弄点什么进去 [email protected]:~$ hadoop fs -mkdir /input [email protected]:~$ ls Desktop Documents Downloads examples.desktop hadoop-2.6.4.tar.gz Music paper.txt Pictures Public Te

大数据时代之hadoop(五):hadoop 分布式计算框架(MapReduce)

hadoop的核心分为两块,一是分布式存储系统-hdfs,这个我已经在上一章节大致讲了一下,另一个就是hadoop的计算框架-mapreduce. mapreduce其实就是一个移动式的基于key-value形式的分布式计算框架. 其计算分为两个阶段,map阶段和reduce阶段,都是对数据的处理,由于其入门非常简单,但是若想理解其中各个环节及实现细节还是有一定程度的困难,因此我计划在本文中只是挑几个mapreduce的核心来进行分析讲解. 1.MapReduce驱动程序默认值 编写mapred