MapReduce编程初步(WordCount,TopN)

在配置Hadoop集群成功后,利用官方自带的例子简单测试了一下MapReduce程序WordCount,这个例子也就相当于编程入门的HelloWorld程序了,结构清晰容易理解,并且方便说明MapReduce的工作过程。这篇随笔主要想记录下在Eclipse中编写简单的MapReduce程序的上手过程。原创代码的内容不会很多,更多的是参考和借鉴现有的优秀代码。

一、Hello ‘MapReduce‘ World——WordCount程序

1、在Eclipse中建立Java项目WordCount

  

2、导入相关包(可以在Eclispe中为这三个包建立User Library以便使用)

  ①commons-cli-1.2.jar

  ②hadoop-common-2.7.3.jar

  ③hadoop-mapreduce-client-core-2.7.3.jar

3、配置好Build Path,确保项目中引入了上述三个包

4、新建包名为zmt.test,在其下建立新的Class名为WordCount,并键入官方源码

package zmt.test;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

    public static class TokenizerMapper
        extends Mapper<Object, Text, Text, IntWritable>{

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException
        {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()){
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }

    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>
    {
        private IntWritable result = new IntWritable();

        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{

            int sum = 0;
            for (IntWritable val : values){
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if(otherArgs.length < 2){

            System.err.println("用法:wordcount <int> [<in>...] <out>");
            System.exit(2);

        }

        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for (int i=0; i<otherArgs.length-1; ++i){
            FileInputFormat.addInputPath(job,  new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }

}

5、项目右键导出为jar文件,命名为WC.jar

6、将WC.jar复制到虚拟机Master主机中,虚拟机安装了 VMWare Tools可以直接拖拽进行复制。此处复制到/home/admin/Documents/

7、准备好待统计词频的文本文件,此处沿用之前搭建Hadoop时的README.txt。

  上传文件至Hadoop:hdfs dfs -put README.txt /data/input

8、执行任务命令

  hadoop jar /home/admin/Documents/WC.jar zmt.test.WordCount /data/input/WC /data/output/WC

  需要关注的是入口类的路径zmt.test.WordCount,在更复杂的任务开发中需要指明MapReduce程序入口

9、查看结果,命令行中会直接给出结果,也可以去/data/output/WC/part-r-00000查看文件内容

10、任务跟踪,查看MapReduce程序运行情况

  http://192.168.222.134:8088/cluster

二、TopN问题——找到前N个数

  TopN问题也是入门的一个很好的例子,可以更好地理解MapReduce程序的工作流程,更重要的是了解程序中哪些是模式,是可以更改的,是可以不这么写的。

  与WordCount重复的步骤就不再描述,直接给出关键代码和操作。

1、生成随机数

  

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.util.Random;

public class Num_Generator {

    public static void main(String[] args) {

        FileOutputStream fos;

        OutputStreamWriter osw;

        BufferedWriter bw;

        Random random = new Random();

        String filename = "random_num";

        for (int i = 0; i < 10; i++) {

            String tmp_name = filename+""+i+".txt";

            File file = new File(tmp_name);

            try {
                fos = new FileOutputStream(file);

                osw = new OutputStreamWriter(fos,"UTF-8");

                bw = new BufferedWriter(osw);

                for (int j = 0; j < 1000000; j++) {

                    int rm = random.nextInt();

                    bw.write(rm+"");

                    bw.newLine();

                }

                bw.flush();

            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (UnsupportedEncodingException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            System.out.println(i+":Complete.");

        }

    }

}

  该程序生成了10个文件,每个文件包括一百万个Integer范围的随机数,生成完成后将其复制并上传到虚拟机的Hadoop文件系统HDFS中

2、TopN程序编写(该程序是参考另一篇博客的,很惭愧,链接忘了(;′⌒`))

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class TopN {

    public static class MyMapper extends Mapper<Object, Text, NullWritable, IntWritable>
    {

        private TreeMap<Integer, Integer> tree = new TreeMap<Integer, Integer>();

//        private final static IntWritable one = new IntWritable(1);

//        private Text number = new Text();

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
//            super.setup(context);
            System.out.println("Mapper("+context.getConfiguration().getInt("N", 10)+"):in setup...");
        }

        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            // TODO Auto-generated method stub
//            super.cleanup(context);
            System.out.println("Mapper("+context.getConfiguration().getInt("N", 10)+"):in cleanup...");
            for(Integer text : tree.values()){

                context.write(NullWritable.get(), new IntWritable(text));

            }
        }

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException{

            String key_num = value.toString();

            int num = Integer.parseInt(key_num);

            tree.put(num, num);

            if(tree.size() > context.getConfiguration().getInt("N", 10))
                tree.remove(tree.firstKey());

//            System.out.println("Mapper("+context.getConfiguration().getInt("N", 10)+"):"+key.toString()+"/"+value.toString());

//            number.set(key_num);

//            context.write(number, one);

        }

    }

    public static class MyReducer extends Reducer<NullWritable, IntWritable, NullWritable, IntWritable>
    {

//        private IntWritable kk = new IntWritable();

        private TreeMap<Integer, Integer> tree = new TreeMap<Integer, Integer>();

//        private IntWritable result = new IntWritable();

        @Override
        public void reduce(NullWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{

            for (IntWritable value : values){

                tree.put(value.get(), value.get());

                if(tree.size() > context.getConfiguration().getInt("N", 10))
                {
                    tree.remove(tree.firstKey());
                }

            }

//            System.out.println("Reducer("+context.getConfiguration().getInt("N", 10)+"):"+key.toString()+"/"+result.get());

        }

        @Override
        protected void cleanup(
                org.apache.hadoop.mapreduce.Reducer.Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
//            super.cleanup(context);

            for(Integer val : tree.descendingKeySet()){

                context.write(NullWritable.get(), new IntWritable(val));

            }

        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if(otherArgs.length < 3){

            System.err.println("heheda");

            System.exit(2);

        }

        conf.setInt("N", new Integer(otherArgs[0]));

        System.out.println("N:"+otherArgs[0]);

        Job job = Job.getInstance(conf, "TopN");
        job.setJarByClass(TopN.class);
        job.setMapperClass(MyMapper.class);
//        job.setCombinerClass(MyReducer.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(IntWritable.class);

        for (int i = 1; i < otherArgs.length-1; i++) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }

        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length-1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);

    }

}

3、运行测试,需要输入参数N

  hadoop jar /home/hadoop/hadoop-2.7.3/share/hadoop/mapreduce/TopN.jar TopN 12 /data/input/test1 /data/output/TT

4、查看结果

  hdfs dfs -cat /data/output/TT/part-r-00000

  

[[email protected] myscript]# hdfs dfs -cat /data/output/TT/part-r-00000
2147483194
2147483070
2147483066
2147482879
2147482835
2147482469
2147482152
2147481212
2147481174
2147480379
2147479927
2147479795
时间: 2024-11-05 11:17:24

MapReduce编程初步(WordCount,TopN)的相关文章

Hadoop 实践(二) Mapreduce 编程

Mapreduce 编程,本文以WordCount  为例:实现文件字符统计 在eclipse 里面搭建一个java项目,引入hadoop lib目录下的jar,和 hadoop主目录下的jar. 新建WordCount 类: package org.scf.wordcount; import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.co

MapReduce编程模型及优化技巧

(一)MapReduce 编程模型 (备注:如果你已经了解MapReduce 编程模型请直接进入第二部分MapReduce 的优化讲解) 在学习MapReduce 优化之前我们先来了解一下MapReduce 编程模型是怎样的? 下图中红色的标注表示没有加入Combiner和Partitioner来进行优化. 上图的流程大概分为以下几步. 第一步:假设一个文件有三行英文单词作为 MapReduce 的Input(输入),这里经过 Splitting 过程把文件分割为3块.分割后的3块数据就可以并行

MapReduce编程实例5

前提准备: 1.hadoop安装运行正常.Hadoop安装配置请参考:Ubuntu下 Hadoop 1.2.1 配置安装 2.集成开发环境正常.集成开发环境配置请参考 :Ubuntu 搭建Hadoop源码阅读环境 MapReduce编程实例: MapReduce编程实例(一),详细介绍在集成环境中运行第一个MapReduce程序 WordCount及代码分析 MapReduce编程实例(二),计算学生平均成绩 MapReduce编程实例(三),数据去重 MapReduce编程实例(四),排序 M

MapReduce 编程模板编写【分析网站基本指标UV】程序

1.网站基本指标的几个概念 PV: page view 浏览量 页面的浏览次数,用户每打开一次页面就记录一次. UV:unique visitor 独立访客数 一天内访问某站点的人数(以cookie为例) 但是如果用户把浏览器cookie给删了之后再次访问会影响记录. VV: visit view 访客的访问次数 记录所有访客一天内访问了多少次网站,访客完成访问直到浏览器关闭算一次. IP:独立ip数 指一天内使用不同ip地址的用户访问网站的数量. 2.编写MapReduce编程模板 Drive

大数据MapReduce 编程实战

MapReduce 编程实战 一.大数据的起源1.举例:(1)商品推荐 问题1:大量订单如何存储?问题2:大量订单如何计算?(2)天气预报: 问题1:大量的天气数据如何存储?问题2:大量的天气数据如何计算? 2.大数据核心的问题: (1)数据的存储:分布式文件系统 (2)数据的计算:分布式计算(MapReduce) 3.MapReduce的计算模型的来源 (1)根据Google发表一篇论文:MapReduce (2)问题:PageRank(网页排名)---> 举例 二.MapReduce的编程模

Mapreduce概述和WordCount程序

一.Mapreduce概述 Mapreduce是分布式程序编程框架,也是分布式计算框架,它简化了开发! Mapreduce将用户编写的业务逻辑代码和自带默认组合整合成一个完整的分布式运算程序,并发的运行在hadoop集群上. 二.Mapreduce优缺点 优点:1.易于编程:只用实现几个接口即可完成一个并发的程序. 2.良好的拓展性:再不行当前程序运行的情况下,可以通过增加节点来解决用户/数据扩展,计算量增加的问题. 3.高容错性:可以运行在廉价的集群机器上. 4.适合处理PB级别以上的离线处理

MapReduce编程实战之“高级特性”

本篇介绍MapReduce的一些高级特性,如计数器.数据集的排序和连接.计数器是一种收集作业统计信息的有效手段,排序是MapReduce的核心技术,MapReduce也能够执行大型数据集间的""连接(join)操作. 计数器 计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计.计数器还可用于辅助诊断系统故障.对于大型分布式系统来说,获取计数器比分析日志文件容易的多. 示例一:气温缺失及不规则数据计数器 import java.io.IOException; import

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter; import java.net.URI; import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Co

编程初步————学渣再学C语言之路

内存: 程序在执行时,程序的指令和数据都必须存储到主内存中,也可以说存在RAM(随机访问存储器)中,RAM是易失性存储器,PC关闭,RAM内容丢失: 变量是什么? 变量就是计算机的一块特定的内存,有一个或多个连续的字节组成,当然每个变量都会有一个名字叫变量名,就像一个ID,一个身份证号码,这个变量名独一无二的代表着这块内存空间,编译器就可以通过这个ID来直接调用这块内存里存储的数据了,这也很好的解释了变量名不占内存空间,编译器直接就把他翻译成了数据: 变量的声明其实也可以称为变量的定义,按照声明