序
本文主要研究一下 flink Table 的 Joins
实例
- Inner Join
- Table left = tableEnv.fromDataSet(ds1, "a, b, c");
- Table right = tableEnv.fromDataSet(ds2, "d, e, f");
- Table result = left.join(right).where("a = d").select("a, b, e");
join 方法即 inner join
- Outer Join
- Table left = tableEnv.fromDataSet(ds1, "a, b, c");
- Table right = tableEnv.fromDataSet(ds2, "d, e, f");
- Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
- Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
- Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");
outer join 分为 leftOuterJoin,rightOuterJoin,fullOuterJoin 三种
- Time-windowed Join
- Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
- Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");
- Table result = left.join(right)
- .where("a = d && ltime>= rtime - 5.minutes && ltime <rtime + 10.minutes")
- .select("a, b, e, ltime");
time-windowed join 需要至少一个等值条件, 然后还需要一个与两边时间相关的条件 (
可以使用 <, <=,>=,>
- )
- Inner Join with Table Function
- // register User-Defined Table Function
- TableFunction<String> split = new MySplitUDTF();
- tableEnv.registerFunction("split", split);
- // join
- Table orders = tableEnv.scan("Orders");
- Table result = orders
- .join(new Table(tableEnv, "split(c)").as("s", "t", "v"))
- .select("a, b, s, t, v");
Table 也可以跟 table function 进行 inner join, 如果 table function 返回空, 则 table 的记录被丢弃
- Left Outer Join with Table Function
- // register User-Defined Table Function
- TableFunction<String> split = new MySplitUDTF();
- tableEnv.registerFunction("split", split);
- // join
- Table orders = tableEnv.scan("Orders");
- Table result = orders
- .leftOuterJoin(new Table(tableEnv, "split(c)").as("s", "t", "v"))
- .select("a, b, s, t, v");
Table 也可以跟 table function 进行 left outer join, 如果 table function 返回空, 则 table 的记录保留, 空的部分为 null 值
- Join with Temporal Table
- Table ratesHistory = tableEnv.scan("RatesHistory");
- // register temporal table function with a time attribute and primary key
- TemporalTableFunction rates = ratesHistory.createTemporalTableFunction(
- "r_proctime",
- "r_currency");
- tableEnv.registerFunction("rates", rates);
- // join with "Orders" based on the time attribute and key
- Table orders = tableEnv.scan("Orders");
- Table result = orders
- .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency")
Table 也可以跟 Temporal tables 进行 join,Temporal tables 通过 Table 的 createTemporalTableFunction 而来, 目前仅仅支持 inner join 的方式
- Table
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/API/table.scala
- class Table(
- private[flink] val tableEnv: TableEnvironment,
- private[flink] val logicalPlan: LogicalNode) {
- //......
- def join(right: Table): Table = {
- join(right, None, JoinType.INNER)
- }
- def join(right: Table, joinPredicate: String): Table = {
- join(right, joinPredicate, JoinType.INNER)
- }
- def join(right: Table, joinPredicate: Expression): Table = {
- join(right, Some(joinPredicate), JoinType.INNER)
- }
- def leftOuterJoin(right: Table): Table = {
- join(right, None, JoinType.LEFT_OUTER)
- }
- def leftOuterJoin(right: Table, joinPredicate: String): Table = {
- join(right, joinPredicate, JoinType.LEFT_OUTER)
- }
- def leftOuterJoin(right: Table, joinPredicate: Expression): Table = {
- join(right, Some(joinPredicate), JoinType.LEFT_OUTER)
- }
- def rightOuterJoin(right: Table, joinPredicate: String): Table = {
- join(right, joinPredicate, JoinType.RIGHT_OUTER)
- }
- def rightOuterJoin(right: Table, joinPredicate: Expression): Table = {
- join(right, Some(joinPredicate), JoinType.RIGHT_OUTER)
- }
- def fullOuterJoin(right: Table, joinPredicate: String): Table = {
- join(right, joinPredicate, JoinType.FULL_OUTER)
- }
- def fullOuterJoin(right: Table, joinPredicate: Expression): Table = {
- join(right, Some(joinPredicate), JoinType.FULL_OUTER)
- }
- private def join(right: Table, joinPredicate: String, joinType: JoinType): Table = {
- val joinPredicateExpr = ExpressionParser.parseExpression(joinPredicate)
- join(right, Some(joinPredicateExpr), joinType)
- }
- private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
- // check if we join with a table or a table function
- if (!containsUnboundedUDTFCall(right.logicalPlan)) {
- // regular table-table join
- // check that the TableEnvironment of right table is not null
- // and right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
- }
- new Table(
- tableEnv,
- Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
- .validate(tableEnv))
- } else {
- // join with a table function
- // check join type
- if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) {
- throw new ValidationException(
- "TableFunctions are currently supported for join and leftOuterJoin.")
- }
- val udtf = right.logicalPlan.asInstanceOf[LogicalTableFunctionCall]
- val udtfCall = LogicalTableFunctionCall(
- udtf.functionName,
- udtf.tableFunction,
- udtf.parameters,
- udtf.resultType,
- udtf.fieldNames,
- this.logicalPlan
- ).validate(tableEnv)
- new Table(
- tableEnv,
- Join(this.logicalPlan, udtfCall, joinType, joinPredicate, correlated = true)
- .validate(tableEnv))
- }
- }
- //......
- }
Table 定义了 join,leftOuterJoin,rightOuterJoin,fullOuterJoin 方法, 其最后都是调用的私有的 join 方法, 其中 JoinType 用于表达 join 类型, 分别有 INNER, LEFT_OUTER, RIGHT_OUTER, FULL_OUTER 这几种; 另外接收 String 类型或者 Expression 的条件表达式, 其中 String 类型最后是被解析为 Expression 类型; join 方法最后是使用 Join 创建了新的 Table
- Join
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/plan/logical/operators.scala
- case class Join(
- left: LogicalNode,
- right: LogicalNode,
- joinType: JoinType,
- condition: Option[Expression],
- correlated: Boolean) extends BinaryNode {
- override def output: Seq[Attribute] = {
- left.output ++ right.output
- }
- private case class JoinFieldReference(
- name: String,
- resultType: TypeInformation[_],
- left: LogicalNode,
- right: LogicalNode) extends Attribute {
- val isFromLeftInput: Boolean = left.output.map(_.name).contains(name)
- val (indexInInput, indexInJoin) = if (isFromLeftInput) {
- val indexInLeft = left.output.map(_.name).indexOf(name)
- (indexInLeft, indexInLeft)
- } else {
- val indexInRight = right.output.map(_.name).indexOf(name)
- (indexInRight, indexInRight + left.output.length)
- }
- override def toString = s"'$name"
- override def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
- // look up type of field
- val fieldType = relBuilder.field(2, if (isFromLeftInput) 0 else 1, name).getType
- // create a new RexInputRef with index offset
- new RexInputRef(indexInJoin, fieldType)
- }
- override def withName(newName: String): Attribute = {
- if (newName == name) {
- this
- } else {
- JoinFieldReference(newName, resultType, left, right)
- }
- }
- }
- override def resolveExpressions(tableEnv: TableEnvironment): LogicalNode = {
- val node = super.resolveExpressions(tableEnv).asInstanceOf[Join]
- val partialFunction: PartialFunction[Expression, Expression] = {
- case field: ResolvedFieldReference => JoinFieldReference(
- field.name,
- field.resultType,
- left,
- right)
- }
- val resolvedCondition = node.condition.map(_.postOrderTransform(partialFunction))
- Join(node.left, node.right, node.joinType, resolvedCondition, correlated)
- }
- override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
- left.construct(relBuilder)
- right.construct(relBuilder)
- val corSet = mutable.Set[CorrelationId]()
- if (correlated) {
- corSet += relBuilder.peek().getCluster.createCorrel()
- }
- relBuilder.join(
- convertJoinType(joinType),
- condition.map(_.toRexNode(relBuilder)).getOrElse(relBuilder.literal(true)),
- corSet.asJava)
- }
- private def convertJoinType(joinType: JoinType) = joinType match {
- case JoinType.INNER => JoinRelType.INNER
- case JoinType.LEFT_OUTER => JoinRelType.LEFT
- case JoinType.RIGHT_OUTER => JoinRelType.RIGHT
- case JoinType.FULL_OUTER => JoinRelType.FULL
- }
- private def ambiguousName: Set[String] =
- left.output.map(_.name).toSet.intersect(right.output.map(_.name).toSet)
- override def validate(tableEnv: TableEnvironment): LogicalNode = {
- val resolvedJoin = super.validate(tableEnv).asInstanceOf[Join]
- if (!resolvedJoin.condition.forall(_.resultType == BOOLEAN_TYPE_INFO)) {
- failValidation(s"Filter operator requires a boolean expression as input," +
- s"but ${resolvedJoin.condition} is of type ${resolvedJoin.joinType}")
- } else if (ambiguousName.nonEmpty) {
- failValidation(s"join relations with ambiguous names: ${ambiguousName.mkString(", ")}")
- }
- resolvedJoin.condition.foreach(testJoinCondition)
- resolvedJoin
- }
- private def testJoinCondition(expression: Expression): Unit = {
- def checkIfJoinCondition(exp: BinaryComparison) = exp.children match {
- case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil
- if x.isFromLeftInput != y.isFromLeftInput => true
- case _ => false
- }
- def checkIfFilterCondition(exp: BinaryComparison) = exp.children match {
- case (x: JoinFieldReference) :: (y: JoinFieldReference) :: Nil => false
- case (x: JoinFieldReference) :: (_) :: Nil => true
- case (_) :: (y: JoinFieldReference) :: Nil => true
- case _ => false
- }
- var equiJoinPredicateFound = false
- // Whether the predicate is literal true.
- val alwaysTrue = expression match {
- case x: Literal if x.value.equals(true) => true
- case _ => false
- }
- def validateConditions(exp: Expression, isAndBranch: Boolean): Unit = exp match {
- case x: And => x.children.foreach(validateConditions(_, isAndBranch))
- case x: Or => x.children.foreach(validateConditions(_, isAndBranch = false))
- case x: EqualTo =>
- if (isAndBranch && checkIfJoinCondition(x)) {
- equiJoinPredicateFound = true
- }
- case x: BinaryComparison =>
- // The boolean literal should be a valid condition type.
- case x: Literal if x.resultType == Types.BOOLEAN =>
- case x => failValidation(
- s"Unsupported condition type: ${x.getClass.getSimpleName}. Condition: $x")
- }
- validateConditions(expression, isAndBranch = true)
- // Due to a bug in Apache Calcite (see CALCITE-2004 and FLINK-7865) we cannot accept join
- // predicates except literal true for TableFunction left outer join.
- if (correlated && right.isInstanceOf[LogicalTableFunctionCall] && joinType != JoinType.INNER ) {
- if (!alwaysTrue) failValidation("TableFunction left outer join predicate can only be" +
- "empty or literal true.")
- } else {
- if (!equiJoinPredicateFound) {
- failValidation(
- s"Invalid join condition: $expression. At least one equi-join predicate is" +
- s"required.")
- }
- }
- }
- }
Join 继承了 BinaryNode, 它内部将 flink 的 JoinType 转为 calcite 的 JoinRelType 类型, construct 方法通过 relBuilder.join 来构建 join 关系
小结
Table 支持多种形式的 join, 其中包括 Inner Join,Outer Join,Time-windowed Join,Inner Join with Table Function,Left Outer Join with Table Function,Join with Temporal Table
Table 定义了 join,leftOuterJoin,rightOuterJoin,fullOuterJoin 方法, 其最后都是调用的私有的 join 方法, 其中 JoinType 用于表达 join 类型, 分别有 INNER, LEFT_OUTER, RIGHT_OUTER, FULL_OUTER 这几种; 另外接收 String 类型或者 Expression 的条件表达式, 其中 String 类型最后是被解析为 Expression 类型; join 方法最后是使用 Join 创建了新的 Table
Join 继承了 BinaryNode, 它内部将 flink 的 JoinType 转为 calcite 的 JoinRelType 类型, construct 方法通过 relBuilder.join 来构建 join 关系
doc
Joins
来源: https://juejin.im/post/5c4fb983f265da61590c0895