MapReduce编程(二) 排序

一、问题描述

文件中存储了商品id和商品价格的信息,文件中每行2列,第一列文本类型代表商品id,第二列为double类型代表商品价格。数据格式如下:

pid0 334589.41
pid1 663306.49
pid2 499226.8
pid3 130618.22
pid4 513708.8
pid5 723470.7
pid6 998579.14
pid7 831682.84
pid8 87723.96

要求使用MapReduce,按商品的价格从低到高排序,输出格式仍为原来的格式:第一列为商品id,第二列为商品价格。

为了方便测试,写了一个DataProducer类随机产生数据。

package com.javacore.hadoop;

import java.io.*;
import java.util.Random;

/**
 * Created by bee on 3/25/17.
 */
public class DataProducer {
    public static void doubleProcuder() throws Exception {
        File f = new File("input/productDouble");
        if (f.exists()) {
            f.delete();
        }

        Random generator = new Random();
        double rangeMin = 1.0;
        double rangeMax = 999999.0;

        FileOutputStream fos = new FileOutputStream(f);
        OutputStreamWriter osq = new OutputStreamWriter(fos);
        BufferedWriter bfw = new BufferedWriter(osq);

        for (int i = 0; i < 100; i++) {
            double pValue = rangeMin + (rangeMax - rangeMin) * generator.nextDouble();
            pValue = (double) Math.round(pValue * 100) / 100;
            try {
                bfw.write("pid" + i + " " + pValue + "\n");

            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        bfw.close();
        osq.close();
        fos.close();
        System.out.println("写入完成!");

    }

    public static void main(String[] args) throws Exception {
        doubleProcuder();
    }
}

二、MapReduce程序

package com.javacore.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * Created by bee on 3/28/17.
 */
public class DataSortText {

    public static class Map extends Mapper<Object, Text, DoubleWritable, Text> {
        public static DoubleWritable pValue = new DoubleWritable();
        public static Text pId = new Text();

        //
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split("\\s+");
            pValue.set(Double.parseDouble(line[1]));
            pId.set(new Text(line[0]));
            context.write(pValue, pId);
        }

    }

    public static class Reduce extends Reducer<DoubleWritable, Text,
            Text, DoubleWritable> {

        public void reduce(DoubleWritable key,Iterable<Text> values,
                           Context context) throws IOException, InterruptedException {
            for (Text val:values){
                context.write(val,key);
            }
        }

    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        FileUtil.deleteDir("output");
        Configuration conf=new Configuration();
        conf.set("fs.defaultFS","hdfs://localhost:9000");
        String[] otherargs=new
                String[]{"input/productDouble",
                "output"};

        if (otherargs.length!=2){
            System.err.println("Usage: mergesort <in> <out>");
            System.exit(2);
        }

        Job job=Job.getInstance();
        job.setJarByClass(DataSortText.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(DoubleWritable.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job,new Path(otherargs[0]));
        FileOutputFormat.setOutputPath(job,new Path(otherargs[1]));
        System.exit(job.waitForCompletion(true) ? 0: 1);
    }
}

三、输出

运行之后,输出结果如下。

pid8    87723.96
pid3    130618.22
pid9    171804.65
pid0    334589.41
pid10   468768.65
pid2    499226.8
pid4    513708.8
pid1    663306.49
pid5    723470.7
pid7    831682.84
pid6    998579.14

四、性能分析

为了测试MapReduce排序的性能,数据量分别用1万、10万、100万、1000万、1亿、5亿做测试,结果如下。

数量 文件大小 排序耗时
1万 177KB 6秒
10万 1.9MB 6秒
100 万 19.7MB 13秒
1000 万 206.8MB 60秒
1亿 2.17GB 9分钟
5亿 11.28GB 41分钟


附机器硬件配置:

内存:8 GB 1867 MHz DDR3
CPU:2.7 GHz Intel Core i5
磁盘:SSD
时间: 2024-10-09 22:15:08

MapReduce编程(二) 排序的相关文章

Hadoop 实践(二) Mapreduce 编程

Mapreduce 编程,本文以WordCount  为例:实现文件字符统计 在eclipse 里面搭建一个java项目,引入hadoop lib目录下的jar,和 hadoop主目录下的jar. 新建WordCount 类: package org.scf.wordcount; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.co

MapReduce编程实战之“高级特性”

本篇介绍MapReduce的一些高级特性,如计数器.数据集的排序和连接.计数器是一种收集作业统计信息的有效手段,排序是MapReduce的核心技术,MapReduce也能够执行大型数据集间的""连接(join)操作. 计数器 计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计.计数器还可用于辅助诊断系统故障.对于大型分布式系统来说,获取计数器比分析日志文件容易的多. 示例一:气温缺失及不规则数据计数器 import java.io.IOException; import

MapReduce编程模型及优化技巧

(一)MapReduce 编程模型 (备注:如果你已经了解MapReduce 编程模型请直接进入第二部分MapReduce 的优化讲解) 在学习MapReduce 优化之前我们先来了解一下MapReduce 编程模型是怎样的? 下图中红色的标注表示没有加入Combiner和Partitioner来进行优化. 上图的流程大概分为以下几步. 第一步:假设一个文件有三行英文单词作为 MapReduce 的Input(输入),这里经过 Splitting 过程把文件分割为3块.分割后的3块数据就可以并行

MapReduce编程实例5

前提准备: 1.hadoop安装运行正常.Hadoop安装配置请参考:Ubuntu下 Hadoop 1.2.1 配置安装 2.集成开发环境正常.集成开发环境配置请参考 :Ubuntu 搭建Hadoop源码阅读环境 MapReduce编程实例: MapReduce编程实例(一),详细介绍在集成环境中运行第一个MapReduce程序 WordCount及代码分析 MapReduce编程实例(二),计算学生平均成绩 MapReduce编程实例(三),数据去重 MapReduce编程实例(四),排序 M

暴力破解MD5的实现(MapReduce编程)

本文主要介绍MapReduce编程模型的原理和基于Hadoop的MD5暴力破解思路. 一.MapReduce的基本原理 Hadoop作为一个分布式架构的实现方案,它的核心思想包括以下几个方面:HDFS文件系统,MapReduce的编程模型以及RPC框架.无论是怎样的架构,一个系统的关键无非是存储结构和业务逻辑.HDFS分布式文件系统是整个Hadoop的基础.在HDFS文件系统之中,大文件被分割成很多的数据块,每一块都有可能分布在集群的不同节点中.也就是说在HDFS文件系统中,文件的情况是这样的:

MapReduce编程实践

一.MapReduce编程思想 学些MapRedcue主要是学习它的编程思想,在MR的编程模型中,主要思想是把对数据的运算流程分成map和reduce两个阶段: Map阶段:读取原始数据,形成key-value数据(map方法) Reduce阶段:把map阶段的key-value数据按照相同的key进行分组聚合(reduce方法) 它其实是一种数据逻辑运算模型,对于这样的运算模型,有一些成熟的具体软件实现,比如hadoop中的mapreduce框架.spark等,例如在hadoop的mr框架中,

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter; import java.net.URI; import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Co

Hadoop学习笔记—11.MapReduce中的排序和分组

一.写在之前的 1.1 回顾Map阶段四大步凑 首先,我们回顾一下在MapReduce中,排序和分组在哪里被执行: 从上图中可以清楚地看出,在Step1.4也就是第四步中,需要对不同分区中的数据进行排序和分组,默认情况下,是按照key进行排序和分组. 1.2 实验场景数据文件 在一些特定的数据文件中,不一定都是类似于WordCount单次统计这种规范的数据,比如下面这类数据,它虽然只有两列,但是却有一定的实践意义. 3 3 3 2 3 1 2 2 2 1 1 1 (1)如果按照第一列升序排列,当

MapReduce编程(六) 从HDFS导入数据到Elasticsearch

一.Elasticsearch for Hadoop安装 Elasticsearch for Hadoop并不像logstash.kibana一样是一个独立的软件,而是Hadoop和Elasticsearch交互所需要的jar包.所以,有直接下载和maven导入2种方式.安装之前确保JDK版本不要低于1.8,Elasticsearch版本不能低于1.0. 官网对声明是对Hadoop 1.1.x.1.2.x.2.2.x.2.4.x.2.6.x.2.7.x测试通过,支持较好,其它版本的也并不是不能用