Hadoop中的DBInputFormat

一:背景

为了方便MapReduce直接访问关系型数据库(MYSQL、Oracle等),Hadoop提供了DBInputFormat和DBOutputFormat两个类,通过DBInputFormat类把数据库表的数据读入到HDFS中,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库中。

二:实现

我们以MYSQL数据库为例,先建立数据库、表以及插入数据,如下,

(1):建立数据库

create database myDB;

(2):建立数据库表

[java] view plain copy

  1. create table student(id INTEGER NOT NULL PRIMARY KEY,name VARCHAR(32) NOT NULL);

(3):插入数据

[java] view plain copy

  1. insert into student values(1,"lavimer");

(4)编写MapReduce程序,我这里使用的版本是hadoop1.2.1,相关知识点都写在注释中了,如下:

[java] view plain copy

  1. /**
  2. * 使用DBInputFormat和DBOutputFormat
  3. * 要把数据库的jdbc驱动放到各个TaskTracker节点的lib目录下
  4. * 重启集群
  5. * @author 廖钟民
  6. * time : 2015年1月15日下午12:50:55
  7. * @version
  8. */
  9. public class MyDBInputFormat {
  10. //定义输出路径
  11. private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
  12. public static void main(String[] args) {
  13. try {
  14. //创建配置信息
  15. Configuration conf = new Configuration();
  16. /*//对Map端的输出进行压缩
  17. conf.setBoolean("mapred.compress.map.output", true);
  18. //设置map端输出使用的压缩类
  19. conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
  20. //对reduce端输出进行压缩
  21. conf.setBoolean("mapred.output.compress", true);
  22. //设置reduce端输出使用的压缩类
  23. conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);*/
  24. // 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)
  25. /*
  26. * conf.addResource("classpath://hadoop/core-site.xml");
  27. * conf.addResource("classpath://hadoop/hdfs-site.xml");
  28. * conf.addResource("classpath://hadoop/hdfs-site.xml");
  29. */
  30. //通过conf创建数据库配置信息
  31. DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://liaozhongmin:3306/myDB","root","134045");
  32. //创建文件系统
  33. FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
  34. //如果输出目录存在就删除
  35. if (fileSystem.exists(new Path(OUT_PATH))){
  36. fileSystem.delete(new Path(OUT_PATH),true);
  37. }
  38. //创建任务
  39. Job job = new Job(conf,MyDBInputFormat.class.getName());
  40. //1.1 设置输入数据格式化的类和设置数据来源
  41. job.setInputFormatClass(DBInputFormat.class);
  42. DBInputFormat.setInput(job, Student.class, "student", null, null, new String[]{"id","name"});
  43. //1.2 设置自定义的Mapper类和Mapper输出的key和value的类型
  44. job.setMapperClass(MyDBInputFormatMapper.class);
  45. job.setMapOutputKeyClass(Text.class);
  46. job.setMapOutputValueClass(Text.class);
  47. //1.3 设置分区和reduce数量(reduce的数量和分区的数量对应,因为分区只有一个,所以reduce的个数也设置为一个)
  48. job.setPartitionerClass(HashPartitioner.class);
  49. job.setNumReduceTasks(1);
  50. //1.4 排序、分组
  51. //1.5 归约
  52. //2.1 Shuffle把数据从Map端拷贝到Reduce端
  53. //2.2 指定Reducer类和输出key和value的类型
  54. job.setReducerClass(MyDBInputFormatReducer.class);
  55. job.setOutputKeyClass(Text.class);
  56. job.setOutputValueClass(Text.class);
  57. //2.3 指定输出的路径和设置输出的格式化类
  58. FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
  59. job.setOutputFormatClass(TextOutputFormat.class);
  60. //提交作业 然后关闭虚拟机正常退出
  61. System.exit(job.waitForCompletion(true) ? 0 : 1);
  62. } catch (Exception e) {
  63. e.printStackTrace();
  64. }
  65. }
  66. /**
  67. * 自定义Mapper类
  68. * @author 廖钟民
  69. * time : 2015年1月15日下午1:22:57
  70. * @version
  71. */
  72. public static class MyDBInputFormatMapper extends Mapper<LongWritable, Student, Text, Text>{
  73. //创建map输出时的key类型
  74. private Text mapOutKey = new Text();
  75. //创建map输出时的value类型
  76. private Text mapOutValue = new Text();
  77. @Override
  78. protected void map(LongWritable key, Student value, Mapper<LongWritable, Student, Text, Text>.Context context) throws IOException, InterruptedException {
  79. //创建输出的key:把id当做key
  80. mapOutKey.set(String.valueOf(value.getId()));
  81. //创建输出的value:把name当做value
  82. mapOutValue.set(value.getName());
  83. //通过context写出去
  84. context.write(mapOutKey, mapOutValue);
  85. }
  86. }
  87. /**
  88. * 自定义Reducer类
  89. * @author 廖钟民
  90. * time : 2015年1月15日下午1:23:28
  91. * @version
  92. */
  93. public static class MyDBInputFormatReducer extends Reducer<Text, Text, Text, Text>{
  94. @Override
  95. protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
  96. //遍历把结果写到HDFS中
  97. for (Text t : values){
  98. context.write(key, t);
  99. }
  100. }
  101. }
  102. }
  103. /**
  104. * 自定义实体类 用于对应数据库表中的字段
  105. * @author 廖钟民
  106. * time : 2015年1月15日下午12:52:58
  107. * @version
  108. */
  109. class Student implements Writable,DBWritable{
  110. //学生id字段
  111. private Integer id;
  112. //学生姓名
  113. private String name;
  114. //无参构造方法
  115. public Student() {
  116. }
  117. //有参构造方法
  118. public Student(Integer id, String name) {
  119. this.id = id;
  120. this.name = name;
  121. }
  122. public Integer getId() {
  123. return id;
  124. }
  125. public void setId(Integer id) {
  126. this.id = id;
  127. }
  128. public String getName() {
  129. return name;
  130. }
  131. public void setName(String name) {
  132. this.name = name;
  133. }
  134. //实现DBWritable接口要实现的方法
  135. public void readFields(ResultSet resultSet) throws SQLException {
  136. this.id = resultSet.getInt(1);
  137. this.name = resultSet.getString(2);
  138. }
  139. //实现DBWritable接口要实现的方法
  140. public void write(PreparedStatement preparedStatement) throws SQLException {
  141. preparedStatement.setInt(1, this.id);
  142. preparedStatement.setString(2, this.name);
  143. }
  144. //实现Writable接口要实现的方法
  145. public void readFields(DataInput dataInput) throws IOException {
  146. this.id = dataInput.readInt();
  147. this.name = Text.readString(dataInput);
  148. }
  149. //实现Writable接口要实现的方法
  150. public void write(DataOutput dataOutput) throws IOException {
  151. dataOutput.writeInt(this.id);
  152. Text.writeString(dataOutput, this.name);
  153. }
  154. @Override
  155. public int hashCode() {
  156. final int prime = 31;
  157. int result = 1;
  158. result = prime * result + ((id == null) ? 0 : id.hashCode());
  159. result = prime * result + ((name == null) ? 0 : name.hashCode());
  160. return result;
  161. }
  162. @Override
  163. public boolean equals(Object obj) {
  164. if (this == obj)
  165. return true;
  166. if (obj == null)
  167. return false;
  168. if (getClass() != obj.getClass())
  169. return false;
  170. Student other = (Student) obj;
  171. if (id == null) {
  172. if (other.id != null)
  173. return false;
  174. } else if (!id.equals(other.id))
  175. return false;
  176. if (name == null) {
  177. if (other.name != null)
  178. return false;
  179. } else if (!name.equals(other.name))
  180. return false;
  181. return true;
  182. }
  183. @Override
  184. public String toString() {
  185. return "Student [id=" + id + ", name=" + name + "]";
  186. }
  187. }

