【Hadoop学习之八】MapReduce开发

环境
  虚拟机:VMware 10
  Linux版本:CentOS-6.5-x86_64
  客户端:Xshell4
  FTP:Xftp4
  jdk8
  hadoop-3.1.1

伪分布式:HDFS和YARN 伪分布式搭建,事先启动HDFS和YARN

第一步:开发WordCount示例

package test.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyWC {

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            Job job = Job.getInstance(conf,"word count");
            job.setJarByClass(MyWC.class);

            job.setMapperClass(WordMapper.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            job.setReducerClass(WordReducer.class);
            job.setNumReduceTasks(1);

//            FileInputFormat.addInputPath(job, new Path("hdfs://node1:9820/wjy/input/text.txt"));
//            Path output = new Path("hdfs://node1:9820/wjy/output/");

            //注意这里设置的目录是从 HDFS根目录开始的
            FileInputFormat.addInputPath(job, new Path("/wjy/input/text.txt"));
            Path output = new Path("/wjy/output/");
            if (output.getFileSystem(conf).exists(output))
            {
                output.getFileSystem(conf).delete(output,true);
            }
            FileOutputFormat.setOutputPath(job, output);

            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
package test.mr;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    // 写在外面 map循环创建会造成内存溢出
    private final static IntWritable one = new IntWritable(1);
    // map写出的数据放到buffer字节数组里 这样word可以继续使用 没有影响
    private Text word = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
            throws IOException, InterruptedException {
        //StringTokenizer 默认按照空格 制表符 回车等空白符作为分隔符来切分传入的数据
        StringTokenizer st = new StringTokenizer(value.toString());
        while (st.hasMoreTokens()) {
            word.set(st.nextToken());
            context.write(word, one);
        }
    }
}
package test.mr;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        //key:hello
        //values:(1,1,1,1,1,1)
        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }

}

第二步:程序打jar包:MyWC.jar,上传jar和测试文件

[[email protected] ~]# ls
MyWC.jar text.txt
[[email protected] ~]# hdfs dfs -mkdir /wjy/input
[[email protected] ~]# hdfs dfs -mkdir /wjy/output
[[email protected] ~]# hdfs dfs -put /root/text.txt /wjy/input

text.txt文件里面是测试数据:

hello sxt 1

hello sxt 2

hello sxt 3

...

hello sxt 1000000

第三步:运行jar:MyWC.jar

[[email protected] ~]# hadoop jar MyWC.jar test.mr.MyWC
2019-01-15 19:06:04,326 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-01-15 19:06:07,698 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
2019-01-15 19:06:09,247 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2019-01-15 19:06:09,294 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1547546637762_0003
2019-01-15 19:06:10,518 INFO input.FileInputFormat: Total input files to process : 1
2019-01-15 19:06:11,078 INFO mapreduce.JobSubmitter: number of splits:1
2019-01-15 19:06:11,490 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
2019-01-15 19:06:14,280 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1547546637762_0003
2019-01-15 19:06:14,287 INFO mapreduce.JobSubmitter: Executing with tokens: []
2019-01-15 19:06:15,163 INFO conf.Configuration: resource-types.xml not found
2019-01-15 19:06:15,163 INFO resource.ResourceUtils: Unable to find ‘resource-types.xml‘.
2019-01-15 19:06:15,934 INFO impl.YarnClientImpl: Submitted application application_1547546637762_0003
2019-01-15 19:06:16,436 INFO mapreduce.Job: The url to track the job: http://node1:8088/proxy/application_1547546637762_0003/
2019-01-15 19:06:16,438 INFO mapreduce.Job: Running job: job_1547546637762_0003
2019-01-15 19:07:48,824 INFO mapreduce.Job: Job job_1547546637762_0003 running in uber mode : false
2019-01-15 19:07:49,614 INFO mapreduce.Job:  map 0% reduce 0%
2019-01-15 19:09:10,176 INFO mapreduce.Job:  map 67% reduce 0%
2019-01-15 19:09:21,123 INFO mapreduce.Job:  map 100% reduce 0%
2019-01-15 19:13:43,544 INFO mapreduce.Job:  map 100% reduce 73%
2019-01-15 19:13:49,599 INFO mapreduce.Job:  map 100% reduce 100%
2019-01-15 19:14:04,717 INFO mapreduce.Job: Job job_1547546637762_0003 completed successfully
2019-01-15 19:14:08,754 INFO mapreduce.Job: Counters: 53
    File System Counters
        FILE: Number of bytes read=34888902
        FILE: Number of bytes written=70205331
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=17888997
        HDFS: Number of bytes written=8888922
        HDFS: Number of read operations=8
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=73564
        Total time spent by all reduces in occupied slots (ms)=167987
        Total time spent by all map tasks (ms)=73564
        Total time spent by all reduce tasks (ms)=167987
        Total vcore-milliseconds taken by all map tasks=73564
        Total vcore-milliseconds taken by all reduce tasks=167987
        Total megabyte-milliseconds taken by all map tasks=75329536
        Total megabyte-milliseconds taken by all reduce tasks=172018688
    Map-Reduce Framework
        Map input records=1000000
        Map output records=3000000
        Map output bytes=28888896
        Map output materialized bytes=34888902
        Input split bytes=101
        Combine input records=0
        Combine output records=0
        Reduce input groups=1000002
        Reduce shuffle bytes=34888902
        Reduce input records=3000000
        Reduce output records=1000002
        Spilled Records=6000000
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=1134
        CPU time spent (ms)=23710
        Physical memory (bytes) snapshot=381153280
        Virtual memory (bytes) snapshot=5039456256
        Total committed heap usage (bytes)=189894656
        Peak Map Physical memory (bytes)=229081088
        Peak Map Virtual memory (bytes)=2516492288
        Peak Reduce Physical memory (bytes)=152334336
        Peak Reduce Virtual memory (bytes)=2522963968
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=17888896
    File Output Format Counters
        Bytes Written=8888922

