hadoop生态系统学习之路(六)hive的简单使用

一、hive的基本概念与原理

Hive是基于Hadoop之上的数据仓库,能够存储、查询和分析存储在 Hadoop 中的大规模数据。

Hive 定义了简单的类 SQL 查询语言,称为 HQL。它同意熟悉 SQL 的用户查询数据,同意熟悉 MapReduce 开发人员的开发自己定义的 mapper 和 reducer 来处理内建的 mapper 和 reducer 无法完毕的复杂的分析工作。

Hive 没有专门的数据格式。

hive的訪问方式:

hive的运行原理:

二、hive的经常使用命令

连接进入hive:hive

删除数据库:drop database if exists qyk_test cascade;例如以下图:

然后,我们使用create database qyk_test;创建一个qyk_test的数据库,例如以下:

接下来,我们运行create table user_info(id bigint, account string, name string, age int) row format delimited fields terminated by ‘\t’;创建一张表,例如以下:

我们能够运行describe user_info;查看表结构。例如以下:

然后。我们使用create table user_info_tmp like user_info;创建一个和user_info一样结构的暂时表,例如以下:

然后我们准备一个文件user_info.txt,以制表符分隔,例如以下

11  1200.0  qyk1    21
22  1301    qyk2    22
33  1400.0  qyk3    23
44  1500.0  qyk4    24
55  1210.0  qyk5    25
66  124 qyk6    26
77  1233    qyk7    27
88  15011   qyk8    28

接下来运行load data local inpath ‘/tmp/user_info.txt’ into table user_info;可看到例如以下:

然后运行select * from user_info;可看到:

然后,我们运行insert into table user_info_tmp select id, account, name, age from user_info;能够看到:

这里,hive将此语句的运行转为MR,最后将数据入到user_info_tmp。

然后。我们运行select count(*) from user_info_tmp;可看到:

相同的是将sql转为mr运行。

最后,运行insert overwrite table user_info select * from user_info where 1=0;清空表数据。

运行drop table user_info_tmp;便可删除表,例如以下:

好了,基本命令就说到这儿,关于外部表、分区、桶以及存储格式相关的概念大家也能够去研究下。

三、编写MR将数据直接入到hive

此MR仅仅有Mapper,没有reducer。直接在mapper输出到hive表。

pom需新增依赖:

