一般来说,我们对于数据库最主要的要求就是:数据不丢。不管是主从复制,还是使用类似otter+canal这样的数据库同步方案,我们最基本的需求是,在数据不丢失的前提下,尽可能的保证系统的高可用,也就是在某个节点挂掉,或者数据库发生主从切换等情况下,我们的数据同步系统依然能够发挥它的作用--数据同步。本文讨论的场景是数据库发生主从切换,本文将从源码的角度,来看看otter和canal是如何保证高可用和高可靠的。
通过阅读文档和源码,我们可以知道,对于一个canal server,基础的框架包括以下几个部分:MetaManager、EventParser、EventSink和EventStore。其中EventParser的作用就是发送dump命令,从mysql数据库获取binlog文件。发送dump命令,可以指定时间戳或者position,从指定的时间或者位置开始dump。我们来看看过程:
首先是CanalServer启动。otter默认使用的是内置版的canal server,所以我们主要看CanalServerWithEmbedded这个类。来看下他的启动过程:
- public void start(final String destination) {
- final CanalInstance canalInstance = canalInstances.get(destination);
- if (!canalInstance.isStart()) {
- try {
- MDC.put("destination", destination);
- canalInstance.start(); //启动实例
- logger.info("start CanalInstances[{}] successfully", destination);
- } finally {
- MDC.remove("destination");
- }
- }
- }
我们看下实例启动那一行,跟到AbstractCanalInstance类中
- public void start() {
- super.start();
- if (!metaManager.isStart()) {
- metaManager.start(); //源数据管理启动
- }
- if (!alarmHandler.isStart()) {
- alarmHandler.start(); //报警处理器启动
- }
- if (!eventStore.isStart()) {
- eventStore.start(); //数据存储器启动
- }
- if (!eventSink.isStart()) {
- eventSink.start(); //数据过滤器启动
- }
- if (!eventParser.isStart()) { //数据解析器启动
- beforeStartEventParser(eventParser);
- eventParser.start();
- afterStartEventParser(eventParser);
- }
- logger.info("start successful....");
- }
我们主要看下eventParser.start()方法里面的内容。我们主要关注的是EventParser使如何在主从切换的条件下,进行dump节点的确定的。我们跟踪到AbstractEventParser类中的start()方法,重点看下
- // 4. 获取最后的位置信息
- EntryPosition position = findStartPosition(erosaConnection);
这块有两个实现,但是canal目前使用的是MysqlEventParser,也就是基于Mysql的Binlog文件来进行数据同步。我们看下代码:
- protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
- EntryPosition startPosition = findStartPositionInternal(connection);
- if (needTransactionPosition.get()) {
- logger.warn("prepare to find last position : {}", startPosition.toString());
- Long preTransactionStartPosition = findTransactionBeginPosition(connection, startPosition);
- if (!preTransactionStartPosition.equals(startPosition.getPosition())) {
- logger.warn("find new start Transaction Position , old : {} , new : {}",
- startPosition.getPosition(),
- preTransactionStartPosition);
- startPosition.setPosition(preTransactionStartPosition);
- }
- needTransactionPosition.compareAndSet(true, false);
- }
- return startPosition;
- }
对于第一行findStartPositionInternal(connection),我们重点关注的情况是数据库连接地址发生变化,也就是进行了主从切换的情况。
- boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null) && logPosition.getPostion().getServerId() != null && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
- if (case2) {
- long timestamp = logPosition.getPostion().getTimestamp();
- long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
- logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] {
- "",
- "",
- logPosition.getPostion().getTimestamp()
- });
- EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
- // 重新置为一下
- dumpErrorCount = 0;
- return findPosition;
- }
我们分析下case2这个条件,其实就是表示的就是配置了主从切换,而且发生了serverId变化的情况,在这种情况下,首先需要获取到事件发生的时间戳,然后将这个事件发生的时间减去60s,也就是向前推一分钟之后,在新的binlog文件中根据新的时间戳来找到当时对应的事件。
这块根据时间戳来寻找事件的过程比较简单,首先根据binglog-index文件找到所有的binlog文件名,然后遍历binlog文件的头,找到binlog文件的写入时间,与新的时间戳进行对比,定位到binlog文件。定位到文件后,直接根据时间戳来进行遍历,找到新的时间戳之前发生的那个事务起始位置。
- /**
- * 根据给定的时间戳,在指定的binlog中找到最接近于该时间戳(必须是小于时间戳)的一个事务起始位置。
- * 针对最后一个binlog会给定endPosition,避免无尽的查询
- */
- private EntryPosition findAsPerTimestampInSpecificLogFile(MysqlConnection mysqlConnection, final Long startTimestamp, final EntryPosition endPosition, final String searchBinlogFile) {
- final LogPosition logPosition = new LogPosition();
- try {
- mysqlConnection.reconnect();
- // 开始遍历文件
- mysqlConnection.seek(searchBinlogFile, 4L, new SinkFunction < LogEvent > () {
- private LogPosition lastPosition;
- public boolean sink(LogEvent event) {
- EntryPosition entryPosition = null;
- try {
- CanalEntry.Entry entry = parseAndProfilingIfNecessary(event);
- if (entry == null) {
- return true;
- }
- String logfilename = entry.getHeader().getLogfileName();
- Long logfileoffset = entry.getHeader().getLogfileOffset();
- Long logposTimestamp = entry.getHeader().getExecuteTime();
- if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType()) || CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
- logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[] {
- logfilename,
- logfileoffset,
- logposTimestamp,
- startTimestamp
- });
- // 事务头和尾寻找第一条记录时间戳,如果最小的一条记录都不满足条件,可直接退出
- if (logposTimestamp >= startTimestamp) {
- return false;
- }
- }
- if (StringUtils.equals(endPosition.getJournalName(), logfilename) && endPosition.getPosition() <= (logfileoffset + event.getEventLen())) {
- return false;
- }
- // 记录一下上一个事务结束的位置,即下一个事务的position
- // position = current +
- // data.length,代表该事务的下一条offest,避免多余的事务重复
- if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
- entryPosition = new EntryPosition(logfilename, logfileoffset + event.getEventLen(), logposTimestamp);
- logger.debug("set {} to be pending start position before finding another proper one...", entryPosition);
- logPosition.setPostion(entryPosition);
- } else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
- // 当前事务开始位点
- entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp);
- logger.debug("set {} to be pending start position before finding another proper one...", entryPosition);
- logPosition.setPostion(entryPosition);
- }
- lastPosition = buildLastPosition(entry);
- } catch(Throwable e) {
- processSinkError(e, lastPosition, searchBinlogFile, 4L);
- }
- return running;
- }
- });
- } catch(IOException e) {
- logger.error("ERROR ## findAsPerTimestampInSpecificLogFile has an error", e);
- }
- if (logPosition.getPostion() != null) {
- return logPosition.getPostion();
- } else {
- return null;
- }
- }
这块的逻辑如下:
至此,我们已经找到了我们想要的binlog文件名和对应的事务开始position,我们继续下面的步骤即可。
这块内容的主要思想如下:
至此,我们基本上知道了canal是如何在发生数据库主从切换时保证高可用和高可靠的,我们可能还有疑惑:为什么要回退60s,来解析binlog,这样不会导致数据重复吗?还有一些自增的update语句(不具备幂等性),不会产生数据错误吗?要想回答这些问题,就需要我们了解Binlog的Row模式了。
Mysql Binlog的Row模式记录的,是数据库中每一行的数据变化,而不仅仅是sql语句。比如我们对数据库中的多行,使用一条sql语句进行了修改。在这种情况下,如果Binlog模式为Statement,只会记录一条sql语句。而Row模式下,会对每一行的数据变化进行记录,以及变化前后每个字段的值。这也就是为什么Row模式的binlog文件如此之大的原因。
对于一些不具备幂等性的sql语句,采用Row语句进行Binlog解析时,也是可以通过重复执行,来保证我们数据的最终一致性的。这也就解释了,为什么要回退60s来进行Binlog位点定位、解析的问题。考虑到Mysql主从的数据复制的延迟性(60s,一般来说的延迟没有这么久),我们可以在主节点挂掉的情况下,回退60s到从节点上继续进行binlog的解析。
当然,也需要考虑一些极端的情况,也就是主从复制确实超过了60s的延迟,在这种情况下,就需要otter登场了。基本思路是:反查数据库同步 (以数据库最新版本同步,解决交替性,比如设置一致性反查数据库延迟阀值为60秒,即当同步过程中发现数据延迟超过了60秒,就会基于PK反查一次数据库,拿到当前最新值进行同步,减少交替性的问题)。
来源: http://www.cnblogs.com/f-zhao/p/7681960.html