第四步:查看下载处理结果

[[email protected] sbin]# hdfs dfs -ls /wjy/output
2019-01-16 00:32:54,137 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
-rw-r--r--   1 root supergroup          0 2019-01-15 19:13 /wjy/output/_SUCCESS
-rw-r--r--   1 root supergroup    8888922 2019-01-15 19:13 /wjy/output/part-r-00000

[[email protected] ~]# hdfs dfs -get /wjy/output/part-r-00000 ./
[[email protected] ~]# vi part-r-00000
999980  1
999981  1
999982  1
999983  1
999984  1
999985  1
999986  1
999987  1
999988  1
999989  1
99999   1
999990  1
999991  1
999992  1
999993  1
999994  1
999995  1
999996  1
999997  1
999998  1
999999  1
hello   1000000
sxt     1000000

问题1:
[2019-01-15 17:08:05.159]Container killed on request. Exit code is 143
[2019-01-15 17:08:05.182]Container exited with a non-zero exit code 143.
2019-01-15 17:08:20,957 INFO mapreduce.Job: Task Id : attempt_1547542193692_0003_m_000000_2, Status : FAILED
[2019-01-15 17:08:18.963]Container [pid=4064,containerID=container_1547542193692_0003_01_000004] is running 210352640B beyond the ‘VIRTUAL‘ memory limit. Current usage: 26.0 MB of 1 GB physical memory used; 2.3 GB of 2.1 GB virtual memory used. Killing container.

原因:申请内存过大而被终止
解决措施:取消内存检查
配置:yarn-site.xml

<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
<description>Whether virtual memory limits will be enforced for containers</description>
</property>

问题2:
2019-01-15 18:51:11,229 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2019-01-15 18:51:12,237 INFO ipc.Client: Retrying connect to server: 0.0.0.0/0.0.0.0:10020. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 MILLISECONDS)
java.io.IOException: java.net.ConnectException: Your endpoint configuration is wrong; For more details see: http://wiki.apache.org/hadoop/UnsetHostnameOrPort
原因:由于没有启动historyserver引起的
解决办法:
在mapred-site.xml配置文件中添加

<property>
    <name>mapreduce.jobhistory.address</name>
    <value>node1:10020</value>
</property>

在namenode上执行命令:mr-jobhistory-daemon.sh start historyserver 
这样在,namenode上会启动JobHistoryServer服务,可以在historyserver的日志中查看运行情况

原文地址:https://www.cnblogs.com/cac2020/p/10274979.html

时间: 2024-11-09 03:57:23

【Hadoop学习之八】MapReduce开发的相关文章

Hadoop学习(五)-MapReduce架构原理

