MapReduce应用案例

1 环境说明

注意:本实验是对前述实验的延续,如果直接点开始实验进入则需要按先前学习的方法启动hadoop

部署节点操作系统为CentOS,防火墙和SElinux禁用,创建了一个shiyanlou用户并在系统根目录下创建/app目录,用于存放
Hadoop等组件运行包。因为该目录用于安装hadoop等组件程序,用户对shiyanlou必须赋予rwx权限(一般做法是root用户在根目录下
创建/app目录,并修改该目录拥有者为shiyanlou(chown –R shiyanlou:shiyanlou /app)。

Hadoop搭建环境:

  • 虚拟机操作系统: CentOS6.6 64位,单核,1G内存
  • JDK:1.7.0_55 64位
  • Hadoop:1.1.2

2 准备测试数据

测试数据包括两个文件dept(部门)和emp(员工),其中各字段用逗号分隔:

dept文件内容:

  • 10,ACCOUNTING,NEW YORK
  • 20,RESEARCH,DALLAS
  • 30,SALES,CHICAGO
  • 40,OPERATIONS,BOSTON

emp文件内容:

  • 7369,SMITH,CLERK,7902,17-12月-80,800,,20
  • 7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30
  • 7521,WARD,SALESMAN,7698,22-2月-81,1250,500,30
  • 7566,JONES,MANAGER,7839,02-4月-81,2975,,20
  • 7654,MARTIN,SALESMAN,7698,28-9月-81,1250,1400,30
  • 7698,BLAKE,MANAGER,7839,01-5月-81,2850,,30
  • 7782,CLARK,MANAGER,7839,09-6月-81,2450,,10
  • 7839,KING,PRESIDENT,,17-11月-81,5000,,10
  • 7844,TURNER,SALESMAN,7698,08-9月-81,1500,0,30
  • 7900,JAMES,CLERK,7698,03-12月-81,950,,30
  • 7902,FORD,ANALYST,7566,03-12月-81,3000,,20
  • 7934,MILLER,CLERK,7782,23-1月-82,1300,,10

在/home/shiyanlou/install-pack/class6目录可以找到这两个文件,把这两个文件上传到HDFS中/class6/input目录中,执行如下命令:

  • cd /home/shiyanlou/install-pack/class6
  • hadoop fs -mkdir -p /class6/input (备注:执行此命令需先启动hadoop: start-all.sh)
  • hadoop fs -copyFromLocal dept /class6/input
  • hadoop fs -copyFromLocal emp /class6/input
  • hadoop fs -ls /class6/input

3 应用案例

3.1 测试例子1:求各个部门的总工资

3.1.1 问题分析

MapReduce中的join分为好几种,比如有最常见的 reduce side join、map side join和semi join
等。reduce join 在shuffle阶段要进行大量的数据传输,会造成大量的网络IO效率低下,而map side join
在处理多个小表关联大表时非常有用 。
Map side
join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样我们可以将小表复制多份,
让每个map task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash
table中查找是否有相同的key的记录,如果有,则连接后输出即可。为了支持文件的复制,Hadoop提供了一个类
DistributedCache,使用该类的方法如下:

  1. 用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如果是HDFS上
    的文件,可以这样:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个
    URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
  2. 用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

在下面代码中,将会把数据量小的表(部门dept)缓存在内存中,在Mapper阶段对员工部门编号映射成部门名称,该名称作为key输出到Reduce中,在Reduce中计算按照部门计算各个部门的总工资。

3.1.2 处理流程图

3.1.3 测试代码

Q1SumDeptSalary.java代码(vi编辑代码是不能存在中文):

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Q1SumDeptSalary extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

        // 用于缓存 dept文件中的数据private Map<String, String> deptMap = new HashMap<String, String>();
        private String[] kv;

        // 此方法会在Map方法执行之前执行且执行一次@Overrideprotected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader in = null;
            try {

                // 从当前作业中获取要缓存的文件
                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                String deptIdName = null;
                for (Path path : paths) {

                    // 对部门文件字段进行拆分并缓存到deptMap中if (path.toString().contains("dept")) {
                        in = new BufferedReader(new FileReader(path.toString()));
                        while (null != (deptIdName = in.readLine())) {

                            // 对部门文件字段进行拆分并缓存到deptMap中// 其中Map中key为部门编号,value为所在部门名称
                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // 对员工文件字段进行拆分
            kv = value.toString().split(",");

            // map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资if (deptMap.containsKey(kv[7])) {
                if (null != kv[5] && !"".equals(kv[5].toString())) {
                    context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
                }
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, LongWritable> {

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            // 对同一部门的员工工资进行求和long sumSalary = 0;
            for (Text val : values) {
                sumSalary += Long.parseLong(val.toString());
            }

            // 输出key为部门名称和value为该部门员工工资总和
            context.write(key, new LongWritable(sumSalary));
        }
    }

    @Overridepublic int run(String[] args) throws Exception {

        // 实例化作业对象,设置作业名称、Mapper和Reduce类
        Job job = new Job(getConf(), "Q1SumDeptSalary");
        job.setJobName("Q1SumDeptSalary");
        job.setJarByClass(Q1SumDeptSalary.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        // 设置输入格式类
        job.setInputFormatClass(TextInputFormat.class);

        // 设置输出格式
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
    String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
    DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    /**
     * 主方法,执行入口
     * @param args 输入参数
     */public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args);
        System.exit(res);
    }
}

3.1.4 编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q1SumDeptSalary.java程序代码(代码页可以
使用/home/shiyanlou/install-pack/class6/Q1SumDeptSalary.java文件)

  • cd /app/hadoop-1.1.2/myclass/class6
  • vi Q1SumDeptSalary.java

编译代码

  • javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q1SumDeptSalary.java

把编译好的代码打成jar包(如果不打成jar形式运行会提示class无法找到的错误)

  • jar cvf ./Q1SumDeptSalary.jar ./Q1SumDept*.class
  • mv *.jar ../..
  • rm Q1SumDept*.class

3.1.5 运行并查看结果

运行Q1SumDeptSalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

  • 部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
  • 员工数据路径:hdfs://hadoop:9000/class6/input/emp
  • 输出路径:hdfs://hadoop:9000/class6/out1

运行如下命令:

  • cd /app/hadoop-1.1.2
  • hadoop jar Q1SumDeptSalary.jar Q1SumDeptSalary
    hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp
    hdfs://hadoop:9000/class6/out1

运行成功后,刷新CentOS HDFS中的输出路径/class6/out1目录,打开part-r-00000文件

  • hadoop fs -ls /class6/out1
  • hadoop fs -cat /class6/out1/part-r-00000

可以看到运行结果:

  • ACCOUNTING 8750
  • RESEARCH 6775
  • SALES 9400

3.2 测试例子2:求各个部门的人数和平均工资

3.2.1 问题分析

求各个部门的人数和平均工资,需要得到各部门工资总数和部门人数,通过两者相除获取各部门平均工资。首先和问题1类似在Mapper的Setup阶
段缓存部门数据,然后在Mapper阶段抽取出部门编号和员工工资,利用缓存部门数据把部门编号对应为部门名称,接着在Shuffle阶段把传过来的数据
处理为部门名称对应该部门所有员工工资的列表,最后在Reduce中按照部门归组,遍历部门所有员工,求出总数和员工数,输出部门名称和平均工资。

3.2.2 处理流程图

3.2.3 编写代码

Q2DeptNumberAveSalary.java代码:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Q2DeptNumberAveSalary extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

        // 用于缓存 dept文件中的数据private Map<String, String> deptMap = new HashMap<String, String>();
        private String[] kv;

        // 此方法会在Map方法执行之前执行且执行一次@Overrideprotected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader in = null;
            try {
                // 从当前作业中获取要缓存的文件
                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                String deptIdName = null;
                for (Path path : paths) {

                    // 对部门文件字段进行拆分并缓存到deptMap中if (path.toString().contains("dept")) {
                        in = new BufferedReader(new FileReader(path.toString()));
                        while (null != (deptIdName = in.readLine())) {

                            // 对部门文件字段进行拆分并缓存到deptMap中// 其中Map中key为部门编号,value为所在部门名称
                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            // 对员工文件字段进行拆分
            kv = value.toString().split(",");

            // map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资if (deptMap.containsKey(kv[7])) {
                if (null != kv[5] && !"".equals(kv[5].toString())) {
                    context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
                }
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            long sumSalary = 0;
            int deptNumber = 0;

            // 对同一部门的员工工资进行求和for (Text val : values) {
                sumSalary += Long.parseLong(val.toString());
                deptNumber++;
            }

            // 输出key为部门名称和value为该部门员工工资平均值
    context.write(key, new Text("Dept Number:" + deptNumber + ", Ave Salary:" + sumSalary / deptNumber));
        }
    }

    @Overridepublic int run(String[] args) throws Exception {

        // 实例化作业对象,设置作业名称、Mapper和Reduce类
        Job job = new Job(getConf(), "Q2DeptNumberAveSalary");
        job.setJobName("Q2DeptNumberAveSalary");
        job.setJarByClass(Q2DeptNumberAveSalary.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        // 设置输入格式类
        job.setInputFormatClass(TextInputFormat.class);

        // 设置输出格式类
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
    String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
        DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    /**
     * 主方法,执行入口
     * @param args 输入参数
     */public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q2DeptNumberAveSalary(), args);
        System.exit(res);
    }
}

3.2.4 编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q2DeptNumberAveSalary.java程序代码
(代码页可以使用/home/shiyanlou/install-pack/class6/Q2DeptNumberAveSalary.java文
件)

  • cd /app/hadoop-1.1.2/myclass/class6
  • vi Q2DeptNumberAveSalary.java

编译代码

  • javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q2DeptNumberAveSalary.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

  • jar cvf ./Q2DeptNumberAveSalary.jar ./Q2DeptNum*.class
  • mv *.jar ../..
  • rm Q2DeptNum*.class

3.2.5 运行并查看结果

运行Q2DeptNumberAveSalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

  • 部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
  • 员工数据路径:hdfs://hadoop:9000/class6/input/emp
  • 输出路径:hdfs://hadoop:9000/class6/out2

运行如下命令:

  • cd /app/hadoop-1.1.2
  • hadoop jar Q2DeptNumberAveSalary.jar Q2DeptNumberAveSalary
    hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp
    hdfs://hadoop:9000/class6/out2

运行成功后,刷新CentOS HDFS中的输出路径/class6/out2目录

  • hadoop fs -ls /class6/out2
  • hadoop fs -cat /class6/out2/part-r-00000

打开part-r-00000文件,可以看到运行结果:

  • ACCOUNTING Dept Number:3,Ave Salary:2916
  • RESEARCH Dept Number:3,Ave Salary:2258
  • SALES Dept Number:6,Ave Salary:1566

3.3 测试例子3:求每个部门最早进入公司的员工姓名

3.3.1 问题分析

求每个部门最早进入公司员工姓名,需要得到各部门所有员工的进入公司日期,通过比较获取最早进入公司员工姓名。首先和问题1类似在Mapper的
Setup阶段缓存部门数据,然后Mapper阶段抽取出key为部门名称(利用缓存部门数据把部门编号对应为部门名称),value为员工姓名和进入公
司日期,接着在Shuffle阶段把传过来的数据处理为部门名称对应该部门所有员工+进入公司日期的列表,最后在Reduce中按照部门归组,遍历部门所
有员工,找出最早进入公司的员工并输出。

3.3.2 处理流程图

3.3.3 编写代码

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Q3DeptEarliestEmp extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

        // 用于缓存 dept文件中的数据private Map<String, String> deptMap = new HashMap<String, String>();
        private String[] kv;

        // 此方法会在Map方法执行之前执行且执行一次@Overrideprotected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader in = null;
            try {
                // 从当前作业中获取要缓存的文件
                Path[] paths =     DistributedCache.getLocalCacheFiles(context.getConfiguration());
                String deptIdName = null;
                for (Path path : paths) {
                    if (path.toString().contains("dept")) {
                        in = new BufferedReader(new FileReader(path.toString()));
                        while (null != (deptIdName = in.readLine())) {

                            // 对部门文件字段进行拆分并缓存到deptMap中// 其中Map中key为部门编号,value为所在部门名称
                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {

            // 对员工文件字段进行拆分
            kv = value.toString().split(",");

            // map join: 在map阶段过滤掉不需要的数据// 输出key为部门名称和value为员工姓名+","+员工进入公司日期if (deptMap.containsKey(kv[7])) {
                if (null != kv[4] && !"".equals(kv[4].toString())) {
                    context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[1].trim()                     + "," + kv[4].trim()));
                }
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,         InterruptedException {

            // 员工姓名和进入公司日期
            String empName = null;
            String empEnterDate = null;

            // 设置日期转换格式和最早进入公司的员工、日期
            DateFormat df = new SimpleDateFormat("dd-MM月-yy");

            Date earliestDate = new Date();
            String earliestEmp = null;

            // 遍历该部门下所有员工,得到最早进入公司的员工信息for (Text val : values) {
                empName = val.toString().split(",")[0];
                empEnterDate = val.toString().split(",")[1].toString().trim();
                try {
                    System.out.println(df.parse(empEnterDate));
                    if (df.parse(empEnterDate).compareTo(earliestDate) < 0) {
                        earliestDate = df.parse(empEnterDate);
                        earliestEmp = empName;
                    }
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }

            // 输出key为部门名称和value为该部门最早进入公司员工
            context.write(key, new Text("The earliest emp of dept:" + earliestEmp + ", Enter             date:" + new SimpleDateFormat("yyyy-MM-dd").format(earliestDate)));
        }
    }

    @Overridepublic int run(String[] args) throws Exception {

        // 实例化作业对象,设置作业名称
        Job job = new Job(getConf(), "Q3DeptEarliestEmp");
        job.setJobName("Q3DeptEarliestEmp");

        // 设置Mapper和Reduce类
        job.setJarByClass(Q3DeptEarliestEmp.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        // 设置输入格式类
        job.setInputFormatClass(TextInputFormat.class);

        // 设置输出格式类
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第三个参数为输出路径
    String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
    DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    /**
     * 主方法,执行入口
     * @param args 输入参数
     */public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q3DeptEarliestEmp(), args);
        System.exit(res);
    }
}

3.3.4 编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q3DeptEarliestEmp.java程序代码(代码页
可以使用/home/shiyanlou/install-pack/class6/Q3DeptEarliestEmp.java文件)

  • cd /app/hadoop-1.1.2/myclass/class6
  • vi Q3DeptEarliestEmp.java

编译代码

  • javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q3DeptEarliestEmp.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

  • jar cvf ./Q3DeptEarliestEmp.jar ./Q3DeptEar*.class
  • mv *.jar ../..
  • rm Q3DeptEar*.class

3.3.5 运行并查看结果

运行Q3DeptEarliestEmp时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

  • 部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
  • 员工数据路径:hdfs://hadoop:9000/class6/input/emp
  • 输出路径:hdfs://hadoop:9000/class6/out3

运行如下命令:

  • cd /app/hadoop-1.1.2
  • hadoop jar Q3DeptEarliestEmp.jar Q3DeptEarliestEmp
    hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp
    hdfs://hadoop:9000/class6/out3

运行成功后,刷新CentOS HDFS中的输出路径/class6/out3目录

  • hadoop fs -ls /class6/out3
  • hadoop fs -cat /class6/out3/part-r-00000

打开part-r-00000文件,可以看到运行结果:

  • ACCOUNTING The earliest emp of dept:CLARK, Enter date:1981-06-09
  • RESEARCH The earliest emp of dept:SMITH, Enter date:1980-12-17
  • SALES The earliest emp of dept:ALLEN, Enter date:1981-02-20

3.4 测试例子4:求各个城市的员工的总工资

3.4.1 问题分析

求各个城市员工的总工资,需要得到各个城市所有员工的工资,通过对各个城市所有员工工资求和得到总工资。首先和测试例子1类似在Mapper的
Setup阶段缓存部门对应所在城市数据,然后在Mapper阶段抽取出key为城市名称(利用缓存数据把部门编号对应为所在城市名称),value为员
工工资,接着在Shuffle阶段把传过来的数据处理为城市名称对应该城市所有员工工资,最后在Reduce中按照城市归组,遍历城市所有员工,求出工资
总数并输出。

3.4.2 处理流程图

3.4.3 编写代码

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Q4SumCitySalary extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

        // 用于缓存 dept文件中的数据private Map<String, String> deptMap = new HashMap<String, String>();
        private String[] kv;

        // 此方法会在Map方法执行之前执行且执行一次@Overrideprotected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader in = null;
            try {
                // 从当前作业中获取要缓存的文件
                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                String deptIdName = null;
                for (Path path : paths) {
                    if (path.toString().contains("dept")) {
                        in = new BufferedReader(new FileReader(path.toString()));
                        while (null != (deptIdName = in.readLine())) {

                            // 对部门文件字段进行拆分并缓存到deptMap中// 其中Map中key为部门编号,value为所在城市名称
                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[2]);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {

            // 对员工文件字段进行拆分
            kv = value.toString().split(",");

            // map join: 在map阶段过滤掉不需要的数据,输出key为城市名称和value为员工工资if (deptMap.containsKey(kv[7])) {
                if (null != kv[5] && !"".equals(kv[5].toString())) {
                    context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
                }
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, LongWritable> {

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,         InterruptedException {

            // 对同一城市的员工工资进行求和long sumSalary = 0;
            for (Text val : values) {
                sumSalary += Long.parseLong(val.toString());
            }

            // 输出key为城市名称和value为该城市工资总和
            context.write(key, new LongWritable(sumSalary));
        }
    }

    @Overridepublic int run(String[] args) throws Exception {

        // 实例化作业对象,设置作业名称
        Job job = new Job(getConf(), "Q4SumCitySalary");
        job.setJobName("Q4SumCitySalary");

        // 设置Mapper和Reduce类
        job.setJarByClass(Q4SumCitySalary.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        // 设置输入格式类
        job.setInputFormatClass(TextInputFormat.class);

        // 设置输出格式类
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
    String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
    DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    /**
     * 主方法,执行入口
     * @param args 输入参数
     */public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q4SumCitySalary(), args);
        System.exit(res);
    }
}

3.4.4 编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q4SumCitySalary.java程序代码(代码页可以
使用/home/shiyanlou/install-pack/class6/Q4SumCitySalary.java文件)

  • cd /app/hadoop-1.1.2/myclass/class6
  • vi Q4SumCitySalary.java

编译代码

  • javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q4SumCitySalary.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

  • jar cvf ./Q4SumCitySalary.jar ./Q4SumCity*.class
  • mv *.jar ../..
  • rm Q4SumCity*.class

3.4.5 运行并查看结果

运行Q4SumCitySalary时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

  • 部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
  • 员工数据路径:hdfs://hadoop:9000/class6/input/emp
  • 输出路径:hdfs://hadoop:9000/class6/out4

运行如下命令:

  • cd /app/hadoop-1.1.2
  • hadoop jar Q4SumCitySalary.jar Q4SumCitySalary
    hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp
    hdfs://hadoop:9000/class6/out4

运行成功后,刷新CentOS HDFS中的输出路径/class6/out4目录

  • hadoop fs -ls /class6/out4
  • hadoop fs -cat /class6/out4/part-r-00000

打开part-r-00000文件,可以看到运行结果:

  • CHICAGO 9400
  • DALLAS 6775
  • NEW YORK 8750

3.5 测试例子5:列出工资比上司高的员工姓名及其工资

3.5.1 问题分析

求工资比上司高的员工姓名及工资,需要得到上司工资及上司所有下属员工,通过比较他们工资高低得到比上司工资高的员工。在Mapper阶段输出经理
数据和员工对应经理表数据,其中经理数据key为员工编号、value为"M,该员工工资",员工对应经理表数据key为经理编号、value为"E,该
员工姓名,该员工工资";然后在Shuffle阶段把传过来的经理数据和员工对应经理表数据进行归组,如编号为7698员工,value中标志M为自己工
资,value中标志E为其下属姓名及工资;最后在Reduce中遍历比较员工与经理工资高低,输出工资高于经理的员工。

3.5.2 处理流程图

3.5.3 编写代码

import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Q5EarnMoreThanManager extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

        public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {

            // 对员工文件字段进行拆分
            String[] kv = value.toString().split(",");

            // 输出经理表数据,其中key为员工编号和value为M+该员工工资
            context.write(new Text(kv[0].toString()), new Text("M," + kv[5]));

            // 输出员工对应经理表数据,其中key为经理编号和value为(E,该员工姓名,该员工工资)if (null != kv[3] && !"".equals(kv[3].toString())) {
                context.write(new Text(kv[3].toString()), new Text("E," + kv[1] + "," + kv[5]));
            }
        }
    }

    public static class Reduce extends Reducer<Text, Text, Text, Text> {

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException,         InterruptedException {

            // 定义员工姓名、工资和存放部门员工Map
            String empName;
            long empSalary = 0;
            HashMap<String, Long> empMap = new HashMap<String, Long>();

            // 定义经理工资变量long mgrSalary = 0;

            for (Text val : values) {
                if (val.toString().startsWith("E")) {
                    // 当是员工标示时,获取该员工对应的姓名和工资并放入Map中
                    empName = val.toString().split(",")[1];
                    empSalary = Long.parseLong(val.toString().split(",")[2]);
                    empMap.put(empName, empSalary);
                } else {
                    // 当时经理标志时,获取该经理工资
                    mgrSalary = Long.parseLong(val.toString().split(",")[1]);
                }
            }

            // 遍历该经理下属,比较员工与经理工资高低,输出工资高于经理的员工for (java.util.Map.Entry<String, Long> entry : empMap.entrySet()) {
                if (entry.getValue() > mgrSalary) {
                    context.write(new Text(entry.getKey()), new Text("" + entry.getValue()));
                }
            }
        }
    }

    @Overridepublic int run(String[] args) throws Exception {

        // 实例化作业对象,设置作业名称
        Job job = new Job(getConf(), "Q5EarnMoreThanManager");
        job.setJobName("Q5EarnMoreThanManager");

        // 设置Mapper和Reduce类
        job.setJarByClass(Q5EarnMoreThanManager.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        // 设置输入格式类
        job.setInputFormatClass(TextInputFormat.class);

        // 设置输出格式类
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 第1个参数为员工数据路径和第2个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    /**
     * 主方法,执行入口
     * @param args 输入参数
     */public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q5EarnMoreThanManager(), args);
        System.exit(res);
    }
}

3.5.4 编译并打包代码
进入/app/hadoop-1.1.2/myclass/class6目录中新建Q5EarnMoreThanManager.java程序代码(代码
页可以使用/home/shiyanlou/install-pack/class6/Q5EarnMoreThanManager.java文件)

  • cd /app/hadoop-1.1.2/myclass/class6
  • vi Q5EarnMoreThanManager.java

编译代码
-javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q5EarnMoreThanManager.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

  • jar cvf ./Q5EarnMoreThanManager.jar ./Q5EarnMore*.class
  • mv *.jar ../..
  • rm Q5EarnMore*.class

3.5.5 运行并查看结果
运行Q5EarnMoreThanManager运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

  • 员工数据路径:hdfs://hadoop:9000/class6/input/emp
  • 输出路径:hdfs://hadoop:9000/class6/out5

运行如下命令:

  • cd /app/hadoop-1.1.2
  • hadoop jar Q5EarnMoreThanManager.jar Q5EarnMoreThanManager hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out5

运行成功后,刷新CentOS HDFS中的输出路径/class6/out5目录

  • hadoop fs -ls /class6/out5
  • hadoop fs -cat /class6/out5/part-r-00000

打开part-r-00000文件,可以看到运行结果:

  • FORD 3000

3.6 测试例子6:列出工资比公司平均工资要高的员工姓名及其工资

3.6.1 问题分析

求工资比公司平均工资要高的员工姓名及工资,需要得到公司的平均工资和所有员工工资,通过比较得出工资比平均工资高的员工姓名及工资。这个问题可以
分两个作业进行解决,先求出公司的平均工资,然后与所有员工进行比较得到结果;也可以在一个作业进行解决,这里就得使用作业
setNumReduceTasks方法,设置Reduce任务数为1,保证每次运行一个reduce任务,从而能先求出平均工资,然后进行比较得出结
果。
在Mapper阶段输出两份所有员工数据,其中一份key为0、value为该员工工资,另外一份key为0、value为"该员工姓名
,员工工资";然后在Shuffle阶段把传过来数据按照key进行归组,在该任务中有key值为0和1两组数据;最后在Reduce中对key值0的所
有员工求工资总数和员工数,获得平均工资;对key值1,比较员工与平均工资的大小,输出比平均工资高的员工和对应的工资。

3.6.2 处理流程图

3.6.3 编写代码

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Q6HigherThanAveSalary extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {

        public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {

            // 对员工文件字段进行拆分
            String[] kv = value.toString().split(",");

            // 获取所有员工数据,其中key为0和value为该员工工资
            context.write(new IntWritable(0), new Text(kv[5]));

            // 获取所有员工数据,其中key为0和value为(该员工姓名 ,员工工资)
            context.write(new IntWritable(1), new Text(kv[1] + "," + kv[5]));
        }
    }

    public static class Reduce extends Reducer<IntWritable, Text, Text, Text> {

        // 定义员工工资、员工数和平均工资private long allSalary = 0;
        private int allEmpCount = 0;
        private long aveSalary = 0;

        // 定义员工工资变量private long empSalary = 0;

        public void reduce(IntWritable key, Iterable<Text> values, Context context) throws         IOException, InterruptedException {

            for (Text val : values) {
                if (0 == key.get()) {
                    // 获取所有员工工资和员工数
                    allSalary += Long.parseLong(val.toString());
                    allEmpCount++;
                    System.out.println("allEmpCount = " + allEmpCount);
                } else if (1 == key.get()) {
                    if (aveSalary == 0) {
                        aveSalary = allSalary / allEmpCount;
                        context.write(new Text("Average Salary = "), new Text("" + aveSalary));
                        context.write(new Text("Following employees have salarys higher than                         Average:"), new Text(""));
                    }

                    // 获取员工的平均工资
                    System.out.println("Employee salary = " + val.toString());
                    aveSalary = allSalary / allEmpCount;

                    // 比较员工与平均工资的大小,输出比平均工资高的员工和对应的工资
                    empSalary = Long.parseLong(val.toString().split(",")[1]);
                    if (empSalary > aveSalary) {
                        context.write(new Text(val.toString().split(",")[0]), new Text("" +                         empSalary));
                    }
                }
            }
        }
    }

    @Overridepublic int run(String[] args) throws Exception {

        // 实例化作业对象,设置作业名称
        Job job = new Job(getConf(), "Q6HigherThanAveSalary");
        job.setJobName("Q6HigherThanAveSalary");

        // 设置Mapper和Reduce类
        job.setJarByClass(Q6HigherThanAveSalary.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        // 必须设置Reduce任务数为1 # -D mapred.reduce.tasks = 1// 这是该作业设置的核心,这样才能够保证各reduce是串行的
        job.setNumReduceTasks(1);

        // 设置输出格式类
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        // 设置输出键和值类型
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 第1个参数为员工数据路径和第2个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    /**
     * 主方法,执行入口
     * @param args 输入参数
     */public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q6HigherThanAveSalary(), args);
        System.exit(res);
    }
}

