背景
SQLServer 为实时更新数据同步提供了 CDC 机制, 类似于 MySQL 的 binlog, 将数据更新操作维护到一张 CDC 表中.
开启 cdc 的源表在插入 INSERT, 更新 UPDATE 和删除 DELETE 活动时会插入数据到日志表中. cdc 通过捕获进程将变更数据捕获到变更表中, 通过 cdc 提供的查询函数, 可以捕获这部分数据.
CDC 的使用条件
1.SQL server 2008 及以上的企业版, 开发版和评估版;
2. 需要开启代理服务 (作业).
3.CDC 需要业务库之外的额外的磁盘空间.
4.CDC 的表需要主键或者唯一主键.
图 1:Sqlserver CDC 原理
ADB4PG Sink 使用条件
需要提前使用建表语句, 在 ADB4PG 端建表, 系统不会自动创建 (如果有需要可以加这部分功能)
每张表需要有主键或唯一主键
当前支持的数据格式: INTEGER,BIGINT,SMALLINT,NUMERIC,DECIMAL,REAL,DOUBLEPERICISION,BOOLEAN,DATE,TIMESTAMP,VARCHAR
环境准备
SQLServer 环境准备
已有自建 SQLServer 或云上 RDS 实例 (示例使用云上 RDS SQLServer 实例)
已有 Windows 环境, 并安装 SSMS(SQL Server Management Studio), 部分命令需要在 SSMS 执行
SQLServer 环境建表
-- 创建源表
- create database connect
- GO
- use connect
- GO
- create table t1
- (
- a int NOT NULL PRIMARY KEY,
- b BIGINT,
- c SMALLINT,
- d REAL,
- e FLOAT,
- f DATETIME,
- g VARCHAR
- );
-- 开启 db 级的 cdc
sp_rds_cdc_enable_db
-- 验证数据库是否开启 cdc 成功
select * from sys.databases where is_cdc_enabled = 1
-- 对源表开启 cdc
exec sp_cdc_enable_table @source_schema='dbo', @source_name='t1', @role_name=null;
ADB4PG 端创建目标表
- CREATE DATABASE connect;
- create table t1
- (
- a int NOT NULL PRIMARY KEY,
- b BIGINT,
- c SMALLINT,
- d REAL,
- e FLOAT,
- f TIMESTAMP,
- g VARCHAR
- );
Kafka 环境准备
安装 Kafka Server
1. 下载 kafka 安装包, 并解压
SQL Server Source Connect 目前只支持 2.1.0 及以上版本的 Kafka Connect, 故需要安装高版本 kafka, 实例使用 kfakf-2.11-2.1.0. http://kafka.apache.org/downloads?spm=a2c4g.11186623.2.19.7dd34587dwy89h#2.1.0
2. 编辑 $KAFKA_HOME/config/server.properties
修改以下参数
- ...
- ## 为每台 broker 配置一个唯一的 id 号
- broker.id=0
- ...
- ## log 存储地址
- log.dirs=/home/gaia/kafka_2.11-2.1.0/logs
- ## kafka 集群使用的 zk 地址
- zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
- ...
3. 启动 kafka server
bin/kafka-server-start.sh config/server.properties
安装 Kafka Connect
1. 修改 kafka connect 配置文件
修改 $KAFKA_HOME/config/connect-distributed.properties
- ## kafka server 地址
- Bootstrap.servers=broker1:9092,broker2:9092,broker3:9092
- ## 为 kafka connector 选定一个消费 group id
- group.id=
- ## 安装插件的地址, 每次 kafka connector 启动时会动态加载改路径下的 jar 包, 可以将每个插件单独放到一个子路径
- plugin.path=
安装需要的 kafka-connect 插件
1. 将插件 jar 包放在我们在前面已经配置过的配置的 plugin.path 路径下
- sqlserver-source-connector
- https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/?spm=a2c4g.11186623.2.18.7dd34587dwy89h
oss-sink-connector, 需要使用代码自行编译, 注意在 pom 修改依赖的 kafka 及 scala 版本号
https://github.com/aliyun/kafka-connect-oss
adb4pg-jdbc-sink-connector, 需要下载以下 jar 包及对应 ADB For PG 的 JDBC 驱动
https://yq.aliyun.com/attachment/download/?spm=a2c4e.11153940.0.0.70ed10daVH6ZQO&id=7282
2. 编辑配置文件
# CDC connector 的配置文件 sqlserver-cdc-source.JSON
▽
- {
- "name": "sqlserver-cdc-source",
- "config": {
- "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
- "tasks.max" : "1",
- "database.server.name" : "server1",
- "database.hostname" : "database hostname",
- "database.port" : "1433",
- "database.user" : "xxxx",
- "database.password" : "xxxxxx",
- "database.dbname" : "connect",
- "schemas.enable" : "false",
- "mode":"incrementing",
- "incrementing.column.name":"a",
- "database.history.kafka.bootstrap.servers" : "kafka-broker:9092",
- "database.history.kafka.topic": "server1.dbo.t1",
- "value.converter.schemas.enable":"false",
- "value.converter":"org.apache.kafka.connect.json.JsonConverter"
- }
- }
- # oss sink 的配置文件 oss-sink.JSON
- {
- "name":"oss-sink",
- "config": {
- "name":"oss-sink",
- "topics":"server1.dbo.testdata",
- "connector.class":"com.aliyun.oss.connect.kafka.OSSSinkConnector",
- "format.class":"com.aliyun.oss.connect.kafka.format.json.JsonFormat",
- "flush.size":"1",
- "tasks.max":"4",
- "storage.class":"com.aliyun.oss.connect.kafka.storage.OSSStorage",
- "partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
- "timestamp.extractor":"Record",
- "oss.bucket":"traffic-csv",
- "partition.duration.ms":"10000",
- "path.format":"YYYY-MM-dd-HH",
- "locale":"US",
- "timezone":"Asia/Shanghai",
- "rotate.interval.ms":"30000"
- }
- }
有关 oss sinker 更详尽的配置, 见文档 https://github.com/aliyun/kafka-connect-oss
- ## adb4pg-jdbc-sink 配置文件
- {
- "name":"adb4pg-jdbc-sink",
- "config": {
- "name":"adb4pg-jdbc-sink",
- "topics":"server1.dbo.t1",
- "connector.class":"io.confluent.connect.jdbc.Adb4PgSinkConnector",
- "connection.url":"jdbc:postgresql://gp-8vb8xi62lohhh2777o.gpdb.zhangbei.rds.aliyuncs.com:3432/connect",
- "connection.user":"xxx",
- "connection.password":"xxxxxx",
- "col.names":"a,b,c,d,e,f,g",
- "col.types":"integer,bigint,smallint,real,doublepericision,timestamp,varchar",
- "pk.fields":"a",
- "target.tablename":"t1",
- "tasks.max":"1",
- "auto.create":"false",
- "table.name.format":"t1",
- "batch.size":"1"
- }
- }
由于 OSS sinker 使用了 hdfs 封装的 FileSystem, 需要将 OSS 相关的信息维护到 $KAFKA_HOME/config/core-site.xml 文件中
- <configuration>
- <property>
- <name>fs.oss.endpoint</name>
- <value>xxxxxxx</value>
- </property>
- <property>
- <name>fs.oss.accessKeyId</name>
- <value>xxxxxxx</value>
- </property>
- <property>
- <name>fs.oss.accessKeySecret</name>
- <value>xxxxxxx</value>
- </property>
- <property>
- <name>fs.oss.impl</name>
- <value>org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem</value>
- </property>
- <property>
- <name>fs.oss.buffer.dir</name>
- <value>/tmp/oss</value>
- </property>
- <property>
- <name>fs.oss.connection.secure.enabled</name>
- <value>false</value>
- </property>
- <property>
- <name>fs.oss.connection.maximum</name>
- <value>2048</value>
- </property>
- </configuration>
3. 启动已经配置好的 kafka-connector 插件
启动及删除 connect 任务命令
- ## 启动命令
- curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sqlserver-cdc-source.JSON
- curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @adb4pg-jdbc-sink.JSON
- curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @oss-sink.JSON
- ## 删除命令
- curl -s -X DELETE http://localhost:8083/connectors/sqlserver-cdc-source
- curl -s -X DELETE http://localhost:8083/connectors/adb4pg-jdbc-sink
- curl -s -X DELETE http://localhost:8083/connectors/oss-sink
在 ADB For PG 获取更新数据
SQLServer 插入赠 / 更 / 删数据记录
insert into t1(a,b,c,d,e,f,g) values(1, 2, 3, 4, 5, convert(datetime,'24-12-19 10:34:09 PM',5), 'h');
在 kafka topic 获取更新结果
先确认是否生成了 kafka-connect 所需的 topic 信息
bin/kafka-topics.sh --zookeeper zk_address --list
如截图, connect-configs, connect-offsets, connect-status 为 kafka-connect 用来存储任务数据更新状态的 topic.schema-changes-inventory 是维护 sqlserver 表结构的 topic.
可以通过 kafka consloe-consumer 上获取到的 topic 信息, 以确认 cdc 数据正确被采集到 kafka topic
bin/kafka-console-consumer.sh --Bootstrap-server xx.xx.xx.xx:9092 --topic server1.dbo.t1
在 ADB For PG 上查询同步过来的数据
注意: 因为是不同数据库之间的同步, 时区设置的不同可能会导致同步结果产生时区偏移, 需要在两侧数据库做好设置.
在 OSS 查看更新的数据
来源: https://yq.aliyun.com/articles/740747