使用ToolRunner运行Hadoop程序基本原理分析

为了简化命令行方式运行作业,Hadoop自带了一些辅助类。GenericOptionsParser是一个类,用来解释常用的Hadoop命令行选项,并根据需要,为Configuration对象设置相应的取值。通常不直接使用GenericOptionsParser,更方便的方式是:实现Tool接口,通过ToolRunner来运行应用程序,ToolRunner内部调用GenericOptionsParser。

一、相关的类及接口解释

(一)相关类及其对应关系如下:

关于ToolRunner典型的实现方法

1、定义一个类(如上图中的MyClass),继承configured,实现Tool接口。

2、在main()方法中通过ToolRunner.run(...)方法调用上述类的run(String[]方法)

见第三部分的例子。

(二)关于ToolRunner

1、ToolRunner与上图中的类、接口无任何的继承、实现关系,它只继承了Object,没实现任何接口。

2、ToolRunner可以方便的运行那些实现了Tool接口的类(调用其run(String[])方法,并通过GenericOptionsParser 可以方便的处理hadoop命令行参数。

A utility to help run Tools.

ToolRunner can be used to run classes implementing Tool interface. It works in conjunction with GenericOptionsParser to parse the generic hadoop command line arguments and modifies the Configuration of the Tool.
The application-specific options are passed along without being modified.

3、ToolRunner除了一个空的构造方法以外,只有一个方法,即run()方法,它有以下三种形式:

run

public static int run(Configuration conf,
                      Tool tool,
                      String[] args)
               throws Exception
Runs the given Tool by Tool.run(String[]), after parsing with the given generic arguments. Uses
the given Configuration, or builds one if null. Sets the Tool‘s configuration with the possibly modified version of the conf.
Parameters:
conf - Configuration for the Tool.
tool - Tool to run.
args - command-line arguments to the tool.
Returns:
exit code of the Tool.run(String[]) method.
Throws:
Exception

run

public static int run(Tool tool,
                      String[] args)
               throws Exception
Runs the Tool with its Configuration. Equivalent to run(tool.getConf(), tool, args).
Parameters:
tool - Tool to run.
args - command-line arguments to the tool.
Returns:
exit code of the Tool.run(String[]) method.
Throws:
Exception

printGenericCommandUsage

public static void printGenericCommandUsage(PrintStream out)
Prints generic command-line argurments and usage information.
Parameters:
out - stream to write usage information to.

它们均是静态方法,即可以通过类名调用。

(1)public static int run(Configuration conf,Tool tool, String[] args)

这个方法调用tool的run(String[])方法,并使用conf中的参数,以及args中的参数,而args一般来源于命令行。

(2)public static int run(Tool tool, String[] args)

这个方法调用tool的run方法,并使用tool类的参数属性,即等同于run(tool.getConf(), tool, args)。

(三)关于Configuration

1、默认情况下,hadoop会加载core-default.xml以及core-site.xml中的参数。

Unless explicitly turned off, Hadoop by default specifies two resources, loaded in-order from the classpath:

  1. core-default.xml : Read-only defaults for hadoop.
  2. core-site.xml: Site-specific configuration for a given hadoop installation.

2、在程序运行时,可以通过命令行修改参数,修改方法如下

二、示例程序一:呈现所有参数

下面是一个简单的程序:

package org.jediael.hadoopdemo.toolrunnerdemo;

import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class ToolRunnerDemo extends Configured implements Tool {
	static {
		//Configuration.addDefaultResource("hdfs-default.xml");
		//Configuration.addDefaultResource("hdfs-site.xml");
		//Configuration.addDefaultResource("mapred-default.xml");
		//Configuration.addDefaultResource("mapred-site.xml");
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		for (Entry<String, String> entry : conf) {
			System.out.printf("%s=%s\n", entry.getKey(), entry.getValue());
		}
		return 0;
	}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new ToolRunnerDemo(), args);
		System.exit(exitCode);
	}
}