3.6.4 编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q5EarnMoreThanManager.java程序代码
(代码页可以使用/home/shiyanlou/install-pack/class6/Q6HigherThanAveSalary.java文
件)

  • cd /app/hadoop-1.1.2/myclass/class6
  • vi Q6HigherThanAveSalary.java

编译代码

  • javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q6HigherThanAveSalary.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

  • jar cvf ./Q6HigherThanAveSalary.jar ./Q6HigherThan*.class
  • mv *.jar ../..
  • rm Q6HigherThan*.class

3.6.5 运行并查看结果

运行Q6HigherThanAveSalary运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

  • 员工数据路径:hdfs://hadoop:9000/class6/input/emp
  • 输出路径:hdfs://hadoop:9000/class6/out6

运行如下命令:

  • cd /app/hadoop-1.1.2
  • hadoop jar Q6HigherThanAveSalary.jar Q6HigherThanAveSalary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out6

运行成功后,刷新CentOS HDFS中的输出路径/class6/out6目录

  • hadoop fs -ls /class6/out6
  • hadoop fs -cat /class6/out6/part-r-00000

打开part-r-00000文件,可以看到运行结果:

  • Average Salary = 2077
  • Following employees have salarys higher than Average:
  • FORD 3000
  • CLARK 2450
  • KING 5000
  • JONES 2975
  • BLAKE 2850

