0. 前言
MaxCompute 作为使用最广泛的大数据平台, 内部存储的数据以 EB 量级计算. 巨大的数据存储量以及大规模计算下高性能数据读写的需求, 对于 MaxCompute 提出了各种高要求及挑战. 处在大数据时代, 数据的来源多种多样, 开源社区经过十几年的发展, 百花齐放, 各种各样的数据格式不断的出现. 我们的用户也在各个场景上, 通过各种计算框架, 积累了各种不同格式的数据. 怎样将 MaxCompute 强大的计算能力开放给这些使用开源格式存储沉淀下来的数据, 在 MaxCompute 上挖掘这些数据中的信息, 是 MaxCompute 团队希望解决的问题.
MaxCompute 2.0 最近推出的非结构化计算框架 [公测阶段] , 旨在从存储介质和存储格式两个维度, 打通计算与存储的通道. 在之前的文章中, 我们已经介绍过怎样在 MaxCompute 上对存储在 OSS 上的文本, 音频, 图像等格式的数据, 以及 TableStore(OTS) 的 KV 数据进行计算处理. 在这里, 则将介绍对于各种流行的开源数据格式(ORC, PARQUET, SEQUENCEFILE, RCFILE, AVRO, TEXTFILE 等等), 怎样将其存储在 OSS 上面, 并通过非结构化框架在 MaxCompute 进行处理.
本着不重造轮子的原则, 对于绝大部分这些开源数据格式的解析工作, 在非结构化框架中会直接调用开源社区的实现, 并且无缝的与 MaxCompute 系统做对接.
https://www.atatech.org/articles/97752#1 1. 创建 EXTERNAL TABLE 来绑定 OSS 外部数据
MaxCompute 非结构化数据框架通过 EXTERNAL TABLE 的概念来提供 MaxCompute 与各种数据的联通, 与读取 OSS 数据的使用方法类似, 对 OSS 数据进行写操作, 首先要通过 CREATE EXTERNAL TABLE 语句创建出一个外部表, 而在读取开源数据格式时, 创建外表的 DDL 语句格式如下:
- DROP TABLE [IF EXISTS] <external_table>;
- CREATE EXTERNAL TABLE [IF NOT EXISTS] <external_table>
- (<column schemas>)
- [PARTITIONED BY (partition column schemas)]
- [ROW FORMAT SERDE '<serde class>']
- STORED AS <file format>
- LOCATION 'oss://${accessKeyId}:${accessKeySecret}@${endpoint}/${bucket}/${userPath}/'
可以看到, 这个语法与 HIVE 的语法是相当接近的, 而在这个 CREATE EXTERNAL TABLE 的 ddl 语句中, 有如下几点要说明:
首先要特别说明的是这里使用的是 STORED AS 的关键字, 而不是普通非结构化外表用的 STORED BY 关键字, 这也是目前在读取开源兼容数据时独有的.
外部表的 < column schemas> 必须与具体 OSS 上存储存储数据的 schema 相符合.
ROW FORMAT SERDE 并非必选选项, 只有在使用一些特殊的格式上, 比如 TEXTFILE 时才需要使用.
STORED AS 后面接的是文件格式名字, 比如 ORC/PARQUET/RCFILE/SEQUENCEFILE/TEXTFILE 等等.
最后还要提到的是, 在上面这个例子中, 我们在 LOCATION 上使用了 OSS 明文 AK, 这只适用于在用户对于 AK 的保密性不敏感情况下使用. 对于数据安全比较敏感的场景, 比如在多用户场景或者弹外集群上, 则推荐使用通过 STS/RAM 体系事先进行鉴权, 从而避免使用明文 AK.
https://www.atatech.org/articles/97752#2 1.1 范例 1: 关联 OSS 上存储的 PARQUET 数据
现在再来看一个具体的例子, 假设我们有一些 PARQUET 文件存放在一个 OSS 路径上, 每个文件都是 PARQUET 格式, 存放着 schema 为 16 列 (4 列 BINGINT, 4 列 DOUBLE, 8 列 STRING) 的数据, 那么可以通过如下 DDL 语句来描述:
CREATE EXTERNAL TABLE tpch_lineitem_parquet
- (
- l_orderkey bigint,
- l_partkey bigint,
- l_suppkey bigint,
- l_linenumber bigint,
- l_quantity double,
- l_extendedprice double,
- l_discount double,
- l_tax double,
- l_returnflag string,
- l_linestatus string,
- l_shipdate string,
- l_commitdate string,
- l_receiptdate string,
- l_shipinstruct string,
- l_shipmode string,
- l_comment string
- )
- STORED AS PARQUET
- LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/parquet_data/';
https://www.atatech.org/articles/97752#3 1.2 范例 2: 分区表关联 OSS 上存储的 TEXTFILE 数据
同样的数据, 如果是每行以 JSON 格式, 存储成 OSS 上 TEXTFILE 文件; 同时, 数据在 OSS 通过多个目录组织, 这时是可以使用 MaxCompute 分区表和数据关联, 则可以通过如下 DDL 语句来描述:
CREATE EXTERNAL TABLE tpch_lineitem_textfile
- (
- l_orderkey bigint,
- l_partkey bigint,
- l_suppkey bigint,
- l_linenumber bigint,
- l_quantity double,
- l_extendedprice double,
- l_discount double,
- l_tax double,
- l_returnflag string,
- l_linestatus string,
- l_shipdate string,
- l_commitdate string,
- l_receiptdate string,
- l_shipinstruct string,
- l_shipmode string,
- l_comment string
- )
- PARTITIONED BY (ds string)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
- STORED AS TEXTFILE
- LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/';
如果 OSS 表目录下面的子目录是以 Partition Name 方式组织, 比如:
- oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170102/'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data/ds=20170103/'
- ...
则可以使用以下 DDL 语句 ADD PARTITION:
- ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102");
- ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103");
如果 OSS 分区目录不是按这种方式组织, 或者根本不在表目录下, 比如:
- oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/;
- oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/;
- ...
则可以使用以下 DDL 语句 ADD PARTITION:
- ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170102")
- LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170102/';
- ALTER TABLE tpch_lineitem_textfile ADD PARTITION(ds="20170103")
- LOCATION 'oss://${accessKeyId}:${accessKeySecret}@oss-cn-hangzhou-zmf.aliyuncs.com/bucket/text_data_20170103/';
- ...
https://www.atatech.org/articles/97752#4 2. 读取以及处理 OSS 上面的开源格式数据
对比上面的两个范例, 可以看出对于不同文件类型, 只要简单修改 STORED AS 后的格式名. 在接下来的例子中, 我们将只集中描述对上面 PARQUET 数据对应的外表 (tpch_lineitem_parquet) 的处理, 如果要处理不同的文件类型, 只要在 DDL 创建外表时指定是 PARQUET/ORC/TEXTFILE/RCFILE/TEXTFILE 即可, 处理数据的语句则是一样的.
https://www.atatech.org/articles/97752#5 2.1 直接读取以及处理 OSS 上面的开源数据
在创建数据外表后, 直接对外表就可以进行与普通 MaxCompute 表的操作, 直接对存储在 OSS 上的数据进行处理, 比如:
- SELECT l_returnflag,
- l_linestatus,
- SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price,
- AVG(l_quantity) AS avg_qty,
- COUNT(*) AS count_order
- FROM tpch_lineitem_parquet
- WHERE l_shipdate <= '1998-09-02'
- GROUP BY
- l_returnflag,
- l_linestatus;
可以看到, 在这里
tpch_lineitem_parquet
这个外表被当作一个普通的内部表一样使用. 唯一不同的只是在 MaxCompute 内部计算引擎将从 OSS 上去读取对应的 PARQUET 数据来进行处理.
但是我们应该强调的是, 在这里直接使用外表, 每次读取的时候都需要涉及外部 OSS 的 IO 操作, 并且 MaxCompute 系统本身针对内部存储做的许多高性能优化都用不上了, 所以性能上会有所损失. 所以如果是需要对数据进行反复计算以及对计算的高效性比较敏感的场景上, 我们推荐下面这种用法: 先将数据导入 MaxCompute 内部, 再进行计算.
注意, 上面例子中的 tpch_lineitem_textfile 表, 因为使用了 ROW FORMAT + STORED AS, 需要手动设置 flag(只使用 STORED AS,odps.sql.hive.compatible 默认为 TRUE), 再进行读取, 否则会有报错.
- SELECT * FROM tpch_lineitem_textfile LIMIT 1;
- FAILED: ODPS-0123131:User defined function exception - Traceback:
com.aliyun.odps.udf.UDFException: java.lang.ClassNotFoundException: com.aliyun.odps.hive.wrapper.HiveStorageHandlerWrapper
-- 需要手动设置 hive 兼容 flag
- set odps.sql.hive.compatible=true;
- SELECT * FROM tpch_lineitem_textfile LIMIT 1;
- +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
| l_orderkey | l_partkey | l_suppkey | l_linenumber | l_quantity | l_extendedprice | l_discount | l_tax | l_returnflag | l_linestatus | l_shipdate | l_commitdate | l_receiptdate | l_shipinstruct | l_shipmode | l_comment |
- +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
- | 5640000001 | 174458698 | 9458733 | 1 | 14.0 | 23071.58 | 0.08 | 0.06 | N | O | 1998-01-26 | 1997-11-16 | 1998-02-18 | TAKE BACK RETURN | SHIP | cuses nag silently. quick |
- +------------+------------+------------+--------------+------------+-----------------+------------+------------+--------------+--------------+------------+--------------+---------------+----------------+------------+-----------+
https://www.atatech.org/articles/97752#6 2.2 将 OSS 上的开源数据导入 MaxCompute, 再进行计算
首先创建一个与外部表 schema 一样的内部表
tpch_lineitem_internal
, 然后将 OSS 上的开源数据导入 MaxCompute 内部表, 以 cFile 格式存储在 MaxCompute 内部:
CREATE TABLE tpch_lineitem_internal LIKE tpch_lineitem_parquet;
INSERT OVERWRITE TABLE tpch_lineitem_internal
SELECT * FROM tpch_lineitem_parquet;
直接就可以对内部表进行同样的操作:
- SELECT l_returnflag,
- l_linestatus,
- SUM(l_extendedprice*(1-l_discount)) AS sum_disc_price,
- AVG(l_quantity) AS avg_qty,
- COUNT(*) AS count_order
- FROM tpch_lineitem_internal
- WHERE l_shipdate <= '1998-09-02'
- GROUP BY
- l_returnflag,
- l_linestatus;
通过这样子将数据先导入系统的情况下, 对同样数据的计算就会更高效得多.
https://www.atatech.org/articles/97752#7
4. 结语
开源的种种数据格式往往由各种数据处理生态产生, 而 MaxCompute 非结构化数据处理框架通过实现计算与存储的互联, 希望打通阿里云核心计算平台与各种数据的通路. 在这个基础上, 各种各样依赖于不同数据格式的应用, 将能在 MaxCompute 计算平台上实现, 后继我们会对一些具体的这种应用, 比如基因计算等, 再做一些具体的 case study 以及介绍. 我们也欢迎有对开源数据进行处理分析的更多应用, 能在 MaxCompute 强大计算能力的基础上开花结果.
来源: https://yq.aliyun.com/articles/596131