为了方便 MapReduce 直接訪问关系型数据库(Mysql,Oracle)。Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,依据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。
执行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,通常是因为程序找不到mysql驱动包。解决方法是让每一个tasktracker执行MapReduce程序时都能够找到该驱动包。
加入包有两种方式:
(1)在每一个节点下的${HADOOP_HOME}/lib下加入该包。重新启动集群,通常是比較原始的方法。
(2)a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /hdfsPath/
b)在mr程序提交job前,加入语句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java-5.1.0-bin.jar”),conf);
mysql数据库存储到hadoop hdfs
mysql表创建和数据初始化
DROP TABLE IF EXISTS `wu_testhadoop`; CREATE TABLE `wu_testhadoop` ( `id` int(11) NOT NULL AUTO_INCREMENT, `title` varchar(255) DEFAULT NULL, `content` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8; -- ---------------------------- -- Records of wu_testhadoop -- ---------------------------- INSERT INTO `wu_testhadoop` VALUES (‘1‘, ‘123‘, ‘122312‘); INSERT INTO `wu_testhadoop` VALUES (‘2‘, ‘123‘, ‘123456‘);
定义hadoop数据訪问
mysql表创建完成后,我们须要定义hadoop訪问mysql的规则。
hadoop提供了org.apache.hadoop.io.Writable接口来实现简单的高效的可序列化的协议,该类基于DataInput和DataOutput来实现相关的功能。
hadoop对数据库訪问也提供了org.apache.hadoop.mapred.lib.db.DBWritable接口,当中write方法用于对PreparedStatement对象设定值,readFields方法用于对从数据库读取出来的对象进行列的值绑定。
以上两个接口的使用例如以下(内容是从源代码得来)
writable
public class MyWritable implements Writable { // Some data private int counter; private long timestamp; public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public static MyWritable read(DataInput in) throws IOException { MyWritable w = new MyWritable(); w.readFields(in); return w; } }
DBWritable
public class MyWritable implements Writable, DBWritable { // Some data private int counter; private long timestamp; //Writable#write() implementation public void write(DataOutput out) throws IOException { out.writeInt(counter); out.writeLong(timestamp); } //Writable#readFields() implementation public void readFields(DataInput in) throws IOException { counter = in.readInt(); timestamp = in.readLong(); } public void write(PreparedStatement statement) throws SQLException { statement.setInt(1, counter); statement.setLong(2, timestamp); } public void readFields(ResultSet resultSet) throws SQLException { counter = resultSet.getInt(1); timestamp = resultSet.getLong(2); } }
数据库相应的实现
package com.wyg.hadoop.mysql.bean; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; public class DBRecord implements Writable, DBWritable{ private int id; private String title; private String content; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } @Override public void readFields(ResultSet set) throws SQLException { this.id = set.getInt("id"); this.title = set.getString("title"); this.content = set.getString("content"); } @Override public void write(PreparedStatement pst) throws SQLException { pst.setInt(1, id); pst.setString(2, title); pst.setString(3, content); } @Override public void readFields(DataInput in) throws IOException { this.id = in.readInt(); this.title = Text.readString(in); this.content = Text.readString(in); } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.id); Text.writeString(out, this.title); Text.writeString(out, this.content); } @Override public String toString() { return this.id + " " + this.title + " " + this.content; } }
实现Map/Reduce
package com.wyg.hadoop.mysql.mapper; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import com.wyg.hadoop.mysql.bean.DBRecord; @SuppressWarnings("deprecation") public class DBRecordMapper extends MapReduceBase implements Mapper<LongWritable, DBRecord, LongWritable, Text>{ @Override public void map(LongWritable key, DBRecord value, OutputCollector<LongWritable, Text> collector, Reporter reporter) throws IOException { collector.collect(new LongWritable(value.getId()), new Text(value.toString())); } }
測试hadoop连接mysql并将数据存储到hdfs
package com.wyg.hadoop.mysql.db; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; import com.wyg.hadoop.mysql.bean.DBRecord; import com.wyg.hadoop.mysql.mapper.DBRecordMapper; public class DBAccess { public static void main(String[] args) throws IOException { JobConf conf = new JobConf(DBAccess.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); conf.setInputFormat(DBInputFormat.class); Path path = new Path("hdfs://192.168.44.129:9000/user/root/dbout"); FileOutputFormat.setOutputPath(conf, path); DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://你的ip:3306/数据库名","username","password"); String [] fields = {"id", "title", "content"}; DBInputFormat.setInput(conf, DBRecord.class, "wu_testhadoop", null, "id", fields); conf.setMapperClass(DBRecordMapper.class); conf.setReducerClass(IdentityReducer.class); JobClient.runJob(conf); } }
运行程序,结果例如以下:
15/08/11 16:46:18 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 15/08/11 16:46:18 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 15/08/11 16:46:18 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 15/08/11 16:46:19 INFO mapred.JobClient: Running job: job_local_0001 15/08/11 16:46:19 INFO mapred.MapTask: numReduceTasks: 1 15/08/11 16:46:19 INFO mapred.MapTask: io.sort.mb = 100 15/08/11 16:46:19 INFO mapred.MapTask: data buffer = 79691776/99614720 15/08/11 16:46:19 INFO mapred.MapTask: record buffer = 262144/327680 15/08/11 16:46:19 INFO mapred.MapTask: Starting flush of map output 15/08/11 16:46:19 INFO mapred.MapTask: Finished spill 0 15/08/11 16:46:19 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 15/08/11 16:46:19 INFO mapred.LocalJobRunner: 15/08/11 16:46:19 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0‘ done. 15/08/11 16:46:19 INFO mapred.LocalJobRunner: 15/08/11 16:46:19 INFO mapred.Merger: Merging 1 sorted segments 15/08/11 16:46:19 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 48 bytes 15/08/11 16:46:19 INFO mapred.LocalJobRunner: 15/08/11 16:46:19 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 15/08/11 16:46:19 INFO mapred.LocalJobRunner: 15/08/11 16:46:19 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now 15/08/11 16:46:19 INFO mapred.FileOutputCommitter: Saved output of task ‘attempt_local_0001_r_000000_0‘ to hdfs://192.168.44.129:9000/user/root/dbout 15/08/11 16:46:19 INFO mapred.LocalJobRunner: reduce > reduce 15/08/11 16:46:19 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0‘ done. 15/08/11 16:46:20 INFO mapred.JobClient: map 100% reduce 100% 15/08/11 16:46:20 INFO mapred.JobClient: Job complete: job_local_0001 15/08/11 16:46:20 INFO mapred.JobClient: Counters: 14 15/08/11 16:46:20 INFO mapred.JobClient: FileSystemCounters 15/08/11 16:46:20 INFO mapred.JobClient: FILE_BYTES_READ=34606 15/08/11 16:46:20 INFO mapred.JobClient: FILE_BYTES_WRITTEN=69844 15/08/11 16:46:20 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=30 15/08/11 16:46:20 INFO mapred.JobClient: Map-Reduce Framework 15/08/11 16:46:20 INFO mapred.JobClient: Reduce input groups=2 15/08/11 16:46:20 INFO mapred.JobClient: Combine output records=0 15/08/11 16:46:20 INFO mapred.JobClient: Map input records=2 15/08/11 16:46:20 INFO mapred.JobClient: Reduce shuffle bytes=0 15/08/11 16:46:20 INFO mapred.JobClient: Reduce output records=2 15/08/11 16:46:20 INFO mapred.JobClient: Spilled Records=4 15/08/11 16:46:20 INFO mapred.JobClient: Map output bytes=42 15/08/11 16:46:20 INFO mapred.JobClient: Map input bytes=2 15/08/11 16:46:20 INFO mapred.JobClient: Combine input records=0 15/08/11 16:46:20 INFO mapred.JobClient: Map output records=2 15/08/11 16:46:20 INFO mapred.JobClient: Reduce input records=2
同一时候能够看到hdfs文件系统多了一个dbout的文件夹,里边的文件保存了数据库相应的数据,内容保存例如以下
1 1 123 122312 2 2 123 123456
hdfs数据导入到mysql
hdfs文件存储到mysql,也须要上边的DBRecord类作为辅助。由于数据库的操作都是通过DBInput和DBOutput来进行的;
首先须要定义map和reduce的实现(map用以对hdfs的文档进行解析,reduce解析map的输出并输出)
package com.wyg.hadoop.mysql.mapper; import java.io.IOException; import java.io.DataInput; import java.io.DataOutput; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Iterator; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import com.wyg.hadoop.mysql.bean.DBRecord; public class WriteDB { // Map处理过程 public static class Map extends MapReduceBase implements Mapper<Object, Text, Text, DBRecord> { private final static DBRecord one = new DBRecord(); private Text word = new Text(); @Override public void map(Object key, Text value, OutputCollector<Text, DBRecord> output, Reporter reporter) throws IOException { String line = value.toString(); String[] infos = line.split(" "); String id = infos[0].split(" ")[1]; one.setId(new Integer(id)); one.setTitle(infos[1]); one.setContent(infos[2]); word.set(id); output.collect(word, one); } } public static class Reduce extends MapReduceBase implements Reducer<Text, DBRecord, DBRecord, Text> { @Override public void reduce(Text key, Iterator<DBRecord> values, OutputCollector<DBRecord, Text> collector, Reporter reporter) throws IOException { DBRecord record = values.next(); collector.collect(record, new Text()); } } }
測试hdfs导入数据到数据库
package com.wyg.hadoop.mysql.db; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; import com.wyg.hadoop.mysql.bean.DBRecord; import com.wyg.hadoop.mysql.mapper.WriteDB; public class DBInsert { public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WriteDB.class); // 设置输入输出类型 conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(DBOutputFormat.class); // 不加这两句,通只是,可是网上给的样例没有这两句。 //Text, DBRecord conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(DBRecord.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(DBRecord.class); // 设置Map和Reduce类 conf.setMapperClass(WriteDB.Map.class); conf.setReducerClass(WriteDB.Reduce.class); // 设置输如文件夹 FileInputFormat.setInputPaths(conf, new Path("hdfs://192.168.44.129:9000/user/root/dbout")); // 建立数据库连接 DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://数据库ip:3306/数据库名称","username","password"); String[] fields = {"id","title","content" }; DBOutputFormat.setOutput(conf, "wu_testhadoop", fields); JobClient.runJob(conf); } }
測试结果例如以下
15/08/11 18:10:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 15/08/11 18:10:15 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 15/08/11 18:10:15 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 15/08/11 18:10:15 INFO mapred.FileInputFormat: Total input paths to process : 1 15/08/11 18:10:15 INFO mapred.JobClient: Running job: job_local_0001 15/08/11 18:10:15 INFO mapred.FileInputFormat: Total input paths to process : 1 15/08/11 18:10:15 INFO mapred.MapTask: numReduceTasks: 1 15/08/11 18:10:15 INFO mapred.MapTask: io.sort.mb = 100 15/08/11 18:10:15 INFO mapred.MapTask: data buffer = 79691776/99614720 15/08/11 18:10:15 INFO mapred.MapTask: record buffer = 262144/327680 15/08/11 18:10:15 INFO mapred.MapTask: Starting flush of map output 15/08/11 18:10:16 INFO mapred.MapTask: Finished spill 0 15/08/11 18:10:16 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 15/08/11 18:10:16 INFO mapred.LocalJobRunner: hdfs://192.168.44.129:9000/user/root/dbout/part-00000:0+30 15/08/11 18:10:16 INFO mapred.TaskRunner: Task ‘attempt_local_0001_m_000000_0‘ done. 15/08/11 18:10:16 INFO mapred.LocalJobRunner: 15/08/11 18:10:16 INFO mapred.Merger: Merging 1 sorted segments 15/08/11 18:10:16 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 40 bytes 15/08/11 18:10:16 INFO mapred.LocalJobRunner: 15/08/11 18:10:16 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting 15/08/11 18:10:16 INFO mapred.LocalJobRunner: reduce > reduce 15/08/11 18:10:16 INFO mapred.TaskRunner: Task ‘attempt_local_0001_r_000000_0‘ done. 15/08/11 18:10:16 INFO mapred.JobClient: map 100% reduce 100% 15/08/11 18:10:16 INFO mapred.JobClient: Job complete: job_local_0001 15/08/11 18:10:16 INFO mapred.JobClient: Counters: 14 15/08/11 18:10:16 INFO mapred.JobClient: FileSystemCounters 15/08/11 18:10:16 INFO mapred.JobClient: FILE_BYTES_READ=34932 15/08/11 18:10:16 INFO mapred.JobClient: HDFS_BYTES_READ=60 15/08/11 18:10:16 INFO mapred.JobClient: FILE_BYTES_WRITTEN=70694 15/08/11 18:10:16 INFO mapred.JobClient: Map-Reduce Framework 15/08/11 18:10:16 INFO mapred.JobClient: Reduce input groups=2 15/08/11 18:10:16 INFO mapred.JobClient: Combine output records=0 15/08/11 18:10:16 INFO mapred.JobClient: Map input records=2 15/08/11 18:10:16 INFO mapred.JobClient: Reduce shuffle bytes=0 15/08/11 18:10:16 INFO mapred.JobClient: Reduce output records=2 15/08/11 18:10:16 INFO mapred.JobClient: Spilled Records=4 15/08/11 18:10:16 INFO mapred.JobClient: Map output bytes=34 15/08/11 18:10:16 INFO mapred.JobClient: Map input bytes=30 15/08/11 18:10:16 INFO mapred.JobClient: Combine input records=0 15/08/11 18:10:16 INFO mapred.JobClient: Map output records=2 15/08/11 18:10:16 INFO mapred.JobClient: Reduce input records=2
測试之前我对原有表进行了清空处理,能够看到运行后数据库里边加入了两条内容;
下次在运行的时候会报错,属于正常情况,原因在于我们导入数据的时候对id进行赋值了,假设忽略id。是能够一直加入的;
源代码下载地址
源代码已上传,下载地址为download.csdn.net/detail/wuyinggui10000/8974585