3.7 测试例子7:列出名字以J开头的员工姓名及其所属部门名称

3.7.1 问题分析

求名字以J开头的员工姓名机器所属部门名称,只需判断员工姓名是否以J开头。首先和问题1类似在Mapper的Setup阶段缓存部门数据,然后在
Mapper阶段判断员工姓名是否以J开头,如果是抽取出员工姓名和员工所在部门编号,利用缓存部门数据把部门编号对应为部门名称,转换后输出结果。

3.7.2 处理流程图

3.7.3 编写代码

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Q7NameDeptOfStartJ extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {

        // 用于缓存 dept文件中的数据private Map<String, String> deptMap = new HashMap<String, String>();
        private String[] kv;

        // 此方法会在Map方法执行之前执行且执行一次@Overrideprotected void setup(Context context) throws IOException, InterruptedException {
            BufferedReader in = null;
            try {

                // 从当前作业中获取要缓存的文件
                Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
                String deptIdName = null;
                for (Path path : paths) {

                    // 对部门文件字段进行拆分并缓存到deptMap中if (path.toString().contains("dept")) {
                        in = new BufferedReader(new FileReader(path.toString()));
                        while (null != (deptIdName = in.readLine())) {

                            // 对部门文件字段进行拆分并缓存到deptMap中// 其中Map中key为部门编号,value为所在部门名称
                            deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {

            // 对员工文件字段进行拆分
            kv = value.toString().split(",");

            // 输出员工姓名为J开头的员工信息,key为员工姓名和value为员工所在部门名称if (kv[1].toString().trim().startsWith("J")) {
                context.write(new Text(kv[1].trim()), new Text(deptMap.get(kv[7].trim())));
            }
        }
    }

    @Overridepublic int run(String[] args) throws Exception {

        // 实例化作业对象,设置作业名称
        Job job = new Job(getConf(), "Q7NameDeptOfStartJ");
        job.setJobName("Q7NameDeptOfStartJ");

        // 设置Mapper和Reduce类
        job.setJarByClass(Q7NameDeptOfStartJ.class);
        job.setMapperClass(MapClass.class);

        // 设置输入格式类
        job.setInputFormatClass(TextInputFormat.class);

        // 设置输出格式类
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
    DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
        FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    /**
     * 主方法,执行入口
     * @param args 输入参数
     */public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q7NameDeptOfStartJ(), args);
        System.exit(res);
    }
}

