一, 背景介绍
随着数据规模的不断扩大, 传统的 RDBMS 难以满足 OLAP 的需求, 本文将介绍如何将 Oracle 的数据实时同步到阿里云的大数据处理平台当中, 并利用大数据工具对数据进行分析.
OGG(Oracle GoldenGate)是一个基于日志的结构化数据备份工具, 一般用于 Oracle 数据库之间的主从备份以及 Oracle 数据库到其他数据库 (DB2, MySQL 等) 的同步. 下面是 Oracle 官方提供的一个 OGG 的整体架构图, 从图中可以看出 OGG 的部署分为源端和目标端两部分组成, 主要有 Manager,Extract,Pump,Collector,Replicat 这么一些组件.
Manager: 在源端和目标端都会有且只有一个 Manager 进程存在, 负责管理其他进程的启停和监控等;
Extract: 负责从源端数据库表或者事务日志中捕获数据, 有初始加载和增量同步两种模式可以配置, 初始加载模式是直接将源表数据同步到目标端, 而增量同步就是分析源端数据库的日志, 将变动的记录传到目标端, 本文介绍的是增量同步的模式;
Pump:Extract 从源端抽取的数据会先写到本地磁盘的 Trail 文件, Pump 进程会负责将 Trail 文件的数据投递到目标端;
Collector: 目标端负责接收来自源端的数据, 生成 Trail 文件
Replicat: 负责读取目标端的 Trail 文件, 转化为相应的 DDL 和 DML 语句作用到目标数据库, 实现数据同步.
本文介绍的 Oracle 数据同步是通过 OGG 的 Datahub 插件 https://github.com/aliyun/aliyun-odps-ogg-plugin 实现的, 该 Datahub 插件在架构图中处于 Replicat 的位置, 会分析 Trail 文件, 将数据的变化记录写入 Datahub 中, 可以使用流计算对 datahub 中的数据进行实时分析, 也可以将数据归档到 MaxCompute 中进行离线处理.
二, 安装步骤
0. 环境要求
源端已安装好 Oracle
源端已安装好 OGG(建议版本 Oracle GoldenGate V12.1.2.1)
目标端已安装好 OGG Adapters(建议版本 Oracle GoldenGate Application Adapters 12.1.2.1)
- java 7
- (下面将介绍 Oracle/OGG 相关安装和配置过程, Oracle 的安装将不做介绍, 另外需要注意的是: Oracle/OGG 相关参数配置以熟悉 Oracle/OGG 的运维人员配置为准, 本示例只是提供一个可运行的样本, Oracle 所使用的版本为 ORA11g)
1. 源端 OGG 安装
下载 OGG 安装包解压后有如下目录:
目前 oracle 一般采取 response 安装的方式, 在 response/oggcore.rsp 中配置安装依赖, 具体如下:
执行命令:
本示例中, 安装后 OGG 的目录在 / home/oracle/u01/ggate, 安装日志在 / home/oracle/u01/ggate/cfgtoollogs/oui 目录下, 当 silentInstall{时间}.log 文件里出现如下提示, 表明安装成功:
执行 / home/oracle/u01/ggate/ggsci 命令, 并在提示符下键入命令: CREATE SUBDIRS, 从而生成 ogg 需要的各种目录(dir 打头的那些).
至此, 源端 OGG 安装完成.
2. 源端 Oracle 配置
以 dba 分身进入 sqlplus:sqlplus / as sysdba
3. OGG 源端 mgr 配置
以下是通过 ggsci 对 ogg 进行配置
配置 mgr
edit params mgr
启动 mgr(运行日志在 ggate/dirrpt 中)
start mgr
查看 mgr 状态
info mgr
查看 mgr 配置
view params mgr
4. OGG 源端 extract 配置
以下是通过 ggsci 对 ogg 进行配置
配置 extract(名字可任取, extract 是组名)
edit params extract
增加 extract 进程(ext 后的名字要跟上面 extract 对应, 本例中 extract 是组名)
add ext extract,tranlog, begin now
删除某废弃进程 DP_TEST
delete ext DP_TEST
添加抽取进程, 每个队列文件大小为 200m
add exttrail ./dirdat/st,ext extract, megabytes 200
启动抽取进程(运行日志在 ggate/dirrpt 中)
start extract extract
至此, extract 配置完成, 数据库的一条变更可以在 ggate/dirdat 目录下的文件中看到
5. 生成 def 文件
源端 ggsci 起来后执行如下命令, 生成 defgen 文件, 并且拷贝到目标端 dirdef 下
edit params defgen
在 shell 中执行如下命令, 生成 ogg_test.def
./defgen paramfile ./dirprm/defgen.prm
6. 目标端 OGG 安装和配置
解压 adapter 包
将源端中 dirdef/ogg_test.def 文件拷贝到 adapter 的 dirdef 下
执行 ggsci 起来后执行如下命令, 创建必须目录
create subdirs
编辑 mgr 配置
edit params mgr
启动 mgr
start mgr
7. 源端 ogg pump 配置
启动 ggsci 后执行如下操作:
编辑 pump 配置
edit params
添加投递进程, 从某一个队列开始投
add ext pump,exttrailsource ./dirdat/st
备注: 投递进程, 每个队文件大小为 200m
add rmttrail ./dirdat/st,ext pump,megabytes 200
启动 pump
start pump
启动后, 结合上面 adapter 的配置, 可以在目标端的 dirdat 目录下看到过来的 trailfile
8. Datahub 插件安装和配置
依赖环境: jdk1.7.
配置好 JAVA_HOME, LD_LIBRARY_PATH, 可以将环境变量配置到~/.bash_profile 中, 例如
修改环境变量后, 请重启 adapter 的 mgr 进程
下载 datahub-ogg-plugin.tar.gz https://github.com/aliyun/aliyun-odps-ogg-plugin/releases/download/1.0.4/datahub-ogg-plugin-1.0.4.tar.gz 并解压:
修改 conf 路径下的 javaue.properties 文件, 将 {YOUR_HOME} 替换为解压后的路径
修改 conf 路径下的 log4j.properties 文件, 将 {YOUR_HOME} 替换为解压后的路径
修改 conf 路径下的 configure.xml 文件, 修改方式见文件中的注释
在 ggsci 下启动 datahub writer
edit params dhwriter
添加 dhwriter
add extract dhwriter, exttrailsource ./dirdat/st
启动 dhwriter
start dhwriter
三, 使用场景
这里会用一个简单的示例来说明数据的使用方法, 例如我们在 Oracle 数据库有一张商品订单表 orders(oid int, pid int, num int), 该表有三列, 分别为订单 ID, 商品 ID 和商品数量.
将这个表通过 OGG Datahub 进行增量数据同步之前, 我们需要先将源表已有的数据通过 DataX 同步到 MaxCompute 中. 增量同步的关键步骤如下:
(1)在 Datahub 上创建相应的 Topic,Topic 的 schema 为(string record_id, string optype, string readtime, bigint oid_before, bigint oid_after, bigint pid_before, bigint pid_after, bigint num_before, bigint num_after);
(2)OGG Datahub 的插件按照上述的安装流程部署配置, 其中列的 Mapping 配置如下:
其中 optype 和 readtime 字段是记录数据库的数据变更类型和时间, optype 有 "I", "D", "U" 三种取值, 分别对应为 "增","删","改" 三种数据变更操作.
(3)OGG Datahub 插件部署好成功运行后, 插件会源源不断的将源表的数据变更记录输送至 datahub 中, 例如我们在源订单表中新增一条记录(1,2,1),datahub 里收到的记录如下:
修改这条数据, 比如把 num 改为 20,datahub 则会收到的一条变更数据记录, 如下:
实时计算
在前一天的离线计算的基础数据上, 我们可以写一个 StreamCompute 流计算的分析程序, 很容易地对数据进行实时汇总, 例如实时统计当前总的订单数, 每种商品的销售量等. 处理思路就是对于每一条到来的变更数据, 可以拿到变化的数值, 实时更新统计变量即可.
离线处理
为了便于后续的离线分析, 我们也可以将 Datahub 里的数据归档到 MaxCompute 中, 在 MaxCompute 中创建相应 Schema 的表:
createtableorders_log(record_idstring, optypestring, readtimestring, oid_beforebigint, oid_afterbigint, pid_beforebigint, pid_afterbigint, num_beforebigint, num_afterbigint);
在 Datahub 上创建 MaxCompute 的数据归档 https://datahub.console.aliyun.com/intro/advancedguide/connector.html , 上述流入 Datahub 里的数据将自动同步到 MaxCompute 当中. 建议将同步到 MaxCompute 中的数据按照时间段进行划分, 比如每一天的增量数据都对应一个独立分区. 这样当天的数据同步完成后, 我们可以处理对应的分区, 拿到当天所有的数据变更, 而与和前一天的全量数据进行合并后, 即可得到当天的全量数据. 为了简单起见, 先不考虑分区表的情况, 以 2016-12-06 这天的增量数据为例, 假设前一天的全量数据在表 orders_base 里面, datahub 同步过来的增量数据在 orders_log 表中, 将 orders_base 与 orders_log 做合并操作, 可以得到 2016-12-06 这天的最终全量数据写入表 orders_result 中. 这个过程可以在 MaxCompute 上用如下这样一条 SQL 完成.
四, 常见问题
Q: 目标端报错 OGG-06551 Oracle GoldenGate Collector: Could not translate host name localhost into an Internet address.
A: 目标端机器 hostname 在 / etc/hosts 里面重新设置 localhost 对应的 ip
Q: 找不到 jvm 相关的 so 包
A: 将 jvm 的 so 路径添加到 LD_LIBRARY_PATH 后, 重启 mgr
例如: exportLD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server
Q: 有了 DDL 语句, 比如增加一列, 源端 ogg 没有问题, 但是 adapter 端的 ffwriter 和 jmswriter 进程退出, 且报错: 2015-06-11 14:01:10 ERROR OGG-01161 Bad column index (5) specified for table OGG_TEST.T_PERSON, max columns = 5.
A: 由于表结构改变, 需要重做 def 文件, 将重做的 def 文件放入 dirdef 后重启即可.
来源: http://www.jianshu.com/p/89e5bd258113