Hadoop,MapReduce操作Mysql

前以前帖子介绍,怎样读取文本数据源和多个数据源的合并:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html

这一个博客介绍一下MapReduce怎样读取关系数据库的数据,选择的关系数据库为MySql,因为它是开源的软件,所以大家用的比较多。以前上学的时候就没有用过开源的软件,直接用盗版,也相当与免费,且比开源好用,例如向oracle,windows7等等。现在工作了,由于公司考虑成本的问题,所以都用成开源的,ubuntu,mysql等,本人现在支持开源,特别像hadoop这样的东西,真的太好了,不但可以使用软件,也可以读到源代码。话不说多了。

hadoop技术推出一首曾遭到关系数据库研究者的挑衅和批评,认为MapReduce不具有关系数据库中的结构化数据存储和处理能力。为此,hadoop社区和研究人员做了多的努力,在hadoop0.19版支持MapReduce访问关系数据库,如:mysql,MySQL、PostgreSQL、Oracle 等几个数据库系统。

1. 从Mysql读出数据

Hadoop访问关系数据库主要通过一下接口实现的:DBInputFormat类,包所在位置:org.apache.hadoop.mapred.lib.db 中。DBInputFormat 在 Hadoop 应用程序中通过数据库供应商提供的 JDBC接口来与数据库进行交互,并且可以使用标准的 SQL 来读取数据库中的记录。学习DBInputFormat首先必须知道二个条件。

  1. 在使用 DBInputFormat 之前,必须将要使用的 JDBC 驱动拷贝到分布式系统各个节点的$HADOOP_HOME/lib/目录下。
  2. MapReduce访问关系数据库时,大量频繁的从MapReduce程序中查询和读取数据,这大大的增加了数据库的访问负载,因此,DBInputFormat接口仅仅适合读取小数据量的数据,而不适合处理数据仓库。要处理数据仓库的方法有:利用数据库的Dump工具将大量待分析的数据输出为文本,并上传的Hdfs中进行处理,处理的方法可参考:http://www.cnblogs.com/liqizhou/archive/2012/05/15/2501835.html

DBInputFormat 类中包含以下三个内置类

  1. protected class DBRecordReader implementsRecordReader<LongWritable, T>:用来从一张数据库表中读取一条条元组记录。
  2. 2.public static class NullDBWritable implements DBWritable,Writable:主要用来实现 DBWritable 接口。DBWritable接口要实现二个函数,第一是write,第二是readFileds,这二个函数都不难理解,一个是写,一个是读出所有字段。原型如下:

    public void write(PreparedStatement statement) throwsSQLException;
    public void readFields(ResultSet resultSet) throws SQLException;
  3. protected static class DBInputSplit implements InputSplit:主要用来描述输入元组集合的范围,包括 start 和 end 两个属性,start 用来表示第一条记录的索引号,end 表示最后一条记录的索引号.

下面对怎样使用 DBInputFormat 读取数据库记录进行详细的介绍,具体步骤如下:

  1. DBConfiguration.configureDB (JobConf job, StringdriverClass, String dbUrl, String userName, String passwd)函数,配置JDBC 驱动,数据源,以及数据库访问的用户名和密码。MySQL 数据库的 JDBC 的驱动为“com.mysql.jdbc.Driver”,数据源为“jdbc:mysql://localhost/testDB”,其中testDB为访问的数据库。useName一般为“root”,passwd是你数据库的密码。
  2. DBInputFormat.setInput(JobConf job, Class<?extends DBWritable> inputClass, String tableName, String conditions,String orderBy, String... fieldNames),这个方法的参数很容易看懂,inputClass实现DBWritable接口。,string tableName表名, conditions表示查询的条件,orderby表示排序的条件,fieldNames是字段,这相当与把sql语句拆分的结果。当然也可以用sql语句进行重载。etInput(JobConf job, Class<?extends DBWritable> inputClass, String inputQuery, StringinputCountQuery)。
  3. 编写MapReduce函数,包括Mapper 类、Reducer 类、输入输出文件格式等,然后调用JobClient.runJob(conf)。