以上程序用于输出上述xml文件中定义的属性。

1、直接运行程序

[[email protected] project]#hadoop jar toolrunnerdemo.jar org.jediael.hadoopdemo.toolrunnerdemo.ToolRunnerDemo

io.seqfile.compress.blocksize=1000000

keep.failed.task.files=false

mapred.disk.healthChecker.interval=60000

dfs.df.interval=60000

dfs.datanode.failed.volumes.tolerated=0

mapreduce.reduce.input.limit=-1

mapred.task.tracker.http.address=0.0.0.0:50060

mapred.used.genericoptionsparser=true

mapred.userlog.retain.hours=24

dfs.max.objects=0

mapred.jobtracker.jobSchedulable=org.apache.hadoop.mapred.JobSchedulable

mapred.local.dir.minspacestart=0

hadoop.native.lib=true

......................

2、通过-D指定新的参数

[[email protected] project]# hadoop org.jediael.hadoopdemo.toolrunnerdemo.ToolRunnerDemo -D color=yello | grep color

color=yello

3、通过-conf增加新的配置文件

(1)原有参数数量

[[email protected] project]# hadoop jar toolrunnerdemo.jar org.jediael.hadoopdemo.toolrunnerdemo.ToolRunnerDemo | wc

67 67 2994

(2)增加配置文件后的参数数量

[[email protected] project]# hadoop jar toolrunnerdemo.jar org.jediael.hadoopdemo.toolrunnerdemo.ToolRunnerDemo-conf /opt/jediael/hadoop-1.2.0/conf/mapred-site.xml |
wc

68 68 3028

其中mapred-site.xml的内容如下:

<?xml version="1.0"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<property>

<name>mapred.job.tracker</name>

<value>localhost:9001</value>

</property>

</configuration>

可见此文件只有一个property,因此参数数量从67个变成了68个。

4、在代码中增加参数,如上面程序中注释掉的语句

static {

Configuration.addDefaultResource("hdfs-default.xml");

Configuration.addDefaultResource("hdfs-site.xml");

Configuration.addDefaultResource("mapred-default.xml");

Configuration.addDefaultResource("mapred-site.xml");

}

更多选项请见第Configuration的解释。

三、示例程序二:典型用法(修改wordcount程序)

修改经典的wordcount程序,参考:Hadoop入门经典:WordCount

package org.jediael.hadoopdemo.toolrunnerdemo;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCount extends Configured implements Tool{

	public static class WordCountMap extends
			Mapper<LongWritable, Text, Text, IntWritable> {

		private final IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			StringTokenizer token = new StringTokenizer(line);
			while (token.hasMoreTokens()) {
				word.set(token.nextToken());
				context.write(word, one);
			}
		}
	}

	public static class WordCountReduce extends
			Reducer<Text, IntWritable, Text, IntWritable> {

		public void reduce(Text key, Iterable<IntWritable> values,
				Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val : values) {
				sum += val.get();
			}
			context.write(key, new IntWritable(sum));
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = new Job(conf);
		job.setJarByClass(WordCount.class);
		job.setJobName("wordcount");

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		job.setMapperClass(WordCountMap.class);
		job.setReducerClass(WordCountReduce.class);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		return(job.waitForCompletion(true)?0:-1);

	}

	public static void main(String[] args) throws Exception {
		int exitCode = ToolRunner.run(new WordCount(), args);
		System.exit(exitCode);
	}

}

运行程序:

[[email protected] project]# hadoop fs -mkdir wcin2
[[email protected] project]# hadoop fs -copyFromLocal /opt/jediael/apache-nutch-2.2.1/CHANGES.txt wcin2
[[email protected] project]# hadoop jar wordcount2.jar org.jediael.hadoopdemo.toolrunnerdemo.WordCount wcin2 wcout2
时间: 2024-10-11 00:43:39

使用ToolRunner运行Hadoop程序基本原理分析的相关文章

【爬坑】在 IDEA 中运行 Hadoop 程序 报 winutils.exe 不存在错误解决方案

