如何在MaxCompute上处理存储在OSS上的开源格式数据

前言

MaxCompute作为使用最广泛的大数据平台,内部存储的数据以EB量级计算。巨大的数据存储量以及大规模计算下高性能数据读写的需求,对于MaxCompute提出了各种高要求及挑战。处在大数据时代,数据的来源多种多样,开源社区经过十几年的发展,百花齐放,各种各样的数据格式不断的出现。 我们的用户也在各个场景上,通过各种计算框架,积累了各种不同格式的数据。怎样将MaxCompute强大的计算能力开放给这些使用开源格式存储沉淀下来的数据,在MaxCompute上挖掘这些数据中的信息,是MaxCompute团队希望解决的问题。

MaxCompute 2.0最近推出的非结构化计算框架【公测阶段】,旨在从存储介质和存储格式两个维度,打通计算与存储的通道。 在之前的文章中,我们已经介绍过怎样在MaxCompute上对存储在OSS上的文本,音频,图像等格式的数据,以及TableStore(OTS)的KV数据进行计算处理。在这里,则将介绍对于各种流行的开源数据格式(ORC, PARQUET, SEQUENCEFILE, RCFILE, AVRO, TEXTFILE等等),怎样将其存储在OSS上面,并通过非结构化框架在MaxCompute进行处理。

本着不重造轮子的原则,对于绝大部分这些开源数据格式的解析工作,在非结构化框架中会直接调用开源社区的实现,并且无缝的与MaxCompute系统做对接。

1. 创建EXTERNAL TABLE来绑定OSS外部数据

MaxCompute非结构化数据框架通过EXTERNAL TABLE的概念来提供MaxCompute与各种数据的联通,与读取OSS数据的使用方法类似,对OSS数据进行写操作,首先要通过CREATE EXTERNAL TABLE语句创建出一个外部表,而在读取开源数据格式时,创建外表的DDL语句格式如下:

DROP TABLE [IF EXISTS] <external_table>;

CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table>
(<column schemas>)
[PARTITIONED BY (partition column schemas)]
[ROW FORMAT SERDE '<serde class>']
STORED AS <file format>
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@${endpoint}/${bucket}/${userPath}/'

可以看到,这个语法与HIVE的语法是相当接近的,而在这个CREATE EXTERNAL TABLE的ddl语句中,有如下几点要说明:

  1. 首先要特别说明的是这里使用的是STORED AS的关键字,而不是普通非结构化外表用的STORED BY关键字,这也是目前在读取开源兼容数据时独有的。
  2. 外部表的<column schemas> 必须与具体OSS上存储存储数据的schema相符合。
  3. ROW FORMAT SERDE 并非必选选项,只有在使用一些特殊的格式上,比如TEXTFILE时才需要使用。
  4. STORED AS后面接的是文件格式名字, 比如 ORC/PARQUET/RCFILE/SEQUENCEFILE/TEXTFILE 等等。
  5. 最后还要提到的是,在上面这个例子中,我们在LOCATION上使用了OSS明文AK,这只适用于在用户对于AK的保密性不敏感情况下使用。 对于数据安全比较敏感的场景,比如在多用户场景或者弹外集群上,则推荐使用通过STS/RAM体系事先进行鉴权,从而避免使用明文AK。

1.1 范例1: 关联OSS上存储的PARQUET数据

现在再来看一个具体的例子,假设我们有一些PARQUET文件存放在一个OSS路径上,每个文件都是PARQUET格式,存放着schema为16列(4列BINGINT, 4列DOUBLE, 8列STRING)的数据,那么可以通过如下DDL语句来描述:

CREATE EXTERNAL TABLE tpch_lineitem_parquet
(
  l_orderkey bigint,
  l_partkey bigint,
  l_suppkey bigint,
  l_linenumber bigint,
  l_quantity double,
  l_extendedprice double,
  l_discount double,
  l_tax double,
  l_returnflag string,
  l_linestatus string,
  l_shipdate string,
  l_commitdate string,
  l_receiptdate string,
  l_shipinstruct string,
  l_shipmode string,
  l_comment string
)
STORED AS PARQUET
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/parquet_data/';

1.2 范例2:分区表关联OSS上存储的TEXTFILE数据

同样的数据,如果是每行以JSON格式,存储成OSS上TEXTFILE文件;同时,数据在OSS通过多个目录组织,这时是可以使用MaxCompute分区表和数据关联,则可以通过如下DDL语句来描述:

CREATE EXTERNAL TABLE tpch_lineitem_textfile
(
  l_orderkey bigint,
  l_partkey bigint,
  l_suppkey bigint,
  l_linenumber bigint,
  l_quantity double,
  l_extendedprice double,
  l_discount double,
  l_tax double,
  l_returnflag string,
  l_linestatus string,
  l_shipdate string,
  l_commitdate string,
  l_receiptdate string,
  l_shipinstruct string,
  l_shipmode string,
  l_comment string
)
PARTITIONED BY (ds string)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/';

