1. Flink Table API 的整体实现流程
主要操作流程如下:
- // 创建表的执行环境
- val tableEnv = ...
- // 创建一张表, 用于读取数据
- tableEnv.connect(...).createTemporaryTable("inputTable")
- // 注册一张表, 用于把计算结果输出
- tableEnv.connect(...).createTemporaryTable("outputTable")
- // 通过 Table API 查询算子, 得到一张结果表
- val result = tableEnv.from("inputTable").select(...)
- // 通过 SQL 查询语句, 得到一张结果表
- val sqlResult = tableEnv.sqlQuery("SELECT ... FROM inputTable ...")
- // 将结果表写入输出表中
- result.insertInto("outputTable")
2. 流处理执行环境的创建配置
创建表环境
基于流处理执行环境, 调 create 方法直接创建:
val tableEnv = StreamTableEnvironment.create(env)
表环境 (TableEnvironment) 是 flink 中集成 Table API & SQL 的核心概念. 它主要负责:
注册 catalog
在内部 catalog 中注册表
执行 SQL 查询
注册用户自定义函数
将 DataStream 或 DataSet 转换为表
保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
环境配置
创建 TableEnv 的时候, 可以通过一些参数来配置 TableEnvironment 的特性.
老版本的流式查询配置
- EnvironmentSettings settings = EnvironmentSettings.newInstance()
- .useOldPlanner() // 使用老版本 planner
- .inStreamingMode() // 流处理模式
- .build()
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)
老版本的批处理配置
- EnvironmentSettings settings = EnvironmentSettings.newInstance()
- .useOldPlanner() // 使用老版本 planner
- .inBatchMode() // 使用老版本的流处理模式
- .build()
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)
blink 版本的流处理配置
- EnvironmentSettings settings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inStreamingMode().build()
- StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings)
blink 版本的批处理配置
- EnvironmentSettings bbSettings = EnvironmentSettings.newInstance()
- .useBlinkPlanner()
- .inBatchMode().build();
- TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
3. Catalog 的操作使用
1)Catalog 的类型:
GenericInMemoryCatalog: 内置 Catalog. 名为 default_catalog, 默认数据库名为 default_database. 默认, 如用 TableEnvironment#registerTable 注册的表, 均会注册到这个 Catalog 中.
User-Defined Catalog: 用户自定义 Catalog. 如 flink-connector-hive 中的 HiveCatalog.
注意:
GenericInMemoryCatalog 中的元数据对象名区分大小写. HiveCatalog 以小写存储所有元数据对象名.
默认使用的 Catalog: default_catalog;Database: default_database.
2)Catalog 的用法:
获取当前使用的 Catalog
tableEnv.getCurrentCatalog()
获取当前使用的 Database
tableEnv.getCurrentDatabase()
注册自定义 Catalog
tableEnv.registerCatalog("custom-catalog",new CustomCatalog("customCatalog"))
列出所有 Catalog
tableEnv.listCatalogs()
列出所有 Database
tableEnv.listDatabases()
切换 Catalog
tableEnv.useCatalog("catalogName")
切换 Database
tableEnv.useDatabase("databaseName")
4. 文件系统的读取操作实现(CSV)
POM 依赖
- <!-- 导入 csv 格式描述器的依赖包 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-CSV</artifactId>
- <version>${flink.version}</version>
- </dependency>
代码实现
- public static void main(String[] args) throws Exception {
- //1. 创建流式程序运行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //2. 没有指定 EnviromentSettings, 默认使用的是老版本的 Planner 的流式查询
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
- //3. 指定读取 CSV 文件的路径
- String filePath = "./data/user.csv";
- //4. 读取 CSV 的文件, 配置属性类型
- tabEnv.connect(new FileSystem().path(filePath))// 读取指定文件目录的文件数据, 该对象一定是实现了 ConnectorDescriptor 的实现类
- .withFormat(new CSV()) // 定义从外部文件读取数据的格式化方法, 需要传入继承自 FormatDescriptor 抽象类的实现类
- .withSchema(new Schema()
- .field("id", DataTypes.INT())
- .field("name", DataTypes.STRING())
- )// 定义表的结构
- .createTemporaryTable("inputTable");
- System.out.println(tabEnv.getCurrentCatalog());
- System.out.println(tabEnv.getCurrentDatabase());
- //5. 将 table 表的数据转换成 table 对象
- Table inputTable = tabEnv.from("inputTable");
- //6. 打印测试
- tabEnv.toAppendStream(inputTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print().setParallelism(1);
- env.execute();
- }
5. 消息队列的读取操作实现(kafka)
POM 依赖
- <!-- 导入 kafka 连接器 jar 包 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka_${scala.version}</artifactId>
- <version>${flink.version}</version>
- </dependency>
- <!-- flink json 序列化 jar 包 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-JSON</artifactId>
- <version>${flink.version}</version>
- </dependency>
代码实现
- public static void main(String[] args) throws Exception {
- // 创建流式程序运行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 没有指定 EnviromentSettings, 默认使用的是老版本的 Planner 的流式查询
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
- // 接入 kafka 的 connect 消费数据
- tabEnv.connect(new Kafka() // 从 kafka 中读取数据
- .version("universal") // 指定当前环境采用的 kafka 的版本号:"0.8", "0.9", "0.10", "0.11", and "universal"
- .topic("rate") // 指定消费的 topic 名字
- .property("zookeeper.connect", "10.10.20.15:2181") // 指定 zookeeper 的集群地址
- .property("bootstrap.servers", "10.10.20.15:9092") // 指定消费 kafka 的集群地址
- ).withFormat(new CSV())
- .withSchema(new Schema()
- .field("timestamp", DataTypes.BIGINT())
- .field("type", DataTypes.STRING())
- .field("rate", DataTypes.INT())
- ).createTemporaryTable("RateInputTable");
- Table rateInputTable = tabEnv.from("RateInputTable");
- tabEnv.toAppendStream(rateInputTable, Rate.class).print();
- env.execute();
- }
消费测试
开启 kafka 消费端:
bin/kafka-console-producer.sh --broker-list 10.10.20.15:9092 --topic rate
发送数据:
- 1618388392479, 'REF', 9
- 1618388392480, 'USD', 4
- 1618388392580, 'HKD', 9
6. 如何进行条件查询操作
6.1 Table API 的实现方式
Table API 是集成在 Scala 和 Java 语言内的查询 API. 与 SQL 不同, Table API 的查询不会用字符串表示, 而是在宿主语言中一步一步调用完成的.
Table API 基于代表一张 "表" 的 Table 类, 并提供一整套操作处理的方法 API. 这些方法会返回一个新的 Table 对象, 这个对象就表示对输入表应用转换操作的结果. 有些关系型转换操作, 可以由多个方法调用组成, 构成链式调用结构. 例如 table.select(...).filter(...), 其中 select(...)表示选择表中指定的字段, filter(...)表示筛选条件.
代码实现:
- ...
- // 基于 TableAPI 进行数据的查询转换操作, 所以要求注册的临时表需要读取出来, 赋值给一个 Table 对象实例才可以操作
- Table resultTable = inputTable.filter("id == 1").select("id,name");
- // 使用 TableAPI 对 Table 对象进行聚合计算
- Table aggResultTable = inputTable.groupBy("id").select("id,id.count as count");
- // 打印测试
- tabEnv.toAppendStream(resultTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {
- })).print("resultTable>>>").setParallelism(1) ;
- tabEnv.toRetractStream(aggResultTable, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {
- })).print("aggResultTable>>>").setParallelism(1) ;
- ...
输出结果:
- resultTable>>>> (1,zhangsan)
- aggResultTable>>>> (true,(2,1))
- aggResultTable>>>> (false,(2,1))
- aggResultTable>>>> (true,(2,2))
- aggResultTable>>>> (true,(1,1))
true 代表新的数据, false 代表已存在历史数据, 然后再次打印 "true,(2,2)" 进行累积统计.
6.2 SQL 的实现方式
Flink 的 SQL 集成, 基于的是 ApacheCalcite, 它实现了 SQL 标准. 在 Flink 中, 用常规字符串来定义 SQL 查询语句. SQL 查询的结果, 是一个新的 Table.
- // 使用 SQL 对表的数据进行操作
- Table resultTableBySQL = tabEnv.sqlQuery("select id,count(id) as cnt from inputTable group by id");
- tabEnv.toRetractStream(resultTableBySQL, TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {
- })).print("sql result>>>").setParallelism(1) ;
7. 实现数据的输出操作
表的输出, 是通过将数据写入 TableSink 来实现的. TableSink 是一个通用接口, 可以支持不同的文件格式, 存储数据库和消息队列.
具体实现, 输出表最直接的方法, 就是通过 Table.executeInsert() 方法将一个 Table 写入注册过的 TableSink 中.
7.1 输出到文件
代码实现:
- //1. 创建流式程序运行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- //2. 没有指定 EnviromentSettings, 默认使用的是老版本的 Planner 的流式查询
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
- env.setParallelism(1);
- //3. 指定读取 CSV 文件的路径
- String filePath = "./data/user.csv";
- //4. 读取 CSV 的文件, 配置属性类型
- tabEnv.connect(new FileSystem().path(filePath))// 读取指定文件目录的文件数据, 该对象一定是实现了 ConnectorDescriptor 的实现类
- .withFormat(new CSV()) // 定义从外部文件读取数据的格式化方法, 需要传入继承自 FormatDescriptor 抽象类的实现类
- .withSchema(new Schema()
- .field("id", DataTypes.INT())
- .field("name", DataTypes.STRING())
- )// 定义表的结构
- .createTemporaryTable("inputTable");
- //5. 将 table 表的数据转换成 table 对象
- Table inputTable = tabEnv.from("inputTable");
- Table resultTable = inputTable.select("id,name");
- // 定义结果表, 将文件数据写入到结果文件中
- tabEnv.connect(new FileSystem().path("./data/user.log"))
- .withFormat(new CSV())
- .withSchema(new Schema() // 这个方法一定要指定
- .field("id", DataTypes.INT())
- .field("name", DataTypes.STRING())
- )
- .createTemporaryTable("outputTable");
- resultTable.executeInsert("outputTable");
- //6. 打印测试
- tabEnv.toAppendStream(inputTable, TypeInformation.of(new TypeHint<Tuple2<Integer, String>>() {})).print().setParallelism(1);
- env.execute();
7.2 输出到 KAFKA
处理的数据可以支持输出到 Kafka, 结合前面的 Kafka 作为输入数据, 创建数据管道, 再输出至 Kafka 消息队列:
- String kafkaNode = "10.10.20.15:2181";
- String kafkaNodeServer = "10.10.20.15:9092";
- // 创建流式程序运行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- // 没有指定 EnviromentSettings, 默认使用的是老版本的 Planner 的流式查询
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);
- // 接入 kafka 的 connect 消费数据
- tabEnv.connect(new Kafka() // 从 kafka 中读取数据
- .version("universal") // 指定当前环境采用的 kafka 的版本号:"0.8", "0.9", "0.10", "0.11", and "universal"
- .startFromEarliest()
- .topic("rate") // 指定消费的 topic 名字
- .property("zookeeper.connect", kafkaNode) // 指定 zookeeper 的集群地址
- .property("bootstrap.servers", kafkaNodeServer) // 指定消费 kafka 的集群地址
- ).withFormat(new CSV())
- .withSchema(new Schema()
- .field("timestamp", DataTypes.BIGINT())
- .field("type", DataTypes.STRING())
- .field("rate", DataTypes.INT())
- ).createTemporaryTable("RateInputTable");
- Table rateInputTable = tabEnv.from("RateInputTable");
- // 接入 kafka 的 connect 消费数据
- tabEnv.connect(new Kafka() // 从 kafka 中读取数据
- .version("universal") // 指定当前环境采用的 kafka 的版本号:"0.8", "0.9", "0.10", "0.11", and "universal"
- .topic("output_rate") // 指定消费的 topic 名字
- .property("zookeeper.connect", kafkaNode) // 指定 zookeeper 的集群地址
- .property("bootstrap.servers", kafkaNodeServer) // 指定消费 kafka 的集群地址
- ).withFormat(new CSV())
- .withSchema(new Schema()
- .field("timestamp", DataTypes.BIGINT())
- .field("type", DataTypes.STRING())
- .field("rate", DataTypes.INT())
- ).createTemporaryTable("RateOutputTable");
- // 将 table 数据写入 kafka 消息队列
- rateInputTable.executeInsert("RateOutputTable");
- // 打印数据信息
- tabEnv.toAppendStream(rateInputTable, StreamOutputKafkaApplication.Rate.class).print();
- env.execute();
本文由 mirson 创作分享, 如需进一步交流, 请加 QQ 群: 19310171 或访问 www.softart.cn
来源: https://segmentfault.com/a/1190000040133047