整理了一些 Java 方面的架构, 面试资料(微服务, 集群, 分布式, 中间件等), 有需要的小伙伴可以关注公众号[程序员内点事] , 无套路自行领取
更多优选
一口气说出 9 种 分布式 ID 生成方式, 面试官有点懵了
3 万字总结, MySQL 优化之精髓 https://juejin.im/post/5e3eb616f265da570d734dcb
为了不复制粘贴, 我被逼着学会了 JAVA 爬虫
技术部突然宣布: JAVA 开发人员全部要会接口自动化测试框架
Redis 5 种数据结构及对应使用场景, 全会面试要加分的
写在前边
在家远程办公第三周, 快被手机上的消息搞的有些神经质了, 生怕错过一条有用的信息, 没办法形势如此, 公司摇摇欲坠大家也都如履薄冰, 毕竟这时候失业有点惨(穷怕了).
但就干活来说还是比较清闲的, 和在公司上班相比, 清闲下来很多碎片时间, 可以随意的做点自己喜欢的事情. 而且我发现, 人一但闲下来真的是好可怕, 潜在的才能会全面爆发, 我女朋友这个抖音深度患者, 一年不做一回饭的主, 一周内接连给我做了两顿黑暗料理, 烤馒头版 "蛋糕", 浆糊版 "凉皮", 然后我就与厕所结下来不解之缘...
不过, 作为一个程序员, 我对黑暗料理是不太感兴趣滴, 闲下来还是喜欢学习钻研一些新奇的技术, canal 就成了很好的研究对象, 一不小心就监控了公司 MySQL 的一举一动的
一, canal 是个啥?
canal 是阿里开发的一款基于数据库增量日志解析, 提供增量数据订阅与消费的框架, 整个框架纯 JAVA 开发, 目前仅支持 MySQL 和 MariaDB(和 MySQL 类似).
那什么是数据库增量日志?
MySQL 的日志种类是比较多的, 主要包含: 错误日志, 查询日志, 慢查询日志, 事务日志, 二进制日志. 而 MySQL 数据库所发生的数据变更 (DML(data manipulation language) 数据操纵语言, 也就是我们熟悉的增删改), 都会以二进制日志 (binary log) 形式存储.
二, canal 原理
在介绍 canal 原理之前, 我们先来回顾一下 MySQL 主从同步的原理, 这或许会让你更好的理解 canal 的工作机制.
1,MySQL 主从同步原理:
MySQL 主从同步也叫读写分离, 可以提升数据库的负载和容错能力, 实现数据库的高可用
先来分析一张 MySQL 主从同步原理图:
以上图片源自网络, 如有侵权联系删除
master 节点操作过程:
当 master 节点数据发生更改后(delete,update,insert, 还是创建函数, 存储过程等操作), 向 binary log 中写入记录日志, 这些记录又叫做二进制日志事件(binary log events).
show binlog events
这些事件会按照顺序写入 bin log 中. 当 slave 节点启动连接到 master 节点的时候, master 节点会为 slave 节点开启 binlog dump 线程(负责传输 binlog 数据).
一旦 master 节点的 bin log 发生变化时, bin log` `dump 线程会通知 slave 节点有可以传输的 binlog, 并将相应的 bin log 内容发送给 slave 节点.
slave 节点操作过程:
slave 节点上会创建两个线程: 一个 I/O 线程, 一个 SQL 线程. I/O 线程连接到 master 节点, master 节点上的 binlog dump 线程会将 binlog 的内容发送给该 I\O 线程.
该 I/O 线程接收到 binlog 内容后, 再将内容写入到本地的 relay log. 而 sql 线程读取到 I/O 线程写入的 ralay log, 将 relay log 中的内容写入 slave 数据库.
2,canal 原理
懂了上边 MySQL 的主从同步原理, canal 的工作机制就很好理解了. 其实 canal 是模拟了 MySQL 数据库中, slave 节点与 master 节点的交互协议, 伪装自己为 MySQL slave 节点, 向 MySQL master 节点发送 dump 协议, MySQL master 节点收到 dump 请求, 开始推送 binary log 给 slave 节点(也就是 canal).
以上图片源自网络, 如有侵权联系删除
光说不练假把式, 开干!
三, canal 实现 "监控"MySQL
在写代码前我们先对 MySQL 进行一下改造, 安装 MySQL 就不再细说了, 基本操作.
1, 查看一下 MySQL 是否开启了 binary log 功能
show binary logs
如果没有开启是图中的状态, 一般用户是没有这个命令权限的, 不过我有, 啧啧啧!
如果没有需要手动开启, 并且在 my.cnf 文件中配置 binlog-format 为 Row 模式
- log-bin=mysq-bin
- binlog-format=Row
log-bin 是 binlog 文件存放位置
binlog-format 设置 MySQL 复制 log-bin 的方式
MySQL 的三种复制方式:
基于 SQL 语句的复制(statement-based replication, SBR)
优点: 将修改数据的 sql 保存在 binlog, 不需要记录每一条 sql 和数据变化, binlog 体量会很小, IO 开销少, 性能好
缺点: 会导致 master-slave 中的数据不一致
基于行的复制(row-based replication, RBR)
优点: 不记录每条 sql 语句的上下文信息, 仅需记录哪条数据被修改了, 修改成什么样了
缺点: binlog 体积很大, 尤其是在 alter table 属性时, 会产生大量 binlog 数据
混合模式复制(mixed-based replication, MBR)
对应的, binlog 的格式也有三种: STATEMENT,ROW,MIXED.
2, 为 canal 创建一个有权限操作 MySQL 的用户
- CREATE USER canal IDENTIFIED BY 'canal';
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
- -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
- FLUSH PRIVILEGES;
3, 安装 canal
下载地址: https://github.com/alibaba/canal/releases
下载后选择版本例如: canal.deployer-xxx.tar.gz
4, 配置 canal
修改 instance.properties 文件, 需要添加监听数据库和表的规则, canal 可以全量监听数据库, 也可以针对某个表进行监听, 比较灵活.
- VIM conf/example/instance.properties
- #################################################
- ## MySQL serverId
- canal.instance.MySQL.slaveId = 2020
- # position info 修改自己的数据库(canal 要监听的数据库 地址 )
- canal.instance.master.address = 127.0.0.1:3306
- canal.instance.master.journal.name =
- canal.instance.master.position =
- canal.instance.master.timestamp =
- #canal.instance.standby.address =
- #canal.instance.standby.journal.name =
- #canal.instance.standby.position =
- #canal.instance.standby.timestamp =
- # username/password 修改成自己 数据库信息的账号 (单独开一个 准备阶段创建的账号)
- canal.instance.dbUsername = canal
- canal.instance.dbPassword = canal
- canal.instance.defaultDatabaseName =
- canal.instance.connectionCharset = UTF-8
- # table regex 表的监听规则
- # canal.instance.filter.regex = blogs\.blog_info
- canal.instance.filter.regex = .\*\\\\..\*
- # table black regex
- canal.instance.filter.black.regex =
启动 canal
sh bin/startup.sh
看一下 server 日志, 确认一下 canal 是否正常启动
vi logs/canal/canal.log
显示 canal server is running now 即为成功
- 2020-01-08 15:25:33.361 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
- 2020-01-08 15:25:33.468 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
- 2020-01-08 15:25:34.061 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
5, 编写 Java 客户端代码, 实现 canal 监听
引入依赖包
- <dependency>
- <groupId>com.alibaba.otter</groupId>
- <artifactId>canal.client</artifactId>
- <version>1.1.0</version>
- </dependency>
这里只是简单实现
- public class MainApp {
- public static void main(String... args) throws Exception {
- /**
- * 创建与
- */
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
- 11111), "example", "","");
- int batchSize = 1000;
- int emptyCount = 0;
- try {
- connector.connect();
- /**
- * 监控数据库中所有表
- */
- connector.subscribe(".*\\..*");
- /**
- * 指定要监控的表, 库名. 表名
- */
- //connector.subscribe("xin-master.jk_order");
- connector.rollback();
- //120 次心跳过后未检测到, 跳出
- int totalEmptyCount = 120;
- while (emptyCount <totalEmptyCount) {
- Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
- long batchId = message.getId();
- int size = message.getEntries().size();
- if (batchId == -1 || size == 0) {
- emptyCount++;
- System.out.println("empty count :" + emptyCount);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- } else {
- emptyCount = 0;
- // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
- printEntry(message.getEntries());
- }
- /**
- * 提交确认
- */
- connector.ack(batchId);
- /**
- * 处理失败, 回滚数据
- */
- connector.rollback(batchId);
- }
- System.out.println("empty too many times, exit");
- } finally {
- connector.disconnect();
- /**
- * 手动开启事务回滚
- */
- //TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
- }
- }
- private static void printEntry(List<CanalEntry.Entry> entrys) {
- for (CanalEntry.Entry entry : entrys) {
- if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry
- .EntryType
- .TRANSACTIONEND) {
- continue;
- }
- CanalEntry.RowChange rowChage = null;
- try {
- rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
- e);
- }
- CanalEntry.EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType));
- for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == CanalEntry.EventType.DELETE) {
- printColumn(rowData.getBeforeColumnsList());
- } else if (eventType == CanalEntry.EventType.INSERT) {
- printColumn(rowData.getAfterColumnsList());
- } else {
- System.out.println("-------> before");
- printColumn(rowData.getBeforeColumnsList());
- System.out.println("-------> after");
- printColumn(rowData.getAfterColumnsList());
- }
- }
- }
- }
- private static void printColumn(List<CanalEntry.Column> columns) {
- for (CanalEntry.Column column : columns) {
- System.out.println(column.getName() + ":" + column.getValue() + "update=" + column.getUpdated());
- }
- }
- }
代码到这就编写完成了, 我们启动服务看下是什么效果, 由于并没有操作数据库, 所以监听的结果都是空的.
接下来我们在数据库执行一条 update 语句试试
update jk_orderset order_no = '1111' where id = 40
控制台检测到了数据库的修改, 并生成 binlog 日志文件
MySQL-bin.000009:3830
那么生成的 binlog 文件该怎么用, 如何解析成 SQl 语句呢?
- <!-- mysql binlog 解析 -->
- <dependency>
- <groupId>com.GitHub.shyiko</groupId>
- <artifactId>MySQL-binlog-connector-java</artifactId>
- <version>0.13.0</version>
- </dependency>
将刚才的 binlog 文件下载本地测试一下
- public static void main(String[] args) throws IOException {
- String filePath = "C:\\ProgramData\\MySQL\\MySQL Server 5.7\\Data\\mysql-bin.000009:3830";
- File binlogFile = new File(filePath);
- EventDeserializer eventDeserializer = new EventDeserializer();
- eventDeserializer.setChecksumType(ChecksumType.CRC32);
- BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
- try {
- for (Event event; (event = reader.readEvent()) != null; ) {
- System.out.println(event.toString());
- }
- } finally {
- reader.close();
- }
- }
查看一下执行结果, 发现数据库最近的一次操作是加了一个 idx_index 索引
- Event{
- header=EventHeaderV4{
- timestamp=1551325542000, eventType=ANONYMOUS_GTID, serverId=1, headerLength=19, dataLength=46, nextPosition=8455, flags=0
- }, data=null
- }
- Event{
- header=EventHeaderV4{
- timestamp=1551325542000, eventType=QUERY, serverId=1, headerLength=19, dataLength=190, nextPosition=8664, flags=0
- }, data=QueryEventData{
- threadId=25, executionTime=0, errorCode=0, database='xin-master', sql='ALTER TABLE `jk_order`
- DROP INDEX `idx_index` ,
- ADD INDEX `idx_index` (`user_id`, `service_id`, `real_price`) USING BTREE'
- }
- }
- Event{
- header=EventHeaderV4{
- timestamp=1551438586000, eventType=STOP, serverId=1, headerLength=19, dataLength=4, nextPosition=8687, flags=0
- }, data=null
- }
至此我们就已经实现了监控 MySQL,
四, canal 应用场景
canal 应用场景大致有以下:
解决 MySQL 主从同步延迟的问题
实现数据库实时备份
多级索引 (卖家和买家各自分库索引)
实现业务 cache 刷新
价格变化等重要业务消息
重点分析一下 canal 是如何解决 MySQL 主从同步延迟的问题
生产环境下 MySQL 的主从同步模式 (maser-slave) 很常见, 但对于跨机房部署的集群, 会出现同步延时的情况. 举个栗子:
一条订单状态是未付款, master 节点修改成已付款, 可由于某些原因出现延迟数据未能及时同步到 slave, 这时用户立即查看订单状态 (查询走 slave) 显示还是未付款, 哪个用户看到这种情况不得慌啊.
为什么会出现主从同步延迟呢?
当主库 master 的 TPS 并发较高时, master 节点并发产生的修改操作, 而 slave 节点的 sql 线程是单线程处理同步数据, 延时自然而言就产生了.
不过造成主从同步的原因不止这些, 由于主从服务器存在跨机器并且跨机房, 除了网络带宽原因之外, 网络的稳定性以及机器之间的同步, 都是主从同步应该考虑的主要原因.
总结
本文只是简单实现 canal 监听数据库的功能, 旨在给大家提供一种解决问题的思路, 还是反复絮叨的那句话, 解决问题的技术方法很对, 具体如何应用还需结合具体业务.
今天就说这么多, 如果本文对您有一点帮助, 希望能得到您一个点赞哦
您的认可才是我写作的动力!
整理了一些 Java 方面的架构, 面试资料(微服务, 集群, 分布式, 中间件等), 有需要的小伙伴可以关注公众号[程序员内点事] , 无套路自行领取 https://mp.weixin.qq.com/s/7Sb-INOrSfQ00sxETLmNog
来源: https://www.cnblogs.com/chengxy-nds/p/12334977.html