1. Table API & SQL 实战运用
案例说明
功能说明
通过 socket 读取数据源, 进行单词的统计处理.
实现流程
初始化 Table 运行环境
转换操作处理:
1)以空格进行分割
2)给每个单词计数累加 1
3)根据单词进行分组处理
4)求和统计
5)输出打印数据
执行任务
FlinkTable API 方式实现
StreamTableApiApplication, 代码实现:
- // 获取流处理的运行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
- // 获取 Table 的运行环境
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
- // 接入数据源
- DataStreamSource<String> lines = env.socketTextStream("10.10.20.15", 9922);
- // 对字符串进行分词压平
- SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String line, Collector<String> out) throws Exception {
- Arrays.stream(line.split(" ")).forEach(out::collect);
- }
- });
- // 将 DataStream 转换成 Table 对象, 字段名默认的是 f0, 给定字段名是 Word
- Table table = tabEnv.fromDataStream(words, "word");
- // 按照单词进行分组聚合操作
- Table resultTable = table.groupBy("word").select("word, sum(1L) as counts");
- // 在流处理中, 数据会源源不断的产生, 需要累加处理, 只能采用用 toRestractStream
- // DataStream<WordCount> wordCountDataStream = tabEnv.toAppendStream(resultTable, WordCount.class);
- // wordCountDataStream.printToErr("toAppendStream>>>");
- DataStream<Tuple2<Boolean, WordCount>> wordCountDataStream = tabEnv.toRetractStream(resultTable, WordCount.class);
- wordCountDataStream.printToErr("toRetractStream>>>");
- env.execute();
测试验证:
开启 socket 输入, 输入字符串:
[root@flink1 flink-1.11.2]# nc -lk 9922
FlinkTable SQL 方式实现
代码实现:
StreamTableSqlApplication 实现类:
- // 获取流处理的运行环境
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
- // 获取 Table 的运行环境
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
- // 接入数据源
- DataStreamSource<String> lines = env.socketTextStream("10.10.20.15", 9922);
- // 对字符串进行分词压平
- SingleOutputStreamOperator<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
- @Override
- public void flatMap(String line, Collector<String> out) throws Exception {
- Arrays.stream(line.split(" ")).forEach(out::collect);
- }
- });
- // 将 DataStream 转换成 Table 对象, 字段名默认的是 f0, 给定字段名是 Word
- tabEnv.registerDataStream("t_wordcount", words, "word");
- // 按照单词进行分组聚合操作
- Table resultTable = tabEnv.sqlQuery("select word,count(1) as counts from t_wordcount group by word");
- DataStream<Tuple2<Boolean, WordCount>> wordCountDataStream = tabEnv.toRetractStream(resultTable, WordCount.class);
- wordCountDataStream.printToErr("toRetractStream>>>");
- env.execute();
2. Flink SQL 滚动窗口实战
Flink SQL 窗口说明
Flink SQL 支持的窗口聚合主要是两种: Windows 聚合和 Over 聚合. 这里主要介绍 Windows 聚合. Windows 聚合支持两种时间属性定义窗口: Event Time 和 Processing Time. 每种时间属性类型支持三种窗口类型: 滚动窗口 (TUMBLE), 滑动窗口(HOP) 和会话窗口(SESSION).
案例说明
统计在过去的 1 分钟内有多少用户点击了某个的网页, 可以通过定义一个窗口来收集最近 1 分钟内的数据, 并对这个窗口内的数据进行计算.
测试数据:
| 用户名 | 访问地址 | 访问时间 |
| ------ | --------------------- | -------------------- |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:00:00 |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:00:10 |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:00:49 |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:01:05 |
| 张三 | http://taobao.com/xxx | 2021-05-10 10:01:58 |
| 李四 | http://taobao.com/xxx | 2021-05-10 10:02:10 |
滚动窗口运用
滚动窗口 (Tumbling Windows) 要用 Tumble 类来定义, 另外还有三个方法:
over: 定义窗口长度
on: 用来分组 (按时间间隔) 或者排序 (按行数) 的时间字段
as: 别名, 必须出现在后面的 groupBy 中
实现步骤:
初始化流运行环境
在流模式下使用 blink planner
创建用户点击事件数据
将源数据写入临时文件并获取绝对路径
创建表载入用户点击事件数据
对表运行 SQL 查询, 并将结果作为新表检索
Table 转换成 DataStream
执行任务
TumbleUserClickApplication, 实现代码:
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
- // 将源数据写入临时文件并获取绝对路径
- String contents =
- "张三, http://taobao.com/xxx,2021-05-10 10:00:00\n" +
- "张三, http://taobao.com/xxx,2021-05-10 10:00:10\n" +
- "张三, http://taobao.com/xxx,2021-05-10 10:00:49\n" +
- "张三, http://taobao.com/xxx,2021-05-10 10:01:05\n" +
- "张三, http://taobao.com/xxx,2021-05-10 10:01:58\n" +
- "张三, http://taobao.com/xxx,2021-05-10 10:02:10\n";
- String path = FileUtil.createTempFile(contents);
- String ddl = "CREATE TABLE user_clicks (\n" +
- "username varchar,\n" +
- "click_url varchar,\n" +
- "ts TIMESTAMP(3),\n" +
- "WATERMARK FOR ts AS ts - INTERVAL'2'SECOND\n" +
- ") WITH (\n" +
- "'connector.type' = 'filesystem',\n" +
- "'connector.path' = '"+ path +"',\n" +
- "'format.type' = 'csv'\n" +
- ")";
- tabEnv.sqlUpdate(ddl);
- // 对表数据进行 sql 查询, 并将结果作为新表进行查询
- String query = "SELECT\n" +
- "TUMBLE_START(ts, INTERVAL'1'MINUTE),\n" +
- "TUMBLE_END(ts, INTERVAL'1'MINUTE),\n" +
- "username,\n" +
- "COUNT(click_url)\n" +
- "FROM user_clicks\n" +
- "GROUP BY TUMBLE(ts, INTERVAL'1'MINUTE), username";
- Table result = tabEnv.sqlQuery(query);
- tabEnv.toAppendStream(result, Row.class).print();
- env.execute();
以 1 分钟作为时间滚动窗口, 水印延迟 2 秒.
输出结果:
4> 2021-10-10T10:00,2021-10-10T10:01, 张三, 3
4> 2021-10-10T10:01,2021-10-10T10:02, 张三, 2
4> 2021-10-10T10:02,2021-10-10T10:03, 张三, 1
3. Flink SQL 滑动窗口实战
实现步骤
初始化流运行环境
在流模式下使用 blink planner
创建用户点击事件数据
将源数据写入临时文件并获取绝对路径
创建表载入用户点击事件数据
对表运行 SQL 查询, 并将结果作为新表检索
Table 转换成 DataStream
执行任务
实现代码
代码 HopUserClickApplication:
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
- // 将源数据写入临时文件并获取绝对路径
- String contents =
- "张三, http://taobao.com/xxx,2020-10-10 10:00:00\n" +
- "张三, http://taobao.com/xxx,2020-10-10 10:00:10\n" +
- "张三, http://taobao.com/xxx,2020-10-10 10:00:49\n" +
- "张三, http://taobao.com/xxx,2020-10-10 10:01:05\n" +
- "张三, http://taobao.com/xxx,2020-10-10 10:01:58\n" +
- "张三, http://taobao.com/xxx,2020-10-10 10:02:10\n";
- String path = FileUtil.createTempFile(contents);
- String ddl = "CREATE TABLE user_clicks (\n" +
- "username varchar,\n" +
- "click_url varchar,\n" +
- "ts TIMESTAMP(3),\n" +
- "WATERMARK FOR ts AS ts - INTERVAL'2'SECOND\n" +
- ") WITH (\n" +
- "'connector.type' = 'filesystem',\n" +
- "'connector.path' = '"+ path +"',\n" +
- "'format.type' = 'csv'\n" +
- ")";
- tabEnv.sqlUpdate(ddl);
- // 对表数据进行 sql 查询, 并将结果作为新表进行查询, 每隔 30 秒, 统计一次过去 1 分钟的数据
- String query = "SELECT\n" +
- "HOP_START(ts, INTERVAL'30'SECOND, INTERVAL'1'MINUTE),\n" +
- "HOP_END(ts, INTERVAL'30'SECOND, INTERVAL'1'MINUTE),\n" +
- "username,\n" +
- "COUNT(click_url)\n" +
- "FROM user_clicks\n" +
- "GROUP BY HOP (ts, INTERVAL'30'SECOND, INTERVAL'1'MINUTE), username";
- Table result = tabEnv.sqlQuery(query);
- tabEnv.toAppendStream(result, Row.class).print();
- env.execute();
每隔 30 秒, 统计一次过去 1 分钟的用户点击数量.
输出结果:
4> 2021-05-10T09:59:30,2021-05-10T10:00:30, 张三, 2
4> 2021-05-10T10:00,2021-05-10T10:01, 张三, 3
4> 2021-05-10T10:00:30,2021-05-10T10:01:30, 张三, 2
4> 2021-05-10T10:01,2021-05-10T10:02, 张三, 2
4> 2021-05-10T10:01:30,2021-05-10T10:02:30, 张三, 2
4> 2021-05-10T10:02,2021-05-10T10:03, 张三, 1
4. Flink SQL 会话窗口实战
实现步骤
初始化流运行环境
在流模式下使用 blink planner
创建用户点击事件数据
将源数据写入临时文件并获取绝对路径
创建表载入用户点击事件数据
对表运行 SQL 查询, 并将结果作为新表检索
Table 转换成 DataStream
执行任务
代码实现:
代码: SessionUserClickApplication
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
- StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, environmentSettings);
- // 将源数据写入临时文件并获取绝对路径
- String contents =
- "张三, http://taobao.com/xxx,2021-05-10 10:00:00\n" +
- "张三, http://taobao.com/xxx,2021-05-10 10:00:10\n" +
- "张三, http://taobao.com/xxx,2021-05-10 10:00:49\n" +
- "张三, http://taobao.com/xxx,2021-05-10 10:01:05\n" +
- "张三, http://taobao.com/xxx,2021-05-10 10:01:58\n" +
- "张三, http://taobao.com/xxx,2021-05-10 10:02:10\n";
- String path = FileUtil.createTempFile(contents);
- String ddl = "CREATE TABLE user_clicks (\n" +
- "username varchar,\n" +
- "click_url varchar,\n" +
- "ts TIMESTAMP(3),\n" +
- "WATERMARK FOR ts AS ts - INTERVAL'2'SECOND\n" +
- ") WITH (\n" +
- "'connector.type' = 'filesystem',\n" +
- "'connector.path' = '"+ path +"',\n" +
- "'format.type' = 'csv'\n" +
- ")";
- tabEnv.sqlUpdate(ddl);
- // 对表数据进行 sql 查询, 并将结果作为新表进行查询, 每隔 30 秒统计一次数据
- String query = "SELECT\n" +
- "SESSION_START(ts, INTERVAL'30'SECOND),\n" +
- "SESSION_END(ts, INTERVAL'30'SECOND),\n" +
- "username,\n" +
- "COUNT(click_url)\n" +
- "FROM user_clicks\n" +
- "GROUP BY SESSION (ts, INTERVAL'30'SECOND), username";
- Table result = tabEnv.sqlQuery(query);
- tabEnv.toAppendStream(result, Row.class).print();
- env.execute();
每隔 30 秒统计一次用户点击数据.
输出结果:
4> 2021-05-10T10:00,2021-05-10T10:00:40, 张三, 2
4> 2021-05-10T10:00:49,2021-05-10T10:01:35, 张三, 2
4> 2021-05-10T10:01:58,2021-05-10T10:02:40, 张三, 2
本文由 mirson 创作分享, 如需进一步交流, 请加 QQ 群: 19310171 或访问 www.softart.cn
来源: https://segmentfault.com/a/1190000040170657