上面讲了理论,下面举个例子:假设 MySQL 数据库中有数据库student,假设数据库中的字段有“id”,“name”,“gender","number"。

第一步要实现DBwrite和write数据接口。代码如下:

        public class StudentRecord implements Writable, DBWritable{
            int id;
            String name;
            String gender;
            String number;
            @Override
        public void readFields(DataInput in) throws IOException {
            // TODO Auto-generated method stub
            this.id = in.readInt();
            this.gender = Text.readString(in);
            this.name = in.readString();
            this.number = in.readString();
        }
            @Override
        public void write(DataOutput out) throws IOException {
            // TODO Auto-generated method stub
            out.writeInt(this.id);
            Text.writeString(out,this.name);
            out.writeInt(this.gender);
            out.writeInt(this.number);
        }

            @Override
        public void readFields(ResultSet result) throws SQLException {
            // TODO Auto-generated method stub
            this.id = result.getInt(1);
            this.name = result.getString(2);
            this.gender = result.getString(3);
            this.number = result.getString(4);
        }

            @Override
        public void write(PreparedStatement stmt) throws SQLException{
            // TODO Auto-generated method stub
            stmt.setInt(1, this.id);
            stmt.setString(2, this.name);
            stmt.setString(3, this.gender);
            stmt.setString(4, this.number);
        }
            @Override
        public String toString() {
            // TODO Auto-generated method stub
            return new String(this.name + " " + this.gender + " " +this.number);
        }

第二步,实现Map和Reduce类

    public class DBAccessMapper extends MapReduceBase implements
            Mapper<LongWritable, TeacherRecord, LongWritable, Text> {
        @Override
        public void map(LongWritable key, TeacherRecord value,
                OutputCollector<LongWritable, Text> collector, Reporter reporter)
                throws IOException {
            // TODO Auto-generated method stub
            new collector.collect(new LongWritable(value.id), new Text(value
                    .toString()));
        }
    }

第三步:主函数的实现,函数

public class DBAccessReader {

    public static void main(String[] args) throws IOException {
        JobConf conf = new JobConf(DBAccessReader.class);
        conf.setOutputKeyClass(LongWritable.class);
        conf.setOutputValueClass(Text.class);
        conf.setInputFormat(DBInputFormat.class);
        FileOutputFormat.setOutputPath(conf, new Path("dboutput"));
        DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
        "jdbc:mysql://localhost/school","root","123456");
        String [] fields = {"id", "name", "gender", "number"};
        DBInputFormat.setInput(conf, StudentRecord.class,"Student",null "id", fields);
        conf.setMapperClass(DBAccessMapper.class);
        conf.setReducerClass(IdentityReducer.class);
        JobClient.runJob(conf);
        }

}

2.写数据

往往对于数据处理的结果的数据量一般不会太大,可能适合hadoop直接写入数据库中。hadoop提供了相应的数据库直接输出的计算发结果。

  1. DBOutFormat: 提供数据库写入接口。
  2. DBRecordWriter:提供向数据库中写入的数据记录的接口。
  3. DBConfiguration:提供数据库配置和创建链接的接口。

DBOutFormat提供一个静态方法setOutput(job,String table,String ...filedNames);该方法的参数很容易看懂。假设要插入一个Student的数据,其代码为

    public static void main(String[] args) throws IOException {
        Configuration conf = new Configuration();
        JobConf conf = new JobConf();
        conf.setOutputFormat(DBOutputFormat.class);
        DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver",
                "jdbc:mysql://localhost/school","root","123456");
        DBOutputFormat.setOutput(conf,"Student", 456, "liqizhou", "man", "20004154578");
        JobClient.runJob(conf); 



Hadoop,MapReduce操作Mysql

时间: 2024-11-05 18:18:59

Hadoop,MapReduce操作Mysql的相关文章

本地通过Eclipse链接Hadoop操作Mysql数据库问题小结

