数据输入输出格式

数据输入格式

数据输入格式(InputFormat)用于描述MR作业的输入规范,主要功能:输入规范检查(比如输入文件目录的检查)、对数据文件进行输入切分和从输入分块中将数据记录逐一读取出来、并转化为Map的输入键值对。

Hadoop中最常用的数据输入格式包括:TextInputFormat 和 KeyValueInputFormat。

1)TextInputFormat 是系统默认的数据输入格式,可以将文件的每一行解析成一个键值对。其中,Key是当前行在整个文件中的字节偏移量,而Value就是该行的内容。默认的RecordReader是LineRecordReader。

2)KeyValueInputFormat是将一个按照<key,value>格式存放的文本文件逐行读出,并自动解析生成相应的key和value。默认是KeyValueLineRecordReader。

定制数据输入格式

用户可以从基类InputFormat和RecordReader开始定制过程,主要实现InputFormat中的createRecordReader()和getSplits()两个抽象方法,而RecordReader中则需要实现gerCurrentKey()和getCurrentValue()几个抽象方法。

需求:为了能更细粒的记录每个单词在文档中出现时的行位置信息[email protected]。

  • 方法一:使用默认的TextInputFormat和LineRecordReader

public static class IIMapper extends Mapper<Text, Text, Text, Text>{
        @Override
        //输出key:word         输出value:[email protected]
        protected void map(Text key, Text value,Context context)
                throws IOException, InterruptedException {

            //得到输入文件的文件名FileName
            FileSplit fileSplit = (FileSplit)context.getInputSplit();
            String name = fileSplit.getPath().getName();

            //组装拼接Value: [email protected]
            Text fileName_lineOffset=new Text(name+"@"+key.toString());

            String[] splited = value.toString().split("\t");
            for(String word : splited){
                context.write(new Text(word), fileName_lineOffset);
            }
   }
   }    

  • 方法二:定制FileNameInputFormat和FileNameRecordReader。本例是基于已有的TextInputFormat和LineRecordReader两个类来完成的。

package invertedIndex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;

public class FileNameRecordReader extends RecordReader<Text, Text> {

    //成员变量
    String fileName;
    //实例化一个LineRecordReader实例
    LineRecordReader lrr=new LineRecordReader();

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {

        //调用LineRecordReader类的初始化方法
        lrr.initialize(split, context);

        //获取当前InputSplit的文件名
        fileName=((FileSplit)split).getPath().getName();
    }

    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {

        //调用LineRecordReader类的方法,拼接key
        //其中lrr.getCurrentKey()返回:当前行在整个文本文件中的字节偏移量
        return new Text("("+fileName+"@"+lrr.getCurrentKey().toString()+")");
    }

    @Override
    public Text getCurrentValue() throws IOException, InterruptedException {

        //调用LineRecordReader类的方法
        return lrr.getCurrentValue();
}    

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        return lrr.nextKeyValue();
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {

        return lrr.getProgress();
    }

    @Override
    public void close() throws IOException {

        lrr.close();
    }
}

package invertedIndex;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class FileNameInputFormat extends FileInputFormat<Text, Text>{

    @Override
    public RecordReader<Text, Text> createRecordReader(InputSplit split,
            TaskAttemptContext context) throws IOException, InterruptedException {

        FileNameRecordReader fnrr = new FileNameRecordReader();

        //调用FileNameRecordReader的初始化方法
        fnrr.initialize(split, context);

        return fnrr;
    }
}

  • 使用自定义的FileNameInputFormat类和FileNameRcordReader:

