binlog 的寻找过程可能的场景如下:
instance 第一次启动
发生数据库主备切换
canal server HA 情况下的切换
所以这个过程是能够保证 binlog 不丢失的关键点.
本文从源码的角度来分析下启动过程中的 binlog 寻找过程.
一, 流程图
下图是根据源码画出的流程图, 需要结合源码分析来一起看.
二, 源码分析
入口在 AbstractEventParser 的 start()方法中, 这个 start 方法其实是 instance 的整个启动过程. 具体启动过程中都做了哪些事情, 请见另一篇文章的分析. 这块不再赘述. 我们主要看的地方是
- // 4. 获取最后的位置信息
- EntryPosition position = findStartPosition(erosaConnection);
这一行就是获取 binlog 的解析位置, 也是本文着重要分析的地方. 因为我们目前所配置的都是 MysqlEventParser, 所以我们分析的也是这个类中的相关代码.
- protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
- if (isGTIDMode()) {
- // GTID 模式下, CanalLogPositionManager 里取最后的 gtid, 没有则取 instanc 配置中的
- LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination);
- if (logPosition != null) {
- return logPosition.getPostion();
- }
- if (StringUtils.isNotEmpty(masterPosition.getGtid())) {
- return masterPosition;
- }
- }
- EntryPosition startPosition = findStartPositionInternal(connection);
- if (needTransactionPosition.get()) {
- logger.warn("prepare to find last position : {}", startPosition.toString());
- Long preTransactionStartPosition = findTransactionBeginPosition(connection, startPosition);
- if (!preTransactionStartPosition.equals(startPosition.getPosition())) {
- logger.warn("find new start Transaction Position , old : {} , new : {}",
- startPosition.getPosition(),
- preTransactionStartPosition);
- startPosition.setPosition(preTransactionStartPosition);
- }
- needTransactionPosition.compareAndSet(true, false);
- }
- return startPosition;
- }
2.1 GTID 模式
我们目前的数据库架构一般都是 M-S, 所以 binlog 的位点很可能不一致, 这就需要开启数据库 GTID 模式(通过在 instance.properties 中配置 canal.instance.gtidon=true 即可开启), 这是一个全局的事务 ID, 能够防止主从位点不一致的情况下, 找不到位点的问题. 目前这块是从 CanalLogPositionManager 中取最后的 GTID.default-instance.xml 中, 使用的 CanalLogPositionManager 是 FailbackLogPositionManager, 一个两级的位点管理器, XML 配置如下:
- <!-- 解析位点记录 -->
- <property name="logPositionManager">
- <bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
- <constructor-arg>
- <bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
- </constructor-arg>
- <constructor-arg>
- <bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
- <constructor-arg ref="metaManager"/>
- </bean>
- </constructor-arg>
- </bean>
- </property>
一级是放到本地缓存中, 第二级直接打了个 info 日志, 有点弱, 其实考虑的情况是性能, 另一个考虑可能是因为 DB 的主从切换, 并不会导致 instance 挂掉, 内存中还是存储了之前 DB 的一些解析位点信息. 其实都没有放到 zk 中, 不利于做 HA, 所以这块目前还不是很完善, 真正要使用 GTID 的话, 需要对 CanalLogPositionManager 进行修改. 目前已经提供了其他的一些实现, 包括定时刷新到 zk 中等等.
如果 CanalLogPositionManager 中没有存储的话, 也可以在 instance.properties 里面指定位点和 GTID 信息, 也能从 binlog 中获取.
2.2 非 GTID 模式
如果 canal 没有开启 GTID 模式, 那么我们就需要走一个 binlog 的寻找过程.
EntryPosition startPosition = findStartPositionInternal(connection);
这个方法是个冗长的方法, 里面的判断逻辑就是上面的流程图, 我们来梳理一下.
首先还是从 CanalLogPositionManager 中获取, 也就是基本上从内存中获取 LogPosition.
LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
2.2.1 内存中不存在 LogPosition
2.2.1.1
首先判断配置文件中的主库信息是否与当前的数据库连接 connection 的地址一致, 如果一致, 如果一致, 那么直接取 properties 文件中的 master 的位点信息.
2.2.1.2
如果主库不一致, 那么判断从库 standby 的 connection 地址, 如果是从库, 那么直接取从库的位点信息.
我们可以在 xml 配置中看到 properties 的一些信息.
- <!-- 解析起始位点 -->
- <property name="masterPosition">
- <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
- <property name="journalName" value="${canal.instance.master.journal.name}" />
- <property name="position" value="${canal.instance.master.position}" />
- <property name="timestamp" value="${canal.instance.master.timestamp}" />
- <property name="gtid" value="${canal.instance.master.gtid}" />
- </bean>
- </property>
- <property name="standbyPosition">
- <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
- <property name="journalName" value="${canal.instance.standby.journal.name}" />
- <property name="position" value="${canal.instance.standby.position}" />
- <property name="timestamp" value="${canal.instance.standby.timestamp}" />
- <property name="gtid" value="${canal.instance.standby.gtid}" />
- </bean>
- </property>
- 2.2.1.3
如果内存中没有, 配置文件中也没有, 那么系统默认从当前时间开始消费.
- entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
- protected EntryPosition findEndPositionWithMasterIdAndTimestamp(MysqlConnection connection) {
- MysqlConnection mysqlConnection = (MysqlConnection) connection;
- final EntryPosition endPosition = findEndPosition(mysqlConnection);// 获取当前最新的位点信息
- if (tableMetaTSDB != null) {
- long startTimestamp = System.currentTimeMillis();
- return findAsPerTimestampInSpecificLogFile(mysqlConnection,
- startTimestamp,
- endPosition,
- endPosition.getJournalName(),
- true);
- } else {
- return endPosition;
- }
- }
这里的 findEndPosition()方法, 其实就是执行了一个 Mysql 命令:
show master status
返回的内容中, 包含 binlog 文件信息和位点 position, 甚至包括 GTID 信息.
找到了最新的 binlog 位点信息后, 根据当前时间戳和 binlog 的时间戳等信息, 去服务器上面寻找 binlog. 其实逻辑基本上都在 findAsPerTimestampInSpecificLogFile()中, 这个方法是根据时间戳去寻找, 离时间戳最近 (小于时间戳) 的一个事务起始位置. 由于这块的代码比较长, 所以我们只做分析, 不做代码粘贴, 具体的代码在 MysqlEventParser 这个类中. 整个寻找的过程如下:
先看一下这个 seek 的过程, 见代码注释:
- /**
- * 加速主备切换时的查找速度, 做一些特殊优化, 比如只解析事务头或者尾
- */
- public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
- updateSettings();// 在 mysql 中执行一些 dump 之前的命令
- sendBinlogDump(binlogfilename, binlogPosition);// 指定位点和 binlog 文件, 发送 dump 命令, COM_BINLOG_DUMP
- DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
- fetcher.start(connector.getChannel());// 开始获取
- LogDecoder decoder = new LogDecoder();
- decoder.handle(LogEvent.ROTATE_EVENT);
- decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT);
- decoder.handle(LogEvent.QUERY_EVENT);
- decoder.handle(LogEvent.XID_EVENT);
- LogContext context = new LogContext();
- while (fetcher.fetch()) {// 遍历获取
- LogEvent event = null;
- event = decoder.decode(fetcher, context);// 解析为 event
- if (event == null) {
- throw new CanalParseException("parse failed");
- }
- if (!func.sink(event)) {// 调用 SinkFunction.sink()过滤
- break;
- }
- }
- }
下面我们看下数据过滤这块:
起始位置为 4, 也就是跳过一个魔法值, 具体可以看 binlog 的结构说明
之后就是一个过滤的过程
首先把事件 event 解析一个 entry, 这个 entry 使用的是消息模型 EntryProtocol.proto
首先判断当前事件是否为事务开始或者结束的位置, 如果是, 判断事件的时间, 如果在我们需要的时间之后, 直接过滤这条 entry
如果当前 entry 的 binlog 文件名和最新的 binlog 文件名相同, 并且最新的位点小于 entry 的位点, 那么直接过滤
如果当前 entry 的类型表示的是事务开始或者事务结束, 那么直接取当前 entry 的位点信息, 利用当前 entry 构建位点信息, 也就是找到了我们需要的事务起点.
2.2.1.4
如果 binlog 文件名为空, 首先判断时间戳是否存在, 如果存在, 那么直接按照时间戳去取, 否则默认从当前最后一个位置进行消费.
- // 如果没有指定 binlogName, 尝试按照 timestamp 进行查找
- if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp()> 0L) {
- logger.warn("prepare to find start position {}:{}:{}",
- new Object[] { "","", entryPosition.getTimestamp() });
- return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp());
- } else {
- logger.warn("prepare to find start position just show master status");
- return findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
- }
这块我们看下 findByStartTimestamp()这个方法, 也就是只根据时间来查找 binlog. 这块的逻辑是这样的:
首先获取最新和最老的 binlog 文件
从最新的 binlog 中, 根据时间去找, 调用的方法也是 findAsPerTimestampInSpecificLogFile()
如果已经从最新的到最老的 binlog 文件中找遍了, 没找到, 说明根本没有对应时间的 binlog
否则不断的遍历 binlog 文件, 因为 binlog 文件名的后缀都是连续的, 所以可以很快的寻找
2.2.1.5
binlog 文件名不为空, 首先判断是否有位点信息, 如果有的话, 直接根据当前内存中存储的位点和文件信息去 Mysql 获取.
否则, 根据当前内存中管理的时间戳去获取, 根据时间戳和 binlog 文件名去获取位点. 当然, 如果时间戳也不存在, 直接从 binlog 文件名的文件开头去获取 binlog.
2.2.2 内存中存在历史成功记录
2.2.2.1 内存中的位点信息对应的数据库 ip 和当前连接的 ip 一致
如果 dump 错误的次数超过了一定的阈值, 默认是 2 次, 也就是连续几次定位失败, 有几种情况:
binlog 位点被删除
vip 模式的 mysql, 发生了主备切换
这种需要进行一次判断, 判断内容:
- boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
- && logPosition.getPostion().getServerId() != null
- && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
判断几个, 第一个配置文件中的 standby 为空, 第二个内存中的 logPosition 存在数据库 ip, 第三个内存中的 logPosition 的数据库 ip 和当前数据库连接 connection 的数据库 ip 不一致.
满足这三个条件, 说明发生了 vip 的主备切换, 此时需要把 logPosition 中的时间戳向前推一个回退时间, 默认 60s, 然后根据新的时间戳去找 binlog 文件和位点信息.
- if (case2) {
- long timestamp = logPosition.getPostion().getTimestamp();
- long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
- logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "","",
- logPosition.getPostion().getTimestamp() });
- EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
- // 重新置为一下
- dumpErrorCount = 0;
- return findPosition;
- }
2.2.2.2 不一致的情况
说明发生了主从切换, 这种情况下, 直接把 logPosition 中的时间回退 60s, 然后根据回退后的时间去 binlog 中寻找, 然后返回.
来源: https://www.cnblogs.com/f-zhao/p/9079990.html