导入HDFS的数据到Hive

1. 通过Hive view

CREATE EXTERNAL TABLE if not exists finance.json_serde_optd_table (
  retCode string,
  retMsg string,
  data array<struct< secid:string,="" tradedate:date,="" optid:string,="" ticker:string,="" secshortname:string,="" exchangecd:string,="" presettleprice:double,="" precloseprice:double,="" openprice:double,="" highestprice:double,="" lowestprice:double,="" closeprice:double,="" settlprice:double,="" turnovervol:double,="" turnovervalue:double,="" openint:int="">>)
ROW FORMAT SERDE ‘org.apache.hive.hcatalog.data.JsonSerDe‘
LOCATION ‘hdfs://wdp.xxxxx.cn:8020/nifi/finance1/optd/‘;
create table if not exists finance.tb_optd
as
SELECT b.data.secID,
		b.data.tradeDate,
		b.data.optID,
		b.data.ticker,
		b.data.secShortName,
		b.data.exchangeCD,
		b.data.preSettlePrice,
		b.data.preClosePrice,
		b.data.openPrice,
		b.data.highestPrice,
		b.data.lowestPrice,
		b.data.closePrice,
		b.data.settlPrice,
		b.data.turnoverVol,
		b.data.turnoverValue,
		b.data.openInt
FROM finance.json_serde_optd_table LATERAL VIEW explode(json_serde_optd_table.data) b AS data;
?

2. 通过Zeppelin

?

%dep
z.load("/usr/hdp/2.4.2.0-258/hive-hcatalog/share/hcatalog/hive-hcatalog-core.jar");

?

// 定义导入的hive对象集合

case class HiveConfig(database: String, modelName: String, hdfsPath: String, schema: String, schema_tb: String);
var hiveConfigList = List[HiveConfig]();
?
// 创建equd数据结构
// 定义json结构
val schema_json_equd_serde ="""  retCode string,
                              retMsg string,
                              data array<struct< secid="" :="" string,="" tradedate="" date,="" ticker="" secshortname="" exchangecd="" precloseprice="" double,="" actprecloseprice:="" openprice="" highestprice="" lowestprice="" closeprice="" turnovervol="" turnovervalue="" dealamount="" int,="" turnoverrate="" accumadjfactor="" negmarketvalue="" marketvalue="" pe="" pe1="" pb="" isopen="" int="">>""";
var schema_equd ="""b.data.secID,
            		b.data.ticker,
            		b.data.secShortName,
            		b.data.exchangeCD,
            		b.data.tradeDate,
            		b.data.preClosePrice,
            		b.data.actPreClosePrice,
            		b.data.openPrice,
            		b.data.highestPrice,
            		b.data.lowestPrice,
            		b.data.closePrice,
            		b.data.turnoverVol,
            		b.data.turnoverValue,
            		b.data.dealAmount,
            		b.data.turnoverRate,
            		b.data.accumAdjFactor,
            		b.data.negMarketValue,
            		b.data.marketValue,
            		b.data.PE,
            		b.data.PE1,
            		b.data.PB,
            		b.data.isOpen""";
hiveConfigList  = hiveConfigList :+ HiveConfig("finance", "equd", "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_equd_serde, schema_equd);

?

// 创建idxd数据结构
// 定义json结构
val schema_json_idxd_serde ="""  retCode string,
                              retMsg string,
                              data array<struct< indexid:string,="" tradedate:date,="" ticker:string,="" porgfullname:string,="" secshortname:string,="" exchangecd:string,="" precloseindex:double,="" openindex:double,="" lowestindex:double,="" highestindex:double,="" closeindex:double,="" turnovervol:double,="" turnovervalue:double,="" chg:double,="" chgpct:double="">>""";
var schema_idxd ="""b.data.indexID,
            		b.data.tradeDate,
            		b.data.ticker,
            		b.data.porgFullName,
            		b.data.secShortName,
            		b.data.exchangeCD,
            		b.data.preCloseIndex,
            		b.data.openIndex,
            		b.data.lowestIndex,
            		b.data.highestIndex,
            		b.data.closeIndex,
            		b.data.turnoverVol,
            		b.data.turnoverValue,
            		b.data.CHG,
            		b.data.CHGPct""";
hiveConfigList = hiveConfigList :+ HiveConfig("finance", "idxd", "hdfs://wdp.xxxxx.cn:8020/nifi/finance1/", schema_json_idxd_serde, schema_idxd);

?

// 循环加载数据中
  def loadDataToHive(args:HiveConfig){
    val loadPath = args.hdfsPath + args.modelName;
    val tb_json_serde = "json_serde_" + args.modelName +"_table";
    val tb= "tb_" + args.modelName;
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
    if(args.database != "" && args.schema != "") {
        print("正在创建项目..." + args.modelName)
        hiveContext.sql("CREATE DATABASE IF NOT EXISTS " + args.database);
        print("正在构造扩展模型...");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb_json_serde + "(" + args.schema + ") row format serde ‘org.apache.hive.hcatalog.data.JsonSerDe‘ LOCATION " + "‘" + loadPath + "/‘");
        println("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data");
        hiveContext.sql("CREATE TABLE IF NOT EXISTS " + args.database + "." + tb + " as select " + args.schema_tb + " from " + args.database + "." + tb_json_serde + " LATERAL VIEW explode(" + tb_json_serde + ".data) b AS data");
        println(args.modelName + " 扩展模型加载已完成!");
    }
  }
  hiveConfigList.size;
  hiveConfigList.foreach { x => loadDataToHive(x) };