3.7.4 编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q7NameDeptOfStartJ.java程序代码(代码
页可以使用/home/shiyanlou/install-pack/class6/Q7NameDeptOfStartJ.java文件)

  • cd /app/hadoop-1.1.2/myclass/class6
  • vi Q7NameDeptOfStartJ.java

编译代码

  • javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q7NameDeptOfStartJ.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

  • jar cvf ./Q7NameDeptOfStartJ.jar ./Q7NameDept*.class
  • mv *.jar ../..
  • rm Q7NameDept*.class

3.7.5 运行并查看结果

运行Q7NameDeptOfStartJ时需要输入部门数据路径、员工数据路径和输出路径三个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

  • 部门数据路径:hdfs://hadoop:9000/class6/input/dept,部门数据将缓存在各运行任务的节点内容中,可以提供处理的效率
  • 员工数据路径:hdfs://hadoop:9000/class6/input/emp
  • 输出路径:hdfs://hadoop:9000/class6/out7

运行如下命令:

  • cd /app/hadoop-1.1.2
  • hadoop jar Q7NameDeptOfStartJ.jar Q7NameDeptOfStartJ
    hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp
    hdfs://hadoop:9000/class6/out7

运行成功后,刷新CentOS HDFS中的输出路径/class6/out7目录

  • hadoop fs -ls /class6/out7
  • hadoop fs -cat /class6/out7/part-r-00000