如果OSS表目录下面的子目录是以Partition Name方式组织,比如:

oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170102/'
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170103/'
...

则可以使用以下DDL语句ADD PARTITION:

ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102");
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103");

如果OSS分区目录不是按这种方式组织,或者根本不在表目录下,比如:

oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/;
oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/;
...

则可以使用以下DDL语句ADD PARTITION:

ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102")
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/';
ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103")
LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/';
...

2. 读取以及处理 OSS 上面的开源格式数据

对比上面的两个范例,可以看出对于不同文件类型,只要简单修改STORED AS后的格式名。在接下来的例子中,我们将只集中描述对上面PARQUET数据对应的外表(tpch_lineitem_parquet)的处理,如果要处理不同的文件类型,只要在DDL创建外表时指定是PARQUET/ORC/TEXTFILE/RCFILE/TEXTFILE即可,处理数据的语句则是一样的。

2.1 直接读取以及处理OSS上面的开源数据

在创建数据外表后,直接对外表就可以进行与普通MaxCompute表的操作,直接对存储在OSS上的数据进行处理,比如:

SELECT l_returnflag,
    l_linestatus,
    SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price,
    AVG(l_quantity) AS avg_qty,
    COUNT(*) AS count_order
FROM tpch_lineitem_parquet
WHERE l_shipdate <= '1998-09-02'
GROUP BY
    l_returnflag,
    l_linestatus;

可以看到,在这里tpch_lineitem_parquet这个外表被当作一个普通的内部表一样使用。唯一不同的只是在MaxCompute内部计算引擎将从OSS上去读取对应的PARQUET数据来进行处理。

但是我们应该强调的是,在这里直接使用外表,每次读取的时候都需要涉及外部OSS的IO操作,并且MaxCompute系统本身针对内部存储做的许多高性能优化都用不上了,所以性能上会有所损失。 所以如果是需要对数据进行反复计算以及对计算的高效性比较敏感的场景上,我们推荐下面这种用法:先将数据导入MaxCompute内部,再进行计算。

注意,上面例子中的tpch_lineitem_textfile表,因为使用了ROW FORMAT + STORED AS,需要手动设置flag(只使用STORED AS,odps.sql.hive.compatible默认为TRUE),再进行读取,否则会有报错。

SELECT * FROM tpch_lineitem_textfile LIMIT 1;
FAILED: ODPS-0123131:User defined function exception - Traceback:
com.aliyun.odps.udf.UDFException: java.lang.ClassNotFoundException: com.aliyun.odps.hive.wrapper.HiveStorageHandlerWrapper

--需要手动设置hive兼容flag
set odps.sql.hive.compatible=true;
SELECT * FROM tpch_lineitem_textfile LIMIT 1;
+------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
| l_orderkey | l_partkey  | l_suppkey  | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax      | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment |
+------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
| 5640000001 | 174458698  | 9458733    | 1            | 14.0       | 23071.58        | 0.08       | 0.06       | N            | O            | 1998-01-26 | 1997-11-16   | 1998-02-18    | TAKE BACK RETURN | SHIP       | cuses nag silently. quick |
+------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+

2.2 将OSS上的开源数据导入MaxCompute,再进行计算

  • 首先创建一个与外部表schema一样的内部表tpch_lineitem_internal,然后将OSS上的开源数据导入MaxCompute内部表,以cFile格式存储在MaxCompute内部:
CREATE TABLE tpch_lineitem_internal LIKE tpch_lineitem_parquet;

INSERT OVERWRITE TABLE tpch_lineitem_internal
SELECT * FROM tpch_lineitem_parquet;
  • 直接就可以对内部表进行同样的操作:
SELECT l_returnflag,
    l_linestatus,
    SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price,
    AVG(l_quantity) AS avg_qty,
    COUNT(*) AS count_order
FROM tpch_lineitem_internal
WHERE l_shipdate <= '1998-09-02'
GROUP BY
    l_returnflag,
    l_linestatus;

通过这样子将数据先导入系统的情况下,对同样数据的计算就会更高效得多。

4. 结语

开源的种种数据格式往往由各种数据处理生态产生,而MaxCompute非结构化数据处理框架通过实现计算与存储的互联,希望打通阿里云核心计算平台与各种数据的通路。在这个基础上,各种各样依赖于不同数据格式的应用,将能在MaxCompute计算平台上实现,后继我们会对一些具体的这种应用,比如基因计算等,再做一些具体的case study以及介绍。我们也欢迎有对开源数据进行处理分析的更多应用,能在MaxCompute强大计算能力的基础上开花结果。

原文链接

原文地址:http://blog.51cto.com/13679539/2122514

时间: 2024-10-08 05:12:24

如何在MaxCompute上处理存储在OSS上的开源格式数据的相关文章

混合云存储网关云上部署版本介绍

