本文转自个人微信公众号, 原文链接 https://mp.weixin.qq.com/s/KZckw0wCuRt2O_NhmwbFkA . 本博客评论系统需要梯子, 大家关注下公众号方便交流.
本文基于 Flink 1.7.
随着 Hadoop 的发展, 有了 Hive, 使用 HQL 即可完成原来繁琐的 Map Reduce 程序.
随着 Spark 的发展, 引入了 Spark SQL.
随着 Flink 版本的更迭, Flink 也提供了 Flink SQL, 以及 Table APIs.
注意: 截止 Flink 1.7,Table API 和 SQL 还没有完成, 有些操作还不支持.
1. 基本概念
1.1 Why
那么, 为什么要推出 Table APIs 和 SQL?
首先, 来看下 Flink 的编程模型. 如下图所示(图片来源于官网),DataStream API 和 DataSet API 是分开的, 但是对于应用开发者来说, 为什么要关注这一点? 对于相同的数据, 批处理与流计算居然要写两套代码, 简直不可思议. Table APIs 和 SQL 的推出, 实现了流处理和批处理的统一.
flink-abstract.PNG
其次, 降低了学习和使用门槛, 基于 DataStream/DataSet APIs 的 Scala 或 Java 程序开发, 对于 BI / 分析师来说, 还是有一定门槛的, 而 SQL 则简单太多了.
1.2 Dynamic Tables
Dynamic Tables 是 Flink Table API 和 SQL 的核心概念, 与大家熟知的 Static Tables 相比, Dynamic Tables 随着时间一直在变化. 可以查询 Dynamic Table, 查询 Dynamic Table 时会产生一个持续的查询, 持续的查询不会终止, 产生的结果也是 Dynamic Table, 根据输入, 输出也会不断变化. 熟悉关系型数据库的可以将 Dynamic Tables 的查询跟关系型数据库里查询物化视图对比起来, 需要注意的是, Dynamic Tables 是一个逻辑概念, 不需要 (全部) 物化.
另外, 需要注意, 在 Dynamic Table 上的持续查询的结果语义上是跟在 Dynamic Table 的快照上执行查询相同.
flink_dynamic_table.PNG
如上图所示:
Stream 转化为 Dynamic Table
在 Dynamic Table 上执行查询, 得到的结果是一个新的 Dynamic Table
最终的 Dynamic Table 结果, 被转化为 Stream
1.3 Update Queries VS Append Queries
Append Queries: 只会对查询结果进行追加的查询.
Update Queries: 会更新查询结果的查询, 一般需要维护更多的 state.
1.4 查询限制
有些 Stream 上的查询需要花费巨大的代价:
需要维护的 state 太大. 持续查询可能会运行非常长的时间, 处理的数据量会非常大, 对于一些需要更新原来结果的查询, 需要维护原来的结果, 维护的 state 会很大.
更新计算代价高昂: 输入数据的一小点变化, 可能有些查询需要重新计算大量的数据, 这种计算就不适合做持续查询.
1.5 Table 到 Stream 的转化
就像普通的数据库 Table 一样, Dynamic Table 也支持 insert,update,delete 等对它的更新. 当需要将 Dynamic Table 转化为 Stream 或者输出到外部系统时, 需要对这些更新进行 encode.
Append-only Stream: 仅有 Insert 更新的 Dynamic Table, 可通过 emit 插入的数据行转化为 stream.
Retract Stream:Retract Stream 是支持 add 消息 和 retract 消息两类消息的流. 将 insert 编码为 add 消息, 将 delete 编码为 retract 消息, 将 update 编码为对之前消息的 retarct 消息和对新消息的 add 消息.
Upsert Stream:Upsert Stream 是支持 upsert 消息和 delete 消息两类消息的流. 如果一个 Dynamic Table 需要转化为一个 Upsert Stream, 这个 Table 必须要有 unique key, 可以将 insert/update 编码为 upsert 消息, 将 delete 编码为 delete 消息. Upsert Stream 与 Retract Stream 的主要区别是 update 操作只需要一条消息, 所以会更高效.
Append-only Stream 和 Retarct Stream 支持将 Dynamic Table 转化为 DataStream.
2. 实战
下面引入一个简单的例子, 从 stream 开始, 转化为 Table, 然后查询 Table, 最后将 Table 转化为 Stream.
从例子可以很容易的看出, Stream 和 Table APIs / SQL 可以很容易的混用, 这也给我们带来了极大的便利性.
2.1 引入依赖
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-table_2.11</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <dependency>
- <!-- for batch query -->
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <!-- 上线时用 provided, 避免 build 的 jar 包太大, 更避免冲突 -->
- <!--<scope>provided</scope>-->
- <!-- IDEA 里用 compile, 否则 in-ide execution 会失败 -->
- <scope>compile</scope>
- </dependency>
- <dependency>
- <!-- for streaming query -->
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
- <version>${flink.version}</version>
- <!-- 上线时用 provided, 避免 build 的 jar 包太大, 更避免冲突 -->
- <!--<scope>provided</scope>-->
- <!-- IDEA 里用 compile, 否则 in-ide execution 会失败 -->
- <scope>compile</scope>
- </dependency>
2.2 隐式转换
Flink 的 Scala Table APIs 用了隐式转换, 所以, 需要 import 进来.
- import org.apache.flink.table.API.scala._
- import org.apache.flink.API.scala._
2.3 创建 TableEnvironment
TableEnvironment 是 Table APIs 和 SQL 的核心, 可以用于:
注册 Table
执行 SQL 查询
注册 UDF
将 DataStream / DataSet 转化为 Table
维护一个到 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用.
Table 总是绑定到一个 TableEnvironment 的, 在使用时, 在同一个 SQL 中不能联合使用不同 TableEnvironment 的表, 比如 join 或 union.
下面创建一个用于 Stream 的 StreamTableEnvironment. 另外, 创建一个简单的 stream.
- // 创建 StreamTableEnvironment
- val senv = StreamExecutionEnvironment.getExecutionEnvironment
- val stableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(senv)
- // 创建一个用于实验的 Stream[ObjPrice]
- case class ObjPrice (name: String, price: Long)
- val stream: DataStream[ObjPrice] = senv.fromCollection(List(ObjPrice("car", 100000), ObjPrice("house", 2000000), ObjPrice("book", 100), ObjPrice("car", 900210)))
2.4 将 Stream 转化为 Table
val sTable1Rename: Table = stableEnv.fromDataStream(stream, 'myName,'myPrice)
将上面的 stream 转化为 Table, 同时对字段进行重命名.
2.5 查询 Table
- // 采用 Table API 的方式进行查询
- val sTableResult: Table = sTable1Rename
- .filter('myPrice> 1000)
- .groupBy('myName)
- .select('myName,'myPrice.sum as 'mySumPrice)
2.6 将 Table 转化为 Stream
val sResultDataStream: DataStream[(Boolean, ObjPrice)] = stableEnv.toRetractStream[ObjPrice](sTableResult)
3. 总结
本文仅涉及一些基础知识和最常见的使用, 其他的比如注册 Table / TableSink / TableSource / External Catalog , 数据类型与 Table Schema 的映射, 查询优化等并不涉及, 可以参考官网 进行查阅.
为了方便交流, 请扫描下方二维码关注我.
wxqr.jpg
来源: http://www.jianshu.com/p/49e8556416f7