一, 背景
事情是从公司前段时间的需求说起, 大家知道宜信是一家金融科技公司, 我们的很多数据与标准互联网企业不同, 大致来说就是:
玩数据的人都知道数据是非常有价值的, 然后这些数据是保存在各个系统的数据库中, 如何让需要数据的使用方得到一致性, 实时的数据呢?
过去的通用做法有几种, 分别是:
DBA 开放各个系统的备库, 在业务低峰期(比如夜间), 使用方各自抽取所需数据. 由于抽取时间不同, 各个数据使用方数据不一致, 数据发生冲突, 而且重复抽取, 相信不少 DBA 很头疼这个事情.
公司统一的大数据平台, 通过 Sqoop 在业务低峰期到各个系统统一抽取数据, 并保存到 Hive 表中, 然后为其他数据使用方提供数据服务. 这种做法解决了一致性问题, 但时效性差, 基本是 T+1 的时效.
基于 trigger 的方式获取增量变更, 主要问题是业务方侵入性大, 而且 trigger 也带来性能损失.
这些方案都不算完美. 我们在了解和考虑了不同实现方式后, 最后借鉴了 linkedin 的思想, 认为要想同时解决数据一致性和实时性, 比较合理的方法应该是来自于 log.
(此图来自: https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/)
把增量的 Log 作为一切系统的基础. 后续的数据使用方, 通过订阅 kafka 来消费 log.
比如:
大数据的使用方可以将数据保存到 Hive 表或者 Parquet 文件给 Hive 或 Spark 查询;
提供搜索服务的使用方可以保存到 Elasticsearch 或 HBase 中;
提供缓存服务的使用方可以将日志缓存到 Redis 或 alluxio 中;
数据同步的使用方可以将数据保存到自己的数据库中;
由于 kafka 的日志是可以重复消费的, 并且缓存一段时间, 各个使用方可以通过消费 kafka 的日志来达到既能保持与数据库的一致性, 也能保证实时性;
为什么使用 log 和 kafka 作为基础, 而不使用 Sqoop 进行抽取呢? 因为:
为什么不使用 dual write(双写)呢?, 请参考 https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/
这里就不多做解释了.
二, 总体架构
于是我们提出了构建一个基于 log 的公司级的平台的想法.
下面解释一下 DWS 平台, DWS 平台是有 3 个子项目组成:
Dbus(数据总线): 负责实时将数据从源端实时抽出, 并转换为约定的自带 schema 的 JSON 格式数据(UMS 数据), 放入 kafka 中;
Wormhole(数据交换平台): 负责从 kafka 读出数据 将数据写入到目标中;
Swifts(实时计算平台): 负责从 kafka 中读出数据, 实时计算, 并将数据写回 kafka 中.
图中:
Log extractor 和 dbus 共同完成数据抽取和数据转换, 抽取包括全量和增量抽取.
Wormhole 可以将所有日志数据保存到 HDFS 中; 还可以将数据落地到所有支持 jdbc 的数据库, 落地到 HBash,Elasticsearch,Cassandra 等;
Swifts 支持以配置和 SQL 的方式实现对进行流式计算, 包括支持流式 join,look up,filter,Windows aggregation 等功能;
Dbus web 是 dbus 的配置管理端, rider 除了配置管理以外, 还包括对 Wormhole 和 Swifts 运行时管理, 数据质量校验等.
由于时间关系, 我今天主要介绍 DWS 中的 Dbus 和 Wormhole, 在需要的时候附带介绍一下 Swifts.
三, dbus 解决方案
3.1 日志解析
如前面所说, Dbus 主要解决的是将日志从源端实时的抽出. 这里我们以 MySQL 为例子, 简单说明如何实现.
我们知道, 虽然 MySQL InnoDB 有自己的 log,MySQL 主备同步是通过 binlog 来实现的. 如下图:
图片来自: https://github.com/alibaba/canal
而 binlog 有三种模式:
Row 模式: 日志中会记录成每一行数据被修改的形式, 然后在 slave 端再对相同的数据进行修改.
Statement 模式: 每一条会修改数据的 sql 都会记录到 master 的 bin-log 中. slave 在复制的时候 SQL 进程会解析成和原来 master 端执行过的相同的 SQL 来再次执行.
Mixed 模式: MySQL 会根据执行的每一条具体的 sql 语句来区分对待记录的日志形式, 也就是在 Statement 和 Row 之间选择一种.
他们各自的优缺点如下:
此处来自: http://www.jquerycn.cn/a_13625
由于 statement 模式的缺点, 在与我们的 DBA 沟通过程中了解到, 实际生产过程中都使用 row 模式进行复制. 这使得读取全量日志成为可能.
通常我们的 MySQL 布局是采用 2 个 master 主库(vip)+ 1 个 slave 从库 + 1 个 backup 容灾库 的解决方案, 由于容灾库通常是用于异地容灾, 实时性不高也不便于部署.
为了最小化对源端产生影响, 显然我们读取 binlog 日志应该从 slave 从库读取.
读取 binlog 的方案比较多, GitHub 上不少, 参考 https://github.com/search?utf8=✓&q=binlog. 最终我们选用了阿里的 canal 做位日志抽取方.
Canal 最早被用于阿里中美机房同步, canal 原理相对比较简单:
Canal 模拟 MySQL Slave 的交互协议, 伪装自己为 MySQL Slave, 向 MySQL Slave 发送 dump 协议
MySQL master 收到 dump 请求, 开始推送 binary log 给 Slave(也就是 canal)
Canal 解析 binary log 对象(原始为 byte 流)
图片来自: https://github.com/alibaba/canal
3.2 解决方案
Dbus 的 MySQL 版主要解决方案如下:
对于增量的 log, 通过订阅 Canal Server 的方式, 我们得到了 MySQL 的增量日志:
按照 Canal 的输出, 日志是 protobuf 格式, 开发增量 Storm 程序, 将数据实时转换为我们定义的 UMS 格式(JSON 格式, 稍后我会介绍), 并保存到 kafka 中;
增量 Storm 程序还负责捕获 schema 变化, 以控制版本号;
增量 Storm 的配置信息保存在 Zookeeper 中, 以满足高可用需求.
Kafka 既作为输出结果也作为处理过程中的缓冲器和消息解构区.
在考虑使用 Storm 作为解决方案的时候, 我们主要是认为 Storm 有以下优点:
技术相对成熟, 比较稳定, 与 kafka 搭配也算标准组合;
实时性比较高, 能够满足实时性需求;
满足高可用需求;
通过配置 Storm 并发度, 可以活动性能扩展的能力;
3.3 全量抽取
对于流水表, 有增量部分就够了, 但是许多表需要知道最初 (已存在) 的信息. 这时候我们需要 initial load(第一次加载).
对于 initial load(第一次加载), 同样开发了全量抽取 Storm 程序通过 jdbc 连接的方式, 从源端数据库的备库进行拉取. initial load 是拉全部数据, 所以我们推荐在业务低峰期进行. 好在只做一次, 不需要每天都做.
全量抽取, 我们借鉴了 Sqoop 的思想. 将全量抽取 Storm 分为了 2 个部分:
数据分片
实际抽取
数据分片需要考虑分片列, 按照配置和自动选择列将数据按照范围来分片, 并将分片信息保存到 kafka 中.
下面是具体的分片策略:
全量抽取的 Storm 程序是读取 kafka 的分片信息, 采用多个并发度并行连接数据库备库进行拉取. 因为抽取的时间可能很长. 抽取过程中将实时状态写到 Zookeeper 中, 便于心跳程序监控.
3.4 统一消息格式
无论是增量还是全量, 最终输出到 kafka 中的消息都是我们约定的一个统一消息格式, 称为 UMS(unified message schema)格式.
如下图所示:
消息中 schema 部分, 定义了 namespace 是由 类型 + 数据源名 + schema 名 + 表名 + 版本号 + 分库号 + 分表号 能够描述整个公司的所有表, 通过一个 namespace 就能唯一定位.
_ums_op_ 表明数据的类型是 I(insert),U(update),D(删除);
_ums_ts_ 发生增删改的事件的时间戳, 显然新的数据发生的时间戳更新;
_ums_id_ 消息的唯一 id, 保证消息是唯一的, 但这里我们保证了消息的先后顺序(稍后解释);
payload 是指具体的数据, 一个 JSON 包里面可以包含 1 条至多条数据, 提高数据的有效载荷.
UMS 中支持的数据类型, 参考了 Hive 类型并进行简化, 基本上包含了所有数据类型.
3.5 全量和增量的一致性
在整个数据传输中, 为了尽量的保证日志消息的顺序性, kafka 我们使用的是 1 个 partition 的方式. 在一般情况下, 基本上是顺序的和唯一的.
但是我们知道写 kafka 会失败, 有可能重写, Storm 也用重做机制, 因此, 我们并不严格保证 exactly once 和完全的顺序性, 但保证的是 at least once.
因此_ums_id_变得尤为重要.
对于全量抽取,_ums_id_是唯一的, 从 zk 中每个并发度分别取不同的 id 片区, 保证了唯一性和性能, 填写负数, 不会与增量数据冲突, 也保证他们是早于增量消息的.
对于增量抽取, 我们使用的是 MySQL 的日志文件号 + 日志偏移量作为唯一 id.Id 作为 64 位的 long 整数, 高 7 位用于日志文件号, 低 12 位作为日志偏移量.
例如: 000103000012345678. 103 是日志文件号, 12345678 是日志偏移量.
这样, 从日志层面保证了物理唯一性(即便重做也这个 id 号也不变), 同时也保证了顺序性(还能定位日志). 通过比较_ums_id_ 消费日志就能通过比较_ums_id_知道哪条消息更新.
其实_ums_ts_与_ums_id_意图是类似的, 只不过有时候_ums_ts_可能会重复, 即在 1 毫秒中发生了多个操作, 这样就得靠比较_ums_id_了.
3.6 心跳监控和预警
整个系统涉及到数据库的主备同步, Canal Server, 多个并发度 Storm 进程等各个环节.
因此对流程的监控和预警就尤为重要.
通过心跳模块, 例如每分钟 (可配置) 对每个被抽取的表插入一条心态数据并保存发送时间, 这个心跳表也被抽取, 跟随着整个流程下来, 与被同步表在实际上走相同的逻辑(因为多个并发的的 Storm 可能有不同的分支), 当收到心跳包的时候, 即便没有任何增删改的数据, 也能证明整条链路是通的.
Storm 程序和心跳程序将数据发送公共的统计 topic, 再由统计程序保存到 influxdb 中, 使用 grafana 进行展示, 就可以看到如下效果:
图中是某业务系统的实时监控信息. 上面是实时流量情况, 下面是实时延时情况. 可以看到, 实时性还是很不错的, 基本上 1~2 秒数据就已经到末端 kafka 中.
Granfana 提供的是一种实时监控能力.
如果出现延时, 则是通过 dbus 的心跳模块发送邮件报警或短信报警.
3.7 实时脱敏
考虑到数据安全性, 对于有脱敏需求的场景, Dbus 的全量 storm 和增量 storm 程序也完成了实时脱敏的功能. 脱敏方式有 3 种:
总结一下: 简单的说, Dbus 就是将各种源的数据, 实时的导出, 并以 UMS 的方式提供订阅, 支持实时脱敏, 实际监控和报警.
四, Wormhole 解决方案
说完 Dbus, 该说一下 Wormhole, 为什么两个项目不是一个, 而要通过 kafka 来对接呢?
其中很大一个原因就是解耦, kafka 具有天然的解耦能力, 程序直接可以通过 kafka 做异步的消息传递. Dbus 和 Wornhole 内部也使用了 kafka 做消息传递和解耦.
另外一个原因就是, UMS 是自描述的, 通过订阅 kafka, 任何有能力的使用方来直接消费 UMS 来使用.
虽然 UMS 的结果可以直接订阅, 但还需要开发的工作. Wormhole 解决的是: 提供一键式的配置, 将 kafka 中的数据落地到各种系统中, 让没有开发能力的数据使用方通过 wormhole 来实现使用数据.
如图所示, Wormhole 可以将 kafka 中的 UMS 落地到各种系统, 目前用的最多的 HDFS,JDBC 的数据库和 HBase.
在技术栈上, wormhole 选择使用 spark streaming 来进行.
在 Wormhole 中, 一条 flow 是指从一个 namaspace 从源端到目标端. 一个 spark streaming 服务于多条 flow.
选用 Spark 的理由是很充分的:
Spark 天然的支持各种异构存储系统;
虽然 Spark Stream 比 Storm 延时稍差, 但 Spark 有着更好的吞吐量和更好的计算性能;
Spark 在支持并行计算方面有更强的灵活性;
Spark 提供了一个技术栈内解决 Sparking Job,Spark Streaming,Spark SQL 的统一功能, 便于后期开发;
这里补充说一下 Swifts 的作用:
Swifts 的本质是读取 kafka 中的 UMS 数据, 进行实时计算, 将结果写入到 kafka 的另外一个 topic.
实时计算可以是很多种方式: 比如过滤 filter,projection(投影),lookup, 流式 join Windows aggregation, 可以完成各种具有业务价值的流式实时计算.
Wormhole 和 Swifts 对比如下:
4.1 落 HDFS
通过 Wormhole Wpark Streaming 程序消费 kafka 的 UMS, 首先 UMS log 可以被保存到 HDFS 上.
kafka 一般只保存若干天的信息, 不会保存全部信息, 而 HDFS 中可以保存所有的历史增删改的信息. 这就使得很多事情变为可能:
通过重放 HDFS 中的日志, 我们能够还原任意时间的历史快照.
可以做拉链表, 还原每一条记录的历史信息, 便于分析;
当程序出现错误是, 可以通过回灌(backfill), 重新消费消息, 重新形成新的快照.
可以说 HDFS 中的日志是很多的事情基础.
介于 Spark 原生对 parquet 支持的很好, Spark SQL 能够对 Parquet 提供很好的查询. UMS 落地到 HDFS 上是保存到 Parquet 文件中的. Parquet 的内容是所有 log 的增删改信息以及_ums_id_,_ums_ts_都存下来.
Wormhole spark streaming 根据 namespace 将数据分布存储到不同的目录中, 即不同的表和版本放在不同目录中.
由于每次写的 Parquet 都是小文件, 大家知道 HDFS 对于小文件性能并不好, 因此另外还有一个 job, 每天定时将这些的 Parquet 文件进行合并成大文件.
每个 Parquet 文件目录都带有文件数据的起始时间和结束时间. 这样在回灌数据时, 可以根据选取的时间范围来决定需要读取哪些 Parquet 文件, 不必读取全部数据.
4.2 插入或更新数据的幂等性
常常我们遇到的需求是, 将数据经过加工落地到数据库或 HBase 中. 那么这里涉及到的一个问题就是, 什么样的数据可以被更新到数据?
这里最重要的一个原则就是数据的幂等性.
无论是遇到增删改任何的数据, 我们面临的问题都是:
该更新哪一行;
更新的策略是什么.
对于第一个问题, 其实就需要定位数据要找一个唯一的键, 常见的有:
使用业务库的主键;
由业务方指定几个列做联合唯一索引;
对于第二个问题, 就涉及到_ums_id_了, 因为我们已经保证了_ums_id_大的值更新, 因此在找到对应数据行后, 根据这个原则来进行替换更新.
之所以要软删除和加入_is_active_列, 是为了这样一种情况:
如果已经插入的 umsid 比较大, 是删除的数据(表明这个数据已经删除了), 如果不是软删除, 此时插入一个 umsid 小的数据(旧数据), 就会真的插入进去.
这就导致旧数据被插入了. 不幂等了. 所以被删除的数据依然保留 (软删除) 是有价值的, 它能被用于保证数据的幂等性.
4.3 HBase 的保存
插入数据到 Hbase 中, 相当要简单一些. 不同的是 HBase 可以保留多个版本的数据 (当然也可以只保留一个版本) 默认是保留 3 个版本;
因此插入数据到 HBase, 需要解决的问题是:
选择合适的 rowkey:Rowkey 的设计是可以选的, 用户可以选择源表的主键, 也可以选择若干列做联合主键.
选择合适的 version: 使用_ums_id_+ 较大的偏移量(比如 100 亿) 作为 row 的 version.
Version 的选择很有意思, 利用_ums_id_的唯一性和自增性, 与 version 自身的比较关系一致: 即 version 较大等价于_ums_id_较大, 对应的版本较新.
从提高性能的角度, 我们可以将整个 Spark Streaming 的 Dataset 集合直接插入到 HBase, 不需要比较. 让 HBase 基于 version 自动替我们判断哪些数据可以保留, 哪些数据不需要保留.
Jdbc 的插入数据: 插入数据到数据库中, 保证幂等的原理虽然简单, 要想提高性能在实现上就变得复杂很多, 总不能一条一条的比较然后在插入或更新.
我们知道 Spark 的 RDD/dataset 都是以集合的方式来操作以提高性能, 同样的我们需要以集合操作的方式实现幂等性.
具体思路是:
首先根据集合中的主键到目标数据库中查询, 得到一个已有数据集合;
与 dataset 中的集合比较, 分出两类:
A: 不存在的数据, 即这部分数据 insert 就可以;
B: 存在的数据, 比较_ums_id_, 最终只将哪些_ums_id_更新较大 row 到目标数据库, 小的直接抛弃.
使用 Spark 的同学都知道, RDD/dataset 都是可以 partition 的, 可以使用多个 worker 并进行操作以提高效率.
在考虑并发情况下, 插入和更新都可能出现失败, 那么还有考虑失败后的策略.
比如: 因为别的 worker 已经插入, 那么因为唯一性约束插入失败, 那么需要改为更新, 还要比较_ums_id_看是否能够更新.
对于无法插入其他情况(比如目标系统有问题),Wormhole 还有重试机制. 插入到其他存储中的就不多介绍了, 总的原则是: 根据各自存储自身特性, 设计基于集合的, 并发的插入数据实现. 这些都是 Wormhole 为了性能而做的努力, 使用Wormhole 的用户不必关心 .
五, 运用案例
5.1 实时营销
说了那么多, DWS 有什么实际运用呢? 下面我来介绍某系统使用 DWS 实现了的实时营销.
如上图所示:
系统 A 的数据都保存到自己的数据库中, 我们知道, 宜信提供很多金融服务, 其中包括借款, 而借款过程中很重要的就是信用审核.
借款人需要提供证明具有信用价值的信息, 比如央行征信报告, 是具有最强信用数据的数据. 而银行流水, 网购流水也是具有较强的信用属性的数据.
借款人通过 Web 或手机 App 在系统 A 中填写信用信息时, 可能会某些原因无法继续, 虽然可能这个借款人是一个优质潜在客户, 但以前由于无法或很久才能知道这个信息, 所以实际上这样的客户是流失了.
应用了 DWS 以后, 借款人已经填写的信息已经记录到数据库中, 并通过 DWS 实时的进行抽取, 计算和落地到目标库中. 根据对客户的打分, 评价出优质客户. 然后立刻将这个客户的信息输出到客服系统中.
客服人员在很短的时间 (几分钟以内) 就通过打电话的方式联系上这个借款人(潜客), 进行客户关怀, 将这个潜客转换为真正的客户. 我们知道借款是有时效性的, 如果时间太久就没有价值了.
如果没有实时抽取 / 计算 / 落库的能力, 那么这一切都无法实现.
5.2 实时报表系统
另外一个实时报表的应用如下:
我们数据使用方的数据来自多个系统, 以前是通过 T+1 的方式获得报表信息, 然后指导第二天的运营, 这样时效性很差.
通过 DWS, 将数据从多个系统中实时抽取, 计算和落地, 并提供报表展示, 使得运营可以及时作出部署和调整, 快速应对.
六, 总结
DWS 技术上基于主流实时流式大数据技术框架, 高可用大吞吐强水平扩容, 低延迟高容错最终一致.
DWS 能力上支持异构多源多目标系统, 支持多数据格式 (结构化半结构化非结构化数据) 和实时技术能力.
DWS 将三个子项目合并作为一个平台推出, 使得我们具备了实时的能力, 驱动各种实时场景应用.
适合场景包括: 实时同步 / 实时计算 / 实时监控 / 实时报表 / 实时分析 / 实时洞察 / 实时管理 / 实时运营 / 实时决策
来源: https://www.cnblogs.com/yixinjishu/p/11200622.html