MapReduce 常见SQL模型解析

MapReduce应用场景

前一阵子参加炼数成金的MapReduce培训,培训中的作业例子比较有代表性,用于解释问题再好不过了。有一本国外的有关MR的教材,比较实用,点此下载

MR能解决什么问题?一般来说,用的最多的应该是日志分析,海量数据排序处理。最近一段时间公司用MR来解决大量日志的离线并行分析问题。

MapReduce机制

对于不熟悉MR工作原理的同学,推荐大家先去看一篇博文:http://blog.csdn.net/athenaer/article/details/8203990

常用计算模型

这里举一个例子,数据表在Oracle默认用户Scott下有DEPT表和EMP表。为方便,现在直接写成两个TXT文件如下:

1.部门表

DEPTNO,DNAME,LOC    // 部门号,部门名称,所在地

DEPTNO,DNAME,LOC    // 部门号,部门名称,所在地

10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON

2.员工表

EMPNO,ENAME,JOB,HIREDATE,SAL,COMM,DEPTNO,MGR // 员工号,英文名,职位,聘期,工资,奖金,所属部门,管理者

7369,SMITH,CLERK,1980-12-17 00:00:00.0,800,,20,7902
7499,ALLEN,SALESMAN,1981-02-20 00:00:00.0,1600,300,30,7698
7521,WARD,SALESMAN,1981-02-22 00:00:00.0,1250,500,30,7698
7566,JONES,MANAGER,1981-04-02 00:00:00.0,2975,,20,7839
7654,MARTIN,SALESMAN,1981-09-28 00:00:00.0,1250,1400,30,7698
7698,BLAKE,MANAGER,1981-05-01 00:00:00.0,2850,,30,7839
7782,CLARK,MANAGER,1981-06-09 00:00:00.0,2450,    ,10,7839
7839,KING,PRESIDENT,1981-11-17 00:00:00.0,5000,,10,
7844,TURNER,SALESMAN,1981-09-08 00:00:00.0,1500,0,30,7698
7900,JAMES,CLERK,1981-12-03 00:00:00.0,950,,30,7698
7902,FORD,ANALYST,1981-12-03 00:00:00.0,3000,,20,7566
7934,MILLER,CLERK,1982-01-23 00:00:00.0,1300,,10,7782

 

3.实例化为bean

这两个bean的实际作用都是分割传入的字符串,从字符串内得到所属的属性信息。

emp.java

public Emp(String inStr) {
        String[] split = inStr.split(",");
        this.empno = (split[0].isEmpty()? "" : split[0]);
        this.ename = (split[1].isEmpty() ? "" : split[1]);
        this.job = (split[2].isEmpty() ? "" : split[2]);
        this.hiredate = (split[3].isEmpty() ? "" : split[3]);
        this.sal = (split[4].isEmpty() ? "0" : split[4]);
        this.comm = (split[5].isEmpty() ? "" : split[5]);
        this.deptno = (split[6].isEmpty() ? "" : split[6]);
        try {
            this.mgr = (split[7].isEmpty() ? "" : split[7]);
        } catch (IndexOutOfBoundsException e) {     //防止最后一位为空的情况
            this.mgr = "";
        }
}

dep.java

public Dept(String string) {
        String[] split = string.split(",");
        this.deptno = split[0];
        this.dname = split[1];
        this.loc = split[2];
    }

4.模型分析

4.1 求和

求各个部门的总工资

public static class Map_1 extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> {
        public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            try {
                Emp emp = new Emp(value.toString());
                output.collect(new Text(emp.getDeptno()), new IntWritable(Integer.parseInt(emp.getSal())));  // { k=部门号,v=员工薪资}
            } catch (Exception e) {
            reporter.getCounter(ErrCount.LINESKIP).increment(1);
            WriteErrLine.write("./input/" + this.getClass().getSimpleName() + "err_lines", reporter.getCounter(ErrCount.LINESKIP).getCounter() + " " + value.toString());
            }
        }
    }  

    public static class Reduce_1 extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum = sum + values.next().get();
            }
            output.collect(key, new IntWritable(sum));
        }  

    }

运行结果:

4.2 平均值

求各个部门的人数和平均工资

