如何通过Java程序提交yarn的mapreduce计算任务

由于项目需求,需要通过Java程序提交Yarn的MapReduce的计算任务。与一般的通过Jar包提交MapReduce任务不同,通过程序提交MapReduce任务需要有点小变动,详见以下代码。

以下为MapReduce主程序,有几点需要提一下:

1、在程序中,我将文件读入格式设定为WholeFileInputFormat,即不对文件进行切分。

2、为了控制reduce的处理过程,map的输出键的格式为组合键格式。与常规的<key,value>不同,这里变为了<TextPair,Value>,TextPair的格式为<key1,key2>。

3、为了适应组合键,重新设定了分组函数,即GroupComparator。分组规则为,只要TextPair中的key1相同(不要求key2相同),则数据被分配到一个reduce容器中。这样,当相同key1的数据进入reduce容器后,key2起到了一个数据标识的作用。

package web.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import util.Utils;

public class GEMIMain {

	public GEMIMain(){
		job = null;
	}

	public Job job;
	public static class NamePartitioner extends
			Partitioner<TextPair, BytesWritable> {
		@Override
		public int getPartition(TextPair key, BytesWritable value,
				int numPartitions) {
			return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
		}
	}

	/**
	 * 分组设置类,只要两个TextPair的第一个key相同,他们就属于同一组。他们的Value就放到一个Value迭代器中,
	 * 然后进入Reducer的reduce方法中。
	 *
	 * @author hduser
	 *
	 */
	public static class GroupComparator extends WritableComparator {
		public GroupComparator() {
			super(TextPair.class, true);
		}

		@Override
		public int compare(WritableComparable a, WritableComparable b) {
			TextPair t1 = (TextPair) a;
			TextPair t2 = (TextPair) b;
			// 比较相同则返回0,比较不同则返回-1
			return t1.getFirst().compareTo(t2.getFirst()); // 只要是第一个字段相同的就分成为同一组
		}
	}

	public  boolean runJob(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {

		Configuration conf = new Configuration();
		// 在conf中设置outputath变量,以在reduce函数中可以获取到该参数的值
		conf.set("outputPath", args[args.length - 1].toString());
		//设置HDFS中,每次任务生成产品的质量文件所在文件夹。args数组的倒数第二个原数为质量文件所在文件夹
		conf.set("qualityFolder", args[args.length - 2].toString());
		//如果在Server中运行,则需要获取web项目的根路径;如果以java应用方式调试,则读取/opt/hadoop-2.5.0/etc/hadoop/目录下的配置文件
		//MapReduceProgress mprogress = new MapReduceProgress();
		//String rootPath= mprogress.rootPath;
		String rootPath="/opt/hadoop-2.5.0/etc/hadoop/";
		conf.addResource(new Path(rootPath+"yarn-site.xml"));
		conf.addResource(new Path(rootPath+"core-site.xml"));
		conf.addResource(new Path(rootPath+"hdfs-site.xml"));
		conf.addResource(new Path(rootPath+"mapred-site.xml"));
		this.job = new Job(conf);

		job.setJobName("Job name:" + args[0]);
		job.setJarByClass(GEMIMain.class);

		job.setMapperClass(GEMIMapper.class);
		job.setMapOutputKeyClass(TextPair.class);
		job.setMapOutputValueClass(BytesWritable.class);
		// 设置partition
		job.setPartitionerClass(NamePartitioner.class);
		// 在分区之后按照指定的条件分组
		job.setGroupingComparatorClass(GroupComparator.class);

		job.setReducerClass(GEMIReducer.class);

		job.setInputFormatClass(WholeFileInputFormat.class);
		job.setOutputFormatClass(NullOutputFormat.class);
		// job.setOutputKeyClass(NullWritable.class);
		// job.setOutputValueClass(Text.class);
		job.setNumReduceTasks(8);

		// 设置计算输入数据的路径
		for (int i = 1; i < args.length - 2; i++) {
			FileInputFormat.addInputPath(job, new Path(args[i]));
		}
		// args数组的最后一个元素为输出路径
		FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));
		boolean flag = job.waitForCompletion(true);
		return flag;
	}

	@SuppressWarnings("static-access")
	public static void main(String[] args) throws ClassNotFoundException,
			IOException, InterruptedException {	

		String[] inputPaths = new String[] { "normalizeJob",
				"hdfs://192.168.168.101:9000/user/hduser/red1/",
				"hdfs://192.168.168.101:9000/user/hduser/nir1/","quality11111",
				"hdfs://192.168.168.101:9000/user/hduser/test" };
		GEMIMain test = new GEMIMain();
		boolean result = test.runJob(inputPaths);
	}
}