<!-- hcatalog相关jar -->
      <dependency>
          <groupId>org.apache.hive.hcatalog</groupId>
          <artifactId>hive-hcatalog-core</artifactId>
          <version>${hive.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hive.hcatalog</groupId>
          <artifactId>hive-hcatalog-hbase-storage-handler</artifactId>
          <version>${hive.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hive.hcatalog</groupId>
          <artifactId>hive-hcatalog-server-extensions</artifactId>
          <version>${hive.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hive.hcatalog</groupId>
          <artifactId>hive-hcatalog-pig-adapter</artifactId>
          <version>${hive.version}</version>
      </dependency>
      <dependency>
          <groupId>org.apache.hive.hcatalog</groupId>
          <artifactId>hive-webhcat-java-client</artifactId>
          <version>${hive.version}</version>
      </dependency>

Mapper类:

/**
 * Project Name:mr-demo
 * File Name:HiveStoreMapper.java
 * Package Name:org.qiyongkang.mr.hivestore
 * Date:2016年4月4日下午10:02:07
 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
 *
*/

package org.qiyongkang.mr.hivestore;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;

/**
 * ClassName:HiveStoreMapper <br/>
 * Function: Mapper类. <br/>
 * Date:     2016年4月4日 下午10:02:07 <br/>
 * @author   qiyongkang
 * @version
 * @since    JDK 1.6
 * @see
 */
public class HiveStoreMapper extends Mapper<Object, Text, WritableComparable<Object>, HCatRecord> {
    private HCatSchema schema = null;

    //每一个mapper实例,运行一次
    @Override
    protected void setup(Mapper<Object, Text, WritableComparable<Object>, HCatRecord>.Context context)
            throws IOException, InterruptedException {
        schema = HCatOutputFormat.getTableSchema(context.getConfiguration());
    }

    @Override
    protected void map(Object key, Text value, Mapper<Object, Text, WritableComparable<Object>, HCatRecord>.Context context)
            throws IOException, InterruptedException {
        //每行以制表符分隔   id, account, name, age
        String[] strs = value.toString().split("\t");

        HCatRecord record = new DefaultHCatRecord(4);
        //id,通过列下表
        record.set(0, Long.valueOf(strs[0]));

        //account
        record.set(1, strs[1]);

        //name
        record.set(2, strs[2]);

        //age,通过字段名称
        record.set("age", schema, Integer.valueOf(strs[3]));

        //写入到hive
        context.write(null, record);
    }

    public static void main(String[] args) {
        String value = "1   1200    qyk 24";
        String[] strs = value.toString().split("\t");
        for (int i = 0; i < strs.length; i++) {
            System.out.println(strs[i]);
        }
    }
}

主类:

/**
 * Project Name:mr-demo
 * File Name:LoadDataToHiveMR.java
 * Package Name:org.qiyongkang.mr.hivestore
 * Date:2016年4月4日下午9:55:42
 * Copyright (c) 2016, CANNIKIN(http://http://code.taobao.org/p/cannikin/src/) All Rights Reserved.
 *
*/

package org.qiyongkang.mr.hivestore;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;

/**
 * ClassName:LoadDataToHiveMR <br/>
 * Function: MR将数据直接入到hive. <br/>
 * Date: 2016年4月4日 下午9:55:42 <br/>
 *
 * @author qiyongkang
 * @version
 * @since JDK 1.6
 * @see
 */
public class LoadDataToHiveMR {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        try {
            Job job = Job.getInstance(conf, "hive store");
            job.setJarByClass(LoadDataToHiveMR.class);

            // 设置Mapper
            job.setMapperClass(HiveStoreMapper.class);

            // 由于没有reducer。这里设置为0
            job.setNumReduceTasks(0);

            // 设置输入文件路径
            FileInputFormat.addInputPath(job, new Path("/qiyongkang/input"));

            // 指定Mapper的输出
            job.setMapOutputKeyClass(WritableComparable.class); // map
            job.setMapOutputValueClass(DefaultHCatRecord.class);// map

            //设置要入到hive的数据库和表
            HCatOutputFormat.setOutput(job, OutputJobInfo.create("qyk_test", "user_info", null));
            //这里注意是使用job.getConfiguration(),不能直接使用conf
            HCatSchema hCatSchema = HCatOutputFormat.getTableSchema(job.getConfiguration());
            HCatOutputFormat.setSchema(job, hCatSchema);

            //设置输出格式类
            job.setOutputFormatClass(HCatOutputFormat.class);

            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

然后。我们使用maven打个包,上传到服务器。

然后。我们准备一个user_info.txt,上传至hdfs中的/qiyongkang/input下:

11  1200.0  qyk1    21
22  1301    qyk2    22
33  1400.0  qyk3    23
44  1500.0  qyk4    24
55  1210.0  qyk5    25
66  124 qyk6    26
77  1233    qyk7    27
88  15011   qyk8    28

注意以制表符\t分隔。

然后运行yarn jar mr-demo-0.0.1-SNAPSHOT-jar-with-dependencies.jar。在jobhistory能够看到:

事实上,hive的元数据是放在hdfs上。运行hadoop fs -ls /user/hive/warehouse能够看到:

然后,我们在hive命令行运行 select * from user_info;能够看到:

说明数据从hdfs写入到hive成功。

四、使用java jdbc连接Thrift Server查询元数据

接下来,我们使用java编写一个客户端,来查询刚才入到hive里面的数据,代码例如以下:

package org.hive.demo;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.log4j.Logger;

public class HiveStoreClient {
    private static String driverName = "org.apache.hive.jdbc.HiveDriver";
    private static String url = "jdbc:hive2://172.31.25.8:10000/qyk_test";
    private static String user = "hive";
    private static String password = "";
    private static final Logger log = Logger.getLogger(HiveStoreClient.class);

    @SuppressWarnings("rawtypes")
    public static void main(String[] args) {
        Connection conn = null;
        Statement stmt = null;
        ResultSet res = null;
        try {
            //载入驱动
            Class.forName(driverName);
            //获取连接
            conn = DriverManager.getConnection(url, user, password);
            stmt = conn.createStatement();

            // select * query
            String sql = "select * from user_info";
            System.out.println("Running: " + sql);

            //运行查询
            res = stmt.executeQuery(sql);

            //处理结果集
            List list = convertList(res);
            System.out.println("总记录:" + list);

            //获取总个数
            sql = "select count(1) from user_info";
            System.out.println("Running: " + sql);
            res = stmt.executeQuery(sql);
            while (res.next()) {
              System.out.println("总个数:" + res.getString(1));
            }

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            log.error(driverName + " not found!", e);
            System.exit(1);
        } catch (SQLException e) {
            e.printStackTrace();
            log.error("Connection error!", e);
            System.exit(1);
        } finally {
            try {
                res.close();
                stmt.close();
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

    }

    /**
     *
     * convertList:将结果集转换成map. <br/>
     *
     * @author qiyongkang
     * @param rs
     * @return
     * @throws SQLException
     * @since JDK 1.6
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    public static List convertList(ResultSet rs) throws SQLException {
        List list = new ArrayList();
        ResultSetMetaData md = rs.getMetaData();
        int columnCount = md.getColumnCount(); //Map rowData; 

        while (rs.next()) { //rowData = new HashMap(columnCount);
            Map rowData = new HashMap();
            for (int i = 1; i <= columnCount; i++) {
                rowData.put(md.getColumnName(i), rs.getObject(i));
            }
            list.add(rowData);
        }
        return list;
    }
}

运行后,能够看到控制台输出例如以下:

開始的异常能够忽略。

能够看到数据,说明是成功的。

好了,hive就说到这儿了。

事实上,hive还能够同步hbase的数据,还能够将hive的表数据同步到impala。由于它们都是使用相同的元数据,这个在后面的博文中再进行介绍。

时间: 2024-10-13 11:22:15

hadoop生态系统学习之路(六)hive的简单使用的相关文章

hadoop生态系统学习之路(九)MR将结果输出到数据库(DB)

最开始讲MapReduce的时候,我们是指定输出目录,然后把结果直接输出到hdfs上.然后,在介绍hive的简单使用时,我们直接将结果输出到了hive表中.另外,MR还可以将结果输出到数据库以及hbase. 今天,笔者就给大家介绍MR将结果输出到db. 首先,笔者要提及一下之前MR将结果输出到hive表,这里需要注意,只能向某张表中入一次数据,再次执行MR报错: org.apache.hive.hcatalog.common.HCatException : 2003 : Non-partitio

hadoop生态系统学习之路(八)hbase与hive的数据同步以及hive与impala的数据同步

在之前的博文中提到,hive的表数据是可以同步到impala中去的.一般impala是提供实时查询操作的,像比较耗时的入库操作我们可以使用hive,然后再将数据同步到impala中.另外,我们也可以在hive中创建一张表同时映射hbase中的表,实现数据同步. 下面,笔者依次进行介绍. 一.impala与hive的数据同步 首先,我们在hive命令行执行show databases;可以看到有以下几个数据库: 然后,我们在impala同样执行show databases;可以看到: 目前的数据库

hadoop生态系统学习之路(五)hbase的简单使用

最近,参与了公司的一个大数据接口平台的开发,具体的处理过程是这样的.我们公司负责数据的入库,也就是一个etl过程,使用MR将数据入到hive里面,然后同步到impala,然后此接口平台提供查询接口,前台会将sql语句以参数传过来,然后接口平台通过调用impala提供的java api接口,将数据查询出来返回给用户.另外,如果查询的数据量很大,那么前台就会传一个taskId过来,第一次只需将数据查询出来,入到impala临时表,下次再查便将数据返回.那么,如何记录此任务的状态变化呢,这里我们就使用

Hadoop生态系统学习路线

主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等. 从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘.开源界及厂商,所有数据软件,无

hadoop之学习之路

关于Hadoop系列文章 http://mageedu.blog.51cto.com/4265610/1112878 Hadoop系列之四:MapReduce进阶 http://mageedu.blog.51cto.com/4265610/1106263

react.js学习之路六

学习react中,我一直认为,总组件里面才有构造函数,但是我才发现我的观点是错误的,构造函数是可以出现在子组件里面的. 今天有一个错误是点击增加/减少input框里面 的数值 我一直在寻找input框里面的数值,也就是value值,我发现我的思维错误的很彻底.react只是view,是不能操作数据的,只能渲染原始数据的变化,换句话说就是我要改变数值,只能改变原始数据里面的值,然后重新渲染到input框里面. 我的原始数据,给了 value:1; 当点击时候,构造函数 add(e){ var nu

Java学习之路(六)

1:包及和访问权限 将类放置到一个包当中,需要使用package “包名” 编译时需要使用 -d 参数  该参数的作用是依照包名生成相应的文件夹 一个类的全民应该是  “包名” + “.” + “类名” 包名的命名规范: 要求包名所有的字母都要小写 包名一般情况下,是你的域名倒过来写 一个包中的类访问另一个包中的类的条件 1:先自己打个包 1:访问类导入包名 2:被访问类必须是公共的 在以上条件下 :如果想要一个类想访问另一个类的属性和方法时,他们必须是公共(public)的 public   

Java学习之路(六):集合

集合的由来 数组的长度是固定的,当添加的元素超过了数组的长度,就需要对数组重新定义 java内部给我们提供的集合类,能存储任意对象,长度是可以改变的.随着元素的增加而增加,随着元素的减少而减少 数组和集合的区别 数组既可以存储基本数据类型,又可以存储引用数据类型,基本数据类型存储的是值,引用数据类型存储的是地址值 集合只能存储引用数据类型(对象Object),集合中也可以存储基本数据类型,但是在存储的时候会自动装箱变成对象  eg:int==>Integer 数组长度是固定的,不能自动增长 集合

我的java学习之路六:java的常用类

一. Number & Math 类方法 下面的表中列出的是 Number & Math 类常用的一些方法: 序号 方法与描述 1 xxxValue()将 Number 对象转换为xxx数据类型的值并返回. 2 compareTo()将number对象与参数比较. 3 equals()判断number对象是否与参数相等. 4 valueOf()返回一个 Number 对象指定的内置数据类型 5 toString()以字符串形式返回值. 6 parseInt()将字符串解析为int类型. 7