Hadoop学习之路(5)Mapreduce程序完成wordcount

程序使用的测试文本数据

Dear River
Dear River Bear Spark
Car Dear Car Bear Car
Dear Car River Car
Spark Spark Dear Spark 

1编写主要类

(1)Maper类

首先是自定义的Maper类代码

public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        //fields:代表着文本一行的的数据: dear bear river
        String[] words = value.toString().split("\t");
        for (String word : words) {
            // 每个单词出现1次,作为中间结果输出
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

?????这个Map类是一个泛型类型,它有四个形参类型,分别指定map()函数的输入键、输入值、输出键和输出值的类型。LongWritable:输入键类型,Text:输入值类型,Text:输出键类型,IntWritable:输出值类型.
?????String[] words = value.toString().split("\t");,words 的值为Dear River Bear River
?????输入键key是一个长整数偏移量,用来寻找第一行的数据和下一行的数据,输入值是一行文本Dear River Bear River,输出键是单词Bear ,输出值是整数1
?????Hadoop本身提供了一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些类型都在org.apache.hadoop.io包中。这里使用LongWritable类型(相当于Java的Long类型)、Text类型(相当于Java中的String类型)和IntWritable类型(相当于Java的Integer类型)。
?????map()方法的参数是输入键和输入值。以本程序为例,输入键LongWritable key是一个偏移量,输入值Text valueDear Car Bear Car ,我们首先将包含有一行输入的Text值转换成Java的String类型,之后使用substring()方法提取我们感兴趣的列。map()方法还提供了Context实例用于输出内容的写入。

(2)Reducer类

public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    /*
        (River, 1)
        (River, 1)
        (River, 1)
        (Spark , 1)
        (Spark , 1)
        (Spark , 1)
        (Spark , 1)

        key: River
        value: List(1, 1, 1)
        key: Spark
        value: List(1, 1, 1,1)

    */
    public void reduce(Text key, Iterable<IntWritable> values,
                          Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable count : values) {
            sum += count.get();
        }
        context.write(key, new IntWritable(sum));// 输出最终结果
    };
}

Reduce任务最初按照分区号从Map端抓取数据为:
(River, 1)
(River, 1)
(River, 1)
(spark, 1)
(Spark , 1)
(Spark , 1)
(Spark , 1)
经过处理后得到的结果为:
key: hello value: List(1, 1, 1)
key: spark value: List(1, 1, 1,1)
所以reduce()函数的形参 Iterable&lt;IntWritable&gt; values 接收到的值为List(1, 1, 1)List(1, 1, 1,1)

(3)Main函数

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;

public class WordCountMain {
    //若在IDEA中本地执行MR程序,需要将mapred-site.xml中的mapreduce.framework.name值修改成local
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        if (args.length != 2 || args == null) {
            System.out.println("please input Path!");
            System.exit(0);
        }
        //System.setProperty("HADOOP_USER_NAME","hadoop2.7");
        Configuration configuration = new Configuration();
        //configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar");
        //调用getInstance方法,生成job实例
        Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName());
        // 打jar包
        job.setJarByClass(WordCountMain.class);

        // 通过job设置输入/输出格式
        // MR的默认输入格式是TextInputFormat,所以下两行可以注释掉
        // job.setInputFormatClass(TextInputFormat.class);
        // job.setOutputFormatClass(TextOutputFormat.class);
        // 设置输入/输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 设置处理Map/Reduce阶段的类
        job.setMapperClass(WordCountMap.class);
        //map combine减少网路传出量
        job.setCombinerClass(WordCountReduce.class);
        job.setReducerClass(WordCountReduce.class);

        //如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的        输出的kv类型
        //job.setMapOutputKeyClass(.class)
        // job.setMapOutputKeyClass(Text.class);
        // job.setMapOutputValueClass(IntWritable.class);

        // 设置reduce task最终输出key/value的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 提交作业
        job.waitForCompletion(true);

    }
}

2本地运行

首先更改mapred-site.xml文件配置
将mapreduce.framework.name的值设置为local

然后本地运行:

查看结果:

3集群运行

方式一:

首先打包

更改配置文件,改成yarn模式

添加本地jar包位置:

 Configuration configuration = new Configuration();
 configuration.set("mapreduce.job.jar","C:\\Users\\tanglei1\\IdeaProjects\\Hadooptang\\target");


设置允许跨平台远程调用:

configuration.set("mapreduce.app-submission.cross-platform","true");


修改输入参数:

运行结果:

方式二:

将maven项目打包,在服务器端用命令运行mr程序

hadoop jar com.kaikeba.hadoop-1.0-SNAPSHOT.jar
com.kaikeba.hadoop.wordcount.WordCountMain /tttt.txt  /wordcount11

