使用 Blink SQL+UDAF 实现差值聚合计算介绍了如何使用 Blink SQL+UDAF 实现实时流上的差值聚合计算, 后来在与 @付典就业务需求和具体实现方式进行探讨时, 付典提出通过 CEP 实现的思路和方法.
本文介绍通过 CEP 实现实时流上的差值聚合计算.
感谢 @付典在实现过程中的指导. 笔者水平有限, 若有纰漏, 请批评指出.
一, 客户需求
电网公司每天采集各个用户的电表数据(格式如下表), 其中 data_date 为电表数据上报时间, cons_id 为电表 id,r1 为电表度数, 其他字段与计算逻辑无关, 可忽略. 为了后续演示方便, 仅输入 cons_id=100000002 的数据.
no(string) | data_date(string) | cons_id(string) | org_no(string) | r1(double) |
---|---|---|---|---|
101 | 20190716 | 100000002 | 35401 | 13.76 |
101 | 20190717 | 100000002 | 35401 | 14.12 |
101 | 20190718 | 100000002 | 35401 | 16.59 |
101 | 20190719 | 100000002 | 35401 | 18.89 |
表 1: 输入数据
电网公司希望通过实时计算 (Blink) 对电表数据处理后, 每天得到每个电表最近两天 (当天和前一天) 的差值数据, 结果类似如下表:
cons_id(string) | data_date(string) | subDegreeR1(double) |
---|---|---|
100000002 | 20190717 | 0.36 |
100000002 | 20190718 | 2.47 |
100000002 | 20190719 | 2.3 |
表 2: 期望的输出数据
二, 需求分析
根据业务需求以及 CEP 跨事件模式匹配的特性, 定义两个 CEP 事件 e1 和 e2, 输出 e2.r1-e1.r1 即可得到差值.
三, CEP 开发及测试结果
参考复杂事件处理 (CEP) 语句, CEP 代码如下:
- CREATE TABLE input_dh_e_mp_read_curve (
- `no` VARCHAR,
- data_date VARCHAR,
- cons_id VARCHAR,
- org_no VARCHAR,
- r1 DOUBLE,
- ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss')
- ,WATERMARK wk FOR ts as withOffset(ts, 2000)
- ) WITH (
- type = 'datahub',
- endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',
- roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',
- project = 'jszc_datahub',
- topic = 'input_dh_e_mp_read_curve'
- );
- CREATE TABLE data_out(
- cons_id varchar
- ,data_date varchar
- ,subDegreeR1 DOUBLE
- )with(
- type = 'print'
- );
- insert into data_out
- select
- cons_id,
- data_date,
- subDegreeR1
- from input_dh_e_mp_read_curve
- MATCH_RECOGNIZE(
- PARTITION BY cons_id
- ORDER BY ts
- MEASURES
- e2.data_date as data_date,
- e2.r1 - e1.r1 as subDegreeR1
- ONE ROW PER MATCH
- AFTER MATCH SKIP TO NEXT ROW
- PATTERN(e1 e2)
- DEFINE
- e1 as TRUE,
- e2 as TRUE
- );
由于使用了 print connector, 从对应的 sink 的 taskmanager.out 日志中可以查看到输出如下:
- task-1> (+)100000002,20190717,0.35999999999999943
- task-1> (+)100000002,20190718,2.4700000000000006
对比期望输出(表 2),20190717 和 20190718 两个窗口的数据均正确, 表明业务逻辑正确, 但此输出与期望输出有少许差异:
(1)20190719 的数据没有输出, 这是因为我们设置了 watermark, 测试环境下 20190719 之后没有数据进来触发 20190719 对应的窗口的结束.
四, 其他说明
1, 对比使用 Blink SQL+UDAF 实现差值聚合计算(1), 我们可以看出使用 CEP 开发代码非常简洁, 所以在跨事件处理的情况下 CEP 还是非常的合适. 从另外一个方面讲, 同样的需求有不同的实现方式, 所以融会贯通 Blink SQL 中的各种语法, 利用更合适的语法来实现业务需求, 将可能大大提升工作效率和业务性能.
2, 在实现本案例时, 笔者发现使用 CEP 时有如下需要注意的地方:
(1)partiton by 里的字段(如本案的 cons_id), 默认会带到输出里, 若同时在 MEASURES 中定义, 则可能会报类似如下错误:
(2)define 及其内容必须定义, 否则前端页面提示类似如下错误:
来源: https://yq.aliyun.com/articles/757915