程序运行的结果是数据库中的数据成功导入到HDFS中,如下:

注:程序运行时,会碰到一个常见的数据库远程连接错误,大致如下:

[java] view plain copy

  1. Access denied for user ‘root‘@‘%‘ to database ‘xxxx’


原因:创建完数据库后,需要进行授权(在本地访问一般不会出现这个问题)

解决方法就是进行授权:

[java] view plain copy

  1. grant all on xxxx.* to ‘root‘@‘%‘ identified by ‘password‘ with grant option;
  2. xxxx代表创建的数据库;
  3. password为用户密码,在此为root的密码

另外一个常见的错误就是MYSQL驱动没有导入到hadoop/lib目录下,解决方案有两种,传统的方式我就不多说了,这里说另外一种方式:

(1):把包上传到集群上

[java] view plain copy

  1. hadoop fs -put mysql-connector-java-5.1.0- bin.jar /lib

(2):在MR程序提交job前,添加语句:

[java] view plain copy

    1. DistributedCache.addFileToClassPath(new Path("/lib/mysql- connector-java- 5.1.0-bin.jar"), conf);
时间: 2024-11-05 18:53:38

Hadoop中的DBInputFormat的相关文章

关于hadoop中的DBInputFormat试验

1.注意,需要声明为静态内部类,否则会报java.lang.NoSuchMethodException...<init>的错误public static class MySqlWritable implements Writable, DBWritable { 2.如果输出目录存在,需要先删除 3.由于需要从mysql数据取值,则需要有mysql数据库驱动包,hadoop classpath查看hadoop类加载路径,将驱动包拷贝到其中一个目录下即可: 4.解决mysql"Acces

Hadoop中的DBOutputFormat

一:背景 为了方便MapReduce直接访问关系型数据库(MYSQL.Oracle等),Hadoop提供了DBInputFormat和DBOutputFormat两个类,通过DBInputFormat类把数据库表中的数据导入到HDFS中,通过DBOutputFormat类把数MapReduce产生的结果导出到数据库表中. 二:技术实现 我们接上一篇文章即通过通过DBInputFormat将数据库表中的数据导入到HDFS中,这里我们讲的是通过DBOutputFormat类将MapReduce产生的

Hadoop 中利用 mapreduce 读写 mysql 数据

Hadoop 中利用 mapreduce 读写 mysql 数据 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv.uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方. 好了言归正传,简单的说说背景.原理以及需要注意的地方: 1.为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInp

深度分析如何在Hadoop中控制Map的数量

深度分析如何在Hadoop中控制Map的数量 [email protected] 很多文档中描述,Mapper的数量在默认情况下不可直接控制干预,因为Mapper的数量由输入的大小和个数决定.在默认情况下,最终input 占据了多少block,就应该启动多少个Mapper.如果输入的文件数量巨大,但是每个文件的size都小于HDFS的blockSize,那么会造成 启动的Mapper等于文件的数量(即每个文件都占据了一个block),那么很可能造成启动的Mapper数量超出限制而导致崩溃.这些逻

Hadoop中常用的InputFormat、OutputFormat(转)

Hadoop中的Map Reduce框架依赖InputFormat提供数据,依赖OutputFormat输出数据,每一个Map Reduce程序都离不开它们.Hadoop提供了一系列InputFormat和OutputFormat方便开发,本文介绍几种常用的: TextInputFormat 作为默认的文件输入格式,用于读取纯文本文件,文件被分为一系列以LF或者CR结束的行,key是每一行的位置偏移量,是LongWritable类型的,value是每一行的内容,为Text类型. KeyValue

关于学习Hadoop中未总结的资料

出自:http://www.cnblogs.com/xia520pi/archive/2012/01/02/2310118.html 1)Cygwin相关资料 (1)Cygwin上安装.启动ssh服务失败.ssh localhost失败的解决方案 地址:http://blog.163.com/pwcrab/blog/static/16990382220107267443810/ (2)windows2003+cygwin+ssh 地址:http://wenku.baidu.com/view/37

hadoop中Configuration类剖析

Configuration是hadoop中五大组件的公用类,所以放在了core下,org.apache.hadoop.conf.Configruration.这个类是作业的配置信息类,任何作用的配置信息必须通过Configuration传递,因为通过Configuration可以实现在多个mapper和多个reducer任务之间共享信息. 类图 说明:Configuration实现了Iterable和Writable两个接口,其中实现Iterable是为了迭代,迭代出Configuration对

Hadoop中作业(job)、任务(task)和task attempt

hadoop中,MapReduce作业(job)ID的格式为job_201412081211_0002.这表示该作业是第二个作业(作业号从0001开始),作业开始于2014年12月8号12:11. 任务(task)属于作业,通过使用"task"替换作业ID的"job"前缀,然后在后面加上一个后缀表示哪个作业中间的任务.例如:task_201412081211_0002_m_000003,表示该任务属于job_201412081211_0002作业的第三个map任务(

hadoop中Text类 与 java中String类的区别

hadoop 中 的Text类与java中的String类感觉上用法是相似的,但两者在编码格式和访问方式上还是有些差别的,要说明这个问题,首先得了解几个概念: 字符集: 是一个系统支持的所有抽象字符的集合.字符是各种文字和符号的总称,包括各国家文字.标点符号.图形符号.数字等.例如 unicode就是一个字符集,它的目标是涵盖世界上所有国家的文字和符号: 字符编码:是一套法则,使用该法则能够对自然语言的字符的一个集合(如字母表或音节表),与其他东西的一个集合(如号码或电脉冲)进行配对.即在符号集