一, 环境介绍
canal 是阿里开源的中间件, 主要用于同步 mysql 数据库变更. 具体参见: https://github.com/alibaba/canal/releases
搭建环境:
vmware centos7 部署 mysql 和 canal
windows 开发 canal client, 自动捕获 mysql 数据库变更
二, Centos 安装 Mysql
1, 尝试用 yum 安装 mysql
wget http://repo.mysql.com/mysql57-community-release-el7-10.noarch.rpm
返回: 2018-07-13 16:04:42 (63.9 KB/s) - 'mysql57-community-release-el7-10.noarch.rpm' saved [25548/25548]
- sudo rpm -Uvh mysql57-community-release-el7-10.noarch.rpm
- sudo yum install -y mysql-community-server
如果执行顺利, 安装 mysql server 成功.
2. 改用阿里源安装
可是官方的 yum 源在国内访问效果不佳, 我下载 mysql server 的速度太慢了, 决定改用阿里源
- # 下载 wget
- yum install wget -y
- # 备份当前的 yum 源
- mv /etc/yum.repos.d /etc/yum.repos.d.backup4comex
- # 新建空的 yum 源设置目录
- mkdir /etc/yum.repos.d
- # 下载阿里云的 yum 源配置
- wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
- # 最后重建缓存
- yum clean all
- yum makecache
3. 安装 MariaDB
MariaDB 数据库管理系统是 MySQL 的一个分支, 主要由开源社区在维护, 采用 GPL 授权许可. 开发这个分支的原因之一是: 甲骨文公司收购了 MySQL 后, 有将 MySQL 闭源的潜在风险, 因此社区采用分支的方式来避开这个风险. MariaDB 的目的是完全兼容 MySQL, 包括 API 和命令行, 使之能轻松成为 MySQL 的代替品.
安装 mariadb, 大小 59 M.
[root@yl-web yl]# yum install mariadb-server mariadb
其它几条常用的 mariadb 命令:
- systemctl start mariadb #启动 MariaDB
- systemctl stop mariadb #停止 MariaDB
- systemctl restart mariadb #重启 MariaDB
- systemctl enable mariadb #设置开机启动
运行 systemctl start mariadb, 然后就可以正常使用 mysql 了
4. 设置数据库密码:
set password for 'root'@'localhost' =password('root');
5. 遇到的几个问题
从 windows 访问 centos mysql 失败
解决方案: 设置 mysql 允许远程连接
- mysql -u root;
- // 赋予任何主机访问数据的权限
- mysql>GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION;
- // 使修改生效
- mysql>FLUSH PRIVILEGES;
进行上述操作之后, 发现仍然连接失败, 返回错误
Can't connect to MySQL server on'10.168.12.43' (10060)
解决方案: 从 windows 连接 vmware 里面的 mysql 失败, 关闭 windows 防火墙后成功.
三, 部署 canal server
(参考: https://github.com/alibaba/canal/wiki/QuickStart )
1. 下载 canal server
https://github.com/alibaba/canal/releases
我下载的是 canal.exaple-1.0.24.gar.gz, 下载完成后解压缩:
- mkdir /tmp/canal
- tar zxvf canal.deployer-1.0.24.tar.gz -C /tmp/canal
2. 查看 binlog 相关数据库命令:
是否启用了日志
mysql>show variables like 'log_bin';
怎样知道当前的日志
mysql> show master status;
查看 mysql binlog 模式
show variables like 'binlog_format';
获取 binlog 文件列表
show binary logs;
查看当前正在写入的 binlog 文件
show master status\G
查看指定 binlog 文件的内容
show binlog events in 'mysql-bin.000002';
3. 开启 binlog
如果 log_bin 关闭, 需要在 etc 下面找到 my.cnf, 开启 binlog:
- server-id=1
- log-bin=/var/lib/mysql/mysql-bin
然后重启 mysql 服务.
4. 添加 canal mysql 数据库账号
- CREATE USER canal IDENTIFIED BY 'canal';
- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
- FLUSH PRIVILEGES;
5. 配置 canal 实例, 设置本地数据库信息
- vi conf/example/instance.properties
- ## mysql serverId
- canal.instance.mysql.slaveId = 1234
- # position info
- canal.instance.master.address = 10.168.12.43:3306
- canal.instance.master.journal.name =mysql-bin.000003
- canal.instance.master.position =
- canal.instance.master.timestamp =
- ......
- canal.instance.dbUsername = canal
- canal.instance.dbPassword = canal
- canal.instance.defaultDatabaseName =testcanal
- canal.instance.connectionCharset = UTF-8
- # table regex
- canal.instance.filter.regex = .*\\..*
6. 启动 canal
sh bin/startup.sh
7. 查看日志
- vi logs/canal/canal.log
- vi logs/example/example.log
四, canal lient demo
1. 引入 pom 依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.24</version>
</dependency>
2. 客户端代码
- public class ClientTest {
- public static void main(String args[]) {
- // 创建链接
- CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.168.12.43",
- 11111), "example", "","");
- int batchSize = 1000;
- int emptyCount = 0;
- try {
- connector.connect();
- connector.subscribe(".*\\..*");
- connector.rollback();
- int totalEmptyCount = 120;
- while (emptyCount <totalEmptyCount) {
- Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
- long batchId = message.getId();
- int size = message.getEntries().size();
- if (batchId == -1 || size == 0) {
- emptyCount++;
- System.out.println("empty count :" + emptyCount);
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- }
- } else {
- emptyCount = 0;
- printEntry(message.getEntries());
- }
- connector.ack(batchId); // 提交确认
- }
- System.out.println("empty too many times, exit");
- } finally {
- connector.disconnect();
- }
- }
- private static void printEntry(List<Entry> entrys) {
- for (Entry entry : entrys) {
- if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
- continue;
- }
- RowChange rowChage = null;
- try {
- rowChage = RowChange.parseFrom(entry.getStoreValue());
- } catch (Exception e) {
- throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
- e);
- }
- EventType eventType = rowChage.getEventType();
- System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
- entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
- entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
- eventType));
- for (RowData rowData : rowChage.getRowDatasList()) {
- if (eventType == EventType.DELETE) {
- printColumn(rowData.getBeforeColumnsList());
- } else if (eventType == EventType.INSERT) {
- printColumn(rowData.getAfterColumnsList());
- } else {
- System.out.println("-------> before");
- printColumn(rowData.getBeforeColumnsList());
- System.out.println("-------> after");
- printColumn(rowData.getAfterColumnsList());
- }
- }
- }
- }
- private static void printColumn(List<Column> columns) {
- for (Column column : columns) {
- System.out.println(column.getName() + ":" + column.getValue() + "update=" + column.getUpdated());
- }
- }
- }
3. 建立数据库连接, 进行 insert,delete 等数据库操作
五, 遇到的问题
1.canal 建立连接失败
解决方案: 用 telnet 命令测试建立连接仍然失败, 关闭 linux 防火墙.
systemctl stop firewalld.service
其他 centos7 防火墙相关命令:
- firewall-cmd --list-ports# 查看已经开放的端口:
- firewall-cmd --reload #重启 firewall
- systemctl stop firewalld.service #停止 firewall
- systemctl disable firewalld.service #禁止 firewall 开机启动
- firewall-cmd --state #查看默认防火墙状态 (关闭后显示 notrunning, 开启后显示 running)
来源: https://www.cnblogs.com/janes/p/9318576.html