本文主要分析的部分是 instance 启动时, parser 的一个启动和工作过程. 主要关注的是 AbstractEventParser 的 start() 方法中的 parseThread.
一, 序列图
二, 源码分析
parseThread 中包含的内容比较清晰, 代码不是很长, 我们逐步分析下.
2.1 构造数据库连接
erosaConnection = buildErosaConnection();
这里构造的, 应该是一个 mysql 的链接, 包括的内容都是从配置文件中过来的一些信息, 包括 mysql 的地址, 账号密码等.
2.2 启动心跳线程
startHeartBeat(erosaConnection);
这里的心跳, 感觉是个假的心跳, 并没有用到 connection 相关的内容. 启动一个定时任务, 默认 3s 发送一个心跳的 binlog 给 sink 阶段, 表名 parser 还在工作. 在 sink 阶段, 会把心跳的 binlog 直接过滤, 不会走到 store 过程.
2.3 dump 之前准备工作
这一步的代码也不复杂.
preDump(erosaConnection);
我们看看 preDump 都能够做什么? 在 MysqlEventParser 中, 我们可以看到, 主要做了几件事:
针对 binlog 格式进行过滤, 也就是我们在配置文件中指定 binlog 的格式, 不过目前我们默认的都是 ROW 模式.
针对 binlog image 进行过滤, 目前默认是 FULL, 也就是 binlog 记录的是变更前后的数据, 如果配置为 minimal, 那么只记录变更后的值, 可以减少 binlog 的文件大小.
构造表结构源数据的缓存 TableMetaCache
2.4 获取最后的位置信息
这一步是比较核心的, 也是保证 binlog 不丢失的核心代码.
- EntryPosition position = findStartPosition(erosaConnection);
- final EntryPosition startPosition = position;
- if (startPosition == null) {
- throw new CanalParseException("can't find start position for " + destination);
- }
- if (!processTableMeta(startPosition)) {
- throw new CanalParseException("can't find init table meta for " + destination
- + "with position :" + startPosition);
- }
具体的 findStartPosition 是怎么实现的, 请查阅下一篇文章.
如果没有找到最后的位置信息, 那么直接抛出异常, 否则还要进行一次判断, 也就是 processTableMeta, 我们看下这个方法做了什么.
- protected boolean processTableMeta(EntryPosition position) {
- if (isGTIDMode()) {
- if (binlogParser instanceof LogEventConvert) {
- // 记录 gtid
- ((LogEventConvert) binlogParser).setGtidSet(MysqlGTIDSet.parse(position.getGtid()));
- }
- }
- if (tableMetaTSDB != null) {
- if (position.getTimestamp() == null || position.getTimestamp() <= 0) {
- throw new CanalParseException("use gtid and TableMeta TSDB should be config timestamp> 0");
- }
- return tableMetaTSDB.rollback(position);
- }
- return true;
- }
如果开启了 GTID 模式, 那么直接设置 GTID 集合. 如果 tableMetaTSDB 不为空, 那么直接根据位置信息回滚到对应的表结构. 这个 tableMetaTSDB 记录的是一个表结构的时序, 使用的是 Druid 的一个功能, 把所有 DDL 记录在数据库中, 一般来说, 每 24 小时生成一份快照插入到数据库中, 这样能解决 DDL 产生的表结构不一致的问题, 也就是增加了一个表结构的回溯功能.
这边的 rollback 主要做的事情为:
根据位置信息 position 从数据库去查询对应的信息, 包括 binlog 文件名, 位点等. 然后记录到内存中, 使用的 Druid 的 SchemaRepository.console 方法.
2.5 开始 dump 数据
在 dump 之前, 代码中构造了一个 sink 类, 也就是 SinkFunction. 里面定义了一个 sink 方法, 主要的内容是对哪些数据进行过滤.
- try {
- CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, false);
- if (!running) {
- return false;
- }
- if (entry != null) {
- exception = null; // 有正常数据流过, 清空 exception
- transactionBuffer.add(entry);
- // 记录一下对应的 positions
- this.lastPosition = buildLastPosition(entry);
- // 记录一下最后一次有数据的时间
- lastEntryTime = System.currentTimeMillis();
- }
- return running;
- } catch (TableIdNotFoundException e) {
- throw e;
- } catch (Throwable e) {
- if (e.getCause() instanceof TableIdNotFoundException) {
- throw (TableIdNotFoundException) e.getCause();
- }
- // 记录一下, 出错的位点信息
- processSinkError(e,
- this.lastPosition,
- startPosition.getJournalName(),
- startPosition.getPosition());
- throw new CanalParseException(e); // 继续抛出异常, 让上层统一感知
- }
首先判断 parser 是否在运行, 如果不运行, 那么就直接抛弃. 运行时, 判断 entry 是否为空, 不为空的情况下, 直接将 entry 加入到 transactionBuffer 中. 这里我们说下这个 transactionBuffer, 其实类似于 Disruptor 中的一个环形队列 (默认长度为 1024), 维护了几个指针, 包括 put,get,ack 三个指针, 里面存储了需要进行传递到下一阶段的数据.
加到环形队列之后, 记录一下当前的位置信息和时间. 如果这个过程出错了, 需要记录下出错的位置信息, 这里的 processSinkError 其实就是打印了一下错误日志, 然后抛出了一个 CanalException, 让上一层感知.
说了这么多, 还没到真正开始 dump 的地方. 下面开始吧.
- if (isGTIDMode()) {
- erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
- } else {
- if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
- erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
- } else {
- erosaConnection.dump(startPosition.getJournalName(),
- startPosition.getPosition(),
- sinkHandler);
- }
- }
在新版本中, 增加了 GTID 的模式, 所以这里的 dump 需要判断怎么 dump, 发送什么命令给 mysql 来获取什么样的 binlog.
2.5.1 GTID 模式
如果开启了 GTID 模式 (在 instance.properties 开启), 那么需要发送 COM_BINLOG_DUMP_GTID 命令, 然后开始接受 binlog 信息, 进行 binlog 处理.
- public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException {
- updateSettings();
- sendBinlogDumpGTID(gtidSet);
- DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
- fetcher.start(connector.getChannel());
- LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
- LogContext context = new LogContext();
- while (fetcher.fetch()) {
- LogEvent event = null;
- event = decoder.decode(fetcher, context);
- if (event == null) {
- throw new CanalParseException("parse failed");
- }
- if (!func.sink(event)) {
- break;
- }
- }
- }
调用 LogDecoder.decode 方法, 对二进制进行解析, 解析为我们需要的 LogEvent, 如果解析失败, 抛出异常. 否则进行 sink, 如果 sink 返回的 false, 那么直接跳过, 否则加入到 transactionBuffer 中.
2.5.2 非 GTID 模式
这块有个逻辑判断, 如果找到的最后的位置信息中包含了时间戳, 如果没有 binlog 文件名, 那么在 MysqlConnection 中直接报错, 也就是必须既要有时间戳, 又要有 binlog 文件名, 才能进行 dump 操作.
这里的 dump 分了两步, 第一步就是发送 COM_REGISTER_SLAVE 命令, 伪装自己是一个 slave, 然后发送 COM_BINLOG_DUMP 命令接收 binlog.
- public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
- updateSettings();
- sendRegisterSlave();
- sendBinlogDump(binlogfilename, binlogPosition);
- DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
- fetcher.start(connector.getChannel());
- LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
- LogContext context = new LogContext();
- while (fetcher.fetch()) {
- LogEvent event = null;
- event = decoder.decode(fetcher, context);
- if (event == null) {
- throw new CanalParseException("parse failed");
- }
- if (!func.sink(event)) {
- break;
- }
- if (event.getSemival() == 1) {
- sendSemiAck(context.getLogPosition().getFileName(), binlogPosition);
- }
- }
- }
这里有个 mysql 半同步的标识, semival. 如果 semival==1, 说明需要进行 ack, 发送 SEMI_SYNC_ACK 给 master(我们这边 more 都不开启).
2.5.3 异常处理
如果整个过程中发生了异常, 有以下几种处理方式:
没有找到表, 说明起始的 position 在一个事务中, 需要重新找到事务的开始点
其他异常, processDumpError, 如果是 IO 异常, 而且 message 中包含 errno = 1236 错误, 表示从 master 读取 binlog 发生致命错误, 处理方法如下: http://blog.sina.com.cn/s/blog_a1e9c7910102wv2v.html .
如果当前 parser 不在运行, 抛出异常; 如果在运行, 抛出异常之后, 发送一个告警信息.
异常处理完成后, 在 finally 中, 首先将当前线程置为 interrupt, 然后关闭 mysql 连接. 如果关闭连接过程中, 抛出异常, 需要进行处理.
整个异常处理后, 首先暂停 sink 过程, 然后重置缓冲队列 TransctionBuffer, 重置 binlogParser. 最后, 如果 parser 还在运行, 那么 sleep 一段时间后重试.
- } catch (TableIdNotFoundException e) {
- exception = e;
- // 特殊处理 TableIdNotFound 异常, 出现这样的异常, 一种可能就是起始的 position 是一个事务当中, 导致 tablemap
- // Event 时间没解析过
- needTransactionPosition.compareAndSet(false, true);
- logger.error(String.format("dump address %s has an error, retrying. caused by",
- runningInfo.getAddress().toString()), e);
- } catch (Throwable e) {
- processDumpError(e);
- exception = e;
- if (!running) {
- if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) {
- throw new CanalParseException(String.format("dump address %s has an error, retrying.",
- runningInfo.getAddress().toString()), e);
- }
- } else {
- logger.error(String.format("dump address %s has an error, retrying. caused by",
- runningInfo.getAddress().toString()), e);
- sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
- }
- } finally {
- // 重新置为中断状态
- Thread.interrupted();
- // 关闭一下链接
- afterDump(erosaConnection);
- try {
- if (erosaConnection != null) {
- erosaConnection.disconnect();
- }
- } catch (IOException e1) {
- if (!running) {
- throw new CanalParseException(String.format("disconnect address %s has an error, retrying.",
- runningInfo.getAddress().toString()),
- e1);
- } else {
- logger.error("disconnect address {} has an error, retrying., caused by",
- runningInfo.getAddress().toString(),
- e1);
- }
- }
- }
- // 出异常了, 退出 sink 消费, 释放一下状态
- eventSink.interrupt();
- transactionBuffer.reset();// 重置一下缓冲队列, 重新记录数据
- binlogParser.reset();// 重新置位
- if (running) {
- // sleep 一段时间再进行重试
- try {
- Thread.sleep(10000 + RandomUtils.nextInt(10000));
- } catch (InterruptedException e) {
- }
- }
来源: https://www.cnblogs.com/f-zhao/p/9081666.html