1. 背景
在我们的业务开发中, 往往会碰到下面这个场景:
业务更新数据写到数据库中
业务更新数据需要实时传递给下游依赖处理
所以传统的处理架构可能会这样:
但这个架构也存在着不少弊端: 我们需要在项目中维护很多发送消息的代码. 新增或者更新消息都会带来不少维护成本. 所以, 更好的处理方式应该是直接将数据库的数据接入到流式系统中, 如下图:
本文将演示如何在 E-MapReduce 上实现将 RDS binlog 实时同步到 Kafka 集群中.
2. 环境准备
实验中使用 VPC 网络环境, 以下实例创建时默认都是在 VPC 环境下.
2.1 准备一个测试 RDS 数据库
创建一个 RDS 实例, 版本选择 5.7. 这里不赘述如何创建 RDS, 详细流程请参考 RDS 文档. 创建完如图:
2.2 准备一个 Kafka 集群
创建一个 E-MapReduce Kafka 集群, 版本选择 EMR-3.11.0. 需要注意, 这里必须选择 EMR-3.11.0 以上版本, 否则不会默认安装启动 Kafka Connect 服务. 详细创建流程请参考 E-MapReduce 文档. 创建完如图:
注意: RDS 实例和 E-MapReduce Kafka 集群最好在同一个 VPC 中, 否则需要打通两个 VPC 之间的网络.
- 3. Kafka Connect
- 3.1 Connector
Kafka Connect 是一个用于 Kafka 和其他数据系统之间进行数据传输的工具, 它可以实现基于 Kafka 的数据管道, 打通上下游数据源. 我们需要做的就是在 Kafka Connect 服务上运行一个 Connector, 这个 Connector 是具体实现如何从 / 向数据源中读 / 写数据. Confluent 提供了很多 Connector 实现, 你可以在这里 https://www.confluent.io/product/connectors/ 下载. 不过今天我们使用 Debezium 提供的一个 MySQL Connector 插件, 下载地址 https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/0.7.5/debezium-connector-mysql-0.7.5-plugin.tar.gz .
下载这个插件, 并将解压出来的 jar 包全部拷贝到 kafka lib 目录下. 注意: 需要将这些 jar 包拷贝到 Kafka 集群所有机器上.
在 Kafka 集群的服务列表中重启 Kafka Connect 组件.
3.2 启动 Connector
在创建 connector 前, 我们需要做一番配置, 这里罗列一些 Debezium MySQL Connector 的主要配置项:
- database.hostname=x.x.x.x
- database.port=3306
- database.user=tom
- database.password=password
- database.server.id=123456
- database.server.name=fullfillment
- database.whitelist=inventory
- database.history.kafka.bootstrap.servers=y.y.y.y:9092
- database.history.kafka.topic=dbhistory.fullfillment
- include.schema.changes=true
登录到 Kafka 集群, 配置并创建一个 connector, 命令如下:
- curl -X POST -H "Content-Type: application/json"
- --data '{"name":"rds-binlog","config": {"connector.class":"io.debezium.connector.mysql.MySqlConnector","database.hostname":"x.x.x.x","database.port":"3306","database.user":"tom","database.password":"password","database.server.id":"123456","database.server.name":"fulfillment","database.history.kafka.bootstrap.servers":"y.y.y.y:9092","database.history.kafka.topic":"dbhistory.fullfillment","include.schema.changes":"true"}}'
- http://emr-worker-1:8083/connectors
这时, 我们可以看到一个创建好的 connector, 如图:
3.3 注意事项
server_id 是多少?: 你可以在 RDS 执行 "SELECT @@server_id;" 查到.
创建 connector 时可能会出现连接失败, 请确保 RDS 的白名单已经授权了 Kafka 集群机器访问.
4 测试
4.1 创建一张表
一会之后, Kafka 集群中会自动创建一个对应的 topic
插入几条数据
查看 binlog 数据
查看 fulfillment.mugen.students 这个 topic, 是否有刚刚新插入的数据
kafka-console-consumer.sh --zookeeper emr-header-1:2181/kafka-1.0.1
--topic fulfillment.mugen.students --from-beginning
结果如图所示:
5. 资料
confluent 官方文档 https://docs.confluent.io/
debezium 官网 http://debezium.io/
kafka 官方文档 http://kafka.apache.org/documentation.html
来源: https://yq.aliyun.com/articles/592039