以下为TextPair类

public class TextPair implements WritableComparable<TextPair> {
	private Text first;
	private Text second;

	public TextPair() {
		set(new Text(), new Text());
	}

	public TextPair(String first, String second) {
		set(new Text(first), new Text(second));
	}

	public TextPair(Text first, Text second) {
		set(first, second);
	}

	public void set(Text first, Text second) {
		this.first = first;
		this.second = second;
	}

	public Text getFirst() {
		return first;
	}

	public Text getSecond() {
		return second;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		first.write(out);
		second.write(out);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		first.readFields(in);
		second.readFields(in);
	}

	@Override
	public int hashCode() {
		return first.hashCode() * 163 + second.hashCode();
	}

	@Override
	public boolean equals(Object o) {
		if (o instanceof TextPair) {
			TextPair tp = (TextPair) o;
			return first.equals(tp.first) && second.equals(tp.second);
		}
		return false;
	}

	@Override
	public String toString() {
		return first + "\t" + second;
	}

	@Override
	/**A.compareTo(B)
	 * 如果比较相同,则比较结果为0
	 * 如果A大于B,则比较结果为1
	 * 如果A小于B,则比较结果为-1
	 *
	 */
	public int compareTo(TextPair tp) {
		int cmp = first.compareTo(tp.first);
		if (cmp != 0) {
			return cmp;
		}
		//此时实现的是升序排列
		return second.compareTo(tp.second);
	}
}

以下为WholeFileInputFormat,其控制数据在mapreduce过程中不被切分

package web.hadoop;

import java.io.IOException;  

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {  

    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(
            InputSplit arg0, TaskAttemptContext arg1) throws IOException,
            InterruptedException {
        // TODO Auto-generated method stub
        return new WholeFileRecordReader();
    }  

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        // TODO Auto-generated method stub
        return false;
    }
}  

以下为WholeFileRecordReader类

package web.hadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {

	private FileSplit fileSplit;
	private FSDataInputStream fis;

	private Text key = null;
	private BytesWritable value = null;

	private boolean processed = false;

	@Override
	public void close() throws IOException {
		// TODO Auto-generated method stub
		// fis.close();
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return this.key;
	}

	@Override
	public BytesWritable getCurrentValue() throws IOException,
			InterruptedException {
		// TODO Auto-generated method stub
		return this.value;
	}

	@Override
	public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)
			throws IOException, InterruptedException {

		fileSplit = (FileSplit) inputSplit;
		Configuration job = tacontext.getConfiguration();
		Path file = fileSplit.getPath();
		FileSystem fs = file.getFileSystem(job);
		fis = fs.open(file);
	}

	@Override
	public boolean nextKeyValue() {

		if (key == null) {
			key = new Text();
		}

		if (value == null) {
			value = new BytesWritable();
		}

		if (!processed) {
			byte[] content = new byte[(int) fileSplit.getLength()];

			Path file = fileSplit.getPath();

			System.out.println(file.getName());
			key.set(file.getName());

			try {
				IOUtils.readFully(fis, content, 0, content.length);
				// value.set(content, 0, content.length);
				value.set(new BytesWritable(content));
			} catch (IOException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} finally {
				IOUtils.closeStream(fis);
			}

			processed = true;
			return true;
		}

		return false;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return processed ? fileSplit.getLength() : 0;
	}

}
时间: 2024-10-16 15:53:00

如何通过Java程序提交yarn的mapreduce计算任务的相关文章

编写java程序,用while循环计算1+1/2!+...1/20! 的和?

1 float x=1; 2 3 float sum=0; 4 5 float i=1; 6 7 8 while(i<=20) 9 { 10 x=x*i; 11 12 sum=sum+1/x; 13 14 i++; 15 } 16 17 System.out.println("sum="+sum); 18 // TODO 自动生成的方法存根 19 20 } 21 22 23 } sum=1.7182816