public static class Map_2 extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> {
        public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            try {
                Emp emp = new Emp(value.toString());
                output.collect(new Text(emp.getDeptno()), new IntWritable(Integer.parseInt(emp.getSal())));  //{ k=部门号,v=薪资}
            } catch (Exception e) {
                reporter.getCounter(ErrCount.LINESKIP).increment(1);
                WriteErrLine.write("./input/" + this.getClass().getSimpleName() + "err_lines", reporter.getCounter(ErrCount.LINESKIP).getCounter() + " " + value.toString());
            }  

        }
    }  

    public static class Reduce_2 extends MapReduceBase implements Reducer<Text, IntWritable, Text, Text> {
        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            double sum = 0; //部门工资
            int count =0 ; //人数
            while (values.hasNext()) {
                count++;
                sum = sum + values.next().get();
            }
            output.collect(key, new Text( count+" "+sum/count));
        }  

    }

运行结果

4.3 分组排序

求每个部门最早进入公司的员工姓名

public static class Map_3 extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
    public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
        try {
            Emp emp = new Emp(value.toString());
            output.collect(new Text(emp.getDeptno()), new Text(emp.getHiredate() + "~" + emp.getEname())); // { k=部门号,v=聘期}
        } catch (Exception e) {
            reporter.getCounter(ErrCount.LINESKIP).increment(1);
            WriteErrLine.write("./input/" + this.getClass().getSimpleName() + "err_lines", reporter.getCounter(ErrCount.LINESKIP).getCounter() + " " + value.toString());
        }  

    }
}  

public static class Reduce_3 extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
        DateFormat sdf = DateFormat.getDateInstance();
        Date minDate = new Date(9999, 12, 30);
        Date d;
        String[] strings = null;
        while (values.hasNext()) {
            try {
                strings = values.next().toString().split("~"); // 获取名字和日期
                d = sdf.parse(strings[0].toString().substring(0, 10));
                if (d.before(minDate)) {
                    minDate = d;
                }
            } catch (ParseException e) {
                e.printStackTrace();
            }
        }
        output.collect(key, new Text(minDate.toLocaleString() + " " + strings[1]));  

    }  

}

运行结果

4.4 多表关联

求各个城市的员工的总工资

public static class Map_4 extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            try {
                String fileName = ((FileSplit) reporter.getInputSplit()).getPath().getName();
                if (fileName.equalsIgnoreCase("emp.txt")) {
                    Emp emp = new Emp(value.toString());
                    output.collect(new Text(emp.getDeptno()), new Text("A#" + emp.getSal()));
                }
                if (fileName.equalsIgnoreCase("dept.txt")) {
                    Dept dept = new Dept(value.toString());
                    output.collect(new Text(dept.getDeptno()), new Text("B#" + dept.getLoc()));
                }
            } catch (Exception e) {
                reporter.getCounter(ErrCount.LINESKIP).increment(1);
                WriteErrLine.write("./input/" + this.getClass().getSimpleName() + "err_lines", reporter.getCounter(ErrCount.LINESKIP).getCounter() + " " + value.toString());
            }  

        }
    }  

    public static class Reduce_4 extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String deptV;
            Vector<String> empList = new Vector<String>(); // 保存EMP表的工资数据
            Vector<String> deptList = new Vector<String>(); // 保存DEPT表的位置数据
            while (values.hasNext()) {
                deptV = values.next().toString();
                if (deptV.startsWith("A#")) {
                    empList.add(deptV.substring(2));
                }
                if (deptV.startsWith("B#")) {
                    deptList.add(deptV.substring(2));
                }
            }
            double sumSal = 0;
            for (String location : deptList) {
                for (String salary : empList) {
                    //每个城市员工工资总和
                    sumSal = Integer.parseInt(salary) + sumSal;
                }
                output.collect(new Text(location), new Text(Double.toString(sumSal)));
            }
        }  

    }

运行结果

4.5 单表关联

工资比上司高的员工姓名及其工资

public static class Map_5 extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            try {
                Emp emp = new Emp(value.toString());
                output.collect(new Text(emp.getMgr()), new Text("A#" + emp.getEname() + "~" + emp.getSal()));  // 员工表 { k=上司名,v=员工工资}
                output.collect(new Text(emp.getEmpno()), new Text("B#" + emp.getEname() + "~" + emp.getSal()));// “经理表” { k=员工名,v=员工工资}
            } catch (Exception e) {
                reporter.getCounter(ErrCount.LINESKIP).increment(1);
                WriteErrLine.write("./input/" + this.getClass().getSimpleName() + "err_lines", reporter.getCounter(ErrCount.LINESKIP).getCounter() + " " + value.toString());
            }
        }
    }  

    public static class Reduce_5 extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            String value;
            Vector<String> empList = new Vector<String>(); // 员工表
            Vector<String> mgrList = new Vector<String>(); // 经理表
            while (values.hasNext()) {
                value = values.next().toString();
                if (value.startsWith("A#")) {
                    empList.add(value.substring(2));
                }
                if (value.startsWith("B#")) {
                    mgrList.add(value.substring(2));
                }
            }
            String empName, empSal, mgrSal;  

            for (String emploee : empList) {
                for (String mgr : mgrList) {
                    String[] empInfo = emploee.split("~");
                    empName = empInfo[0];
                    empSal = empInfo[1];
                    String[] mgrInfo = mgr.split("~");
                    mgrSal = mgrInfo[1];
                    if (Integer.parseInt(empSal) > Integer.parseInt(mgrSal)) {
                        output.collect(key, new Text(empName + " " + empSal));
                    }
                }
            }
        }  

    }

