MapReduce编程系列 — 4:排序

1、项目名称:

2、程序代码:

package com.sort;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Sort {
    //map将输入中的value化成IntWritable类型,作为输出的key
    public static class Map extends Mapper<Object, Text , IntWritable, IntWritable>{
        public static IntWritable data = new IntWritable();

        public void map(Object key , Text value, Context context) throws IOException,InterruptedException{
            System.out.println("Mapper.................");
            System.out.println("key:"+key+"  value:"+value);

            String line = value.toString();
            data.set(Integer.parseInt(line));
            context.write(data, new IntWritable(1));
            System.out.println("data:"+data+" context:"+context);
        }
    }

    //reduce将输入的key复制到输出的value上,然后根据输入的value-list中元素的个数决定key的输出次数
    //用全局linenum来代表key的位次
    public static class Reduce extends Reducer<IntWritable , IntWritable, IntWritable, IntWritable >{
        public static IntWritable linenum = new IntWritable(1);

        public void reduce(IntWritable key, Iterable<IntWritable> values , Context context)throws IOException,InterruptedException{
            System.out.println("Reducer.................");
            System.out.println("key:"+key+"  value:"+values);

            for(IntWritable val : values){
                context.write(linenum, key);
                System.out.println("linenum:" + linenum +"  key:"+key+" context:"+context);
                linenum = new IntWritable(linenum.get()+1);

            }
        }
    }
    public static void main(String [] args) throws Exception{
        Configuration conf = new Configuration();
        String [] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
        if(otherArgs.length != 2){
            System.out.println("Usage: sort<in><out>");
            System.exit(2);
        }
        Job job = new Job(conf,"sort");
        job.setJarByClass(Sort.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

        System.exit(job.waitForCompletion(true)? 0 : 1);
    }
}

3、测试数据:

file1:

2
32
654
32
15
756
65223

file2:

5956
22
650
92

file3:

26
54
6

4、运行过程:

14/09/21 17:44:27 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/09/21 17:44:27 WARN mapred.JobClient: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
14/09/21 17:44:28 INFO input.FileInputFormat: Total input paths to process : 3
14/09/21 17:44:28 WARN snappy.LoadSnappy: Snappy native library not loaded
14/09/21 17:44:28 INFO mapred.JobClient: Running job: job_local_0001
14/09/21 17:44:28 INFO util.ProcessTree: setsid exited with exit code 0
14/09/21 17:44:28 INFO mapred.Task:  Using ResourceCalculatorPlugin : [email protected]
14/09/21 17:44:28 INFO mapred.MapTask: io.sort.mb = 100
14/09/21 17:44:28 INFO mapred.MapTask: data buffer = 79691776/99614720
14/09/21 17:44:28 INFO mapred.MapTask: record buffer = 262144/327680
Mapper.................
key:0  value:2
data:2 context:[email protected]
Mapper.................
key:2  value:32
data:32 context:[email protected]
Mapper.................
key:5  value:654
data:654 context:[email protected]
Mapper.................
key:9  value:32
data:32 context:[email protected]
Mapper.................
key:12  value:15
data:15 context:[email protected]
Mapper.................
key:15  value:756
data:756 context:[email protected]
Mapper.................
key:19  value:65223
data:65223 context:[email protected]
14/09/21 17:44:28 INFO mapred.MapTask: Starting flush of map output
14/09/21 17:44:28 INFO mapred.MapTask: Finished spill 0
14/09/21 17:44:28 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
14/09/21 17:44:29 INFO mapred.JobClient:  map 0% reduce 0%
14/09/21 17:44:31 INFO mapred.LocalJobRunner:
14/09/21 17:44:31 INFO mapred.Task: Task ‘attempt_local_0001_m_000000_0‘ done.
14/09/21 17:44:31 INFO mapred.Task:  Using ResourceCalculatorPlugin : [email protected]
14/09/21 17:44:31 INFO mapred.MapTask: io.sort.mb = 100
14/09/21 17:44:31 INFO mapred.MapTask: data buffer = 79691776/99614720
14/09/21 17:44:31 INFO mapred.MapTask: record buffer = 262144/327680
Mapper.................
key:0  value:5956
data:5956 context:[email protected]
Mapper.................
key:5  value:22
data:22 context:[email protected]
Mapper.................
key:8  value:650
data:650 context:[email protected]
Mapper.................
key:12  value:92
data:92 context:[email protected]
14/09/21 17:44:31 INFO mapred.MapTask: Starting flush of map output
14/09/21 17:44:31 INFO mapred.MapTask: Finished spill 0
14/09/21 17:44:31 INFO mapred.Task: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
14/09/21 17:44:32 INFO mapred.JobClient:  map 100% reduce 0%
14/09/21 17:44:34 INFO mapred.LocalJobRunner:
14/09/21 17:44:34 INFO mapred.Task: Task ‘attempt_local_0001_m_000001_0‘ done.
14/09/21 17:44:34 INFO mapred.Task:  Using ResourceCalculatorPlugin : [email protected]
14/09/21 17:44:34 INFO mapred.MapTask: io.sort.mb = 100
14/09/21 17:44:34 INFO mapred.MapTask: data buffer = 79691776/99614720
14/09/21 17:44:34 INFO mapred.MapTask: record buffer = 262144/327680
Mapper.................
key:0  value:26
data:26 context:[email protected]
Mapper.................
key:3  value:54
data:54 context:[email protected]
Mapper.................
key:6  value:6
data:6 context:[email protected]
14/09/21 17:44:34 INFO mapred.MapTask: Starting flush of map output
14/09/21 17:44:34 INFO mapred.MapTask: Finished spill 0
14/09/21 17:44:34 INFO mapred.Task: Task:attempt_local_0001_m_000002_0 is done. And is in the process of commiting
14/09/21 17:44:37 INFO mapred.LocalJobRunner:
14/09/21 17:44:37 INFO mapred.Task: Task ‘attempt_local_0001_m_000002_0‘ done.
14/09/21 17:44:37 INFO mapred.Task:  Using ResourceCalculatorPlugin : [email protected]
14/09/21 17:44:37 INFO mapred.LocalJobRunner:
14/09/21 17:44:37 INFO mapred.Merger: Merging 3 sorted segments
14/09/21 17:44:37 INFO mapred.Merger: Down to the last merge-pass, with 3 segments left of total size: 146 bytes
14/09/21 17:44:37 INFO mapred.LocalJobRunner:
Reducer.................
key:2  value:[email protected]
linenum:1  key:2 context:[email protected]
Reducer.................
key:6  value:[email protected]
linenum:2  key:6 context:[email protected]
Reducer.................
key:15  value:[email protected]
linenum:3  key:15 context:[email protected]
Reducer.................
key:22  value:[email protected]
linenum:4  key:22 context:[email protected]
Reducer.................
key:26  value:[email protected]
linenum:5  key:26 context:[email protected]
Reducer.................
key:32  value:[email protected]
linenum:6  key:32 context:[email protected]
linenum:7  key:32 context:org.apache.[email protected]
Reducer.................
key:54  value:[email protected]
linenum:8  key:54 context:[email protected]
Reducer.................
key:92  value:[email protected]
linenum:9  key:92 context:[email protected]
Reducer.................
key:650  value:[email protected]
linenum:10  key:650 context:[email protected]
Reducer.................
key:654  value:[email protected]
linenum:11  key:654 context:[email protected]
Reducer.................
key:756  value:[email protected]
linenum:12  key:756 context:[email protected]
Reducer.................
key:5956  value:[email protected]
linenum:13  key:5956 context:[email protected]
Reducer.................
key:65223  value:[email protected]
linenum:14  key:65223 context:[email protected]
14/09/21 17:44:37 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
14/09/21 17:44:37 INFO mapred.LocalJobRunner:
14/09/21 17:44:37 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
14/09/21 17:44:37 INFO output.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://localhost:9000/user/hadoop/sort_output
14/09/21 17:44:40 INFO mapred.LocalJobRunner: reduce > reduce
14/09/21 17:44:40 INFO mapred.Task: Task ‘attempt_local_0001_r_000000_0‘ done.
14/09/21 17:44:41 INFO mapred.JobClient:  map 100% reduce 100%
14/09/21 17:44:41 INFO mapred.JobClient: Job complete: job_local_0001
14/09/21 17:44:41 INFO mapred.JobClient: Counters: 22
14/09/21 17:44:41 INFO mapred.JobClient:   Map-Reduce Framework
14/09/21 17:44:41 INFO mapred.JobClient:     Spilled Records=28
14/09/21 17:44:41 INFO mapred.JobClient:     Map output materialized bytes=158
14/09/21 17:44:41 INFO mapred.JobClient:     Reduce input records=14
14/09/21 17:44:41 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
14/09/21 17:44:41 INFO mapred.JobClient:     Map input records=14
14/09/21 17:44:41 INFO mapred.JobClient:     SPLIT_RAW_BYTES=345
14/09/21 17:44:41 INFO mapred.JobClient:     Map output bytes=112
14/09/21 17:44:41 INFO mapred.JobClient:     Reduce shuffle bytes=0
14/09/21 17:44:41 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
14/09/21 17:44:41 INFO mapred.JobClient:     Reduce input groups=13
14/09/21 17:44:41 INFO mapred.JobClient:     Combine output records=0
14/09/21 17:44:41 INFO mapred.JobClient:     Reduce output records=14
14/09/21 17:44:41 INFO mapred.JobClient:     Map output records=14
14/09/21 17:44:41 INFO mapred.JobClient:     Combine input records=0
14/09/21 17:44:41 INFO mapred.JobClient:     CPU time spent (ms)=0
14/09/21 17:44:41 INFO mapred.JobClient:     Total committed heap usage (bytes)=1325400064
14/09/21 17:44:41 INFO mapred.JobClient:   File Input Format Counters
14/09/21 17:44:41 INFO mapred.JobClient:     Bytes Read=48
14/09/21 17:44:41 INFO mapred.JobClient:   FileSystemCounters
14/09/21 17:44:41 INFO mapred.JobClient:     HDFS_BYTES_READ=161
14/09/21 17:44:41 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=162878
14/09/21 17:44:41 INFO mapred.JobClient:     FILE_BYTES_READ=3682
14/09/21 17:44:41 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=81
14/09/21 17:44:41 INFO mapred.JobClient:   File Output Format Counters
14/09/21 17:44:41 INFO mapred.JobClient:     Bytes Written=81

5、运行结果:

1    2
2    6
3    15
4    22
5    26
6    32
7    32
8    54
9    92
10    650
11    654
12    756
13    5956
14    65223

时间: 2024-08-09 23:56:25

MapReduce编程系列 — 4:排序的相关文章

MapReduce 编程 系列五 MapReduce 主要过程梳理

前面4篇文章介绍了如何编写一个简单的日志提取程序,读取HDFS share/logs目录下的所有csv日志文件,然后提取数据后,最终输出到share/output目录下. 本篇停留一下,梳理一下主要过程,然后提出新的改进目标. 首先声明一下,所有的代码都是maven工程的,没有使用任何IDE.  这是我一贯的编程风格,用Emacs + JDEE开发.需要使用IDE的只需要学习如何在IDE中使用maven即可. 可比较的序列化 第一个是序列化,这是各种编程技术中常用的.MapReduce的特别之处

MapReduce 编程 系列十一 Map阶段的调优

MapOutputBuffer 对于每一个Map,都有一个内存buffer用来缓存中间结果,这不仅可以缓存,而且还可以用来排序,被称为MapOutputBuffer, 设置这个buffer大小的配置是 io.sort.mb 默认值是100MB. 一般当buffer被使用到一定比例,就会将Map的中间结果往磁盘上写,这个比例的配置是: io.sort.spill.percent 默认值是80%或者0.8. 在内存中排序缓存的过程叫做sort,而当超过上面的比例在磁盘上写入中间结果的过程称之为spi

MapReduce 编程 系列八 根据输入路径产生输出路径和清除HDFS目录

有了前面的MultipleOutputs的使用经验,就可以将HDFS输入目录的路径解析出来,组成输出路径,这在业务上是十分常用的.这样其实是没有多文件名输出,仅仅是调用了MultipleOutputs的addNamedOutput方法一次,设置文件名为result. 同时为了保证计算的可重入性,每次都需要将已经存在的输出目录删除. 先看pom.xml, 现在参数只有一个输入目录了,输出目录会在该路径后面自动加上/output. <project xmlns="http://maven.ap

MapReduce 编程 系列十二 用Hadoop Streaming技术集成newLISP脚本

本文环境和之前的Hadoop 1.x不同,是在Hadoop 2.x环境下测试.功能和前面的日志处理程序一样. 第一个newLISP脚本,起到mapper的作用,在stdin中读取文本数据,将did作为key, value为1,然后将结果输出到stdout 第二个newLISP脚本,起到reducer的作用,在stdin中读取<key, values>, key是dic, values是所有的value,简单对value求和后,写到stdout中 最后应该可以在HDFS下看到结果. 用脚本编程的

MapReduce 编程 系列六 MultipleOutputs使用

在前面的例子中,输出文件名是默认的: _logs part-r-00001 part-r-00003 part-r-00005 part-r-00007 part-r-00009 part-r-00011 part-r-00013 _SUCCESS part-r-00000 part-r-00002 part-r-00004 part-r-00006 part-r-00008 part-r-00010 part-r-00012 part-r-00014 part-r-0000N 还有一个_SUC

MapReduce 编程 系列九 Reducer数目

本篇介绍怎样控制reduce的数目.前面观察结果文件,都会发现通常是以part-r-00000 形式出现多个文件,事实上这个reducer的数目有关系.reducer数目多,结果文件数目就多. 在初始化job的时候.是能够设置reducer的数目的.example4在example的基础上做了改动.改动了pom.xml.使得结束一个參数作为reducer的数目.改动了LogJob.java的代码,作为设置reducer数目. xsi:schemaLocation="http://maven.ap

MapReduce 编程 系列四 MapReduce例子程序运行

MapReduce程序编译是可以在普通的Java环境下进行,现在来到真实的环境上运行. 首先,将日志文件放到HDFS目录下 $ hdfs dfs -put *.csv /user/chenshu/share/logs/ 14/09/27 17:03:22 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where app

MapReduce编程系列 — 3:数据去重

1.项目名称: 2.程序代码: package com.dedup; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce

MapReduce编程系列 — 1:计算单词

1.代码: package com.mrdemo; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import o