打开part-r-00000文件,可以看到运行结果:

  • JAMES SALES
  • JONES RESEARCH

3.8 测试例子8:列出工资最高的头三名员工姓名及其工资

3.8.1 问题分析

求工资最高的头三名员工姓名及工资,可以通过冒泡法得到。在Mapper阶段输出经理数据和员工对应经理表数据,其中经理数据key为0值、value为"员工姓名,员工工资";最后在Reduce中通过冒泡法遍历所有员工,比较员工工资多少,求出前三名。

3.8.2 处理流程图

3.8.3 编写代码

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Q8SalaryTop3Salary extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {

        public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {

            // 对员工文件字段进行拆分
            String[] kv = value.toString().split(",");

            // 输出key为0和value为员工姓名+","+员工工资
            context.write(new IntWritable(0), new Text(kv[1].trim() + "," + kv[5].trim()));
        }
    }

    public static class Reduce extends Reducer<IntWritable, Text, Text, Text> {

        public void reduce(IntWritable key, Iterable<Text> values, Context context) throws         IOException, InterruptedException {

            // 定义工资前三员工姓名
            String empName;
            String firstEmpName = "";
            String secondEmpName = "";
            String thirdEmpName = "";

            // 定义工资前三工资long empSalary = 0;
            long firstEmpSalary = 0;
            long secondEmpSalary = 0;
            long thirdEmpSalary = 0;

            // 通过冒泡法遍历所有员工,比较员工工资多少,求出前三名for (Text val : values) {
                empName = val.toString().split(",")[0];
                empSalary = Long.parseLong(val.toString().split(",")[1]);

                if(empSalary > firstEmpSalary) {
                    thirdEmpName = secondEmpName;
                    thirdEmpSalary = secondEmpSalary;
                    secondEmpName = firstEmpName;
                    secondEmpSalary = firstEmpSalary;
                    firstEmpName = empName;
                    firstEmpSalary = empSalary;
                } else if (empSalary > secondEmpSalary) {
                    thirdEmpName = secondEmpName;
                    thirdEmpSalary = secondEmpSalary;
                    secondEmpName = empName;
                    secondEmpSalary = empSalary;
                } else if (empSalary > thirdEmpSalary) {
                    thirdEmpName = empName;
                    thirdEmpSalary = empSalary;
                }
            }

            // 输出工资前三名信息
            context.write(new Text( "First employee name:" + firstEmpName), new Text("Salary:"             + firstEmpSalary));
            context.write(new Text( "Second employee name:" + secondEmpName), new                     Text("Salary:" + secondEmpSalary));
            context.write(new Text( "Third employee name:" + thirdEmpName), new Text("Salary:"             + thirdEmpSalary));
        }
    }

    @Overridepublic int run(String[] args) throws Exception {

        // 实例化作业对象,设置作业名称
        Job job = new Job(getConf(), "Q8SalaryTop3Salary");
        job.setJobName("Q8SalaryTop3Salary");

        // 设置Mapper和Reduce类
        job.setJarByClass(Q8SalaryTop3Salary.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        // 设置输入格式类
        job.setInputFormatClass(TextInputFormat.class);

        // 设置输出格式类
        job.setOutputKeyClass(Text.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputValueClass(Text.class);

        // 第1个参数为员工数据路径和第2个参数为输出路径
        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),                     args).getRemainingArgs();
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    /**
     * 主方法,执行入口
     * @param args 输入参数
     */public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q8SalaryTop3Salary(), args);
        System.exit(res);
    }
}

