hadoop开发MapReduce程序

准备工作:

1.设置HADOOP_HOME,指向hadoop安装目录,否则报这个错:

2.在window下,需要把hadoop/bin那个目录替换下,在网上搜一个对应版本的

3.如果还报org.apache.hadoop.io.nativeio.NativeIO$Windows.access0错,把其中的hadoop.dll复制到c:\windows\system32目录

依赖的jar

1.common
  hadoop-2.7.3\share\hadoop\common\hadoop-common-2.7.3.jar
  hadoop-2.7.3\share\hadoop\common\lib下的所有
2.hdfs
  hadoop-2.7.3\share\hadoop\hdfs\hadoop-hdfs-2.7.3.jar
hadoop-2.7.3\share\hadoop\hdfs\lib下的所有
3.mapreduce
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-app-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-common-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-core-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-hs-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-hs-plugins-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-jobclient-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-jobclient-2.7.3-tests.jar
hadoop-2.7.3\share\hadoop\mapreduce\hadoop-mapreduce-client-shuffle-2.7.3.jar
hadoop-2.7.3\share\hadoop\mapreduce\lib下的所有
4.yarn
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-api-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-applications-distributedshell-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-applications-unmanaged-am-launcher-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-client-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-common-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-registry-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-applicationhistoryservice-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-common-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-nodemanager-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-resourcemanager-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-sharedcachemanager-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-tests-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib\hadoop-yarn-server-web-proxy-2.7.3.jar
hadoop-2.7.3\share\hadoop\yarn\lib下的所有

可以通过maven管理:

<?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>

        <groupId>xiaol</groupId>
        <artifactId>xiaol-hadoop</artifactId>
        <version>1.0-SNAPSHOT</version>
        <description>MapReduce</description>

        <properties>
            <project.build.sourceencoding>UTF-8</project.build.sourceencoding>
            <hadoop.version>2.7.3</hadoop.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
        </dependencies>
    </project>

编写Mapper:

package xiaol;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 整个工作过程:input->split->map->shuffle->reduce->output
 * input:  每一行都是空格分割的单词
 *         hello java
 *         hello python
 * split:   默认按行读取input,每一行作为一个KV对,交给下一步
 *          K就是行首地址,V就是行内容
 *          K:1   V:hello java
 *          K:11  V:hello python
 *          当然这一步可以用户自己重写
 * map:     必须由用户实现的步骤,进行业务逻辑处理
 *          从split的结果中读取数据,统计单词,产生KEYOUT VALUEOUT交给shuffle
 *          这里交给shuffle的K是单词,V是单词出现的次数
 *          hello 1
 *          java 1
 * shuffle  map的结果是KV对的形式,会把相同的K移动到同一个Node上去进行reduce
 *          当传给reduce的时候会相同K的V组装成Iterable<VALUEOUT>类型
 *          hello 1,1
 *          当然这一步可以用户自己重写
 * reduce   必须由用户实现的步骤,进行业务逻辑处理,将shuffle过来的结果进行汇总
 *          从shuffle的结果中读取数据,统计单词,产生KEYOUT VALUEOUT交给output
 *          hello 2
 */
/**
 * org.apache.hadoop.mapreduce.Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 *     KEYIN    split完成后交给map的key的类型
 *     VALUEIN  split完成后交给map的value的类型
 *     KEYOUT   map完成后交给shuffle的key的类型
 *     VALUEOUT map完成后交给shuffle的key的类型
 * org.apache.hadoop.io.LongWritable    hadoop自己的Long包装类
 * org.apache.hadoop.io.Text            hadoop自己的Text
 * org.apache.hadoop.io.IntWritable     hadoop自己的Int包装类
 */
public class WordMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
    /**
     * 重写map方法
     * protected void map(KEYIN key, VALUEIN value, Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException
     *      KEYIN       split完成后交给map的key的类型,就是那一行的起始地址
     *      VALUEIN     split完成后交给map的value的类型,就是那一行的内容
     *      Context     整个MapReduce的执行环境
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String s = value.toString();
        String[] words = s.split(" ");  //由于每一行都是空格分割的单词,比如hello java这种的,要统计个数,就先拆分
        for(String word: words){
            /**
             * 在执行环境中写入KEYOUT和VALUEOUT作为下一步(shuffle)的输入
             *
             * 这一步是要统计在当前处理这一行里每个单词出现的次数,这里直接给了个1
             * 这里可能有的人会有疑问:如果在某一行里出现了两个相同的单词会怎么样?
             * 这个是不影响的,比如出现了两个hello,结果就是给shuffle的时候会有两个hello 1
             * 然后shuffle的时候会把这两个hello 1交给reduce去处理
             */
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

编写Reducer

package xiaol;

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
 */
public class WordReducer extends Reducer<Text, IntWritable, Text, LongWritable> {