?

?3. 第二种取法

由于data是json数据里的一个数组,所以上面的转换复杂了一点。下面这种方法是先把json里data数组取出来放到hdfs,然后直接用下面的语句放到hive:

用splitjson 来提取、分隔 data 数组

CREATE EXTERNAL TABLE if not exists finance.awen_optd (
  secid string,
  tradedate date,
  optid string,
  ticker string,
  secshortname string,
  exchangecd string,
  presettleprice double,
  precloseprice double,
  openprice double,
  highestprice double,
  lowestprice double,
  closeprice double,
  settlprice double,
  turnovervol double,
  turnovervalue double,
  openint int)
ROW FORMAT SERDE ‘org.apache.hive.hcatalog.data.JsonSerDe‘
LOCATION ‘hdfs://wdp.xxxx.cn:8020/nifi/finance2/optd/‘;

?

时间: 2024-10-27 12:56:51

导入HDFS的数据到Hive的相关文章

Hive导入HDFS/本地数据

#创建表人信息表  person(String name,int age) hive> create table person(name STRING,age INT)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ESCAPED BY '\\' STORED AS TEXTFILE; OK Time taken: 0.541 seconds#创建表票价信息表 ticket(int age,float price) hive> create tab

Hive使用HDFS目录数据创建Hive表分区

描述: Hive表pms.cross_sale_path建立以日期作为分区,将hdfs目录/user/pms/workspace/ouyangyewei/testUsertrack/job1Output/crossSale上的数据,写入该表的$yesterday分区上 表结构: hive -e " set mapred.job.queue.name=pms; drop table if exists pms.cross_sale_path; create external table pms.c

第3节 sqoop:4、sqoop的数据导入之导入数据到hdfs和导入数据到hive表

注意: (1)\001 是hive当中默认使用的分隔符,这个玩意儿是一个asc 码值,键盘上面打不出来 (2)linux中一行写不下,可以末尾加上 一些空格和 “ \ ”,换行继续写余下的命令: bin/sqoop import --connect jdbc:mysql://192.168.25.24:3306/userdb --username root --password admin --table \emp --fields-terminated-by '\001' \--hive-im

使用sqoop1.4.4从oracle导入数据到hive中错误记录及解决方案

在使用命令导数据过程中,出现如下错误 sqoop import --hive-import --connect jdbc:oracle:thin:@192.168.29.16:1521/testdb --username NAME --passord PASS --verbose -m 1 --table T_USERINFO 错误1:File does not exist: hdfs://opt/sqoop-1.4.4/lib/commons-io-1.4.jar FileNotFoundEx

教程 | 使用Sqoop从MySQL导入数据到Hive和HBase

基础环境 sqoop:sqoop-1.4.5+cdh5.3.6+78, hive:hive-0.13.1+cdh5.3.6+397, hbase:hbase-0.98.6+cdh5.3.6+115 Sqool和Hive.HBase简介 Sqoop Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的开源工具,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中. Hiv

Sqoop2入门之导入关系型数据库数据到HDFS上(sqoop2-1.99.4版本)

sqoop2-1.99.4和sqoop2-1.99.3版本操作略有不同:新版本中使用link代替了老版本的connection,其他使用类似. sqoop2-1.99.4环境搭建参见:Sqoop2环境搭建 sqoop2-1.99.3版本实现参见:Sqoop2入门之导入关系型数据库数据到HDFS上 启动sqoop2-1.99.4版本客户端: $SQOOP2_HOME/bin/sqoop.sh client set server --host hadoop000 --port 12000 --web

Sqoop1.4.4实现关系型数据库多表同时导入HDFS或Hive中

问题导读: 1.使用Sqoop哪个工具实现多表导入? 2.满足多表导入的三个条件是? 3.如何指定导入HDFS某个目录?如何指定导入Hive某个数据库? 一.介绍 有时候我们需要将关系型数据库中多个表一起导入到HDFS或者Hive中,这个时候可以使用Sqoop的另一个工具sqoop-import-all-tables.每个表数据被分别存储在以表名命名的HDFS上的不同目录中. 在使用多表导入之前,以下三个条件必须同时满足: 1.每个表必须都只有一个列作为主键: 2.必须将每个表中所有的数据导入,

使用sqoop从mysql导入数据到hive

前言 这篇文章主要是用sqoop从mysql导入数据到hive时遇到的坑的总结. 环境: 系统:Centos 6.5 Hadoop:Apache,2.7.3 Mysql:5.1.73 JDK:1.8 Sqoop:1.4.7 Hadoop以伪分布式模式运行. 一.使用的导入命令 我主要是参考一篇文章去测试的,Sqoop: Import Data From MySQL to Hive. 参照里面的方法,在mysql建了表,填充了数据,然后按照自己的情况输入了命令: sqoop import --co

sqoop命令,mysql导入到hdfs、hbase、hive

1.测试MySQL连接 bin/sqoop list-databases --connect jdbc:mysql://192.168.1.187:3306/trade_dev --username 'mysql' --password '111111' 2.检验SQL语句 bin/sqoop eval --connect jdbc:mysql://192.168.1.187:3306/trade_dev --username 'mysql' --password '111111' --quer