3.8.4 编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q8SalaryTop3Salary.java程序代码(代码
页可以使用/home/shiyanlou/install-pack/class6/Q8SalaryTop3Salary.java文件)

  • cd /app/hadoop-1.1.2/myclass/class6
  • vi Q8SalaryTop3Salary.java

编译代码

  • javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q8SalaryTop3Salary.java
    把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误
  • jar cvf ./Q8SalaryTop3Salary.jar ./Q8SalaryTop3*.class
  • mv *.jar ../..
  • rm Q8SalaryTop3*.class

3.8.5 运行并查看结果

运行Q8SalaryTop3Salary运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

  • 员工数据路径:hdfs://hadoop:9000/class6/input/emp
  • 输出路径:hdfs://hadoop:9000/class6/out8

运行如下命令:

  • cd /app/hadoop-1.1.2
  • hadoop jar Q8SalaryTop3Salary.jar Q8SalaryTop3Salary hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out8

运行成功后,刷新CentOS HDFS中的输出路径/class6/out8目录

  • hadoop fs -ls /class6/out8
  • hadoop fs -cat /class6/out8/part-r-00000

打开part-r-00000文件,可以看到运行结果:

  • First employee name:KING Salary:5000
  • Second employee name:FORD Salary:3000
  • Third employee name:JONES Salary:2975

3.9 测试例子9:将全体员工按照总收入(工资+提成)从高到低排列

3.9.1 问题分析

求全体员工总收入降序排列,获得所有员工总收入并降序排列即可。在Mapper阶段输出所有员工总工资数据,其中key为员工总工资、value为
员工姓名,在Mapper阶段的最后会先调用job.setPartitionerClass对数据进行分区,每个分区映射到一个reducer,每个分
区内又调用job.setSortComparatorClass设置的key比较函数类排序。由于在本作业中Map的key只有0值,故能实现对所有数
据进行排序。

3.9.2 处理流程图

3.9.3 编写代码

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Q9EmpSalarySort extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {

        public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {

            // 对员工文件字段进行拆分
            String[] kv = value.toString().split(",");

            // 输出key为员工所有工资和value为员工姓名int empAllSalary = "".equals(kv[6]) ? Integer.parseInt(kv[5]) :                             Integer.parseInt(kv[5]) + Integer.parseInt(kv[6]);
            context.write(new IntWritable(empAllSalary), new Text(kv[1]));
        }
    }

    /**
     * 递减排序算法
     */public static class DecreaseComparator extends IntWritable.Comparator {
        public int compare(WritableComparable a, WritableComparable b) {
            return -super.compare(a, b);
        }

        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
            return -super.compare(b1, s1, l1, b2, s2, l2);
        }
    }

    @Overridepublic int run(String[] args) throws Exception {

        // 实例化作业对象,设置作业名称
        Job job = new Job(getConf(), "Q9EmpSalarySort");
        job.setJobName("Q9EmpSalarySort");

        // 设置Mapper和Reduce类
        job.setJarByClass(Q9EmpSalarySort.class);
        job.setMapperClass(MapClass.class);

        // 设置输出格式类
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setSortComparatorClass(DecreaseComparator.class);

        // 第1个参数为员工数据路径和第2个参数为输出路径
        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),                     args).getRemainingArgs();
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    /**
     * 主方法,执行入口
     * @param args 输入参数
     */public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q9EmpSalarySort(), args);
        System.exit(res);
    }
}