前一段时间,在上一篇博文中描述了自己抽时间在构建的完全分布式Hadoop环境过程中遇到的一些问题以及构建成功后,通过Eclipse操作HDFS的时候遇到的一些问题,最近又想进一步学习学习Hadoop操作Mysql数据库的一些知识,在这里网上存在很多分歧,很多人可能会笑话,用那么“笨重”的Hadoop来操作数据库,脑子有问题吧,Hadoop的HDFS优势在于处理分布式文件系统,这种说法没有任何错误,数据库的操作讲究“安全.轻便.快捷”,用Hadoop操作完全是不符合常理啊,那为啥还要学习这个东西呢

Hadoop 中利用 mapreduce 读写 mysql 数据

Hadoop 中利用 mapreduce 读写 mysql 数据 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv.uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方. 好了言归正传,简单的说说背景.原理以及需要注意的地方: 1.为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInp

【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现. MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1. Hadoop示例程序WordCount详解及实例 2. hadoop 学习笔

Spark:超越Hadoop MapReduce

引言:和 Hadoop 一样,Spark 提供了一个 Map/Reduce API(分布式计算)和分布式存储.二者主要的不同点是,Spark 在集群的内存中保存数据,而 Hadoop 在集群的磁盘中存储数据. 本文选自<SparkGraphX实战>. 大数据对一些数据科学团队来说是 主要的挑战,因为在要求的可扩展性方面单机没有能力和容量来运行大规模数据处 理.此外,即使专为大数据设计的系统,如 Hadoop,由于一些数据的属性问题也很难有效地处理图数据,我们将在本章的其他部分看到这方面的内容.

【Big Data - Hadoop - MapReduce】hadoop 学习笔记:MapReduce框架详解

开始聊MapReduce,MapReduce是Hadoop的计算框架,我学Hadoop是从Hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密.这个可能是我做技术研究的思路有关,我开始学习某一套技术总是想着这套技术到底能干什么,只有当我真正理解了这套技术解决了什么问题时候,我后续的学习就能逐步的加快,而学习hdfs时候我就发现,要理解hadoop框架的意义,hdfs和mapreduce是密不可分,所以当我写分布式文件系统时候,总是感觉自己的理解肤浅

Hadoop MapReduce执行过程详解(带hadoop例子)

https://my.oschina.net/itblog/blog/275294 摘要: 本文通过一个例子,详细介绍Hadoop 的 MapReduce过程. 分析MapReduce执行过程 MapReduce运行的时候,会通过Mapper运行的任务读取HDFS中的数据文件,然后调用自己的方法,处理数据,最后输出.Reducer任务会接收Mapper任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到HDFS的文件中.整个流程如图: Mapper任务的执行过程详解 每个Mapper任

Hadoop MapReduce原理及实例

MapReduce是用于数据处理的一种编程模型,简单但足够强大,专门为并行处理大数据而设计. 1. 通俗理解MapReduce MapReduce的处理过程分为两个步骤:map和reduce.每个阶段的输入输出都是key-value的形式,key和value的类型可以自行指定.map阶段对切分好的数据进行并行处理,处理结果传输给reduce,由reduce函数完成最后的汇总. 例如从大量历史数据中找出往年最高气温,NCDC公开了过去每一年的所有气温等天气数据的检测,每一行记录一条观测记录,格式如

Hadoop MapReduce编程学习

一直在搞spark,也没时间弄hadoop,不过Hadoop基本的编程我觉得我还是要会吧,看到一篇不错的文章,不过应该应用于hadoop2.0以前,因为代码中有  conf.set("mapred.job.tracker", "192.168.1.2:9001");新框架中已改为 Yarn-site.xml 中的 resouceManager 及 nodeManager 具体配置项,新框架中历史 job 的查询已从 Job tracker 剥离,归入单独的mapre

hadoop MapReduce实例解析

1.MapReduce理论简介 1.1 MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的中间结果,得到最终结果.简单地说,MapReduce就是"任务的分解与结果的汇总". 在Hadoop中,用于执行MapReduce任务的机器角色有两个:一个是JobTracker:另一个是TaskTracker,JobTracker是用于调度工作的,TaskTracke