从 WordCount 到 MapReduce 计算模型

概述 虽然现在都在说大内存时代,不过内存的发展怎么也跟不上数据的步伐吧.所以,我们就要想办法减小数据量.这里说的减小可不是真的减小数据量,而是让数据分散开来.分开存储.分开计算.这就是 MapReduce 分布式的核心. 版权说明 著作权归作者所有. 商业转载请联系作者获得授权,非商业转载请注明出处. 本文作者:Coding-Naga 发表日期: 2016年5月10日 本文链接:http://blog.csdn.net/lemon_tree12138/article/details/513677

本地idea开发mapreduce程序提交到远程hadoop集群执行

https://www.codetd.com/article/664330 https://blog.csdn.net/dream_an/article/details/84342770 通过idea开发mapreduce程序并直接run,提交到远程hadoop集群执行mapreduce. 简要流程:本地开发mapreduce程序–>设置yarn 模式 --> 直接本地run–>远程集群执行mapreduce程序: 完整的流程:本地开发mapreduce程序--> 设置yarn模式

十六:mapreduce程序在yarn集群中的调度过程

mapreduce程序在yarn集群中的调度过程: 1.客户端想ResouceManager提交一个job作业,申请运行一个MR的程序,RPC调用 2.ResourceManager返回一个由创建的jobid目录. 3.在HDFS该目录下有一个以jobid命名的目录并,写入job.xml和job分片数据,job.jar,jobConfinger 4.通知RM,job的资源文件提交完毕. 5.初始化一个任务 然后放到队列中去 6.nodemanager 和ResouceManager 保持心跳进行

将java开发的wordcount程序提交到spark集群上运行

今天来分享下将java开发的wordcount程序提交到spark集群上运行的步骤. 第一个步骤之前,先上传文本文件,spark.txt,然用命令hadoop fs -put spark.txt /spark.txt,即可. 第一:看整个代码视图 打开WordCountCluster.java源文件,修改此处代码: 第二步: 打好jar包,步骤是右击项目文件----RunAs--Run Configurations 照图填写,然后开始拷贝工程下的jar包,如图,注意是拷贝那个依赖jar包,不是第

Mapreduce提交YARN集群运行

Eclipse项目打包1.export2.通过maven打包,切入到项目目录下执行命令mvn clean package Mapreduce提交YARN集群运行 将jar包传到hadoop目录下运行格式:bin/hadoop jar  jar包名   包名(代码的包名).类名 +参数(输入路径输出路径)就可以在集群上运行了 原文地址:https://www.cnblogs.com/libin123/p/10330330.html

Java Web提交任务到Spark

相关软件版本: Spark1.4.1 ,Hadoop2.6,Scala2.10.5 , MyEclipse2014,intelliJ IDEA14,JDK1.8,Tomcat7 机器: windows7 (包含JDK1.8,MyEclipse2014,IntelliJ IDEA14,TOmcat7): centos6.6虚拟机(Hadoop伪分布式集群,Spark standAlone集群,JDK1.8): centos7虚拟机(Tomcat,JDK1.8): 1. 场景: 1. windows

Spark集群模式&amp;Spark程序提交

Spark集群模式&Spark程序提交 1. 集群管理器 Spark当前支持三种集群管理方式 Standalone-Spark自带的一种集群管理方式,易于构建集群. Apache Mesos-通用的集群管理,可以在其上运行Hadoop MapReduce和一些服务应用. Hadoop YARN-Hadoop2中的资源管理器. Tip1: 在集群不是特别大,并且没有mapReduce和Spark同时运行的需求的情况下,用Standalone模式效率最高. Tip2: Spark可以在应用间(通过集

HDFS、YARN、Mapreduce简介

一. HDFS介绍: Hadoop2介绍 HDFS概述 HDFS读写流程   1.  Hadoop2介绍 Hadoop是Apache软件基金会旗下的一个分布式系统基础架构.Hadoop2的框架最核心的设计就是HDFS.MapReduce和YARN,为海量的数据提供了存储和计算. HDFS主要是Hadoop的存储,用于海量数据的存储: MapReduce主要运用于分布式计算: YARN是Hadoop2中的资源管理系统. Hadoop1和Hadoop2的结构对比: Hadoop2主要改进: YARN