《阿里巴巴mysql数据库binlog的增量订阅&消费组件》 https://github.com/alibaba/canal
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
名称:运河[kə'næl]
译意:水道/管道/沟渠
语言:纯java开发
定位:基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql
关键词:mysql binlog解析器/实时/队列和主题
基于日志增量订阅&消费支持的业务:
1.数据库镜像
2.数据库实时备份
3.多级索引 (卖家和买家各自分库索引)
4.search build
5.业务cache刷新
6.价格变化等重要业务消息
从上层来看,复制分成三步:
master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
slave将master的binary log events拷贝到它的中继日志(relay log);
slave重做中继日志中的事件,将改变反映它自己的数据。
CentOs7.3 搭建 MySQL 5.7.19 主从复制,以及复制实现细节分析
原理相对比较简单:
1.运河模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
2.mysql master收到dump请求,开始推送二进制日志给slave(也就是运河)
3.运河解析二进制对象(原始为字节流)
canal的原理是基于mysql binlog技术,所以这里一定需要开启mysql的binlog写入功能,建议配置binlog模式为row.
修改 etc/my.cnf
- $ cat /etc/my.cnf
- [mysqld]
- log-bin=mysql-bin #添加这一行就ok
- binlog-format=ROW #选择row模式
- server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
MySQL 安装
《CentOs7.3 安装 MySQL 5.7.19 二进制版本》
直接下载 访问: https://github.com/alibaba/canal/releases ,会列出所有历史的发布版本包 下载方式,比如以1.0.24版本为例子:
- $ ca / opt $ wget https: //github.com/alibaba/canal/releases/download/canal-1.0.24/canal.deployer-1.0.24.tar.gz
or 自己编译
- $ git clone git@github.com:alibaba/canal.git
- $ cd canal;
- $ mvn clean install -Dmaven.test.skip -Denv=release
编译完成后,会在根目录下产生target/canal.deployer-$version.tar.gz
- $ mkdir /opt/canal
- $ tar zxvf canal.deployer-$version.tar.gz -C /opt/canal
应用参数:
- $ vi conf / example / instance.properties
- #################################################
- ## mysql serverId
- canal.instance.mysql.slaveId = 1234
- canal.instance.master.address 需要改成自己的数据库信息
- canal.instance.master.address = 192.168.252.124:3306
- canal.instance.master.journal.name =
- canal.instance.master.position =
- canal.instance.master.timestamp =
- #canal.instance.standby.address =
- #canal.instance.standby.journal.name =
- #canal.instance.standby.position =
- #canal.instance.standby.timestamp =
- username/password,需要改成自己的数据库信息
- canal.instance.dbUsername = canal
- canal.instance.dbPassword = canal
- canal.instance.defaultDatabaseName =
- canal.instance.connectionCharset = UTF-8
- $ sh bin / startup.sh
- $ less logs / canal / canal.log
- 2017-08-28 16:21:08.945 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
- 2017-08-28 16:21:09.102 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.252.125:11111]
- 2017-08-28 16:21:10.087 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
具体instance的日志:
- $ less logs / example / example.log
- 2017-08-28 16:21:09.449 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
- 2017-08-28 16:21:09.460 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
- 2017-08-28 16:21:09.555 [main] WARN org.springframework.beans.TypeConverterDelegate - PropertyEditor [com.sun.beans.editors.EnumEditor] found through deprecated global PropertyEditorManager fallback - consider using a more isolated form of registration, e.g. on the BeanWrapper/BeanFactory!
- 2017-08-28 16:21:09.730 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
- 2017-08-28 16:21:09.988 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
- 2017-08-28 16:21:10.061 [destination = example , address = /192.168.252.124:3306 , EventParser] WARN c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just last position
- {"identity":{"slaveId":-1,"sourceAddress":{"address":"192.168.252.124","port":3306}},"postion":{"included":false,"journalName":"mysql-bin.000002","position":5225,"serverId":1,"timestamp":1503908357000}}
- $ sh bin / stop.sh
阿里巴巴 / canal /github 项目首页
把项目源码克隆下
- $ clone https://github.com/alibaba/canal.git
创建表
- DROP TABLE IF EXISTS `test`;
- CREATE TABLE `test` (
- `id` int(11) NOT NULL AUTO_INCREMENT,
- `name` varchar(255) DEFAULT NULL,
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;
导入开发工具:
,修改部署Canal 的服务器IP 即可
- \canal\example
启动测试类
- package com.alibaba.otter.canal.example;
- import java.net.InetSocketAddress;
- import org.apache.commons.lang.exception.ExceptionUtils;
- import com.alibaba.otter.canal.client.CanalConnector;
- import com.alibaba.otter.canal.client.CanalConnectors;
- import com.alibaba.otter.canal.common.utils.AddressUtils;
- /**
- * 单机模式的测试例子
- *
- * @author jianghang 2013-4-15 下午04:19:20
- * @version 1.0.4
- */
- public class SimpleCanalClientTest extends AbstractCanalClientTest {
- public SimpleCanalClientTest(String destination) {
- super(destination);
- }
- public static void main(String args[]) {
- // 根据ip,直接创建链接,无HA的功能
- String destination = "example";
- // String ip = AddressUtils.getHostIp();
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.252.125", 11111), destination, "", "");
- final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
- clientTest.setConnector(connector);
- clientTest.start();
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- try {
- logger.info("## stop the canal client");
- clientTest.stop();
- } catch(Throwable e) {
- logger.warn("##something goes wrong when stopping canal:\n{}", ExceptionUtils.getFullStackTrace(e));
- } finally {
- logger.info("## canal client is down.");
- }
- }
- });
- }
- }
在数据库。表里插入,测试订阅数据,等事物提价了,这边看是否订阅成功
响应
- * ****************************************************Batch Id: [5],
- count: [3],
- memsize: [160],
- Time: 2017 - 08 - 28 16 : 53 : 42 * Start: [mysql - bin.000002 : 6201 : 1503910428000(2017 - 08 - 28 16 : 53 : 48)] * End: [mysql - bin.000002 : 6383 : 1503910428000(2017 - 08 - 28 16 : 53 : 48)] * ***************************************************
- ================>binlog[mysql - bin.000002 : 6201],
- executeTime: 1503910428000,
- delay: -5311ms BEGIN---->Thread id: 9---------------->binlog[mysql - bin.000002 : 6329],
- name[penglei, test],
- eventType: INSERT,
- executeTime: 1503910428000,
- delay: -5311ms id: 8 type = int(11) update = true name: 测试订阅type = varchar(255) update = true----------------END---->transaction id: 106 === =============>binlog[mysql - bin.000002 : 6383],
- executeTime: 1503910428000,
- delay: -5311ms
来源: http://www.tuicool.com/articles/AB3AJzI