序
本文主要研究一下 flink Table 的 Set Operations
实例
- Union
- Table left = tableEnv.fromDataSet(ds1, "a, b, c");
- Table right = tableEnv.fromDataSet(ds2, "a, b, c");
- Table result = left.union(right);
union 方法类似 sql 的 union
- UnionAll
- Table left = tableEnv.fromDataSet(ds1, "a, b, c");
- Table right = tableEnv.fromDataSet(ds2, "a, b, c");
- Table result = left.unionAll(right);
unionAll 方法类似 sql 的 union all
- Intersect
- Table left = tableEnv.fromDataSet(ds1, "a, b, c");
- Table right = tableEnv.fromDataSet(ds2, "d, e, f");
- Table result = left.intersect(right);
intersect 方法类似 sql 的 intersect
- IntersectAll
- Table left = tableEnv.fromDataSet(ds1, "a, b, c");
- Table right = tableEnv.fromDataSet(ds2, "d, e, f");
- Table result = left.intersectAll(right);
intersectAll 方法类似 sql 的 intersect all
- Minus
- Table left = tableEnv.fromDataSet(ds1, "a, b, c");
- Table right = tableEnv.fromDataSet(ds2, "a, b, c");
- Table result = left.minus(right);
minus 方法类似 sql 的 except
- MinusAll
- Table left = tableEnv.fromDataSet(ds1, "a, b, c");
- Table right = tableEnv.fromDataSet(ds2, "a, b, c");
- Table result = left.minusAll(right);
minusAll 方法类似 sql 的 except all
- In
- Table left = ds1.toTable(tableEnv, "a, b, c");
- Table right = ds2.toTable(tableEnv, "a");
- // using implicit registration
- Table result = left.select("a, b, c").where("a.in(" + right + ")");
- // using explicit registration
- tableEnv.registerTable("RightTable", right);
- Table result = left.select("a, b, c").where("a.in(RightTable)");
in 方法类似 sql 的 in
- Table
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/API/table.scala
- class Table(
- private[flink] val tableEnv: TableEnvironment,
- private[flink] val logicalPlan: LogicalNode) {
- //......
- def union(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
- }
- new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
- }
- def unionAll(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException("Only tables from the same TableEnvironment can be unioned.")
- }
- new Table(tableEnv, Union(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
- }
- def intersect(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException(
- "Only tables from the same TableEnvironment can be intersected.")
- }
- new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = false).validate(tableEnv))
- }
- def intersectAll(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException(
- "Only tables from the same TableEnvironment can be intersected.")
- }
- new Table(tableEnv, Intersect(logicalPlan, right.logicalPlan, all = true).validate(tableEnv))
- }
- def minus(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException("Only tables from the same TableEnvironment can be" +
- "subtracted.")
- }
- new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = false)
- .validate(tableEnv))
- }
- def minusAll(right: Table): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException("Only tables from the same TableEnvironment can be" +
- "subtracted.")
- }
- new Table(tableEnv, Minus(logicalPlan, right.logicalPlan, all = true)
- .validate(tableEnv))
- }
- //......
- }
union 及 unionAll 使用的是 Union,intersect 及 intersectAll 使用的是 Intersect,minus 及 minusAll 使用的是 Minus
- Union
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
- case class Union(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
- override def output: Seq[Attribute] = left.output
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- left.construct(relBuilder)
- right.construct(relBuilder)
- relBuilder.union(all)
- }
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment] && !all) {
- failValidation(s"Union on stream tables is currently not supported.")
- }
- val resolvedUnion = super.validate(tableEnv).asInstanceOf[Union]
- if (left.output.length != right.output.length) {
- failValidation(s"Union two tables of different column sizes:" +
- s"${left.output.size} and ${right.output.size}")
- }
- val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
- l.resultType == r.resultType
- }
- if (!sameSchema) {
- failValidation(s"Union two tables of different schema:" +
- s"[${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
- s"[${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
- }
- resolvedUnion
- }
- }
Union 继承了 BinaryNode, 其 construct 方法通过 relBuilder.union 来构建 union 操作
- Intersect
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
- case class Intersect(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
- override def output: Seq[Attribute] = left.output
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- left.construct(relBuilder)
- right.construct(relBuilder)
- relBuilder.intersect(all)
- }
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
- failValidation(s"Intersect on stream tables is currently not supported.")
- }
- val resolvedIntersect = super.validate(tableEnv).asInstanceOf[Intersect]
- if (left.output.length != right.output.length) {
- failValidation(s"Intersect two tables of different column sizes:" +
- s"${left.output.size} and ${right.output.size}")
- }
- // allow different column names between tables
- val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
- l.resultType == r.resultType
- }
- if (!sameSchema) {
- failValidation(s"Intersect two tables of different schema:" +
- s"[${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
- s"[${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
- }
- resolvedIntersect
- }
- }
Intersect 继承了 BinaryNode, 其 construct 方法通过 relBuilder.intersect 来构建 intersect 操作
- Minus
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
- case class Minus(left: LogicalNode, right: LogicalNode, all: Boolean) extends BinaryNode {
- override def output: Seq[Attribute] = left.output
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- left.construct(relBuilder)
- right.construct(relBuilder)
- relBuilder.minus(all)
- }
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
- failValidation(s"Minus on stream tables is currently not supported.")
- }
- val resolvedMinus = super.validate(tableEnv).asInstanceOf[Minus]
- if (left.output.length != right.output.length) {
- failValidation(s"Minus two table of different column sizes:" +
- s"${left.output.size} and ${right.output.size}")
- }
- val sameSchema = left.output.zip(right.output).forall { case (l, r) =>
- l.resultType == r.resultType
- }
- if (!sameSchema) {
- failValidation(s"Minus two table of different schema:" +
- s"[${left.output.map(a => (a.name, a.resultType)).mkString(", ")}] and" +
- s"[${right.output.map(a => (a.name, a.resultType)).mkString(", ")}]")
- }
- resolvedMinus
- }
- }
Minus 继承了 BinaryNode, 其 construct 方法通过 relBuilder.minus 来构建 minus 操作
小结
Table 对 Set 提供了 union,unionAll,intersect,intersectAll,minus,minusAll,in(
in 在 where 子句中
) 操作
union 及 unionAll 使用的是 Union,intersect 及 intersectAll 使用的是 Intersect,minus 及 minusAll 使用的是 Minus
Union 继承了 BinaryNode, 其 construct 方法通过 relBuilder.union 来构建 union 操作; Intersect 继承了 BinaryNode, 其 construct 方法通过 relBuilder.intersect 来构建 intersect 操作; Minus 继承了 BinaryNode, 其 construct 方法通过 relBuilder.minus 来构建 minus 操作
- doc
- Set Operations
来源: https://juejin.im/post/5c512b6a51882525812514b2