运行结果

4.6 TOP N

列出工资最高的头三名员工姓名及其工资

public static class Map_8 extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            try {
                Emp emp = new Emp(value.toString());
                output.collect(new Text("1"), new Text(emp.getEname() + "~" + emp.getSal()));    // { k=随意字符串或数字,v=员工名字+薪资}
            } catch (Exception e) {
                reporter.getCounter(ErrCount.LINESKIP).increment(1);
                WriteErrLine.write("./input/" + this.getClass().getSimpleName() + "err_lines", reporter.getCounter(ErrCount.LINESKIP).getCounter() + " " + value.toString());
            }  

        }
    }  

    public static class Reduce_8 extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            Map<Integer, String> emp = new TreeMap<Integer, String>();   // TreeMap默认key升序排列,巧妙利用这点可以实现top N
            while (values.hasNext()) {
                String[] valStrings = values.next().toString().split("~");
                emp.put(Integer.parseInt(valStrings[1]), valStrings[0]);
            }
            int count = 0; // 计数器
            for (Iterator<Integer> keySet = emp.keySet().iterator(); keySet.hasNext();) {
                if (count < 3) {  //  N =3
                    Integer current_key = keySet.next();
                    output.collect(new Text(emp.get(current_key)), new Text(current_key.toString())); // 迭代key,即SAL
                    count++;
                } else {
                    break;
                }
            }
        }
    }

运算结果

4.7 降序排序

将全体员工按照总收入(工资+提成)从高到低排列,要求列出姓名及其总收入

public static class Map_9 extends MapReduceBase implements Mapper<Object, Text, Text, Text> {
        public void map(Object key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            try {
                Emp emp = new Emp(value.toString());
                int totalSal = Integer.parseInt(emp.getComm()) + Integer.parseInt(emp.getSal());
                output.collect(new Text("1"), new Text(emp.getEname() + "~" + totalSal));
            } catch (Exception e) {
                reporter.getCounter(ErrCount.LINESKIP).increment(1);
                WriteErrLine.write("./input/" + this.getClass().getSimpleName() + "err_lines", reporter.getCounter(ErrCount.LINESKIP).getCounter() + " " + value.toString());
            }  

        }
    }  

    public static class Reduce_9 extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
            Map<Integer, String> emp = new TreeMap<Integer, String>(
            // 重写比较器,使降序排列
                    new Comparator<Integer>() {
                        public int compare(Integer o1, Integer o2) {
                            return o2.compareTo(o1);
                        }
                    });
            while (values.hasNext()) {
                String[] valStrings = values.next().toString().split("~");
                emp.put(Integer.parseInt(valStrings[1]), valStrings[0]);
            }
            for (Iterator<Integer> keySet = emp.keySet().iterator(); keySet.hasNext();) {
                Integer current_key = keySet.next();
                output.collect(new Text(emp.get(current_key)), new Text(current_key.toString())); // 迭代key,即SAL
            }
        }
    }

运行结果

总结

把sql里常用的计算模型写成MR是一件比较麻烦的事,因为很多情况下一行sql估计要十几甚至几十行代码来实现,略显笨拙。但是从数据计算速度来说,MR跟sql不是一个级别的。

但不可否认的一点是,无论是什么技术都有各自的适用范围,MR不是万能的,具体要看使用场景再选择适当的技术。

时间: 2024-08-29 13:44:17

MapReduce 常见SQL模型解析的相关文章

SQL点滴26—常见T-SQL面试解析

原文:SQL点滴26-常见T-SQL面试解析 它山之石可以攻玉,这一篇是读别人的博客后写下的,不是原原本本的转载,加入了自己的分析过程和演练.sql语句可以解决很多的复杂业务,避免过多的项目代码,下面几个语句很值得玩味. 1. 已经知道原表year salary2000 10002001 20002002 30002003 4000怎么查询的到下面的结果,就是累积工资year salary2000 10002001 30002002 60002003 10000 思路:这个需要两个表交叉查询得到

