hadoop生态系统学习之路(九)MR将结果输出到数据库(DB)

最开始讲MapReduce的时候,我们是指定输出目录,然后把结果直接输出到hdfs上。然后,在介绍hive的简单使用时,我们直接将结果输出到了hive表中。另外,MR还可以将结果输出到数据库以及hbase。

今天,笔者就给大家介绍MR将结果输出到db。

首先,笔者要提及一下之前MR将结果输出到hive表,这里需要注意,只能向某张表中入一次数据,再次执行MR报错:

org.apache.hive.hcatalog.common.HCatException : 2003 : Non-partitioned table already contains data : qyk_test.user_info。因为hive中的表数据实际也是放在hdfs中的。我们都知道hdfs是一次写入,多次读取的。那么,怎么解决这个问题呢?我们可以每次创建一个临时表,然后将MR的数据入到临时表,入完后再把临时表的数据使用insert into table 表名 select * from 临时表名 入到实际表中,入完后再删除临时表。

好了,接下来,笔者分以下几个步骤进行介绍:

一、pom依赖

这里,需要添加一个mysql驱动包依赖:

<!-- mysql驱动包 -->
<dependency>
   <groupId>mysql</groupId>
   <artifactId>mysql-connector-java</artifactId>
   <version>5.1.36</version>
 </dependency>

二、数据以及数据库表准备

我们还是使用之前博文中入到hive的输入文件user_info.txt,放在hdfs中的/qiyongkang/input目录下:

11  1200.0  qyk1    21
22  1301    qyk2    22
33  1400.0  qyk3    23
44  1500.0  qyk4    24
55  1210.0  qyk5    25
66  124 qyk6    26
77  1233    qyk7    27
88  15011   qyk8    28

然后,我们这里使用的是mysql数据库,在test数据库建表:

