【转】ChainMapper 实例理解二

package com.oncedq.code;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapred.lib.ChainMapper;

import com.oncedq.code.util.DateUtil;

public class ProcessSample {
    public static class ExtractMappper extends MapReduceBase implements
            Mapper<LongWritable, Text, LongWritable, Conn1> {

        @Override
        public void map(LongWritable arg0, Text arg1,
                OutputCollector<LongWritable, Conn1> arg2, Reporter arg3)
                throws IOException {
            String line = arg1.toString();
            String[] strs = line.split(";");
            Conn1 conn1 = new Conn1();
            conn1.orderKey = Long.parseLong(strs[0]);
            conn1.customer = Long.parseLong(strs[1]);
            conn1.state = strs[2];
            conn1.price = Double.parseDouble(strs[3]);
            conn1.orderDate = DateUtil.getDateFromString(strs[4], "yyyy-MM-dd");
            LongWritable lw = new LongWritable(conn1.orderKey);
            arg2.collect(lw, conn1);
        }

    }

    private static class Conn1 implements WritableComparable<Conn1> {
        public long orderKey;
        public long customer;
        public String state;
        public double price;
        public java.util.Date orderDate;

        @Override
        public void readFields(DataInput in) throws IOException {
            orderKey = in.readLong();
            customer = in.readLong();
            state = Text.readString(in);
            price = in.readDouble();
            orderDate = DateUtil.getDateFromString(Text.readString(in),
                    "yyyy-MM-dd");
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(orderKey);
            out.writeLong(customer);
            Text.writeString(out, state);
            out.writeDouble(price);
            Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
        }

        @Override
        public int compareTo(Conn1 arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

    }

    public static class Filter1Mapper extends MapReduceBase implements
            Mapper<LongWritable, Conn1, LongWritable, Conn2> {

        @Override
        public void map(LongWritable inKey, Conn1 c2,
                OutputCollector<LongWritable, Conn2> collector, Reporter report)
                throws IOException {
            if (c2.state.equals("F")) {
                Conn2 inValue = new Conn2();
                inValue.customer = c2.customer;
                inValue.orderDate = c2.orderDate;
                inValue.orderKey = c2.orderKey;
                inValue.price = c2.price;
                inValue.state = c2.state;
                collector.collect(inKey, inValue);
            }
        }

    }

    private static class Conn2 implements WritableComparable<Conn1> {
        public long orderKey;
        public long customer;
        public String state;
        public double price;
        public java.util.Date orderDate;

        @Override
        public void readFields(DataInput in) throws IOException {
            orderKey = in.readLong();
            customer = in.readLong();
            state = Text.readString(in);
            price = in.readDouble();
            orderDate = DateUtil.getDateFromString(Text.readString(in),
                    "yyyy-MM-dd");
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(orderKey);
            out.writeLong(customer);
            Text.writeString(out, state);
            out.writeDouble(price);
            Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
        }

        @Override
        public int compareTo(Conn1 arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

    }

    public static class RegexMapper extends MapReduceBase implements
            Mapper<LongWritable, Conn2, LongWritable, Conn3> {

        @Override
        public void map(LongWritable inKey, Conn2 c3,
                OutputCollector<LongWritable, Conn3> collector, Reporter report)
                throws IOException {
            c3.state = c3.state.replaceAll("F", "Find");
            Conn3 c2 = new Conn3();
            c2.customer = c3.customer;
            c2.orderDate = c3.orderDate;
            c2.orderKey = c3.orderKey;
            c2.price = c3.price;
            c2.state = c3.state;
            collector.collect(inKey, c2);
        }
    }

    private static class Conn3 implements WritableComparable<Conn1> {
        public long orderKey;
        public long customer;
        public String state;
        public double price;
        public java.util.Date orderDate;

        @Override
        public void readFields(DataInput in) throws IOException {
            orderKey = in.readLong();
            customer = in.readLong();
            state = Text.readString(in);
            price = in.readDouble();
            orderDate = DateUtil.getDateFromString(Text.readString(in),
                    "yyyy-MM-dd");
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(orderKey);
            out.writeLong(customer);
            Text.writeString(out, state);
            out.writeDouble(price);
            Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
        }

        @Override
        public int compareTo(Conn1 arg0) {
            // TODO Auto-generated method stub
            return 0;
        }

    }

    public static class LoadMapper extends MapReduceBase implements
            Mapper<LongWritable, Conn3, LongWritable, Conn3> {

        @Override
        public void map(LongWritable arg0, Conn3 arg1,
                OutputCollector<LongWritable, Conn3> arg2, Reporter arg3)
                throws IOException {
            arg2.collect(arg0, arg1);
        }

    }

