使用老版本的java api提交hadoop作业

还是使用之前的单词计数的例子

自定义Mapper类

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;

//自定义的Mapper类必须继承MapReduceBase 并且实现Mapper接口
public class JMapper extends MapReduceBase implements
		Mapper<LongWritable, Text, Text, LongWritable> {

	@Override
	public void map(LongWritable key, Text value,
			OutputCollector<Text, LongWritable> collector, Reporter reporter)
			throws IOException {
		String[] ss = value.toString().split("\t");
		for (String s : ss) {
			//使用collector.collect而不是context.write
			collector.collect(new Text(s), new LongWritable(1));
		}
	}

}

自定义Reducer类

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

//自定义的Reducer类必须继承MapReduceBase 并且实现Reducer接口
public class JReducer extends MapReduceBase implements
		Reducer<Text, LongWritable, Text, LongWritable> {

	@Override
	public void reduce(Text key, Iterator<LongWritable> value,
			OutputCollector<Text, LongWritable> collector, Reporter reporter)
			throws IOException {
		long sum = 0;
		//由于value不在可以用foreach循环,所以用while代替
		while (value.hasNext()) {
			sum += value.next().get();
		}
		collector.collect(key, new LongWritable(sum));
	}

}

运行提交代码的类JSubmit

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class JSubmit {
	public static void main(String[] args) throws IOException,
			URISyntaxException, InterruptedException, ClassNotFoundException {
		Path outPath = new Path("hdfs://localhost:9000/out");
		Path inPath = new Path("/home/hadoop/word");
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf);
		if (fs.exists(outPath)) {
			fs.delete(outPath, true);
		}
		// 使用JobConf 而不是Job
		JobConf job = new JobConf(conf, JSubmit.class);
		FileInputFormat.setInputPaths(job, inPath);
		job.setInputFormat(TextInputFormat.class);
		job.setMapperClass(JMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(LongWritable.class);
		job.setReducerClass(JReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		FileOutputFormat.setOutputPath(job, outPath);
		job.setOutputFormat(TextOutputFormat.class);
		// 使用JobClient.runJob而不是job.waitForCompletion
		JobClient.runJob(job);
	}
}

可以看到

其实老版本的api差别不大,只是用了少数几个类替换了而已

注意老版本api的类虽然和新版本api的类名字很多都是一模一样的

但是所在的包不同,老版本所在的包都是mapred的,而新版本的都在mapreduce

时间: 2024-10-22 21:22:15

使用老版本的java api提交hadoop作业的相关文章

使用java api操作Hadoop文件 Robbin

1 package cn.hadoop.fs; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import java.net.URI; 6 import java.net.URISyntaxException; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FSDataOutputStream; 10 i

Java API实现Hadoop文件系统增删改查

Java API实现Hadoop文件系统增删改查 Hadoop文件系统可以通过shell命令hadoop fs -xx进行操作,同时也提供了Java编程接口 maven配置 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.ap

通过java api提交自定义hadoop 作业

通过API操作之前要先了解几个基本知识 一.hadoop的基本数据类型和java的基本数据类型是不一样的,但是都存在对应的关系 如下图 如果需要定义自己的数据类型,则必须实现Writable hadoop的数据类型可以通过get方法获得对应的java数据类型 而java的数据类型可以通过hadoop数据类名的构造函数,或者set方法转换 二.hadoop提交作业的的步骤分为八个,可以理解为天龙八步 如下: map端工作: 1.1 读取要操作的文件--这步会将文件的内容格式化成键值对的形式,键为每

hadoop Java API、 hadoop Streaming 、hadoop Pipes 三者比较学习

1.hadoop  Java  API Hadoop的主要编程语言是Java,因而,Java API是最基本的对外编程接口. 2. hadoop    Streaming             1.概述 它是为方便非java用户编写Mapreduce程序而设计的工具包. Hadoop Streaming是Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或者脚本文件作为Mapper和Reducer, 例如: 采用shell脚本语言中的一些命令作为mapper和reducer(cat作

使用HDFS客户端java api读取hadoop集群上的信息

本文介绍使用hdfs java api的配置方法. 1.先解决依赖,pom <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> <scope>provided</scope> </dependency> 2.配置文

补装老版本的Java SE

到oracle注册账号下载bin文件 在bin文件下载目录打开终端复制下面到命令 # chmod +x jdk-6u21-linux-i586.bin 注意版本不同,比如我这次下载到是6u45 chmod +x jdk-6u45-linux-x64.bin chmod命令 usage: chmod [-fhv] [-R [-H | -L | -P]] [-a | +a | =a  [i][# [ n]]] mode|entry file ... chmod [-fhv] [-R [-H | -L

使用java api操作Hadoop文件

1. 概述 2. 文件操作 2.1  上传本地文件到hadoop fs 2.2 在hadoop fs中新建文件,并写入 2.3 删除hadoop fs上的文件 2.4  读取文件 3. 目录操作 3.1 在hadoop fs上创建目录 3.2 删除目录 3.3 读取某个目录下的所有文件 4. 参考资料接代码下载 <1>. 概述  hadoop中关于文件操作类基本上全部是在org.apache.hadoop.fs包中,这些api能够支持的操作包含:打开文件,读写文件,删除文件等. hadoop类

使用JAVA API获取hadoop集群的FileSystem

所需要配置的参数:  Configuration conf = new Configuration();   conf.set("fs.defaultFS", "hdfs://hadoop2cluster");   conf.set("dfs.nameservices", "hadoop2cluster");   conf.set("dfs.ha.namenodes.hadoop2cluster", &qu

提交hadoop作业到spark上运行

1.引入spark包:spark-assembly-1.4.0-hadoop2.6.0,在spark的lib目录下 File-->project structure 2.用IDEA建立一个scala项目,新建一个WordCount的object 3.WordCount代码如下: import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.SparkContext._