CREATE TABLE `user_info` (
  `id` bigint(20) DEFAULT NULL,
  `account` varchar(50) DEFAULT NULL,
  `name` varchar(50) DEFAULT NULL,
  `age` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

三、MR编写

首先,我们来看主类LoadDataToDbMR:

/**
 * Project Name:mr-demo
 * File Name:LoadDataToDbMR.java
 * Package Name:org.qiyongkang.mr.dbstore
 * Date:2016年4月10日下午3:16:05
 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
 *
*/

package org.qiyongkang.mr.dbstore;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

/**
 * ClassName:LoadDataToDbMR <br/>
 * Function: TODO ADD FUNCTION. <br/>
 * Reason:   TODO ADD REASON. <br/>
 * Date:     2016年4月10日 下午3:16:05 <br/>
 * @author   qiyongkang
 * @version
 * @since    JDK 1.6
 * @see
 */
public class LoadDataToDbMR {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration(); 

        //数据库配置
        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver","jdbc:mysql://192.168.52.31:3306/test","root", "root");

        Job job = Job.getInstance(conf, "db store");
        job.setJarByClass(LoadDataToDbMR.class);

        // 设置Mapper
        job.setMapperClass(DbStoreMapper.class);

        // 由于没有reducer,这里设置为0
        job.setNumReduceTasks(0);

        // 设置输入文件路径
        FileInputFormat.addInputPath(job, new Path("/qiyongkang/input"));

        DBOutputFormat.setOutput(job, "user_info", "id", "account", "name", "age");
        job.setOutputFormatClass(DBOutputFormat.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

然后,我们再来看Mapper:

/**
 * Project Name:mr-demo
 * File Name:DbStoreMapper.java
 * Package Name:org.qiyongkang.mr.dbstore
 * Date:2016年4月10日下午3:15:46
 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
 *
*/

package org.qiyongkang.mr.dbstore;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * ClassName:DbStoreMapper <br/>
 * Function: TODO ADD FUNCTION. <br/>
 * Reason: TODO ADD REASON. <br/>
 * Date: 2016年4月10日 下午3:15:46 <br/>
 *
 * @author qiyongkang
 * @version
 * @since JDK 1.6
 * @see
 */
public class DbStoreMapper extends Mapper<LongWritable, Text, UserInfoDBWritable, UserInfoDBWritable> {
    private UserInfo userInfo = new UserInfo();

    private UserInfoDBWritable userInfoDBWritable = null;

    @Override
    protected void map(LongWritable key, Text value,
            Mapper<LongWritable, Text, UserInfoDBWritable, UserInfoDBWritable>.Context context)
                    throws IOException, InterruptedException {

        // 每行以制表符分隔 id, account, name, age
        String[] strs = value.toString().split("\t");

        // id,
        userInfo.setId(Long.valueOf(strs[0]));

        // account
        userInfo.setAccount(strs[1]);

        // name
        userInfo.setName(strs[2]);

        // age
        userInfo.setAge(Integer.valueOf(strs[3]));

        // 写入到db,放在key
        userInfoDBWritable = new UserInfoDBWritable(userInfo);
        context.write(userInfoDBWritable , null);
    }

}

这里,我们准备了一个Model,UserInfo:

/**
 * Project Name:mr-demo
 * File Name:UserInfo.java
 * Package Name:org.qiyongkang.mr.dbstore
 * Date:2016年4月10日下午3:30:01
 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
 *
*/

package org.qiyongkang.mr.dbstore;
/**
 * ClassName:UserInfo <br/>
 * Function: TODO ADD FUNCTION. <br/>
 * Reason:   TODO ADD REASON. <br/>
 * Date:     2016年4月10日 下午3:30:01 <br/>
 * @author   qiyongkang
 * @version
 * @since    JDK 1.6
 * @see
 */
public class UserInfo {
    private long id;

    private String account;

    private String name;

    private int age;

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getAccount() {
        return account;
    }

    public void setAccount(String account) {
        this.account = account;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

}

然后,我们要想MR输出到Db,那么此类必须实现DBWritable,如下:

/**
 * Project Name:mr-demo
 * File Name:UserInfoDBWritable.java
 * Package Name:org.qiyongkang.mr.dbstore
 * Date:2016年4月10日下午3:27:32
 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
 *
*/

package org.qiyongkang.mr.dbstore;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.mapreduce.lib.db.DBWritable;

/**
 * ClassName:UserInfoDBWritable <br/>
 * Function: TODO ADD FUNCTION. <br/>
 * Reason:   TODO ADD REASON. <br/>
 * Date:     2016年4月10日 下午3:27:32 <br/>
 * @author   qiyongkang
 * @version
 * @since    JDK 1.6
 * @see
 */
public class UserInfoDBWritable implements DBWritable {
    private UserInfo userInfo;

    public UserInfoDBWritable() {}

    public UserInfoDBWritable(UserInfo userInfo) {
        this.userInfo = userInfo;
    }

    @Override
    public void write(PreparedStatement statement) throws SQLException {
        statement.setLong(1, userInfo.getId());
        statement.setString(2, userInfo.getAccount());
        statement.setString(3, userInfo.getName());
        statement.setInt(4, userInfo.getAge());
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {

    }

}

这里面的参数设置顺序与主类中设置DBOutputFormat时的字段顺序一致。

四、执行并查看结果

下面,还是同样的打包方式,只需修改下main函数所在的类即可。然后,上传到主节点,使用hdfs用户执行,注意此jar的权限设置。

接下来,执行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar,日志如下:

bash-4.1$ yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar
16/04/10 16:14:11 INFO client.RMProxy: Connecting to ResourceManager at massdata8/172.31.25.8:8032
16/04/10 16:14:12 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
16/04/10 16:14:13 INFO input.FileInputFormat: Total input paths to process : 1
16/04/10 16:14:13 INFO mapreduce.JobSubmitter: number of splits:1
16/04/10 16:14:14 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1458262657013_0982
16/04/10 16:14:14 INFO impl.YarnClientImpl: Submitted application application_1458262657013_0982
16/04/10 16:14:14 INFO mapreduce.Job: The url to track the job: http://massdata8:8088/proxy/application_1458262657013_0982/
16/04/10 16:14:14 INFO mapreduce.Job: Running job: job_1458262657013_0982
16/04/10 16:14:21 INFO mapreduce.Job: Job job_1458262657013_0982 running in uber mode : false
16/04/10 16:14:21 INFO mapreduce.Job:  map 0% reduce 0%
16/04/10 16:14:29 INFO mapreduce.Job:  map 100% reduce 0%
16/04/10 16:14:29 INFO mapreduce.Job: Job job_1458262657013_0982 completed successfully
16/04/10 16:14:29 INFO mapreduce.Job: Counters: 30
    File System Counters
        FILE: Number of bytes read=0
        FILE: Number of bytes written=91506
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=259
        HDFS: Number of bytes written=0
        HDFS: Number of read operations=2
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=0
    Job Counters
        Launched map tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=4459
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=4459
        Total vcore-seconds taken by all map tasks=4459
        Total megabyte-seconds taken by all map tasks=4566016
    Map-Reduce Framework
        Map input records=8
        Map output records=8
        Input split bytes=117
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=21
        CPU time spent (ms)=1530
        Physical memory (bytes) snapshot=321511424
        Virtual memory (bytes) snapshot=1579036672
        Total committed heap usage (bytes)=792199168
    File Input Format Counters
        Bytes Read=142
    File Output Format Counters
        Bytes Written=0

然后,我们在数据库执行查询SELECT * FROM user_info;可以看到:

说明入库成功!

好了,就介绍到这儿了。

时间: 2024-10-25 21:59:15

hadoop生态系统学习之路(九)MR将结果输出到数据库(DB)的相关文章

hadoop生态系统学习之路(五)hbase的简单使用

最近,参与了公司的一个大数据接口平台的开发,具体的处理过程是这样的.我们公司负责数据的入库,也就是一个etl过程,使用MR将数据入到hive里面,然后同步到impala,然后此接口平台提供查询接口,前台会将sql语句以参数传过来,然后接口平台通过调用impala提供的java api接口,将数据查询出来返回给用户.另外,如果查询的数据量很大,那么前台就会传一个taskId过来,第一次只需将数据查询出来,入到impala临时表,下次再查便将数据返回.那么,如何记录此任务的状态变化呢,这里我们就使用

hadoop生态系统学习之路(六)hive的简单使用

一.hive的基本概念与原理 Hive是基于Hadoop之上的数据仓库,能够存储.查询和分析存储在 Hadoop 中的大规模数据. Hive 定义了简单的类 SQL 查询语言,称为 HQL.它同意熟悉 SQL 的用户查询数据,同意熟悉 MapReduce 开发人员的开发自己定义的 mapper 和 reducer 来处理内建的 mapper 和 reducer 无法完毕的复杂的分析工作. Hive 没有专门的数据格式. hive的訪问方式: hive的运行原理: 二.hive的经常使用命令 连接

hadoop生态系统学习之路(八)hbase与hive的数据同步以及hive与impala的数据同步

在之前的博文中提到,hive的表数据是可以同步到impala中去的.一般impala是提供实时查询操作的,像比较耗时的入库操作我们可以使用hive,然后再将数据同步到impala中.另外,我们也可以在hive中创建一张表同时映射hbase中的表,实现数据同步. 下面,笔者依次进行介绍. 一.impala与hive的数据同步 首先,我们在hive命令行执行show databases;可以看到有以下几个数据库: 然后,我们在impala同样执行show databases;可以看到: 目前的数据库

Hadoop生态系统学习路线

主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等. 从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘.开源界及厂商,所有数据软件,无

hadoop之学习之路

关于Hadoop系列文章 http://mageedu.blog.51cto.com/4265610/1112878 Hadoop系列之四:MapReduce进阶 http://mageedu.blog.51cto.com/4265610/1106263

【python3的学习之路九】函数式编程

变量作用域 变量的作用域决定了在哪一部分程序可以访问哪个特定的变量名称.Python的作用域一共有4种,分别是: L (Local) 局部作用域 E (Enclosing) 闭包函数外的函数中 G (Global) 全局作用域 B (Built-in) 内建作用域 以 L –> E –> G –>B 的规则查找,即:在局部找不到,便会去局部外的局部找(例如闭包),再找不到就会去全局找,再者去内建中找. x = int(2.9) # 内建作用域 g_count = 0 # 全局作用域 de

cortex_m3_stm32嵌入式学习笔记(九):PWM 输出实验

PWM 简介 脉冲宽度调制(PWM),是英文"Pulse Width Modulation" 的缩写,简称脉宽调制,是利用微处理器的数字输出来对模拟电路进行控制的一种非常有效的技术.简单一点,就是对脉冲宽度的控制. STM32 的定时器除了 TIM6 和 7.其他的定时器都可以用来产生 PWM 输出.其中高级定时器 TIM1 和 TIM8 可以同时产生多达 7 路的 PWM 输出.而通用定时器也能同时产生多达 4路的 PWM 输出,这样, STM32 最多可以同时产生 30 路 PWM

Python学习之路第一天&mdash;&mdash;代码的输出与执行

1.打印输出Hello World: Python2打印方法: >>> print "hello world"hello world Python3打印方法: >>> print("hello world") hello world 注:Python3与Pytho2的区别是加了小括号. 2.以文件形式执行代码: [[email protected] s1]# vim hello.py打开一个文件hello.py文件内写入以下内容:

【python3的学习之路一】输入和输出

标识符 第一个字符必须是字母或下划线 标识符的其他的部分由字母.数字和下划线组成 标识符对大小写敏感 Python保留字 即关键字,不能把它们用作任何标识符名称 >>> import keyword >>> keyword.kwlist ['False', 'None', 'True', 'and', 'as', 'assert', 'break', 'class', 'continue', 'def', 'del', 'elif', 'else', 'except',