MapReduce设置输出文件到多个文件夹下

一:自定义OutputFormat类
MapReduce默认的OutPutFormat会将结果输出文件放置到一个我们指定的目录下,但如果想把输出文件根据某个条件,把满足不同条件的内容分别输出到不同的目录下,

就需要自定义实现OutputFormat类,且重写RecordWriter方法。在驱动类中设置job.setOutputFormatClass方法为自定义实现的OutputFormat类

下面案例是一组购物文本数据,将其中的好评和差评分别输出到对应的好评文件夹下、差评文件夹下。

二:自定义实现OutputFormat类代码实现

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
* 自定义实现OutputFormat类
*/
public class MyOutputFormat extends FileOutputFormat<Text,NullWritable> {

@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
//从这个方法里面可以获取一个configuration
Configuration configuration = context.getConfiguration();
//获取文件系统的对象
FileSystem fileSystem = FileSystem.get(configuration);
//好评文件的输出路径
Path goodComment = new Path("file:///F:\\goodComment\\1.txt");

//差评文件的输出路径
Path badComment = new Path("file:///F:\\badComment\\1.txt");

//获取到了两个输出流
FSDataOutputStream fsDataOutputStream = fileSystem.create(goodComment);
FSDataOutputStream fsDataOutputStream1 = fileSystem.create(badComment);

MyRecordWriter myRecordWriter = new MyRecordWriter(fsDataOutputStream, fsDataOutputStream1);

return myRecordWriter;
}
}

三:自定义实现RecordWriter类

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class MyRecordWriter extends RecordWriter<Text,NullWritable> {
private FSDataOutputStream goodStream;
private FSDataOutputStream badSteam;

public MyRecordWriter(){

}

public MyRecordWriter(FSDataOutputStream goodStream,FSDataOutputStream badSteam){
this.goodStream = goodStream;
this.badSteam= badSteam;

}

/**
* 重写write方法
* 这个write方法就是往外写出去数据,我们可以根据这个key,来判断文件究竟往哪个目录下面写
* goodStream:指定输出文件
* badSteam:自定输出文件
* @param key:k3
* @param value
* @throws IOException
* @throws InterruptedException
*/
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
String[] split = key.toString().split("\t");
//获取评论状态 0 好评 1 中评 2 差评
// split[9]
//判断评评论状态,如果是小于等于1,都写到好评文件里面去
if(Integer.parseInt(split[9])<=1){
//好评
goodStream.write(key.getBytes());
goodStream.write("\r\n".getBytes());
}else{
//差评
badSteam.write(key.getBytes());
badSteam.write("\r\n".getBytes());
}
}

/**
* 关闭资源
* @param context:上下文对象
* @throws IOException
* @throws InterruptedException
*/
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
IOUtils.closeStream(badSteam);
IOUtils.closeStream(goodStream);
}
}

四:自定义Map类

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyOutputMapper extends Mapper<LongWritable,Text,Text,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value,NullWritable.get());
}
}

五:驱动程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyOutputMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "ownOutputFormat");

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F:\\input"));

job.setMapperClass(MyOutputMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);

job.setOutputFormatClass(MyOutputFormat.class);
//由于重写了FileOutputFormat,所以下面这个指定的目录内不会有输出文件
//输出文件在MyOutputFormat中重新指定
MyOutputFormat.setOutputPath(job ,new Path("file:///F:\\output"));

boolean b = job.waitForCompletion(true);

return b?0:1;
}

public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new MyOutputMain(), args);
System.exit(run);
}

}

原文地址:https://www.cnblogs.com/zyanrong/p/11597863.html

时间: 2024-10-11 05:35:32

MapReduce设置输出文件到多个文件夹下的相关文章

MapReduce设置输出分隔符

conf.set("mapred.textoutputformat.ignoreseparator","true"); conf.set("mapred.textoutputformat.separator",","); 默认是tab

java io流 创建文件、写入数据、设置输出位置