3.9.4 编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建Q9EmpSalarySort.java程序代码(代码页可以
使用/home/shiyanlou/install-pack/class6/Q9EmpSalarySort.java文件)

  • cd /app/hadoop-1.1.2/myclass/class6
  • vi Q9EmpSalarySort.java

编译代码

  • javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q9EmpSalarySort.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

  • jar cvf ./Q9EmpSalarySort.jar ./Q9EmpSalary*.class
  • mv *.jar ../..
  • rm Q9EmpSalary*.class

3.9.5 运行并查看结果

运行Q9EmpSalarySort运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

  • 员工数据路径:hdfs://hadoop:9000/class6/input/emp
  • 输出路径:hdfs://hadoop:9000/class6/out9

运行如下命令:

  • cd /app/hadoop-1.1.2
  • hadoop jar Q9EmpSalarySort.jar Q9EmpSalarySort hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out9

运行成功后,刷新CentOS HDFS中的输出路径/class6/out9目录

  • hadoop fs -ls /class6/out9
  • hadoop fs -cat /class6/out9/part-r-00000

打开part-r-00000文件,可以看到运行结果:

  • 5000 KING
  • 3000 FORD
  • 2975 JONES
  • 2850 BLAKE
  • ......

3.10 测试例子10:求任何两名员工信息传递所需要经过的中间节点数

3.10.1 问题分析

该公司所有员工可以形成入下图的树形结构,求两个员工的沟通的中间节点数,可转换在员工树中求两个节点连通所经过的节点数,即从其中一节点到汇合节
点经过节点数加上另一节点到汇合节点经过节点数。例如求M到Q所需节点数,可以先找出M到A经过的节点数,然后找出Q到A经过的节点数,两者相加得到M到
Q所需节点数。

在作业中首先在Mapper阶段所有员工数据,其中经理数据key为0值、value为"员工编号,员工经理编号",然后在Reduce阶段把所有
员工放到员工列表和员工对应经理链表Map中,最后在Reduce的Cleanup中按照上面说所算法对任意两个员工计算出沟通的路径长度并输出。

3.10.2 处理流程图

3.10.3 编写代码

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Q10MiddlePersonsCountForComm extends Configured implements Tool {

    public static class MapClass extends Mapper<LongWritable, Text, IntWritable, Text> {

        public void map(LongWritable key, Text value, Context context) throws IOException,         InterruptedException {

            // 对员工文件字段进行拆分
            String[] kv = value.toString().split(",");

            // 输出key为0和value为员工编号+","+员工经理编号
            context.write(new IntWritable(0), new Text(kv[0] + "," + ("".equals(kv[3]) ? " " : kv[3])));
        }
    }

    public static class Reduce extends Reducer<IntWritable, Text, NullWritable, Text> {

        // 定义员工列表和员工对应经理Map
        List<String> employeeList = new ArrayList<String>();
        Map<String, String> employeeToManagerMap = new HashMap<String, String>();

        public void reduce(IntWritable key, Iterable<Text> values, Context context) throws         IOException, InterruptedException {

            // 在reduce阶段把所有员工放到员工列表和员工对应经理Map中for (Text value : values) {
                employeeList.add(value.toString().split(",")[0].trim());
                employeeToManagerMap.put(value.toString().split(",")[0].trim(),                             value.toString().split(",")[1].trim());
            }
        }

        @Overrideprotected void cleanup(Context context) throws IOException, InterruptedException {
            int totalEmployee = employeeList.size();
            int i, j;
            int distance;
            System.out.println(employeeList);
            System.out.println(employeeToManagerMap);

            // 对任意两个员工计算出沟通的路径长度并输出for (i = 0; i < (totalEmployee - 1); i++) {
                for (j = (i + 1); j < totalEmployee; j++) {
                    distance = calculateDistance(i, j);
                    String value = employeeList.get(i) + " and " + employeeList.get(j) + " =                     " + distance;
                    context.write(NullWritable.get(), new Text(value));
                }
            }
        }

        /**
         * 该公司可以由所有员工形成树形结构,求两个员工的沟通的中间节点数,可以转换在员工树中两员工之间的距离
         * 由于在树中任意两点都会在某上级节点汇合,根据该情况设计了如下算法
         */private int calculateDistance(int i, int j) {
            String employeeA = employeeList.get(i);
            String employeeB = employeeList.get(j);
            int distance = 0;

            // 如果A是B的经理,反之亦然if (employeeToManagerMap.get(employeeA).equals(employeeB) ||                                     employeeToManagerMap.get(employeeB).equals(employeeA)) {
                distance = 0;
            }
            // A和B在同一经理下else if  (employeeToManagerMap.get(employeeA).equals(
                    employeeToManagerMap.get(employeeB))) {
                distance = 0;
            } else {
                // 定义A和B对应经理链表
                List<String> employeeA_ManagerList = new ArrayList<String>();
                List<String> employeeB_ManagerList = new ArrayList<String>();

                // 获取从A开始经理链表
                employeeA_ManagerList.add(employeeA);
                String current = employeeA;
                while (false == employeeToManagerMap.get(current).isEmpty()) {
                    current = employeeToManagerMap.get(current);
                    employeeA_ManagerList.add(current);
                }

                // 获取从B开始经理链表
                employeeB_ManagerList.add(employeeB);
                current = employeeB;
                while (false == employeeToManagerMap.get(current).isEmpty()) {
                    current = employeeToManagerMap.get(current);
                    employeeB_ManagerList.add(current);
                }

                int ii = 0, jj = 0;
                String currentA_manager, currentB_manager;
                boolean found = false;

                // 遍历A与B开始经理链表,找出汇合点计算for (ii = 0; ii < employeeA_ManagerList.size(); ii++) {
                    currentA_manager = employeeA_ManagerList.get(ii);
                    for (jj = 0; jj < employeeB_ManagerList.size(); jj++) {
                        currentB_manager = employeeB_ManagerList.get(jj);
                        if (currentA_manager.equals(currentB_manager)) {
                            found = true;
                            break;
                        }
                    }

                    if (found) {
                        break;
                    }
                }

                // 最后获取两只之前的路径
                distance = ii + jj - 1;
            }

            return distance;
        }
    }

    @Overridepublic int run(String[] args) throws Exception {

        // 实例化作业对象,设置作业名称
        Job job = new Job(getConf(), "Q10MiddlePersonsCountForComm");
        job.setJobName("Q10MiddlePersonsCountForComm");

        // 设置Mapper和Reduce类
        job.setJarByClass(Q10MiddlePersonsCountForComm.class);
        job.setMapperClass(MapClass.class);
        job.setReducerClass(Reduce.class);

        // 设置Mapper输出格式类
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        // 设置Reduce输出键和值类型
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(Text.class);

        // 第1个参数为员工数据路径和第2个参数为输出路径
        String[] otherArgs = new GenericOptionsParser(job.getConfiguration(),                     args).getRemainingArgs();
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        job.waitForCompletion(true);
        return job.isSuccessful() ? 0 : 1;
    }

    /**
     * 主方法,执行入口
     * @param args 输入参数
     */public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new Q10MiddlePersonsCountForComm(), args);
        System.exit(res);
    }
}