    /**
     * 重写reduce方法
     * protected void reduce(KEYIN key, Iterable<VALUEIN> values, Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException
     *      KEYIN                   shuffle完成后交给reduce的key的类型,其实就是map的KEYOUT
     *      Iterable<VALUEIN>       shuffle完成后交给reduce的value的类型的数组(shuffle那一步会把相同的K分发到同一个node上去进行reduce,所以这里是V数组),其实就是map的VALUEOUT数组
     *      Context                 整个MapReduce的执行环境
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        long count = 0;
        for(IntWritable v : values) {
            count += v.get();
        }
        context.write(key, new LongWritable(count));
    }

}

编写启动类:

package xiaol;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.util.Properties;

/**
 *
 */
public class Test {
    public static void main(String[] args) throws Exception {
        //本地运行直接new一个Configuration,远程运行需要配集群相关的配置
        Configuration conf = new Configuration();

        Job job = Job.getInstance(conf);

        //设定mapper和reducer的class
        job.setMapperClass(WordMapper.class);
        job.setReducerClass(WordReducer.class);

        //设定mapper和outputKey和outputValue的class
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //设定reducer和outputKey和outputValue的class
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        FileInputFormat.setInputPaths(job, "d:/test/test.txt");
        FileOutputFormat.setOutputPath(job, new Path("d:/test/out/"));

        //等待结束,true代表打印中间日志
        job.waitForCompletion(true);
    }
}

原文地址:https://www.cnblogs.com/413xiaol/p/10054394.html

时间: 2024-10-15 17:50:16

hadoop开发MapReduce程序的相关文章

Hadoop之MapReduce程序开发流程

摘要:MapReduce程序开发流程遵循算法思路.Mapper.Reducer.作业执行的步骤. 关键词:MapReduce 程序   开发流程 对于一个数据处理问题,若需要MapReduce,那么如何设计和实现?MapReduce程序基础模板,包含两个部分,一个是map,一个是reduce.map和reduce的设计取决解决问题的算法思路:而map和reduce的执行需要作业的调度. 因此,MapReduce程序开发可以遵循以下流程. 第一步:清楚问题是什么,确定解决问题的算法思路. 第二步:

本地idea开发mapreduce程序提交到远程hadoop集群执行

https://www.codetd.com/article/664330 https://blog.csdn.net/dream_an/article/details/84342770 通过idea开发mapreduce程序并直接run,提交到远程hadoop集群执行mapreduce. 简要流程:本地开发mapreduce程序–>设置yarn 模式 --> 直接本地run–>远程集群执行mapreduce程序: 完整的流程:本地开发mapreduce程序--> 设置yarn模式

Hadoop之MapReduce程序应用三

摘要:MapReduce程序进行数据去重. 关键词:MapReduce   数据去重 数据源:人工构造日志数据集log-file1.txt和log-file2.txt. log-file1.txt内容 2014-1-1    wangluqing 2014-1-2    root 2014-1-3   root 2014-1-4  wangluqing 2014-1-5  root 2014-1-6  wangluqing log-file2.txt内容 2014-1-1  root 2014-

Hadoop之MapReduce程序应用一

摘要:MapReduce程序处理专利数据集. 关键词:MapReduce程序   专利数据集 数据源:专利引用数据集cite75_99.txt.(该数据集可以从网址http://www.nber.org/patents/下载) 问题描述: 读取专利引用数据集并对它进行倒排.对于每一个专利,找到那些引用它的专利并进行合并.top5输出结果如下: 1                                3964859, 4647229 10000                      

HADOOP之MAPREDUCE程序应用二

摘要:MapReduce程序进行单词计数. 关键词:MapReduce程序  单词计数 数据源:人工构造英文文档file1.txt,file2.txt. file1.txt 内容 Hello   Hadoop I   am  studying   the   Hadoop  technology file2.txt内容 Hello  world The  world  is  very  beautiful I   love    the   Hadoop    and    world 问题描

在Eclipse中开发MapReduce程序

一.Eclipse的安装与设置 1.在Eclipse官网上下载eclipse-jee-oxygen-3a-linux-gtk-x86_64.tar.gz文件并将其拷贝到/home/jun/Resources下,然后再将文件拷贝到/home/jun下并解压. [[email protected] ~]$ cp /home/jun/Resources/eclipse-jee-oxygen-3a-linux-gtk-x86_64.tar.gz /home/jun/ [[email protected]

用PHP编写Hadoop的MapReduce程序

用PHP写hadoop的mapreduce程序 Hadoop本身是Java写的,所以,给hadoop写mapreduce,人们会自然地想到java 但hadoop里面有个contrib叫做hadoop streaming,这是一个小工具,为hadoop提供streaming支持,使得任何支持标准IO (stdin, stdout)的可执行程序都能成为hadoop的mapper 或者 reducer 例如:hadoop jar hadoop-streaming.jar -input SOME_IN

Hadoop之MapReduce程序分析

摘要:Hadoop之MapReduce程序包括三个部分:Mapper,Reducer和作业执行.本文介绍和分析MapReduce程序三部分结构. 关键词:MapReduce   Mapper  Reducer   作业执行 MapReduce程序包括三个部分,分别是Mapper,Reducer和作业执行. Mapper 一个类要充当Mapper需要继承MapReduceBase并实现Mapper接口. Mapper接口负责数据处理阶段.它采用形式为Mapper<K1,V1,K2,V2>的Jav

为Hadoop的MapReduce程序编写makefile

最近需要把基于hadoop的MapReduce程序集成到一个大的用C/C++编写的框架中,需要在make的时候自动将MapReduce应用进行编译和打包.这里以简单的WordCount1为例说明具体的实现细节,注意:hadoop版本为2.4.0. 源代码包含两个文件,一个是WordCount1.java是具体的对单词计数实现的逻辑:第二个是CounterThread.java,其中简单的当前处理的行数做一个统计和打印.代码分别见附1. 编写makefile的关键是将hadoop提供的jar包的路