序
本文主要研究一下 flink Table Schema 的定义
实例
定义字段及类型
- .withSchema(
- new Schema()
- .field("MyField1", Types.INT) // required: specify the fields of the table (in this order)
- .field("MyField2", Types.STRING)
- .field("MyField3", Types.BOOLEAN)
- )
通过 field 定义字段名及字段类型
定义字段属性
- .withSchema(
- new Schema()
- .field("MyField1", Types.SQL_TIMESTAMP)
- .proctime() // optional: declares this field as a processing-time attribute
- .field("MyField2", Types.SQL_TIMESTAMP)
- .rowtime(...) // optional: declares this field as a event-time attribute
- .field("MyField3", Types.BOOLEAN)
- .from("mf3") // optional: original field in the input that is referenced/aliased by this field
- )
通过 proctime 定义 processing-time, 通过 rowtime 定义 event-time, 通过 from 定义引用或别名
定义 Rowtime 属性
- // Converts an existing LONG or SQL_TIMESTAMP field in the input into the rowtime attribute.
- .rowtime(
- new Rowtime()
- .timestampsFromField("ts_field") // required: original field name in the input
- )
- // Converts the assigned timestamps from a DataStream API record into the rowtime attribute
- // and thus preserves the assigned timestamps from the source.
- // This requires a source that assigns timestamps (e.g., Kafka 0.10+).
- .rowtime(
- new Rowtime()
- .timestampsFromSource()
- )
- // Sets a custom timestamp extractor to be used for the rowtime attribute.
- // The extractor must extend `org.apache.flink.table.sources.tsextractors.TimestampExtractor`.
- .rowtime(
- new Rowtime()
- .timestampsFromExtractor(...)
- )
通过 timestampsFromField,timestampsFromSource,timestampsFromExtractor 定义 rowtime
定义 watermark strategies
- // Sets a watermark strategy for ascending rowtime attributes. Emits a watermark of the maximum
- // observed timestamp so far minus 1. Rows that have a timestamp equal to the max timestamp
- // are not late.
- .rowtime(
- new Rowtime()
- .watermarksPeriodicAscending()
- )
- // Sets a built-in watermark strategy for rowtime attributes which are out-of-order by a bounded time interval.
- // Emits watermarks which are the maximum observed timestamp minus the specified delay.
- .rowtime(
- new Rowtime()
- .watermarksPeriodicBounded(2000) // delay in milliseconds
- )
- // Sets a built-in watermark strategy which indicates the watermarks should be preserved from the
- // underlying DataStream API and thus preserves the assigned watermarks from the source.
- .rowtime(
- new Rowtime()
- .watermarksFromSource()
- )
通过 watermarksPeriodicAscending,watermarksPeriodicBounded,watermarksFromSource 定义 watermark strategies
- StreamTableEnvironment.connect
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/API/StreamTableEnvironment.scala
- abstract class StreamTableEnvironment(
- private[flink] val execEnv: StreamExecutionEnvironment,
- config: TableConfig)
- extends TableEnvironment(config) {
- //......
- def connect(connectorDescriptor: ConnectorDescriptor): StreamTableDescriptor = {
- new StreamTableDescriptor(this, connectorDescriptor)
- }
- //......
- }
StreamTableEnvironment 的 connect 方法创建 StreamTableDescriptor
- StreamTableDescriptor
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
- class StreamTableDescriptor(
- tableEnv: StreamTableEnvironment,
- connectorDescriptor: ConnectorDescriptor)
- extends ConnectTableDescriptor[StreamTableDescriptor](
- tableEnv,
- connectorDescriptor)
- with StreamableDescriptor[StreamTableDescriptor] {
- //......
- }
StreamTableDescriptor 继承了 ConnectTableDescriptor
- ConnectTableDescriptor
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/ConnectTableDescriptor.scala
- abstract class ConnectTableDescriptor[D <: ConnectTableDescriptor[D]](
- private val tableEnv: TableEnvironment,
- private val connectorDescriptor: ConnectorDescriptor)
- extends TableDescriptor
- with SchematicDescriptor[D]
- with RegistrableDescriptor { this: D =>
- private var formatDescriptor: Option[FormatDescriptor] = None
- private var schemaDescriptor: Option[Schema] = None
- /**
- * Searches for the specified table source, configures it accordingly, and registers it as
- * a table under the given name.
- *
- * @param name table name to be registered in the table environment
- */
- override def registerTableSource(name: String): Unit = {
- val tableSource = TableFactoryUtil.findAndCreateTableSource(tableEnv, this)
- tableEnv.registerTableSource(name, tableSource)
- }
- /**
- * Searches for the specified table sink, configures it accordingly, and registers it as
- * a table under the given name.
- *
- * @param name table name to be registered in the table environment
- */
- override def registerTableSink(name: String): Unit = {
- val tableSink = TableFactoryUtil.findAndCreateTableSink(tableEnv, this)
- tableEnv.registerTableSink(name, tableSink)
- }
- /**
- * Searches for the specified table source and sink, configures them accordingly, and registers
- * them as a table under the given name.
- *
- * @param name table name to be registered in the table environment
- */
- override def registerTableSourceAndSink(name: String): Unit = {
- registerTableSource(name)
- registerTableSink(name)
- }
- /**
- * Specifies the format that defines how to read data from a connector.
- */
- override def withFormat(format: FormatDescriptor): D = {
- formatDescriptor = Some(format)
- this
- }
- /**
- * Specifies the resulting table schema.
- */
- override def withSchema(schema: Schema): D = {
- schemaDescriptor = Some(schema)
- this
- }
- // ----------------------------------------------------------------------------------------------
- /**
- * Converts this descriptor into a set of properties.
- */
- override def toProperties: util.Map[String, String] = {
- val properties = new DescriptorProperties()
- // this performs only basic validation
- // more validation can only happen within a factory
- if (connectorDescriptor.isFormatNeeded && formatDescriptor.isEmpty) {
- throw new ValidationException(
- s"The connector'$connectorDescriptor'requires a format description.")
- } else if (!connectorDescriptor.isFormatNeeded && formatDescriptor.isDefined) {
- throw new ValidationException(
- s"The connector'$connectorDescriptor'does not require a format description" +
- s"but'${formatDescriptor.get}'found.")
- }
- properties.putProperties(connectorDescriptor.toProperties)
- formatDescriptor.foreach(d => properties.putProperties(d.toProperties))
- schemaDescriptor.foreach(d => properties.putProperties(d.toProperties))
- properties.asMap()
- }
- }
ConnectTableDescriptor 提供了 withSchema 方法, 返回 Schema
- Schema
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/Schema.scala
- class Schema extends Descriptor {
- // maps a field name to a list of properties that describe type, origin, and the time attribute
- private val tableSchema = mutable.LinkedHashMap[String, mutable.LinkedHashMap[String, String]]()
- private var lastField: Option[String] = None
- def schema(schema: TableSchema): Schema = {
- tableSchema.clear()
- lastField = None
- schema.getFieldNames.zip(schema.getFieldTypes).foreach { case (n, t) =>
- field(n, t)
- }
- this
- }
- def field(fieldName: String, fieldType: TypeInformation[_]): Schema = {
- field(fieldName, TypeStringUtils.writeTypeInfo(fieldType))
- this
- }
- def field(fieldName: String, fieldType: String): Schema = {
- if (tableSchema.contains(fieldName)) {
- throw new ValidationException(s"Duplicate field name $fieldName.")
- }
- val fieldProperties = mutable.LinkedHashMap[String, String]()
- fieldProperties += (SCHEMA_TYPE -> fieldType)
- tableSchema += (fieldName -> fieldProperties)
- lastField = Some(fieldName)
- this
- }
- def from(originFieldName: String): Schema = {
- lastField match {
- case None => throw new ValidationException("No field previously defined. Use field() before.")
- case Some(f) =>
- tableSchema(f) += (SCHEMA_FROM -> originFieldName)
- lastField = None
- }
- this
- }
- def proctime(): Schema = {
- lastField match {
- case None => throw new ValidationException("No field defined previously. Use field() before.")
- case Some(f) =>
- tableSchema(f) += (SCHEMA_PROCTIME -> "true")
- lastField = None
- }
- this
- }
- def rowtime(rowtime: Rowtime): Schema = {
- lastField match {
- case None => throw new ValidationException("No field defined previously. Use field() before.")
- case Some(f) =>
- tableSchema(f) ++= rowtime.toProperties.asScala
- lastField = None
- }
- this
- }
- final override def toProperties: util.Map[String, String] = {
- val properties = new DescriptorProperties()
- properties.putIndexedVariableProperties(
- SCHEMA,
- tableSchema.toSeq.map { case (name, props) =>
- (Map(SCHEMA_NAME -> name) ++ props).asJava
- }.asJava
- )
- properties.asMap()
- }
- }
Schem 提供了 field,from,proctime,rowtime 方法用于定义 Schema 的相关属性
- Rowtime
- flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/descriptors/Rowtime.scala
- class Rowtime extends Descriptor {
- private var timestampExtractor: Option[TimestampExtractor] = None
- private var watermarkStrategy: Option[WatermarkStrategy] = None
- def timestampsFromField(fieldName: String): Rowtime = {
- timestampExtractor = Some(new ExistingField(fieldName))
- this
- }
- def timestampsFromSource(): Rowtime = {
- timestampExtractor = Some(new StreamRecordTimestamp)
- this
- }
- def timestampsFromExtractor(extractor: TimestampExtractor): Rowtime = {
- timestampExtractor = Some(extractor)
- this
- }
- def watermarksPeriodicAscending(): Rowtime = {
- watermarkStrategy = Some(new AscendingTimestamps)
- this
- }
- def watermarksPeriodicBounded(delay: Long): Rowtime = {
- watermarkStrategy = Some(new BoundedOutOfOrderTimestamps(delay))
- this
- }
- def watermarksFromSource(): Rowtime = {
- watermarkStrategy = Some(PreserveWatermarks.INSTANCE)
- this
- }
- def watermarksFromStrategy(strategy: WatermarkStrategy): Rowtime = {
- watermarkStrategy = Some(strategy)
- this
- }
- final override def toProperties: java.util.Map[String, String] = {
- val properties = new DescriptorProperties()
- timestampExtractor.foreach(normalizeTimestampExtractor(_)
- .foreach(e => properties.putString(e._1, e._2)))
- watermarkStrategy.foreach(normalizeWatermarkStrategy(_)
- .foreach(e => properties.putString(e._1, e._2)))
- properties.asMap()
- }
- }
Rowtime 提供了 timestampsFromField,timestampsFromSource,timestampsFromExtractor 方法用于定义 timestamps; 提供了 watermarksPeriodicAscending,watermarksPeriodicBounded,watermarksFromSource,watermarksFromStrategy 方法用于定义 watermark strategies
小结
StreamTableEnvironment 的 connect 方法创建 StreamTableDescriptor;StreamTableDescriptor 继承了 ConnectTableDescriptor;ConnectTableDescriptor 提供了 withSchema 方法, 返回 Schema
Schem 提供了 field,from,proctime,rowtime 方法用于定义 Schema 的相关属性; 通过 proctime 定义 processing-time, 通过 rowtime 定义 event-time, 通过 from 定义引用或别名
Rowtime 提供了 timestampsFromField,timestampsFromSource,timestampsFromExtractor 方法用于定义 timestamps; 提供了 watermarksPeriodicAscending,watermarksPeriodicBounded,watermarksFromSource,watermarksFromStrategy 方法用于定义 watermark strategies
doc
Table Schema
来源: https://juejin.im/post/5c56615af265da2daa312977