package invertedIndex;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertedIndex {
    public static void main(String[] args) throws Exception {
        Job job = Job.getInstance(new Configuration());
        job.setJarByClass(InvertedIndex.class);

        //设置数据输入格式【使用自定义的InputFormat】
        job.setInputFormatClass(FileNameInputFormat.class);

        job.setMapperClass(FFMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setNumReduceTasks(0);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }

    public static class FFMapper extends Mapper<Text, Text, Text, Text>{
        @Override
        protected void map(Text key, Text value, Context context)
                throws IOException, InterruptedException {

            //分词
            StringTokenizer st = new StringTokenizer(value.toString());
            for(;st.hasMoreTokens();){

                //key:单词word        value:FileName+偏移量
                context.write(new Text(st.nextToken()), key);
            }
        }
    }
}

输出结果为:key:单词,value:[email protected]偏移量

read ([email protected])

file ([email protected])

read ([email protected])

data ([email protected])

数据输出格式

数据输出格式(OutputFormat)用于描述MR作业的数据输出规范。主要功能:输出规范检查(如检查输出目录是否存在),以及提供作业结果数据输出功能。

Hadoop默认的数据输出格式是TextOutputFormat,可以将结果以【key+\t+value】的形式逐行输出。默认的RecordWriter是LineRecordWriter。

时间: 2024-10-09 12:45:01

数据输入输出格式的相关文章

干货--Hadoop自定义数据类型和自定义输入输出格式整合项目案例

正文开始前 ,先介绍几个概念 序列化 所谓序列化,是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储. 反序列化 是指将字节流转回到结构化对象的逆过程 序列化在分布式数据处理的两个大领域经常出现:进程间通信和永久存储 在Hadoop中,系统中多个节点上进程间的通信是通过"远程过程调用"(remote procedure call,RPC)实现的 .RPC协议将消息序列化成二进制流后发送到远程节点,远程节点接着将二进制流反序列化为原始消息 Hadoop使用了自己写的序列

Hadoop 高级程序设计(二)---自定义输入输出格式

Hadoop提供了较为丰富的数据输入输出格式,可以满足很多的设计实现,但是在某些时候需要自定义输入输出格式. 数据的输入格式用于描述MapReduce作业的数据输入规范,MapReduce框架依靠数据输入格式完后输入规范检查(比如输入文件目录的检查),对数据文件进行输入分块(InputSpilt)以及提供从输入分快中将数据逐行的读出,并转换为Map过程的输入键值对等功能.Hadoop提供了很多的输入格式,TextInputFormat和KeyValueInputFormat,对于每个输入格式都有

C语言中输入输出格式控制

1.C语言中,非零值为真,真用1表示:零值为假,假用0表示. 2.转义字符参考: \a 蜂鸣,响铃 \b 回退:向后退一格 \f 换页 \n 换行 \r 回车,光标到本行行首 \t 水平制表 \v 垂直制表 \\ 反斜杠 \' 单引号 \" 双引号 \? 问号 \ddd 三位八进制 \0 空字符(NULL),什么都不做 \xhh 二位十六进制 说明: 1)\v垂直制表和\f换页符对屏幕没有任何影响,但会影响打印机执行响应操作. 2),\n其实应该叫回车换行.换行只是换一行,不改变光标的横坐标:回

Java学习笔记之[ 利用扫描仪Scanner进行数据输入 ]

/*********数据的输入********//**利用扫描仪Scanner进行数据输入 怎么使用扫描仪Scanner *1.放在类声明之前,引入扫描仪 import java.util.Scanner; *2.声明一个新的扫描仪(即向内存申请一个空间) Scanner in *3.赋值 in=new Scanner(System.in); Scanner in=new Scanner(System.in); *4.使用扫描仪 整形数据输入:in.nextInt()来接收 双精度小数输入:in

字节流之数据输入输出流

数据输入输出流:让我们对基本数据类型操作更方便

R in action -- 2.3 数据输入

R in action -- 2.3 数据输入 1.从CSV文件导入数据 > gtades <- read.table("1.csv",header=TRUE,sep=",") > gtades ID name age 1 1 qqw 15 2 2 eew 56 3 3 rrw 43 4 4 ttw 58 2.从Excel导入数据 安装openxlsx包 > install.packages("openxlsx") Ins

C语言基础--常用的数据输入输出函数

常用的数据输入输出函数 以下只是个人学习的笔记,由于我也是刚接触,所以有可能有错误,如有错误,请指出 1.语句 语句:就是完成一定的操作任务,在编写程序时,生命部分不能算作语句,如,int a=10:   程序中包括声明部分和执行部分,其中执行部分即由语句组成 2.字符数据输入输出 (1).字符数据输出 字符数据输出使用的是putchar,其作用是向显示设备输出一个字符,其语法格式是int putchar(int ch);,其中的ch是要进行输出的字符可以是字符型变量或者整型变量,也可以是常量,

JAVA用户数据输入

数据输入 首先需要导入扫描仪 然后声明扫描仪 输出输入提示 接收用户数据的数据 输出用户数据的数据 实例: import java.util.Scanner; //导入扫描仪 public class Test{ public static void main(String[] args){ Scanner in = new Scanner(System.in); //声明扫描仪,赋值扫描仪给in System.out.println("请输入你的姓名:"); //输出输入提示 Str

R语言入门视频笔记--4--R的数据输入

R的数据输入可以大体三种: 1.键盘输出 2.从文本文件导入 3.从Excel中导入数据 一.从键盘输入 首先创建一个数据框,玩玩嘛,瞎建一个 mydata <- data.frame(age =numeric(0),gender= character(0),weight=numeric(0))    #建一个空数据框,但已经声明过元素类型 mydata <- edit(mydata)                        #可以进行编辑 fix(mydata) #跟上面一样可以进行编