Hive Streaming 追加 ORC 文件

1.概述

  在存储业务数据的时候,随着业务的增长,Hive 表存储在 HDFS 的上的数据会随时间的增加而增加,而以 Text 文本格式存储在 HDFS 上,所消耗的容量资源巨大。那么,我们需要有一种方式来减少容量的成本。而在 Hive 中,有一种 ORC 文件格式可以极大的减少存储的容量成本。今天,笔者就为大家分享如何实现流式数据追加到 Hive ORC 表中。

2.内容

2.1 ORC

  这里,我们首先需要知道 Hive 的 ORC 是什么。在此之前,Hive 中存在一种 RC 文件,而 ORC 的出现,对 RC 这种文件做了许多优化,这种文件格式可以提供一种高效的方式来存储 Hive 数据,使用 ORC 文件可以提供 Hive 的读写以及性能。其优点如下:

  • 减少 NameNode 的负载
  • 支持复杂数据类型(如 list,map,struct 等等)
  • 文件中包含索引
  • 块压缩
  • ...

  结构图(来源于 Apache ORC 官网)如下所示:

  这里笔者就不一一列举了,更多详情,可以阅读官网介绍:[入口地址]

2.2 使用

  知道了 ORC 文件的结构,以及相关作用,我们如何去使用 ORC 表,下面我们以创建一个处理 Stream 记录的表为例,其创建示例 SQL 如下所示:

create table alerts ( id int , msg string )
     partitioned by (continent string, country string)
     clustered by (id) into 5 buckets
     stored as orc tblproperties("transactional"="true"); // currently ORC is required for streaming

  需要注意的是,在使用 Streaming 的时候,创建 ORC 表,需要使用分区分桶。

  下面,我们尝试插入一下数据,来模拟 Streaming 的流程,代码如下所示:

String dbName = "testing";
String tblName = "alerts";
ArrayList<String> partitionVals = new ArrayList<String>(2);
partitionVals.add("Asia");
partitionVals.add("India");
String serdeClass = "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe";

HiveEndPoint hiveEP = new HiveEndPoint("thrift://x.y.com:9083", dbName, tblName, partitionVals);

  如果,有多个分区,我们这里可以将分区存放在分区集合中,进行加载。这里,需要开启 metastore 服务来确保 Hive 的 Thrift 服务可用。

//-------   Thread 1  -------//
StreamingConnection connection = hiveEP.newConnection(true);
DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames,",", endPt);
TransactionBatch txnBatch = connection.fetchTransactionBatch(10, writer);
///// Batch 1 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("1,Hello streaming".getBytes());
txnBatch.write("2,Welcome to streaming".getBytes());
txnBatch.commit();
if(txnBatch.remainingTransactions() > 0) {
///// Batch 1 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("3,Roshan Naik".getBytes());
txnBatch.write("4,Alan Gates".getBytes());
txnBatch.write("5,Owen O’Malley".getBytes());
txnBatch.commit();
txnBatch.close();
connection.close();
}
txnBatch = connection.fetchTransactionBatch(10, writer);
///// Batch 2 - First TXN
txnBatch.beginNextTransaction();
txnBatch.write("6,David Schorow".getBytes());
txnBatch.write("7,Sushant Sowmyan".getBytes());
txnBatch.commit();
if(txnBatch.remainingTransactions() > 0) {
///// Batch 2 - Second TXN
txnBatch.beginNextTransaction();
txnBatch.write("8,Ashutosh Chauhan".getBytes());
txnBatch.write("9,Thejas Nair" getBytes());
txnBatch.commit();
txnBatch.close();
}
connection.close();

  接下来,我们对 Streaming 数据进行写入到 ORC 表进行存储。实现结果如下图所示:

3.案例

  下面,我们来完成一个完整的案例,有这样一个场景,每天有许多业务数据上报到指定服务器,然后有中转服务将各个业务数据按业务拆分后转发到各自的日志节点,再由 ETL 服务将数据入库到 Hive 表。这里,我们只说说入库 Hive 表的流程,拿到数据,处理后,入库到 Hive 的 ORC 表中。具体实现代码如下所示:

/**
 * @Date Nov 24, 2016
 *
 * @Author smartloli
 *
 * @Email [email protected]
 *
 * @Note TODO
 */