3.10.4 编译并打包代码

进入/app/hadoop-1.1.2/myclass/class6目录中新建
Q10MiddlePersonsCountForComm.java程序代码(代码页可以使用/home/shiyanlou/install-
pack/class6/Q10MiddlePersonsCountForComm.java文件)

  • cd /app/hadoop-1.1.2/myclass/class6
  • vi Q10MiddlePersonsCountForComm.java

编译代码

  • javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q10MiddlePersonsCountForComm.java

把编译好的代码打成jar包,如果不打成jar形式运行会提示class无法找到的错误

  • jar cvf ./Q10MiddlePersonsCountForComm.jar ./Q10MiddlePersons*.class
  • mv *.jar ../..
  • rm Q10MiddlePersons*.class

3.10.5 运行并查看结果

运行Q10MiddlePersonsCountForComm运行的员工数据路径和输出路径两个参数,需要注意的是hdfs的路径参数路径需要全路径,否则运行会报错:

  • 员工数据路径:hdfs://hadoop:9000/class6/input/emp
  • 输出路径:hdfs://hadoop:9000/class6/out10

运行如下命令:

  • cd /app/hadoop-1.1.2
  • hadoop jar Q10MiddlePersonsCountForComm.jar
    Q10MiddlePersonsCountForComm hdfs://hadoop:9000/class6/input/emp
    hdfs://hadoop:9000/class6/out10

运行成功后,刷新CentOS HDFS中的输出路径/class6/out10目录

  • hadoop fs -ls /class6/out10
  • hadoop fs -cat /class6/out10/part-r-00000

打开part-r-00000文件,可以看到运行结果:

  • 7369 and 7499 = 4
  • 7369 and 7521 = 4
  • 7369 and 7566 = 1
  • 7369 and 7654 = 4
  • 7369 and 7698 = 3
  • ......

来源: <https://www.shiyanlou.com/courses/running/1034>

时间: 2024-10-04 11:43:26

MapReduce应用案例的相关文章

大数据技术之_05_Hadoop学习_04_MapReduce_Hadoop企业优化(重中之重)+HDFS小文件优化方法+MapReduce扩展案例+倒排索引案例(多job串联)+TopN案例+找博客共同粉丝案例+常见错误及解决方案

第6章 Hadoop企业优化(重中之重)6.1 MapReduce 跑的慢的原因6.2 MapReduce优化方法6.2.1 数据输入6.2.2 Map阶段6.2.3 Reduce阶段6.2.4 I/O传输6.2.5 数据倾斜问题6.2.6 常用的调优参数6.3 HDFS小文件优化方法6.3.1 HDFS小文件弊端6.3.2 HDFS小文件解决方案第7章 MapReduce扩展案例7.1 倒排索引案例(多job串联)7.2 TopN案例7.3 找博客共同粉丝案例第8章 常见错误及解决方案 第6章

MapReduce 应用案例分析 - 单词计数

需求 计算出文件中每个单词的频数.要求输出结果按照单词的字母顺序进行排序.每个单词和其频数占一行,单词和频数之间有间隔. 比如,输入一个文件,其内容如下: hello world hello hadoop hello mapreduce 对应上面给出的输入样例,其输出样例为: hadoop 1 hello 3 mapreduce 1 world 1 方案制定 对该案例,可设计出如下的MapReduce方案: 1. Map阶段各节点完成由输入数据到单词切分的工作 2. shuffle阶段完成相同单

mapreduce 高级案例倒排索引

理解[倒排索引]的功能 熟悉mapreduce 中的combine 功能 根据需求编码实现[倒排索引]的功能,旨在理解mapreduce 的功能. 一:理解[倒排索引]的功能 1.1 倒排索引: 由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引 简单来说根据单词,返回它在哪个文件中出现过,而且频率是多少的结果.例如:就像百度里的搜索,你输入一个关键字,那么百度引擎就迅速的在它的服务器里找到有该关键字的文件,并根据频率和其他一些策略(如页面点击投票率)等来给你返回结果

MapReduce初级案例

1.数据去重  "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选.统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重.下面就进入这个实例的MapReduce程序设计. 1.1 实例描述 对数据文件中的数据进行去重.数据文件中的每行都是一个数据. 样例输入如下所示: 1)file1: 2012-3-1 a 2012-3-2 b 2012-3-3 c 2012-3-4 d 2012-3-5 a 2012-3-6 b 2012-3-7

MapReduce应用案例--简单排序

1. 设计思路 在MapReduce过程中自带有排序,可以使用这个默认的排序达到我们的目的. MapReduce 是按照key值进行排序的,我们在Map过程中将读入的数据转化成IntWritable类型,然后作为Map的key值输出. Reduce 阶段拿到的就是按照key值排序好的<key,value list>,将key值输出,并根据value list 中元素的个数决定key的输出次数. 2. 实现 2.1 程序代码 package sort; import java.io.IOExce

MapReduce应用案例--单表关联

1. 实例描述 单表关联这个实例要求从给出的数据中寻找出所关心的数据,它是对原始数据所包含信息的挖掘. 实例中给出child-parent 表, 求出grandchild-grandparent表. 输入数据 file01: child parent Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Marry Lucy Ben Jack Alice Jack Jesse Terry Alice Terry Jesse Philip Terry Philip

大数据学习之MapReduce编程案例一单词计数 10

一:单词计数 1:单词计数总流程图 2:代码实现 1:Map阶段 package it.dawn.YARNPra.wc_hdfs; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapp

Hadoop之MapReduce程序应用三

摘要:MapReduce程序进行数据去重. 关键词:MapReduce   数据去重 数据源:人工构造日志数据集log-file1.txt和log-file2.txt. log-file1.txt内容 2014-1-1    wangluqing 2014-1-2    root 2014-1-3   root 2014-1-4  wangluqing 2014-1-5  root 2014-1-6  wangluqing log-file2.txt内容 2014-1-1  root 2014-

HADOOP之MAPREDUCE程序应用二

摘要:MapReduce程序进行单词计数. 关键词:MapReduce程序  单词计数 数据源:人工构造英文文档file1.txt,file2.txt. file1.txt 内容 Hello   Hadoop I   am  studying   the   Hadoop  technology file2.txt内容 Hello  world The  world  is  very  beautiful I   love    the   Hadoop    and    world 问题描