原文地址:https://blog.51cto.com/10312890/2462281

时间: 2024-11-05 06:30:13

Hadoop学习之路(5)Mapreduce程序完成wordcount的相关文章

Hadoop学习之路(7)MapReduce自定义排序

本文测试文本: tom 20 8000 nancy 22 8000 ketty 22 9000 stone 19 10000 green 19 11000 white 39 29000 socrates 30 40000    MapReduce中,根据key进行分区.排序.分组MapReduce会按照基本类型对应的key进行排序,如int类型的IntWritable,long类型的LongWritable,Text类型,默认升序排序   为什么要自定义排序规则?现有需求,需要自定义key类型,

Hadoop学习之路(6)MapReduce自定义分区实现

MapReduce自带的分区器是HashPartitioner原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走.自定义分分区需要继承Partitioner,复写getpariton()方法自定义分区类:注意:map的输出是<K,V>键值对其中int partitionIndex = dict.get(text.toString()),partitionIndex是获取K的值 附:被计算的的文本 Dear Dea

Hadoop学习基础之三:MapReduce

现在是讨论这个问题的不错的时机,因为最近媒体上到处充斥着新的革命所谓“云计算”的信息.这种模式需要利用大量的(低端)处理器并行工作来解决计算问题.实际上,这建议利用大量的低端处理器来构建数据中心,而不是利用数目少的多的高端服务器来构建. 举例来说,IBM和Google已经宣布计划用1000台处理器构建的集群提供给部分大学,传授学生们如何使用MapReduce工具在这些集群上编程.加利福尼亚大学伯克利分校甚至打算开设使用MapReduce框架编程的课程.我们对MapReduce支持者大肆炒作它如何

Hadoop学习笔记—4.初识MapReduce

一.神马是高大上的MapReduce MapReduce是Google的一项重要技术,它首先是一个编程模型,用以进行大数据量的计算.对于大数据量的计算,通常采用的处理手法就是并行计算.但对许多开发者来说,自己完完全全实现一个并行计算程序难度太大,而MapReduce就是一种简化并行计算的编程模型,它使得那些没有多有多少并行计算经验的开发人员也可以开发并行应用程序.这也就是MapReduce的价值所在,通过简化编程模型,降低了开发并行应用的入门门槛. 1.1 MapReduce是什么 Hadoop

阿里封神谈hadoop学习之路

阿里封神谈hadoop学习之路 封神 2016-04-14 16:03:51 浏览3283 评论3 发表于: 阿里云E-MapReduce >> 开源大数据周刊 hadoop 学生 spark 摘要: 在大数据时代,要想个性化实现业务的需求,还是得操纵各类的大数据软件,如:hadoop.hive.spark等.笔者(阿里封神)混迹Hadoop圈子多年,经历了云梯1.ODPS等项目,目前base在E-Mapreduce.在这,笔者尽可能梳理下hadoop的学习之路. 引言 当前,越来越多的同学进

Hadoop学习之路(十四)MapReduce的核心运行机制

概述 一个完整的 MapReduce 程序在分布式运行时有两类实例进程: 1.MRAppMaster:负责整个程序的过程调度及状态协调 2.Yarnchild:负责 map 阶段的整个数据处理流程 3.Yarnchild:负责 reduce 阶段的整个数据处理流程 以上两个阶段 MapTask 和 ReduceTask 的进程都是 YarnChild,并不是说这 MapTask 和 ReduceTask 就跑在同一个 YarnChild 进行里 MapReduce 套路图 MapReduce 程

Hadoop学习之路(十五)MapReduce的多Job串联和全局计数器

MapReduce 多 Job 串联 需求 一个稍复杂点的处理逻辑往往需要多个 MapReduce 程序串联处理,多 job 的串联可以借助 MapReduce 框架的 JobControl 实现 实例 以下有两个 MapReduce 任务,分别是 Flow 的 SumMR 和 SortMR,其中有依赖关系:SumMR 的输出是 SortMR 的输入,所以 SortMR 的启动得在 SumMR 完成之后 Configuration conf1 = new Configuration(); Con

Hadoop学习之路(一)——Hadoop家族学习路线图

主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等. 从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘.开源界及厂商,所有数据软件,无

Hadoop学习笔记(2) 关于MapReduce

1. 查找历年最高的温度. MapReduce任务过程被分为两个处理阶段:map阶段和reduce阶段.每个阶段都以键/值对作为输入和输出,并由程序员选择它们的类型.程序员还需具体定义两个函数:map函数和reduce函数. 对应的Java MapReduce代码如下: public class MaxTemperature{ static class MaxTemperatureMapper extends Mapper<LongWritable,Text,Text,IntWritable>