public class IPLoginStreaming extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(IPLoginStreaming.class);
    private String path = "";

    public static void main(String[] args) throws Exception {
        String[] paths = SystemConfigUtils.getPropertyArray("hive.orc.path", ",");
        for (String str : paths) {
            IPLoginStreaming ipLogin = new IPLoginStreaming();
            ipLogin.path = str;
            ipLogin.start();
        }
    }

    @Override
    public void run() {
        List<String> list = FileUtils.read(this.path);
        long start = System.currentTimeMillis();
        try {
            write(list);
        } catch (Exception e) {
            LOG.error("Write PATH[" + this.path + "] ORC has error,msg is " + e.getMessage());
        }
        System.out.println("Path[" + this.path + "] spent [" + (System.currentTimeMillis() - start) / 1000.0 + "s]");
    }

    public static void write(List<String> list)
            throws ConnectionError, InvalidPartition, InvalidTable, PartitionCreationFailed, ImpersonationFailed, InterruptedException, ClassNotFoundException, SerializationError, InvalidColumn, StreamingException {
        String dbName = "default";
        String tblName = "ip_login_orc";
        ArrayList<String> partitionVals = new ArrayList<String>(1);
        partitionVals.add(CalendarUtils.getDay());
        String[] fieldNames = new String[] { "_bpid", "_gid", "_plat", "_tm", "_uid", "ip", "latitude", "longitude", "reg", "tname" };

        StreamingConnection connection = null;
        TransactionBatch txnBatch = null;

        try {

            HiveEndPoint hiveEP = new HiveEndPoint("thrift://master:9083", dbName, tblName, partitionVals);
            HiveConf hiveConf = new HiveConf();
            hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES, true);
            hiveConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
            connection = hiveEP.newConnection(true, hiveConf);
            DelimitedInputWriter writer = new DelimitedInputWriter(fieldNames, ",", hiveEP);
            txnBatch = connection.fetchTransactionBatch(10, writer);

            // Batch 1
            txnBatch.beginNextTransaction();
            for (String json : list) {
                String ret = "";
                JSONObject object = JSON.parseObject(json);
                for (int i = 0; i < fieldNames.length; i++) {
                    if (i == (fieldNames.length - 1)) {
                        ret += object.getString(fieldNames[i]);
                    } else {
                        ret += object.getString(fieldNames[i]) + ",";
                    }
                }
                txnBatch.write(ret.getBytes());
            }
            txnBatch.commit();

        } finally {
            if (txnBatch != null) {
                txnBatch.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
}

  PS:建议使用多线程来处理数据。

4.预览

  实现结果如下所示:

  • 分区详情

  • 该分区下记录数

5.总结

  在使用 Hive Streaming 来实现 ORC 追加的时候,除了表本身需要分区分桶以外,工程本身的依赖也是复杂,会设计 Hadoop Hive 等项目的依赖包,推荐使用 Maven 工程来实现,由 Maven 工程去帮我们解决各个 JAR 包之间的依赖问题。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

时间: 2024-08-11 07:39:20

Hive Streaming 追加 ORC 文件的相关文章

Hive Hadoop 解析 orc 文件

解析 orc 格式 为 json 格式: ./hive --orcfiledump -d <hdfs-location-of-orc-file> 把解析的 json 写入 到文件 ./hive --orcfiledump -d <hdfs-location-of-orc-file> > myfile.txt

hive streaming 使用shell脚本

一.HIVE streaming 在Hive中,需要实现Hive中的函数无法实现的功能时,就可以用Streaming来实现.其原理可以理解成:用HQL语句之外的语言,如Python.Shell来实现这些功能,同时配合HQL语句,以实现特殊的功能. 二. 实例 1. 日志文件的格式 2014-02-02 01:59:02 W3SVC1 2001:da8:7007:102::244 GET /favicon.ico - 80 - 2001:da8:7007:336:ca:f74b:eede:a024

hive streaming 使用的时候的一些心得

hive streaming 报错的解决方案: 1.把使用到hive streaming 的sql 分解,例如:select transform a,b,c,d using 'python cc.py' as (e,f) from table,分解成:select a,b,c,d from table ,然后执行: hive -e "select a,b,c,d from table" | python cc.py,这样如果是语法有问题的话就会检查出来. 2.查看是否是编码问题:如果你

C#追加日志文件

追加日志文件 using System; using System.IO; class DirAppend { public static void Main() { using (StreamWriter w = File.AppendText("log.txt")) { Log("Test1", w); Log("Test2", w); } using (StreamReader r = File.OpenText("log.txt

C#文本写入文件,追加写入文件

写入文件和这个对象 StreamWriter 1 using (StreamWriter fs = new StreamWriter(path, true)) 2 { 3 fs.WriteLine(strLog); 4 } 这个看到那个蓝色的true了没,个就是追加的标记,如果不写的话,那么你所有写一次,之前的都会被覆盖掉.

Hive merge(小文件合并)

当Hive的输入由很多个小文件组成时,如果不涉及文件合并的话,那么每个小文件都会启动一个map task. 如果文件过小,以至于map任务启动和初始化的时间大于逻辑处理的时间,会造成资源浪费,甚至发生OutOfMemoryError错误. 因此,当我们启动一个任务时,如果发现输入数据量小但任务数量多时,需要注意在Map前端进行输入小文件合并操作. 同理,向一个表写数据时,注意观察reduce数量,注意输出文件大小. 1. Map输入小文件合并 #每个Map处理的最大输入文件大小(256MB) s

Hive优化之小文件问题及其解决方案

小文件是如何产生的 1.动态分区插入数据,产生大量的小文件,从而导致map数量剧增. 2.reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的). 3.数据源本身就包含大量的小文件. 小文件问题的影响 1.从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能. 2.在HDFS中,每个小文件对象约占150byte,如果小文件过多会占用大量内存.这样NameNode内存容量严重制约了集群的扩展. 小

追加模式 文件读写

1.方法一 try{ RandomAccessFile randomFile = new RandomAccessFile(path,"rw"); long fileLength = randomFile.length(): randomFile.seek(fileLength); String str; //为了解决乱码问题做如下变换 byte temp[] = str.getBytes(); randomFlie.write(temp); randomFile.close; }ca

php 字符串写入文件或追加入文件(file_put_contents)

file_put_contents() 函数用于把字符串写入文件,成功返回写入到文件内数据的字节数,失败则返回 FALSE. 使用说明: file_put_contents(file,data,mode,context) 参数说明: file要写入数据的文件名  data 要写入的数据.类型可以是 string,array(但不能为多维数组),或者是 stream 资源 mode可选,规定如何打开/写入文件.可能的值:  1.FILE_USE_INCLUDE_PATH:检查 filename 副