前言
如今大型的 IT 系统中, 都会使用分布式的方式, 同时会有非常多的中间件, 如 Redis, 消息队列, 大数据存储等, 但是实际核心的数据存储依然是存储在数据库, 作为使用最广泛的数据库, 如何将 MySQL 的数据与中间件的数据进行同步, 既能确保数据的一致性, 及时性, 也能做到代码无侵入的方式呢? 如果有这样的一个需求, 数据修改后, 需要及时的将 MySQL 中的数据更新到 Elasticsearch, 我们会怎么进行实现呢?
数据同步方案选择
针对上文的需求, 经过思考, 初步有如下的一些方案:
代码实现
针对代码中进行数据库的增删改操作时, 同时进行 Elasticsearch 的增删改操作.
mybatis 实现
通过 mybatis plugin 进行实现, 截取 sql 语句进行分析, 针对 insert,update,delete 的语句进行处理. 显然, 这些操作如果都是单条数据的操作, 是很容易处理的. 但是, 实际开发中, 总是会有一些批量的更新或者删除操作, 这时候, 就很难进行处理了.
Aop 实现
不管是通过哪种 Aop 方式, 根据制定的规则, 如规范方法名, 注解等进行切面处理, 但依然还是会出现无法处理批量操作数据的问题.
logstash
logstash 类似的同步组件提供的文件和数据同步的功能, 可以进行数据的同步, 只需要简单的配置就能将 MySQL 数据同步到 Elasticsearch, 但是 logstash 的原理是每秒进行一次增量数据查询, 将结果同步到 Elasticsearch, 实时性要求特别高的, 可能无法满足要求. 且此方案的性能不是很好, 造成资源的浪费.
实现方式 | 优缺点 |
---|---|
代码实现 | 技术难度低,侵入性强,实时性高 |
基于 mybatis | 有一定的技术难度,但是无法覆盖所有的场景 |
Aop 实现 | 技术难度低,半侵入性,需要规范代码,依然无法覆盖所有的场景 |
logstash | 技术难度低,无侵入性,无需开发,但会造成资源浪费。 |
那么是否有什么更好的方式进行处理吗? MySQL binlog 同步, 实时性强, 对于应用无任何侵入性, 且性能更好, 不会造成资源浪费, 那么就有了我今天的主角 --canal
canal
介绍
canal 是阿里巴巴的一个开源项目, 基于 java 实现, 整体已经在很多大型的互联网项目生产环境中使用, 包括阿里, 美团等都有广泛的应用, 是一个非常成熟的数据库同步方案, 基础的使用只需要进行简单的配置即可.
canal 是通过模拟成为 MySQL 的 slave 的方式, 监听 MySQL 的 binlog 日志来获取数据, binlog 设置为 row 模式以后, 不仅能获取到执行的每一个增删改的脚本, 同时还能获取到修改前和修改后的数据, 基于这个特性, canal 就能高性能的获取到 MySQL 数据数据的变更.
使用
canal 的介绍在官网有非常详细的说明, 如果想了解更多, 大家可以移步官网 (https://github.com/alibaba/canal) https://github.com/alibaba/canal/ 了解. 我这里补充下使用中不太容易理解部分.
canal 的部署主要分为 server 端和 client 端.
server 端部署好以后, 可以直接监听 MySQL binlog, 因为 server 端是把自己模拟成了 MySQL slave, 所以, 只能接受数据, 没有进行任何逻辑的处理, 具体的逻辑处理, 需要 client 端进行处理.
client 端一般是需要大家进行简单的开发. https://github.com/alibaba/canal/wiki/ClientAPI 有一个简单的示例, 很容易理解.
canal Adapter
为了便于大家的使用, 官方做了一个独立的组件 Adapter,Adapter 是可以将 canal server 端获取的数据转换成几个常用的中间件数据源, 现在支持 kafka,rocketmq,hbase,Elasticsearch, 针对这几个中间件的支持, 直接配置即可, 无需开发. 上文中, 如果需要将 MySQL 的数据同步到 Elasticsearch, 直接运行 canal Adapter, 修改相关的配置即可.
常见问题
无法接收到数据, 程序也没有报错?
一定要确保 MySQL 的 binlog 模式为 row 模式, canal 原理是解析 Binlog 文件, 并且直接中文件中获取数据的.
Adapter 使用无法同步数据?
按照官方文档, 检查配置项, 如 sql 的大小写, 字段的大小写可能都会有影响, 如果还无法搞定, 可以自己获取代码调试下, Adapter 的代码还是比较容易看懂的.
canal Adapter Elasticsearch 改造
因为有了 canal 和 canal Adapter 这个神器, 同步到 Elasticsearch,hbase 等问题都解决了, 但是自己的开发的过程中发现, Adapter 使用还是有些问题, 因为先使用的是 Elasticsearch 同步功能, 所以对 Elasticsearch 进行了一些改造:
Elasticsearch 初始化
一个全新的 Elasticsearch 无法使用, 因为没有创建 Elasticsearch index 和 mapping, 增加了对应的功能.
Elasticsearch 配置文件 mapping 节点增加两个参数:
- enablefieldmap: true
- fieldmap:
- id: "text"
- name: "text"
- c_time: "text"
enablefieldmap 是否需要自动生成 fieldmap, 默认为 false, 如果需要启动的时候就生成这设置为 true, 并且设置
fieldmap, 类似 Elasticsearch mapping 中每个字段的类型.
esconfig bug 处理
代码中获取 binlog 的日志处理时, 必须要获取数据库名, 但是当获取 binlog 为 type query 时, 是无法获取
数据库名的, 此处有 bug, 导致出现 "Outer adapter write failed" , 且未输出错误日志, 修复此 bug.
后续计划
增加 rabbit MQ 的支持
增加 Redis 的支持
源码
源码地址: https://github.com/itmifen/canal
微信号: itmifen
来源: https://www.cnblogs.com/joylee/p/10248106.html