Hadoop中的KeyValueInputFormat

一:背景

有时候,我们可以不以偏移量和行文本内容来作为数据源到MapTask的输入格式,而使用键值对的形式,使用KeyValueInputFormat就可以完成这种需求。

二:技术实现

数据源如下

操作代码如下:

[java] view plain copy

  1. public class MyKeyValueTextInputFormat {
  2. // 定义输入路径
  3. private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/hello";
  4. // 定义输出路径
  5. private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
  6. public static void main(String[] args) {
  7. try {
  8. // 创建配置信息
  9. Configuration conf = new Configuration();
  10. //设置行的分隔符,这里是制表符,第一个制表符前面的是Key,第一个制表符后面的内容都是value
  11. conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");
  12. /**********************************************/
  13. //对Map端输出进行压缩
  14. /*conf.setBoolean("mapred.compress.map.output", true);
  15. //设置map端输出使用的压缩类
  16. conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
  17. //对reduce端输出进行压缩
  18. conf.setBoolean("mapred.output.compress", true);
  19. //设置reduce端输出使用的压缩类
  20. conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);*/
  21. // 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)
  22. /*
  23. * conf.addResource("classpath://hadoop/core-site.xml");
  24. * conf.addResource("classpath://hadoop/hdfs-site.xml");
  25. * conf.addResource("classpath://hadoop/hdfs-site.xml");
  26. */
  27. // 创建文件系统
  28. FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
  29. // 如果输出目录存在,我们就删除
  30. if (fileSystem.exists(new Path(OUT_PATH))) {
  31. fileSystem.delete(new Path(OUT_PATH), true);
  32. }
  33. // 创建任务
  34. Job job = new Job(conf, MyKeyValueTextInputFormat.class.getName());
  35. //1.1   设置输入目录和设置输入数据格式化的类
  36. FileInputFormat.setInputPaths(job, INPUT_PATH);
  37. job.setInputFormatClass(KeyValueTextInputFormat.class);
  38. //1.2   设置自定义Mapper类和设置map函数输出数据的key和value的类型
  39. job.setMapperClass(MyKeyValueInputFormatMapper.class);
  40. job.setMapOutputKeyClass(Text.class);
  41. job.setMapOutputValueClass(LongWritable.class);
  42. //1.3   设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
  43. job.setPartitionerClass(HashPartitioner.class);
  44. job.setNumReduceTasks(1);
  45. //1.4   排序、分组
  46. //1.5   归约
  47. //2.1   Shuffle把数据从Map端拷贝到Reduce端。
  48. //2.2   指定Reducer类和输出key和value的类型
  49. job.setReducerClass(MyKeyValueInputFormatReducer.class);
  50. job.setOutputKeyClass(Text.class);
  51. job.setOutputValueClass(LongWritable.class);
  52. //2.3   指定输出的路径和设置输出的格式化类
  53. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  54. job.setOutputFormatClass(TextOutputFormat.class);
  55. // 提交作业 退出
  56. System.exit(job.waitForCompletion(true) ? 0 : 1);
  57. } catch (Exception e) {
  58. e.printStackTrace();
  59. }
  60. }
  61. /**
  62. * 自定义Mapper类
  63. * @author 廖钟民
  64. * time : 2015年1月15日下午8:00:01
  65. * @version
  66. */
  67. public static class MyKeyValueInputFormatMapper extends Mapper<Text, Text, Text, LongWritable>{
  68. /**
  69. * 输入数据是
  70. * hello    you
  71. * hello    me
  72. * you  me  love
  73. *
  74. * 进入map的键值对应该是<hello,you> <hello,me> <you,me love>每个键值对分别调用map()函数
  75. */
  76. protected void map(Text key, Text value, Mapper<Text, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
  77. //把key和value都当做key写出去
  78. context.write(key, new LongWritable(1));
  79. context.write(value, new LongWritable(1));
  80. }
  81. }
  82. /**
  83. * map()函数的输出结果为:
  84. *<hello,1> <you,1> <hello,1> <me,1> <you,1> <me love,1>
  85. *排序分组后的结果为:
  86. *<hello,{1,1}> <me,{1}> <me love,{1}> <you,{1,1}>
  87. */
  88. /**
  89. * 自定义Reducer类
  90. * @author 廖钟民
  91. * time : 2015年1月15日下午8:00:12
  92. * @version
  93. */
  94. public static class MyKeyValueInputFormatReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
  95. @Override
  96. protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,
  97. InterruptedException {
  98. int sum = 0;
  99. //遍历统计
  100. for (LongWritable s : values){
  101. sum += s.get();
  102. }
  103. context.write(key, new LongWritable(sum));
  104. }
  105. }
  106. }

