导读: 本次分享的主题为 Apache Flink 新场景 --OLAP 引擎, 主要内容包括:
背景介绍
Apache Flink OLAP 引擎
案例介绍
未来计划
1. OLAP 及其分类
OLAP 是一种让用户可以用从不同视角方便快捷的分析数据的计算方法. 主流的 OLAP 可以分为 3 类: 多维 OLAP ( Multi-dimensional OLAP ), 关系型 OLAP ( Relational OLAP ) 和混合 OLAP ( Hybrid OLAP ) 三大类.
多维 OLAP ( MOLAP ):
传统的 OLAP 分析方式
数据存储在多维数据集中
关系型 OLAP ( ROLAP ):
以关系数据库为核心, 以关系型结构进行多维数据的表示
通过 SQL 的 where 条件以呈现传统 OLAP 的切片, 切块功能
混合 OLAP ( HOLAP ):
将 MOLAP 和 ROLPA 的优势结合起来, 以获得更快的性能
接下来为大家详细介绍下:
1 MOLAP
典型代表:
MOLAP 的典型代表是 Kylin 和 Druid.
处理流程:
对原始数据做数据预处理
预处理后的数据存至数据仓库
用户的请求通过 OLAP server 查询数据仓库中的数据
MOLAP 的优点和缺点:
MOLAP 的优点和缺点都来自于其数据预处理 ( pre-processing ) 环节. 数据预处理, 将原始数据按照指定的计算规则预先做聚合计算, 这样避免了查询过程中出现大量的临时计算, 提升了查询性能, 同时也为很多复杂的计算提供了支持.
但是这样的预聚合处理, 需要预先定义维度, 会限制后期数据查询的灵活性; 如果查询工作涉及新的指标, 需要重新增加预处理流程, 损失了灵活度, 存储成本也很高; 同时, 这种方式不支持明细数据的查询.
因此, MOLAP 适用于对性能非常高的场景.
2 ROLAP
典型代表:
ROLAP 的典型代表是 Presto 和 Impala.
处理流程:
用户的请求直接发送给 OLAP server
OLAP serve 将用户的请求转换成关系型操作算子:
通过 SCAN 扫描原始数据
在原始数据基础上做过滤, 聚合, 关联等处理
将计算结果返回给用户
ROLAP 的优点和缺点:
ROLAP 不需要进行数据预处理 ( pre-processing ), 因此查询灵活, 可扩展性好. 这类引擎使用 MPP 架构 ( 与 Hadoop 相似的大型并行处理架构, 可以通过扩大并发来增加计算资源 ), 可以高效处理大量数据. 但是当数据量较大或 query 较为复杂时, 查询性能也无法像 MOLAP 那样稳定. 所有计算都是临时发生 ( 没有预处理 ), 因此会耗费更多的计算资源.
因此, ROLAP 适用于对查询灵活性高的场景.
3 HOLAP
混合 OLAP, 是 MOLAP 和 ROLAP 的一种融合. 当查询聚合性数据的时候, 使用 MOLAP 技术; 当查询明细数据时, 使用 ROLAP 技术. 在给定使用场景的前提下, 以达到查询性能的最优化.
2. Apache Flink 介绍
1 当前 Apache Flink 支持的应用场景
Apache Flink 支持的 3 种典型应用场景:
事件驱动的应用
反欺诈
基于规则的监控报警
流式 Pipeline
数据 ETL
实时搜索引擎的索引
批处理 & 流处理分析
网络质量监控
消费者实时数据分析
2 Apache Flink 架构
3 Apache Flink 优势
统一框架 ( 不区分流处理和批处理 )
用户 API 统一
执行引擎统一
多层次 API
标准 SQL APL
- Table API
- DataStream API ( 灵活, 无 schema 限制 )
高性能
支持内存计算
支持代价模型优化
支持代码动态生成
方便集成
支持丰富的 Connectors
方便对接现有 catalog
灵活的 Failover 策略
在 Pipeline 下支持快速 failover
类似 MapReduce,Spark 一样支持 shuffle 数据落盘
易部署维护
灵活部署方案
支持高可用
1. Apache Flink OLAP 引擎
1 为什么 Apache Flink 可以做 ROLAP 引擎?
Flink 的核心和基础是流计算, 支持高性能, 低延迟的大规模计算
Blink 将批看作有限流, 批处理是针对有限数据集的优化, 因此批处理引擎也是构建在流引擎上 ( 已开源 )
OLAP 是响应时间要求更短的批处理, 因此 OLAP 可以看作是一种特殊的批. OLAP 引擎也可以构建在现有的批引擎上
注: Flink OLAP 引擎目前不带存储, 只是一个计算框架
2 Apache Flink 做 OLAP 引擎的优势
统一引擎: 流处理, 批处理, OLAP 统一使用 Flink 引擎
降低学习成本, 仅需要学习一个引擎
提高开发效率, 很多 SQL 是流批通用
提高维护效率, 可以更集中维护好一个引擎
既有优势: 利用 Flink 已有的很多特性, 使 OLAP 使用场景更为广泛
使用流处理的内存计算, Pipeline
支持代码动态生成
也可以支持批处理数据落盘能力
相互增强: OLAP 能享有现有引擎的优势, 同时也能增强引擎能力
无统计信息场景的优化
开发更高效的算子
使 Flink 同时兼备流, 批, OLAP 处理的能力, 成为更通用的框架
2. 性能优化
OLAP 对查询时间非常敏感, 当前很多组件的性能不满足要求, 因此我们对 Flink 做了很多相关优化.
1 服务架构的优化
客户端服务化:
下图介绍了一条 SQL 怎么在客户端一步一步变为 JobGraph, 最终提交给 JM:
在改动之前, 每次接受一个 query 时会启动一个新的 JVM 进程来进行作业的编译. 其中 JVM 的启动, Class 的加载, 代码的动态编译 ( 如 Optimizer 模块由于需要通过 Janino 动态编译进行 cost 计算 ) 等操作都非常耗时 ( 需要约 3~5s ). 因此, 我们将客户端进行服务化, 将整个 Client 做成 Service, 当接收到用户的 query 时, 无需重复各项加载工作, 可将延时降低至 100ms 左右.
自定义 CollectionTableSink:
这部分优化, 源于 OLAP 的一个特性: OLAP 会将最终计算结果发给客户端, 通过 JobManager 转发给 Client. 假如某个 query 的结果数据量很大, 会让 JobManager OOM ( OutOfMemory ); 如果同时执行多个 query, 也会相互影响. 因此, 我们从新实现了一个 CollectionTableSink, 限制数据的条数和数据大小, 避免出现 OOM, 保证多个 Query 同时运行时的稳定性.
调度优化:
在 Batch 模式下的调度存在以下问题:
使用 Lazy_from_sources 模式调度, 会导致整体运行时间较长, 也可能造成死锁.
注: 调度死锁是指在资源有限的情况下, 多个 Job 同时运行时, 如果多个 Job 都只申请到了部分资源并没有剩余资源可以申请, 导致 Job 没法继续执行, 新的 Job 也没法提交
RM ( Resource Manager ) 按 OnDemand 方式分配 Slot 需求, 也会造成死锁
RM 以单线程同步模式向 TM ( Transaction Manager ) 分配 Slot 请求, 会造成等待时间更长.
针对上述问题, 我们提出了以下几点改动:
采用 Eager 调度模式 ( 确保所有的资源都申请到后才开始运行 )
使用 FIFO ( 先进先出队 ) 模式申请资源 ( 确保当前 Job 的资源分配结束后才开始下一个 Job 的资源分配 )
将单线程同步模式改为多线程异步模式, 减少任务启动时间和执行时间
2 针对 source 的优化
在 ROLAP 的执行场景中, 所有数据都是通过扫描原始数据表后进行处理; 因此, 基于 Source 的读取性能非常关键, 直接影响 Job 的执行效率.
Project&Filter 下堆:
像 Parquet 这类的列存文件格式, 支持按需读取相所需列, 同时支持 RowGroup 级别的过滤. 利用该特性, 可以将 Project 和 Filter 下推到 TableSource, 从而只需要扫描 Query 中涉及的字段和满足条件的 RowGroup, 大大提升读取效率.
Aggregate 下堆:
这个优化也是充分利用了 TableSource 的特性: 例如 Parquet 文件的 metadata 中已经存储了每个 RowGroup 的统计信息 ( 如 max,min 等 ), 因此在做 max,min 这类聚合统计时, 可直接读取 metadata 信息, 而不需要先读取所有原始数据再计算.
3 在没有统计信息场景下做的优化
消除 CrossJoin:
CrossJoin 是没有任何 Join 条件, 将 Join 的两张表的数据做笛卡尔积, 导致 Join 的结果膨胀非常厉害, 这类 Join 应该尽量避免. 我们对含有 CrossJoin 的 Plan 进行改写: 将有 join 条件的表格先做 join ( 通常会因为一些数据 Join 不上而减少数据 ), 从而提高执行效率. 这是一个确定性的改写, 即使在没有统计信息的情况下, 也可以使用该优化.
自适应的 Local Aggregate:
通常情况下, 两阶段的 Aggregate 是非常高效的, 因为 LocalAggregate 能聚合大量数据, 导致 Shuffle 的数据量会变少. 但是当 LocalAggregate 的聚合度很低的时候, Local 聚合操作的意义不大, 反而会浪费 CPU. 在没有任何统计信息的情况下, 优化器没法决定是否要产生 LocalAggregate 算子; 因此, 我们采用运行时采样的方式来判断聚合度, 如果聚合度低于设定的阈值, 我们将关闭聚合操作, 改为仅做数据转发; 经我们测试, 部分场景有 30% 的性能提升.
3. 测试结果
上图是 Flink 和 Presto 基于 1T 数据做的 SSB ( Star Schema Benchmark ) 测试, 从图中可以看出 Flink 和 Presto 整体上不相上下, 甚至有些 Query Flink 性能优于 Presto. 注: Flink OLAP 从开始到嘉宾分享时, 只有 3 个月时间.
1. Apache Flink OLAP 在数据探查上的应用
上图描述了一个数据湖应用的完整架构, Flink OLAP 主要用于 "数据探查". 数据探查是对数据结构做智能判断, 给出数据的探查结果, 快速了解数据的信息和质量情况. 即用户可以在管控平台上了解数据湖中任意一份数据的数据特性. 用户通过 web 交互操作选择相应的表和指标后立即展示相关结果指标, 因此要求低延迟, 实时反馈. 而且数据湖中很多数据没有任何统计信息; 前述的各种查询, 聚合层面的优化, 主要为这类场景服务.
2. 整体架构
上图是这类应用的整体架构. 整套服务托管到 Kubernetes 上, 最终访问的数据是 OSS; 目前这套架构正在阿里云上做公测, 邀请广大用户试用.
推回社区: 目前所有工作都是基于内部 Flink, 希望推回社区;
资源隔离: 后期很多功能的开发和优化会围绕多 Query 运行时的 "资源隔离";
优化 & 性能: 围绕 OLAP 的特性, 在此场景下会进一步做优化和性能提升等方面的工作.
来源: http://www.tuicool.com/articles/vQJnUbz