0. 问题说明 环境为 Windows 10 在 IDEA 中运行 Hadoop 程序报   winutils.exe 不存在  错误 1. 解决方案 [1.1 解压] 解压 hadoop-2.7.3.zip 文件到自定义目录 [1.2 配置 Hadoop 环境变量] 新建HADOOP_HOME,变量值为D:\program\hadoop-2.7.3 添加PATH,添加%HADOOP_HOME%\bin;%HADOOP_HOME%\sbin [1.3 重启 IDEA] [ 1.4 测试配置 ]

用java运行Hadoop程序报错:org.apache.hadoop.fs.LocalFileSystem cannot be cast to org.apache.

用java运行Hadoop例程报错:org.apache.hadoop.fs.LocalFileSystem cannot be cast to org.apache.所写代码如下: package com.pcitc.hadoop; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.h

Ubuntu中使用终端运行Hadoop程序

接上一篇<Ubuntu Kylin系统下安装Hadoop2.6.0> 通过上一篇,Hadoop伪分布式基本配好了. 下一步是运行一个MapReduce程序,以WordCount为例: 1. 构建实现类: cd /usr/local/hadoop mkdir workspacecd workspacegedit WordCount.java 将代码复制粘贴. import java.io.IOException; import java.util.StringTokenizer; import

在Eclipse中运行hadoop程序

1.下载hadoop-eclipse-plugin-1.2.1.jar,并将之复制到eclipse/plugins下. 2.打开map-reduce视图 在eclipse中,打开window-->open perspetive-->other,选择map/reduce. 3.选择Map/Reduce Locations标签页,新建一个Location 4.在project exploer中,可以浏览刚才定义站点的文件系统 5.准备测试数据,并上传到hdfs中. liaoliuqingdeMac

eclipse运行hadoop程序报错:Connection refused: no further information

log4j:WARN No appenders could be found for logger (org.apache.hadoop.conf.Configuration.deprecation). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exceptio

Hadoop源码分析—— Job任务的程序入口

这篇文章大致介绍Hadoop Job的程序是如何启动的. 通常用Java编写的Hadoop MapReduce程序是通过一个main方法作为程序的整个入口,如下: public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new CalculateSumJob(),args); System.exit(res);} 可以看到这个Job任务的MapR

原生态在Hadoop上运行Java程序

第一种:原生态运行jar包1,利用eclipse编写Map-Reduce方法,一般引入Hadoop-core-1.1.2.jar.注意这里eclipse里没有安装hadoop的插件,只是引入其匝包,该eclipse可以安装在windows或者linux中,如果是在windows中安装的,且在其虚拟机安装的linux,可以通过共享文件夹来实现传递.2,编写要测试的数据,如命名为tempdata3,利用eclipse的export来打包已编写好的,在利用eclipse打包jar的时候,只需要选择sr

运行Hadoop自带的wordcount单词统计程序

0.前言 前面一篇<Hadoop初体验:快速搭建Hadoop伪分布式环境>搭建了一个Hadoop的环境,现在就使用Hadoop自带的wordcount程序来做单词统计的案例. 1.使用示例程序实现单词统计 (1)wordcount程序 wordcount程序在hadoop的share目录下,如下: [[email protected] mapreduce]# pwd /usr/local/hadoop/share/hadoop/mapreduce [[email protected] mapr

在ubuntu上安装eclipse同时连接hadoop运行wordcount程序

起先我是在win7 64位上远程连接hadoop运行wordcount程序的,但是这总是需要网络,考虑到这一情况,我决定将这个环境转移到unbuntu上 需要准备的东西 一个hadoop的jar包,一个连接eclipse的插件(在解压的jar包里有这个东西),一个hadoop-core-*.jar(考虑到连接的权限问题) 一个eclipse的.tar.gz包(其它类型的包也可以,eclipse本身就是不需要安装的,这里就不多说了) 因为我之前在win7上搭建过这个环境,所以一切很顺利,但还是要在