序
本文主要研究一下 flink Table 的 Over Windows
实例
- Table table = input
- .Windows([OverWindow w].as("w")) // define over Windows with alias w
- .select("a, b.sum over w, c.min over w"); // aggregate over the over Windows w
Over Windows 类似 SQL 的 over 子句, 它可以基于 event-time,processing-time 或者 row-count; 具体可以通过 Over 类来构造, 其中必须设置 orderBy,preceding 及 as 方法; 它有 Unbounded 及 Bounded 两大类
Unbounded Over Windows 实例
- // Unbounded Event-time over Windows (assuming an event-time attribute "rowtime")
- .Windows(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"));
- // Unbounded Processing-time over Windows (assuming a processing-time attribute "proctime")
- .Windows(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_range").as("w"));
- // Unbounded Event-time Row-count over Windows (assuming an event-time attribute "rowtime")
- .Windows(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"));
- // Unbounded Processing-time Row-count over Windows (assuming a processing-time attribute "proctime")
- .Windows(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_row").as("w"));
对于 event-time 及 processing-time 使用 unbounded_range 来表示 Unbounded, 对于 row-count 使用 unbounded_row 来表示 Unbounded
Bounded Over Windows 实例
- // Bounded Event-time over Windows (assuming an event-time attribute "rowtime")
- .Windows(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))
- // Bounded Processing-time over Windows (assuming a processing-time attribute "proctime")
- .Windows(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))
- // Bounded Event-time Row-count over Windows (assuming an event-time attribute "rowtime")
- .Windows(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))
- // Bounded Processing-time Row-count over Windows (assuming a processing-time attribute "proctime")
- .Windows(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
对于 event-time 及 processing-time 使用诸如 1.minutes 来表示 Bounded, 对于 row-count 使用诸如 10.rows 来表示 Bounded
- Table.Windows
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/API/table.scala
- class Table(
- private[flink] val tableEnv: TableEnvironment,
- private[flink] val logicalPlan: LogicalNode) {
- //......
- @varargs
- def Windows(overWindows: OverWindow*): OverWindowedTable = {
- if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
- throw new TableException("Over-windows for batch tables are currently not supported.")
- }
- if (overWindows.size != 1) {
- throw new TableException("Over-Windows are currently only supported single window.")
- }
- new OverWindowedTable(this, overWindows.toArray)
- }
- //......
- }
Table 提供了 OverWindow 参数的 Windows 方法, 用来进行 Over Windows 操作, 它创建的是 OverWindowedTable
- OverWindow
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/API/Windows.scala
- /**
- * Over Windows is similar to the traditional OVER SQL.
- */
- case class OverWindow(
- private[flink] val alias: Expression,
- private[flink] val partitionBy: Seq[Expression],
- private[flink] val orderBy: Expression,
- private[flink] val preceding: Expression,
- private[flink] val following: Expression)
OverWindow 定义了 alias,partitionBy,orderBy,preceding,following 属性
- Over
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/API/java/Windows.scala
- object Over {
- /**
- * Specifies the time attribute on which rows are grouped.
- *
- * For streaming tables call [[orderBy 'rowtime or orderBy'proctime]] to specify time mode.
- *
- * For batch tables, refer to a timestamp or long attribute.
- */
- def orderBy(orderBy: String): OverWindowWithOrderBy = {
- val orderByExpr = ExpressionParser.parseExpression(orderBy)
- new OverWindowWithOrderBy(Array[Expression](), orderByExpr)
- }
- /**
- * Partitions the elements on some partition keys.
- *
- * @param partitionBy some partition keys.
- * @return A partitionedOver instance that only contains the orderBy method.
- */
- def partitionBy(partitionBy: String): PartitionedOver = {
- val partitionByExpr = ExpressionParser.parseExpressionList(partitionBy).toArray
- new PartitionedOver(partitionByExpr)
- }
- }
- class OverWindowWithOrderBy(
- private val partitionByExpr: Array[Expression],
- private val orderByExpr: Expression) {
- /**
- * Set the preceding offset (based on time or row-count intervals) for over Windows.
- *
- * @param preceding preceding offset relative to the current row.
- * @return this over Windows
- */
- def preceding(preceding: String): OverWindowWithPreceding = {
- val precedingExpr = ExpressionParser.parseExpression(preceding)
- new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr)
- }
- }
- class PartitionedOver(private val partitionByExpr: Array[Expression]) {
- /**
- * Specifies the time attribute on which rows are grouped.
- *
- * For streaming tables call [[orderBy 'rowtime or orderBy'proctime]] to specify time mode.
- *
- * For batch tables, refer to a timestamp or long attribute.
- */
- def orderBy(orderBy: String): OverWindowWithOrderBy = {
- val orderByExpr = ExpressionParser.parseExpression(orderBy)
- new OverWindowWithOrderBy(partitionByExpr, orderByExpr)
- }
- }
- class OverWindowWithPreceding(
- private val partitionBy: Seq[Expression],
- private val orderBy: Expression,
- private val preceding: Expression) {
- private[flink] var following: Expression = _
- /**
- * Assigns an alias for this Windows that the following `select()` clause can refer to.
- *
- * @param alias alias for this over Windows
- * @return over Windows
- */
- def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))
- /**
- * Assigns an alias for this Windows that the following `select()` clause can refer to.
- *
- * @param alias alias for this over Windows
- * @return over Windows
- */
- def as(alias: Expression): OverWindow = {
- // set following to CURRENT_ROW / CURRENT_RANGE if not defined
- if (null == following) {
- if (preceding.resultType.isInstanceOf[RowIntervalTypeInfo]) {
- following = CURRENT_ROW
- } else {
- following = CURRENT_RANGE
- }
- }
- OverWindow(alias, partitionBy, orderBy, preceding, following)
- }
- /**
- * Set the following offset (based on time or row-count intervals) for over Windows.
- *
- * @param following following offset that relative to the current row.
- * @return this over Windows
- */
- def following(following: String): OverWindowWithPreceding = {
- this.following(ExpressionParser.parseExpression(following))
- }
- /**
- * Set the following offset (based on time or row-count intervals) for over Windows.
- *
- * @param following following offset that relative to the current row.
- * @return this over Windows
- */
- def following(following: Expression): OverWindowWithPreceding = {
- this.following = following
- this
- }
- }
Over 类是创建 over Windows 的帮助类, 它提供了 orderBy 及 partitionBy 两个方法, 分别创建的是 OverWindowWithOrderBy 及 PartitionedOver
PartitionedOver 提供了 orderBy 方法, 创建的是 OverWindowWithOrderBy;OverWindowWithOrderBy 提供了 preceding 方法, 创建的是 OverWindowWithPreceding
OverWindowWithPreceding 则包含了 partitionBy,orderBy,preceding 属性, 它提供了 as 方法创建 OverWindow, 另外还提供了 following 方法用于设置 following offset
- OverWindowedTable
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/API/table.scala
- class OverWindowedTable(
- private[flink] val table: Table,
- private[flink] val overWindows: Array[OverWindow]) {
- def select(fields: Expression*): Table = {
- val expandedFields = expandProjectList(
- fields,
- table.logicalPlan,
- table.tableEnv)
- if(fields.exists(_.isInstanceOf[WindowProperty])){
- throw new ValidationException(
- "Window start and end properties are not available for Over windows.")
- }
- val expandedOverFields = resolveOverWindows(expandedFields, overWindows, table.tableEnv)
- new Table(
- table.tableEnv,
- Project(
- expandedOverFields.map(UnresolvedAlias),
- table.logicalPlan,
- // required for proper projection push down
- explicitAlias = true)
- .validate(table.tableEnv)
- )
- }
- def select(fields: String): Table = {
- val fieldExprs = ExpressionParser.parseExpressionList(fields)
- //get the correct expression for AggFunctionCall
- val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
- select(withResolvedAggFunctionCall: _*)
- }
- }
OverWindowedTable 构造器需要 overWindows 参数; 它只提供 select 操作, 其中 select 可以接收 String 类型的参数, 也可以接收 Expression 类型的参数; String 类型的参数会被转换为 Expression 类型, 最后调用的是 Expression 类型参数的 select 方法; select 方法创建了新的 Table, 其 Project 的 projectList 为 expandedOverFields.map(UnresolvedAlias), 而 expandedOverFields 则通过 resolveOverWindows(expandedFields, overWindows, table.tableEnv) 得到
小结
Over Windows 类似 SQL 的 over 子句, 它可以基于 event-time,processing-time 或者 row-count; 具体可以通过 Over 类来构造, 其中必须设置 orderBy,preceding 及 as 方法; 它有 Unbounded 及 Bounded 两大类 (
对于 event-time 及 processing-time 使用 unbounded_range 来表示 Unbounded, 对于 row-count 使用 unbounded_row 来表示 Unbounded; 对于 event-time 及 processing-time 使用诸如 1.minutes 来表示 Bounded, 对于 row-count 使用诸如 10.rows 来表示 Bounded
)
Table 提供了 OverWindow 参数的 Windows 方法, 用来进行 Over Windows 操作, 它创建的是 OverWindowedTable;OverWindow 定义了 alias,partitionBy,orderBy,preceding,following 属性; Over 类是创建 over Windows 的帮助类, 它提供了 orderBy 及 partitionBy 两个方法, 分别创建的是 OverWindowWithOrderBy 及 PartitionedOver, 而 PartitionedOver 提供了 orderBy 方法, 创建的是 OverWindowWithOrderBy;OverWindowWithOrderBy 提供了 preceding 方法, 创建的是 OverWindowWithPreceding;OverWindowWithPreceding 则包含了 partitionBy,orderBy,preceding 属性, 它提供了 as 方法创建 OverWindow, 另外还提供了 following 方法用于设置 following offset
OverWindowedTable 构造器需要 overWindows 参数; 它只提供 select 操作, 其中 select 可以接收 String 类型的参数, 也可以接收 Expression 类型的参数; String 类型的参数会被转换为 Expression 类型, 最后调用的是 Expression 类型参数的 select 方法; select 方法创建了新的 Table, 其 Project 的 projectList 为 expandedOverFields.map(UnresolvedAlias), 而 expandedOverFields 则通过 resolveOverWindows(expandedFields, overWindows, table.tableEnv) 得到
doc
Over Windows
来源: https://juejin.im/post/5c4d2a94e51d45593c3759e6