概述 hadoop主要是用于应对海量数据的存储和计算的,前面hdfs文件系统,则重点是用于海量数据的存储.那么有了这么多数据,我们又该怎么在这些数据里面来提取我们需要的信息呢?这个时候hadoop中出现了一个非常重要的成员MapReduce.MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算.注意这个并行运算,它不再是我们传统意义上的串行计算,它可以在不同的进程中进行并行的运算.概念"Map(映射)"和"Reduce(归约)",是它们的主要思

Hadoop学习之MapReduce

结构介绍 MapReduce是Hadoop提供的一种处理海量数据的并行编程模型和计算框架,用于对大规模的数据进行并行计算.主要由ResourceManager和NodeManager两类节点构成. ResourceManager主要负责集群资源管理,NodeManager负责节点的资源管理. 当运行MapReduce任务的时候,后产生ApplicationMaster和Container,其中ApplicationMaster负责向ResourceManager节点进行资源的申请并控制任务的执行

Hadoop学习之MapReduce执行过程详解

转自:http://my.oschina.net/itblog/blog/275294 分析MapReduce执行过程 MapReduce运行的时候,会通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的方法,处理数据,最后输出.Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS的文件中.整个流程如图: Mapper任务的执行过程详解 每个Mapper任务是一个java进程,它会读取HDFS中的文件,解析成很多的键值对,经过我

大数据学习之八——MapReduce工作机制

1.MapReduce的特点 软件框架.并行处理.可靠且容错.大规模集群.海量数据集 2.mapper和reducer mapper负责"分":把复杂的任务分解为若干个"简单的任务"来处理.简单的任务包含三层含义: (1)数据或计算的规模相对原任务要大大缩小: (2)就近计算原则,任务会分配到存放着所需数据的节点上进行计算: (3)这些小任务可以并行计算,彼此间几乎没有依赖关系. reducer负责对map阶段的结果进行汇总. 3.MapReduce的工作机制 (1

Hadoop 学习之MapReduce

MapReduce充分利用了分而治之,主要就是将一个数据量比较大的作业拆分为多个小作业的框架,而用户需要做的就是决定拆成多少份,以及定义作业本身,用户所要做的操作少了又少,真是Very Good! 一.MapReduce执行流程 下面的是MapReduce的执行过程: 最上方的用户程序链接了底层的MapReduce库,并实现了最基本的Map函数和Reduce函数. 由用户来决定将任务划分为K块(这里设为5),假设为64MB,如图左方所示分成了split0~4(文件块):然后使用fork将用户程序

Hadoop学习笔记(4) ——搭建开发环境及编写Hello World

Hadoop学习笔记(4) ——搭建开发环境及编写Hello World 整个Hadoop是基于Java开发的,所以要开发Hadoop相应的程序就得用JAVA.在linux下开发JAVA还数eclipse方便. 下载 进入官网:http://eclipse.org/downloads/. 找到相应的版本进行下载,我这里用的是eclipse-SDK-3.7.1-linux-gtk版本. 解压 下载下来一般是tar.gz文件,运行: $tar -zxvf eclipse-SDK-3.7.1-linu

hadoop 学习笔记:mapreduce框架详解

hadoop 学习笔记:mapreduce框架详解 开始聊mapreduce,mapreduce是hadoop的计算框架,我 学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的 思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习 hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不

hadoop学习;自定义Input/OutputFormat;类引用mapreduce.mapper;三种模式

hadoop分割与读取输入文件的方式被定义在InputFormat接口的一个实现中,TextInputFormat是默认的实现,当你想要一次获取一行内容作为输入数据时又没有确定的键,从TextInputFormat返回的键为每行的字节偏移量,但目前没看到用过 以前在mapper中曾使用LongWritable(键)和Text(值),在TextInputFormat中,因为键是字节偏移量,可以是LongWritable类型,而当使用KeyValueTextInputFormat时,第一个分隔符前后

Hadoop那些事儿(二)---MapReduce开发环境搭建

上一篇文章介绍了在ubuntu系统中安装Hadoop的伪分布式环境,这篇文章主要为MapReduce开发环境的搭建流程. 1.HDFS伪分布式配置 使用MapReduce时,如果需要与HDFS建立连接,及使用HDFS中的文件,还需要做一些配置. 首先进入Hadoop的安装目录 cd /usr/local/hadoop/hadoop2 在HDFS中创建用户目录 ./bin/hdfs dfs -mkdir -p /user/hadoop 创建input目录,并将./etc/hadoop中的xml文件