摘要: 随着企业信息系统的需求扩展和信息技术的发展进步,很多企业用户的信息系统已经完成了从物理环境到虚拟化环境的转变.云上部署版本的混合云存储网关在目前已实现的虚拟化环境中部署形态的基础上,帮助用户在云上的环境中无需改变现有业务系统中的应用,轻松的对接阿里云上的存储服务. 一.背景随着企业信息系统的需求扩展和信息技术的发展进步,很多企业用户的信息系统已经完成了从物理环境到虚拟化环境的转变.而现在,越来越多的企业用户已经开始了业务向云环境的迁移.如何在不淘汰现有架构的前提下,顺利实现业务转型成了很

OSS上传文件到阿里云

最近做项目,需要上传文件,因为上传到项目路径下,感觉有时候也挺不方便的,就试了一下上传文件到阿里云oss上去了, oss的使用网上有很多介绍,都是去配置一下需要的数据,然后直接调用他的api就可以了. 这里贴一段可以直接使用的oss代码,有需要的可以自己参考下. @Controller @RequestMapping("/ossfile") public class OSSFileController { @Autowired private EventidService eventi

使用阿里云OSS上传文件

本文介绍如何利用Java API操作阿里云OSS对象存储. 1.控制台操作 首先介绍一下阿里云OSS对象存储的一些基本概念. 1.1 进入对象存储界面 登录阿里云账号,进入对象存储界面,如图所示. 进入后如图所示. 1.2 OSS基本概念 这里不过多介绍如何在阿里云上传下载文件,这些操作基本上点一点都能找到. 1.2.1 Bucket Bucket实质就是阿里云OSS对象存储的一个存储空间,按照计算机理解的话可以理解为一个磁盘(不知道这样比喻是否恰当). 创建桶的过程很简单,如图所示,填写对应内

如何在CentOS 7 / Fedora 31/30/29上安装ELK Stack

原文地址:https://computingforgeeks.com/how-to-install-elk-stack-on-centos-fedora/ 原作者: Josphat Mutai 译者:高行行 如何在 CentOS 7 / Fedora 31/30/29 上安装 ELK Stack?" ELK "是 Elasticsearch, Logstash, and Kibana 的缩写. Elasticsearch:这是一个开源的.基于 REST 和 JSON 的搜索引擎.它具有

OC高级编程——深入block,如何捕获变量,如何存储在堆上

首先先看几道block相关的题目 这是一篇比较长的博文,前部分是block的测试题目,中间是block的语法.特性,block讲解block内部实现和block存储位置,请读者耐心阅读.具备block基础的同学,直接调转到block的实现 下面列出了五道题,看看能否答对两三个.主要涉及block栈上.还是堆上.怎么捕获变量.答案在博文最后一行 //-----------第一道题:-------------- void exampleA() { char a = 'A'; ^{ printf("%

STM32F412应用开发笔记之六:使用片上Flash存储参数

我们的项目中需要保存一些系统配置参数,这些数据的特点是:数量少而且不需要经常修改,但又不能定义为常量,因为每台设备可能不一样而且在以后还有修改的可能.这就需要考虑这些参数保存的问题.将这类数据存在指定的位置,需要修改时直接修改存储位置的数值,需要使用时则直接读取,会是一种方便的做法.考虑到这些数据量比较少,使用专门的存储单元既不经济,也没有必要,恰好有些MCU拥有比较大的FLASH,使用少量来存储这些参数则既方便有经济.STM32F4的Flash架构如下: 这次NUCLEO-F412ZG测试板上

数据库在磁盘上的存储布局HeapFile

----<大规模分布式存储系统:原理解析与架构实战>读书笔记 这篇依然是学习<大规模分布式存储系统:原理解析与架构实战>一书之外的一个话题.通过学习本书,知道了分布式键值系统,通常使用SSTable(一个无序的键值对集合容器)作为其磁盘上的布局.这不禁让人产生联想,传统数据库使用的是什么存储布局来存储数据呢?这就是今天要探讨的主题----HeapFile. HeapFile是什么? HeapFile是一种保存Page数据的数据结构,类似于链表,HeapFile也是一种无序容器. H

阿里云OSS上传文件模块

1 package com.hughes.bcsc.app.core.util.oss; 2 3 import java.io.ByteArrayInputStream; 4 import java.io.ByteArrayOutputStream; 5 import java.io.File; 6 import java.io.FileInputStream; 7 import java.io.FileNotFoundException; 8 import java.io.IOExceptio

magento -- 如何在magento中进行产品的批量上传

花费了好多时间,阅读了magento官方论坛上几乎所有的批量上传产品的相关帖子,分析了大量相关magento代码,终于可以完全实现指产品批量上传的功能,免除网速慢,在页面之间跳来跳去,以及重复输入数据的烦恼,你只需要在excel中编辑数据就可以轻松实现产品批量上传到magento站点. 碰到的常见问题(统统搞定,哈哈): 多图上传 上传后magento前台无法查看 上传后前台看不了图片 上传后前台只能看到部分图片 上传后前台的图片有重复 不能上传custom option(可以支持基于optio