    public static void main(String[] args) {
        JobConf job = new JobConf(ProcessSample.class);
        job.setJobName("ProcessSample");
        job.setNumReduceTasks(0);
        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        JobConf mapper1 = new JobConf();
        JobConf mapper2 = new JobConf();
        JobConf mapper3 = new JobConf();
        JobConf mapper4 = new JobConf();
        ChainMapper cm = new ChainMapper();
        cm.addMapper(job, ExtractMappper.class, LongWritable.class, Text.class,
                LongWritable.class, Conn1.class, true, mapper1);
        cm.addMapper(job, Filter1Mapper.class, LongWritable.class, Conn1.class,
                LongWritable.class, Conn2.class, true, mapper2);
        cm.addMapper(job, RegexMapper.class, LongWritable.class, Conn2.class,
                LongWritable.class, Conn3.class, true, mapper3);
        cm.addMapper(job, LoadMapper.class, LongWritable.class, Conn3.class,
                LongWritable.class, Conn3.class, true, mapper4);
        FileInputFormat.setInputPaths(job, new Path("orderData"));
        FileOutputFormat.setOutputPath(job, new Path("orderDataOutput"));
        Job job1;
        try {
            job1 = new Job(job);
            JobControl jc = new JobControl("test");
            jc.addJob(job1);
            jc.run();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
}
时间: 2024-08-20 10:19:45

【转】ChainMapper 实例理解二的相关文章

spring深入理解二(关于容器工作源码)

spring基本工作原理如下: 1.查找bean配置文件 2.加载bean配置文件并解析生成中间表示BeanDefinition 3.注册beanDefinition 4.如果是单例或lazy-init=false,则直接生成bean spring将 1.查找bean配置文件 2.加载bean配置文件并解析生成中间表示BeanDefinition 3.注册beanDefinition 这三部分开,可以提供更多的定制给用户. spring将配置文件之类的文件资源抽象成一个Resource,封装了g

HTML5 本地文件操作之FileSystemAPI实例(二)

文件操作实例整理二 1.删除文件.复制文件.移动文件 //获取请求权限 window.requestFileSystem = window.requestFileSystem || window.webkitRequestFileSystem; window.requestFileSystem(window.TEMPORARY, 5 * 1024, initFs, errorHandler); function initFs(fs) { //删除文件 fileEntry.remove() fs.

matlab文件读写处理实例(二)——textread批量读取文件

问题:对文件夹下所有文件进行批量读取,跳过文件头部分,读取每个文件数据部分的7,8,9列,保存到变量并且输出到文件. 数据: 文件夹11m\  单个文件格式: DAV1                                                        MARKER NAME66010M001                                                   MARKER NUMBER     7    PR    TD    HR  

C语言库函数大全及应用实例十二

原文:C语言库函数大全及应用实例十二                                          [编程资料]C语言库函数大全及应用实例十二 函数名: setrgbpalette 功 能: 定义IBM8514图形卡的颜色 用 法: void far setrgbpalette(int colornum, int red, int green, int blue); 程序例: #i nclude #i nclude #i nclude #i nclude int main(v

iOS 键值观察(KVO)简述及实例理解

KVO概述: KVO,即:Key-Value Observing,直译为:基于键值的观察者.  它提供一种机制,当指定的对象的属性被修改后,则对象就会接受到通知. 简单的说就是每次指定的被观察的对象的属性被修改后,KVO就会自动通知相应的观察者了.KVO的优点: 当有属性改变,KVO会提供自动的消息通知.这样开发人员不需要自己去实现这样的方案:每次属性改变了就发送消息通知. 这是KVO机制提供的最大的优点.因为这个方案已经被明确定义,获得框架级支持,可以方便地采用. 开发人员不需要添加任何代码,

highcharts实例教程二:结合php与mysql生成饼图

上回我们分析了用highcharts结合php和mysql生成折线图的实例,这次我们以技术cto网站搜索引擎流量为例利用highcharts生成饼图. 饼图通常用在我们需要直观地显示各个部分所占的比例的时候,比如我们需要统计各大搜索引擎来的流量比例. 第一步:创建数据库保存各搜索引擎流量的pv数 CREATE TABLE `pie` (   `id` int(10) NOT NULL AUTO_INCREMENT,   `title` varchar(30) NOT NULL,   `pv` i

基于Android2.3.5系统:JNI与HAL实例解析[二]

*************************************************************************************************************************** 作者:EasyWave                                                                                                           时间:2015.

一些有用的javascript实例分析(二)

原文:一些有用的javascript实例分析(二) 1 5 求出数组中所有数字的和 2 window.onload = function () 3 { 4 var oBtn = document.getElementsByTagName("button")[0]; 5 var oInput = document.getElementsByTagName("input")[0] 6 var oStrong = document.getElementsByTagName

Selenium2学习-022-WebUI自动化实战实例-020-JavaScript 在 Selenium 自动化中的应用实例之二(获取浏览器显示区域大小)

前几篇文章中简略概述了,如何获取.设置浏览器窗口大小,那么我们该如何获取浏览器显示区域的大小呢?此文讲对此进行简略概述,敬请各位小主参阅.若有不足之处,敬请各位大神指正,不胜感激! 获取浏览器显示区域的方法,我目前想到的只有以下两种方法: 1.通过 JavaScript  获取浏览器显示区域的大小 2.通过 WebDriver 截图,获取截图的大小,从而获得浏览器显示区域的大小 此文主要以第一种方法示例演示,第二种方法进行后续更新,敬请期待!谢谢! 1 /** 2 * Get width and