java io流 创建文件 写入数据 改变system.out.print的输出位置 //创建文件 //写入数据 //改变system.out.print的输出位置 import java.io.*; public class Index{ public static void main(String[] args) throws Exception{ /** * 存储为二进制,给计算机看的 */ //创建文件 DataOutputStream sjl = new DataOutputStrea

response设置输出文件编码

在java后台的Action代码或者Servlet代码中用response的方法来设置输出内容的编码方式,有以下三个方法: 1.response.setCharacterEncoding("UTF-8"); // 只能用来设置out输出流中所采用的编码,但是它的优先权最高,可以覆盖后面两种方法中的设置 2.response.setContentType("text/html;charset=UTF-8"); // 可以设置out输出流中字符的编码方式,也可以设置浏览

jenkin控制台输出乱码,jenkins.xml文件设置utf-8都没有用

Jenkins控制台显示乱码 Jenkins控制台显示乱码,我们需要改两个地方,见下: 1.Jenkins系统设置中修改 点击左侧"系统管理"--右侧选择"系统设置"--"全局属性",选择第一项:Environment variables,键值对列表,点击增加: 键:LANG 值:zh.CH.UTF-8 点击保存,见下图: 2.Jenkins.xml文件修改 在Jenkins安装目录下找到jenkins.xml文件 (1)找到<argume

log4j日志输出到web项目指定文件夹

感谢 eric2500 的这篇文章:http://www.cxyclub.cn/n/27860/ 摘要:尝试将log4j的文件日志输出到web工程制定目录,遇到了很多问题,最终在eric2500的指导下搞定,下面是记录. 其原理在于log4j的配置文件支持服务器的vm的环境变量,如${oss.log4j.path},在log4j加载配置文件之前,先用 System.setProperty("","")设置好日志文件路径,这一操作通过一个初始的servlet来实现.

在web上逐行输出较大的txt文件

在某些场景下,需要在web上展示一些日志文件,这些日志文件是放在文件服务器上的一些txt. 当日志文件很大时,下载日志会导致页面长时间卡住,一直在loading状态,而且下载完日志之后分析日志并生成dom,瞬间大量的dom渲染可能导致页面崩溃. 于是想着优化一下日志的输出方式,开始下载即在页面上一行一行打印日志,就像一些IDE中输出程序的编译过程一样. 最终实现的方法如下: 在下载文件的时候,让请求过一层代理,代理写输出流的时候分段输出: ? int l; byte[] buffer = new

log4j配置输出到多个日志文件

通常我们项目里,有一些重要的日志想单独的输出到指定的文件,而不是全总输出到系统的日志文件中.那么我们log4j为我们提供了这种功能,以下我们来一步一步看是怎么做的.这里以property的配置方式写.xml方式类似,想了解的,能够看官方文档. 这里測试的项目包结构例如以下: log4j 主要由三部分组成:Loggers, Appenders 和Layouts,(appender 能够理解为输出的目的地) 咱们的log4j.properties或log4j.xml 里能够配置多个logger, 每

log4j配置输出到多个日志文件(转)

參考资料:http://logging.apache.org/log4j/1.2/manual.html 通常我们项目里,有一些重要的日志想单独的输出到指定的文件,而不是全总输出到系统的日志文件中.那么我们log4j为我们提供了这种功能,以下我们来一步一步看是怎么做的.这里以property的配置方式写.xml方式类似,想了解的,能够看官方文档. 这里測试的项目包结构例如以下: log4j 主要由三部分组成:Loggers, Appenders 和Layouts,(appender 能够理解为输

Logback 输出 JPA SQL日志 到文件

使用Spring Boot 配置 JPA 时可以指定如下配置在控制台查看执行的SQL语句 spring.jpa.show-sql=true Spring Boot 默认的日志配置不会输出到文件,若要打印日志到文件,可以使用如下配置: llogging.level后跟要打印日志的包名或类的全限定名,设置打印级别 日志级别:TRACE < DEBUG < INFO < WARN < ERROR < FATAL logging.level.com.example=INFO 配置日志