【Android开发精要笔记】Android组件模型解析

Android组件模型解析 Android中的Mashup 将应用切分成不同类别的组件,通过统一的定位模型和接口标准将他们整合在一起,来共同完成某项任务.在Android的Mashup模式下,每个组件的功能都可以被充分的复用.来自不同应用的组件可以有机地结合在一起,共同完成任务. 基于Mashup的Android应用模型 三个基本要素:组件.连接.配置 接口就是实现单元.从代码来看,组件就是派生自特定接口或基类的子类的实现,如界面组件Activity就是指派生自android.app.Activ

Apache Spark源码走读之11 -- sql的解析与执行

欢迎转载,转载请注明出处,徽沪一郎. 概要 在即将发布的spark 1.0中有一个新增的功能,即对sql的支持,也就是说可以用sql来对数据进行查询,这对于DBA来说无疑是一大福音,因为以前的知识继续生效,而无须去学什么scala或其它script. 一般来说任意一个sql子系统都需要有parser,optimizer,execution三大功能模块,在spark中这些又都是如何实现的呢,这些实现又有哪些亮点和问题?带着这些疑问,本文准备做一些比较深入的分析. SQL模块分析有几大难点,分别为

Spark SQL应用解析

一  Spark SQL概述 1.1 什么是Spark SQL Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用. Hive是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢.所有Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快! 1.易整合 2

mysql之SQL模型

SQL模型(SQL mode):    通过定义某些规定,限制用户行为,并定义对应的处理机制. 常见的模型:        ANSI            宽松模式,对插入数据进行校验,如果不符合定义类型或长度,对数据类型调整或截断保存,报warning警告. TRADITIONAL            严格模式,当向mysql数据库插入数据时,进行数据的严格校验,保证错误数据不能插入,报error错误.用于事物时,会进行事物的回滚. STRICT_TRANS_TABLES          

Web安全测试中常见逻辑漏洞解析(实战篇)

Web安全测试中常见逻辑漏洞解析(实战篇) 简要: 越权漏洞是比较常见的漏洞类型,越权漏洞可以理解为,一个正常的用户A通常只能够对自己的一些信息进行增删改查,但是由于程序员的一时疏忽,对信息进行增删改查的时候没有进行一个判断,判断所需要操作的信息是否属于对应的用户,导致用户A可以操作其他人的信息.? 逻辑漏洞挖掘一直是安全测试中"经久不衰"的话题.相比SQL注入.XSS漏洞等传统安全漏洞,现在的攻击者更倾向于利用业务逻辑层的应用安全问题,这类问题往往危害巨大,可能造成了企业的资产损失和

MySQL &#183; 性能优化 &#183; MySQL常见SQL错误用法

前言 MySQL在2016年仍然保持强劲的数据库流行度增长趋势.越来越多的客户将自己的应用建立在MySQL数据库之上,甚至是从Oracle迁移到MySQL上来.但也存在部分客户在使用MySQL数据库的过程中遇到一些比如响应时间慢,CPU打满等情况.阿里云RDS专家服务团队帮助云上客户解决过很多紧急问题.现将<ApsaraDB专家诊断报告>中出现的部分常见SQL问题总结如下,供大家参考. 常见SQL错误用法 1. LIMIT 语句 分页查询是最常用的场景之一,但也通常也是最容易出问题的地方.比如

常见SQL Server导入导出数据的几个工具

摘自:http://www.cnblogs.com/chenxizhang/archive/2011/06/09/2076542.html 在我们的日常工作中,与数据库打交道的机会越来越多.这一篇文章我整理一下常见的SQL Server导入导出数据的几个工具 1. 数据导入导出向导 这是一个可视化的工具,我放在首位,是由于它可以极大灵活地满足导入导出功能,而且是所见即所得的,易于使用. 启动数据导入导出向导的方式有好多种,我自己习惯直接通过如下的命令启动(开始=>运行) dtswizard(顾名

几种常见SQL分页方式效率比较-转

原文地址:几种常见SQL分页方式效率比较 分页很重要,面试会遇到.不妨再回顾总结一下. 1.创建测试环境,(插入100万条数据大概耗时5分钟). create database DBTestuse DBTest --创建测试表create table pagetest(id int identity(1,1) not null,col01 int null,col02 nvarchar(50) null,col03 datetime null) --1万记录集declare @i intset