程序运行结果:

时间: 2024-08-04 10:11:21

Hadoop中的KeyValueInputFormat的相关文章

hadoop中Configuration类剖析

Configuration是hadoop中五大组件的公用类,所以放在了core下,org.apache.hadoop.conf.Configruration.这个类是作业的配置信息类,任何作用的配置信息必须通过Configuration传递,因为通过Configuration可以实现在多个mapper和多个reducer任务之间共享信息. 类图 说明:Configuration实现了Iterable和Writable两个接口,其中实现Iterable是为了迭代,迭代出Configuration对

Hadoop中作业(job)、任务(task)和task attempt

hadoop中,MapReduce作业(job)ID的格式为job_201412081211_0002.这表示该作业是第二个作业(作业号从0001开始),作业开始于2014年12月8号12:11. 任务(task)属于作业,通过使用"task"替换作业ID的"job"前缀,然后在后面加上一个后缀表示哪个作业中间的任务.例如:task_201412081211_0002_m_000003,表示该任务属于job_201412081211_0002作业的第三个map任务(

hadoop中Text类 与 java中String类的区别

hadoop 中 的Text类与java中的String类感觉上用法是相似的,但两者在编码格式和访问方式上还是有些差别的,要说明这个问题,首先得了解几个概念: 字符集: 是一个系统支持的所有抽象字符的集合.字符是各种文字和符号的总称,包括各国家文字.标点符号.图形符号.数字等.例如 unicode就是一个字符集,它的目标是涵盖世界上所有国家的文字和符号: 字符编码:是一套法则,使用该法则能够对自然语言的字符的一个集合(如字母表或音节表),与其他东西的一个集合(如号码或电脉冲)进行配对.即在符号集

再次整理关于hadoop中yarn的原理及运行

关于hadoop中yarn的运行原理整理 一:对yarn的理解 1.关于yarn的组成 大约分成主要的四个. Resourcemanager,Nodemanager,Applicationmaster,container 2.Resourcemanager(RM)的理解 RM是全局资源管理器,负责整个系统的资源管理和分配. 主要由两个组件组成:调度器和应用程序管理器(ASM) 调度器:根据容量,队列等限制条件,将系统中的资源分配给各个正在运行的应用程序,不负责具体应用程序的相关工作,比如监控或跟

Hadoop 中利用 mapreduce 读写 mysql 数据

Hadoop 中利用 mapreduce 读写 mysql 数据 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv.uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方. 好了言归正传,简单的说说背景.原理以及需要注意的地方: 1.为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInp

解决hadoop中 bin/hadoop fs -ls ls: `.&#39;: No such file or directory问题

出现这样的问题确实很苦恼...使用的是2.7版本..一般论坛上的都是1.x的教程,搞死人 在现在的2.x版本上的使用bin/hadoop fs -ls  /就有用 应该使用绝对路径就不会有问题....mkdir也是一样的..具体原因不知,我使用相对路径会出现错误.... 解决hadoop中 bin/hadoop fs -ls ls: `.': No such file or directory问题

结合手机上网流量业务来说明Hadoop中的自定义数据类型(序列化、反序列化机制)

大家都知道,Hadoop中为Key的数据类型必须实现WritableComparable接口,而Value的数据类型只需要实现Writable接口即可:能做Key的一定可以做Value,能做Value的未必能做Key.但是具体应该怎么应用呢?--本篇文章将结合手机上网流量业务进行分析. 先介绍一下业务场景:统计每个用户的上行流量和,下行流量和,以及总流量和. 本次描述所用数据: 日志格式描述: 日志flowdata.txt中的具体数据: 接下来贴出详细代码,代码中含有详细注释,从代码中可以看出,

Hadoop中WordCount代码-直接加载hadoop的配置文件

Hadoop中WordCount代码-直接加载hadoop的配置文件 在Myeclipse中,直接编写WordCount代码,代码中直接调用core-site.xml,hdfs-site.xml,mapred-site.xml配置文件 package com.apache.hadoop.function; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import 

深度分析如何在Hadoop中控制Map的数量

深度分析如何在Hadoop中控制Map的数量 [email protected] 很多文档中描述,Mapper的数量在默认情况下不可直接控制干预,因为Mapper的数量由输入的大小和个数决定.在默认情况下,最终input 占据了多少block,就应该启动多少个Mapper.如果输入的文件数量巨大,但是每个文件的size都小于HDFS的blockSize,那么会造成 启动的Mapper等于文件的数量(即每个文件都占据了一个block),那么很可能造成启动的Mapper数量超出限制而导致崩溃.这些逻