OGG(Oracle GoldenGate)是一个基于日志的结构化数据备份工具,一般用于 Oracle 数据库之间的主从备份以及 Oracle 数据库到其他数据库(DB2, MySQL 等)的同步。下面是 Oracle 官方提供的一个 OGG 的整体架构图,从图中可以看出 OGG 的部署分为源端和目标端两部分组成,主要有 Manager,Extract,Pump,Collector,Replicat 这么一些组件。
本文介绍的 Oracle 数据同步是通过 OGG 的实现的,该 Datahub 插件在架构图中处于 Replicat 的位置,会分析 Trail 文件,将数据的变化记录写入 Datahub 中,可以使用流计算对 datahub 中的数据进行实时分析,也可以将数据归档到 MaxCompute 中进行离线处理。
(下面将介绍 Oracle/OGG 相关安装和配置过程,Oracle 的安装将不做介绍,另外需要注意的是:Oracle/OGG 相关参数配置以熟悉 Oracle/OGG 的运维人员配置为准,本示例只是提供一个可运行的样本,Oracle 所使用的版本为 ORA11g)
下载 OGG 安装包解压后有如下目录:
- drwxr - xr - x install drwxrwxr - x response - rwxr - xr - x runInstaller drwxr - xr - x stage
目前 oracle 一般采取 response 安装的方式,在 response/oggcore.rsp 中配置安装依赖,具体如下:
- oracle.install.responseFileVersion = /oracle/install / rspfmt_ogginstall_response_schema_v12_1_2#需要目前与oracle版本对应INSTALL_OPTION = ORA11g#goldegate主目录SOFTWARE_LOCATION = /home/oracle / u01 / ggate#初始不启动manager START_MANAGER = false#manger端口MANAGER_PORT = 7839#对应oracle的主目录DATABASE_LOCATION = /home/oracle / u01 / app / oracle / product / 11.2.0 / dbhome_1#暂可不配置INVENTORY_LOCATION = #分组(目前暂时将oracle和ogg用同一个账号ogg_test,实际可以给ogg单独账号)UNIX_GROUP_NAME = oinstall
执行命令:
- runInstaller - silent - responseFile {
- YOUR_OGG_INSTALL_FILE_PATH
- }
- /response/oggcore.rsp
本示例中,安装后 OGG 的目录在 / home/oracle/u01/ggate,安装日志在 / home/oracle/u01/ggate/cfgtoollogs/oui 目录下,当 silentInstall{时间}.log 文件里出现如下提示,表明安装成功:
- The installation of Oracle GoldenGate Core was successful.
执行 / home/oracle/u01/ggate/ggsci 命令,并在提示符下键入命令:CREATE SUBDIRS,从而生成 ogg 需要的各种目录(dir 打头的那些)。
至此,源端 OGG 安装完成。
以 dba 分身进入 sqlplus:sqlplus / as sysdba
- #创建独立的表空间create tablespace ATMV datafile '/home/oracle/u01/app/oracle/oradata/uprr/ATMV.dbf'size 100m autoextend on next 50m maxsize unlimited;#创建ogg_test用户,密码也为ogg_test create user ogg_test identified by ogg_test
- default tablespace ATMV;#给ogg_test赋予充分的权限grant connect,
- resource,
- dba to ogg_test;#检查附加日志情况Select SUPPLEMENTAL_LOG_DATA_MIN,
- SUPPLEMENTAL_LOG_DATA_PK,
- SUPPLEMENTAL_LOG_DATA_UI,
- SUPPLEMENTAL_LOG_DATA_FK,
- SUPPLEMENTAL_LOG_DATA_ALL from v$database;#增加数据库附加日志及回退alter database add supplemental log data;
- alter database add supplemental log data(primary key, unique, foreign key) columns;#rollback alter database drop supplemental log data(primary key, unique, foreign key) columns;
- alter database drop supplemental log data;#全字段模式,注意:在该模式下的delete操作也只有主键值ALTER DATABASE ADD SUPPLEMENTAL LOG DATA(ALL) COLUMNS;#开启数据库强制日志模式alter database force logging;#执行marker_setup.sql脚本@marker_setup.sql#执行@ddl_setup.sql@ddl_setup.sql#执行role_setup.sql@role_setup.sql#给ogg用户赋权grant GGS_GGSUSER_ROLE to ogg_test;#执行@ddl_enable.sql,开启DDL trigger@ddl_enable.sql#执行优化脚本@ddl_pin ogg_test#安装sequence support@sequence.sql#alter table sys.seq$ add supplemental log data(primary key) columns;
以下是通过 ggsci 对 ogg 进行配置
配置 mgr
edit params mgr
- PORT 7839 DYNAMICPORTLIST 7840 - 7849 USERID ogg_test,
- PASSWORD ogg_test PURGEOLDEXTRACTS. / dirdat
- /*, USECHECKPOINTS, MINKEEPDAYS 7
- LAGREPORTHOURS 1
- LAGINFOMINUTES 30
- LAGCRITICALMINUTES 45
- PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
- PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
- */
启动 mgr(运行日志在 ggate/dirrpt 中)
start mgr
查看 mgr 状态
info mgr
查看 mgr 配置
view params mgr
以下是通过 ggsci 对 ogg 进行配置
配置 extract(名字可任取,extract 是组名)
edit params extract
- EXTRACT extract SETENV(NLS_LANG = "AMERICAN_AMERICA.AL32UTF8") DBOPTIONS ALLOWUNUSEDCOLUMN USERID ogg_test,
- PASSWORD ogg_test REPORTCOUNT EVERY 1 MINUTES,
- RATE NUMFILES 5000 DISCARDFILE. / dirrpt / ext_test.dsc,
- APPEND,
- MEGABYTES 100 DISCARDROLLOVER AT 2 : 00 WARNLONGTRANS 2h,
- CHECKINTERVAL 3m EXTTRAIL. / dirdat / st,
- MEGABYTES 200 DYNAMICRESOLUTION TRANLOGOPTIONS CONVERTUCS2CLOBS TRANLOGOPTIONS RAWDEVICEOFFSET 0 DDL & INCLUDE MAPPED OBJTYPE 'table' & INCLUDE MAPPED OBJTYPE 'index' & INCLUDE MAPPED OBJTYPE 'SEQUENCE' & EXCLUDE OPTYPE COMMENT DDLOPTIONS NOCROSSRENAME REPORT TABLE OGG_TEST. * ;
- SEQUENCE OGG_TEST. * ;
- GETUPDATEBEFORES
增加 extract 进程(ext 后的名字要跟上面
对应,本例中 extract 是组名) add ext extract,tranlog, begin now
- extract
删除某废弃进程 DP_TEST
delete ext DP_TEST
添加抽取进程,每个队列文件大小为 200m
add exttrail ./dirdat/st,ext extract, megabytes 200
启动抽取进程(运行日志在 ggate/dirrpt 中)
start extract extract
至此,extract 配置完成,数据库的一条变更可以在 ggate/dirdat 目录下的文件中看到
源端 ggsci 起来后执行如下命令,生成 defgen 文件, 并且拷贝到目标端 dirdef 下
edit params defgen
- DEFSFILE. / dirdef / ogg_test.def USERID ogg_test,
- PASSWORD ogg_test table OGG_TEST. * ;
在 shell 中执行如下命令,生成 ogg_test.def
./defgen paramfile ./dirprm/defgen.prm
解压 adapter 包
将源端中 dirdef/ogg_test.def 文件拷贝到 adapter 的 dirdef 下
执行 ggsci 起来后执行如下命令,创建必须目录
create subdirs
编辑 mgr 配置
edit params mgr
- PORT 7839 DYNAMICPORTLIST 7840 - 7849 PURGEOLDEXTRACTS. / dirdat
- /*, USECHECKPOINTS, MINKEEPDAYS 7
- LAGREPORTHOURS 1
- LAGINFOMINUTES 30
- LAGCRITICALMINUTES 45
- PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
- PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
- */
启动 mgr
start mgr
启动 ggsci 后执行如下操作:
编辑 pump 配置
edit params pump
- EXTRACT pump RMTHOST xx.xx.xx.xx,
- MGRPORT 7839,
- COMPRESS PASSTHRU NUMFILES 5000 RMTTRAIL. / dirdat / st DYNAMICRESOLUTION TABLE OGG_TEST. * ;
- SEQUENCE OGG_TEST. * ;
添加投递进程,从某一个队列开始投
add ext pump,exttrailsource ./dirdat/st
备注:投递进程,每个队文件大小为 200m
add rmttrail ./dirdat/st,ext pump,megabytes 200
启动 pump
start pump
启动后,结合上面 adapter 的配置,可以在目标端的 dirdat 目录下看到过来的 trailfile
依赖环境:jdk1.7。
配置好 JAVA_HOME, LD_LIBRARY_PATH,可以将环境变量配置到~/.bash_profile 中,例如
- export JAVA_HOME = /xxx/xxx / jrexx export LD_LIBRARY_PATH = $ {
- LD_LIBRARY_PATH
- }: $JAVA_HOME / lib / amd64: $JAVA_HOME / lib / amd64 / server
修改环境变量后,请重启 adapter 的 mgr 进程
下载并解压:
修改 conf 路径下的 javaue.properties 文件,将 {YOUR_HOME} 替换为解压后的路径
- gg.handlerlist = ggdatahub gg.handler.ggdatahub.type = com.aliyun.odps.ogg.handler.datahub.DatahubHandler gg.handler.ggdatahub.configureFileName = {
- YOUR_HOME
- }
- /datahub-ogg-plugin/conf / configure.xml goldengate.userexit.nochkpt = false goldengate.userexit.timestamp = utc gg.classpath = {
- YOUR_HOME
- }
- /datahub-ogg-plugin/lib
- /*
- gg.log.level=debug
- jvm.bootoptions=-Xmx512m -Dlog4j.configuration=file:{YOUR_HOME}/datahub-ogg-plugin/conf/log4j.properties -Djava.class.path=ggjava/ggjava.jar
- */
修改 conf 路径下的 log4j.properties 文件,将 {YOUR_HOME} 替换为解压后的路径
修改 conf 路径下的 configure.xml 文件,修改方式见文件中的注释
- <?xml version="1.0" encoding="UTF-8" ?>
- <configue>
- <defaultOracleConfigure>
- <!-- oracle sid, 必选-->
- <sid>
- 100
- </sid>
- <!-- oracle schema, 可以被mapping中的oracleSchema覆盖, 两者必须有一个非空-->
- <schema>
- ogg_test
- </schema>
- </defaultOracleConfigure>
- <defalutDatahubConfigure>
- <!-- datahub endpoint, 必填-->
- <endPoint>
- YOUR_DATAHUB_ENDPOINT
- </endPoint>
- <!-- datahub project, 可以被mapping中的datahubProject, 两者必须有一个非空-->
- <project>
- YOUR_DATAHUB_PROJECT
- </project>
- <!-- datahub accessId, 可以被mapping中的datahubAccessId覆盖, 两者必须有一个非空-->
- <accessId>
- YOUR_DATAHUB_ACCESS_ID
- </accessId>
- <!-- datahub accessKey, 可以被mapping中的datahubAccessKey覆盖, 两者必须有一个非空-->
- <accessKey>
- YOUR_DATAHUB_ACCESS_KEY
- </accessKey>
- <!-- 数据变更类型同步到datahub对应的字段,可以被columnMapping中的ctypeColumn覆盖 -->
- <ctypeColumn>
- optype
- </ctypeColumn>
- <!-- 数据变更时间同步到datahub对应的字段,可以被columnMapping中的ctimeColumn覆盖 -->
- <ctimeColumn>
- readtime
- </ctimeColumn>
- <!-- 数据变更序号同步到datahub对应的字段, 按数据变更先后递增, 不保证连续, 可以被columnMapping中的cidColumn覆盖
- -->
- <cidColumn>
- record_id
- </cidColumn>
- <!-- 额外增加的常量列,每条record该列值为指定值,格式为c1=xxx,c2=xxx,可以被columnMapping中的constColumnMap覆盖-->
- <constColumnMap>
- </constColumnMap>
- </defalutDatahubConfigure>
- <!-- 默认最严格,不落文件 直接退出 无限重试-->
- <!-- 运行每批上次的最多纪录数, 可选, 默认1000-->
- <batchSize>
- 1000
- </batchSize>
- <!-- 默认时间字段转换格式, 可选, 默认yyyy-MM-dd HH:mm:ss-->
- <defaultDateFormat>
- yyyy-MM-dd HH:mm:ss
- </defaultDateFormat>
- <!-- 脏数据是否继续, 可选, 默认false-->
- <dirtyDataContinue>
- true
- </dirtyDataContinue>
- <!-- 脏数据文件, 可选, 默认datahub_ogg_plugin.dirty-->
- <dirtyDataFile>
- datahub_ogg_plugin.dirty
- </dirtyDataFile>
- <!-- 脏数据文件最大size, 单位M, 可选, 默认500-->
- <dirtyDataFileMaxSize>
- 200
- </dirtyDataFileMaxSize>
- <!-- 重试次数, -1:无限重试 0:不重试 n:重试次数, 可选, 默认-1-->
- <retryTimes>
- 0
- </retryTimes>
- <!-- 重试间隔, 单位毫秒, 可选, 默认3000-->
- <retryInterval>
- 4000
- </retryInterval>
- <!-- 点位文件, 可选, 默认datahub_ogg_plugin.chk-->
- <checkPointFileName>
- datahub_ogg_plugin.chk
- </checkPointFileName>
- <mappings>
- <mapping>
- <!-- oracle schema, 见上描述-->
- <oracleSchema>
- </oracleSchema>
- <!-- oracle table, 必选-->
- <oracleTable>
- t_person
- </oracleTable>
- <!-- datahub project, 见上描述-->
- <datahubProject>
- </datahubProject>
- <!-- datahub AccessId, 见上描述-->
- <datahubAccessId>
- </datahubAccessId>
- <!-- datahub AccessKey, 见上描述-->
- <datahubAccessKey>
- </datahubAccessKey>
- <!-- datahub topic, 必选-->
- <datahubTopic>
- t_person
- </datahubTopic>
- <ctypeColumn>
- </ctypeColumn>
- <ctimeColumn>
- </ctimeColumn>
- <cidColumn>
- </cidColumn>
- <constColumnMap>
- </constColumnMap>
- <columnMapping>
- <!-- src:oracle字段名称, 必须; dest:datahub field, 必须; destOld:变更前数据落到datahub的field,
- 可选; isShardColumn: 是否作为shard的hashkey, 可选, 默认为false, 可以被shardId覆盖 isDateFormat:
- timestamp字段是否采用DateFormat格式转换, 默认true. 如果是false, 源端数据必须是long dateFormat:
- timestamp字段的转换格式, 不填就用默认值 -->
- <column src="id" dest="id" isShardColumn="true" isDateFormat="false" dateFormat="yyyy-MM-dd HH:mm:ss"
- />
- <column src="name" dest="name" isShardColumn="true" />
- <column src="age" dest="age" />
- <column src="address" dest="address" />
- <column src="comments" dest="comments" />
- <column src="sex" dest="sex" />
- <column src="temp" dest="temp" destOld="temp1" />
- </columnMapping>
- <!--指定shard id, 优先生效, 可选-->
- <shardId>
- 1
- </shardId>
- </mapping>
- </mappings>
- </configue>
在 ggsci 下启动 datahub writer
edit params dhwriter
- extract dhwriter getEnv(JAVA_HOME) getEnv(LD_LIBRARY_PATH) getEnv(PATH) CUSEREXIT. / libggjava_ue.so CUSEREXIT PASSTHRU INCLUDEUPDATEBEFORES,
- PARAMS "{YOUR_HOME}/datahub-ogg-plugin/conf/javaue.properties"sourcedefs. / dirdef / ogg_test.def table OGG_TEST. * ;
添加 dhwriter
add extract dhwriter, exttrailsource ./dirdat/st
启动 dhwriter
start dhwriter
这里会用一个简单的示例来说明数据的使用方法,例如我们在 Oracle 数据库有一张商品订单表 orders(oid int, pid int, num int),该表有三列,分别为订单 ID, 商品 ID 和商品数量。
将这个表通过 OGG Datahub 进行增量数据同步之前,我们需要先将源表已有的数据通过 DataX 同步到 MaxCompute 中。增量同步的关键步骤如下:
(1)在 Datahub 上创建相应的 Topic,Topic 的 schema 为 (string record_id, string optype, string readtime, bigint oid_before, bigint oid_after, bigint pid_before, bigint pid_after, bigint num_before, bigint num_after);
(2)OGG Datahub 的插件按照上述的安装流程部署配置,其中列的 Mapping 配置如下:
- <ctypeColumn>
- optype
- </ctypeColumn>
- <ctimeColumn>
- readtime
- </ctimeColumn>
- <cidColumn>
- record_id
- </cidColumn>
- <columnMapping>
- <column src="oid" dest="oid_after" destOld="oid_before" isShardColumn="true"
- />
- <column src="pid" dest="pid_after" destOld="pid_before" />
- <column src="num" dest="num_after" destOld="num_before" />
- </columnMapping>
其中 optype 和 readtime 字段是记录数据库的数据变更类型和时间,optype 有 "I", "D", "U" 三种取值,分别对应为 "增","删","改" 三种数据变更操作。
(3)OGG Datahub 插件部署好成功运行后,插件会源源不断的将源表的数据变更记录输送至 datahub 中,例如我们在源订单表中新增一条记录(1,2,1),datahub 里收到的记录如下:
- + --------+------------+------------+------------+------------+------------+------------+------------+------------+|record_id | optype | readtime | oid_before | oid_after | pid_before | pid_after | num_before | num_after | +-------+------------+------------+------------+------------+------------+------------+------------+------------+|14810373343020000 | I | 2016 - 12 - 06 15 : 15 : 28.000141 | NULL | 1 | NULL | 2 | NULL | 1 |
修改这条数据,比如把 num 改为 20,datahub 则会收到的一条变更数据记录,如下:
- + -------+------------+------------+------------+------------+------------+------------+------------+------------+|record_id | optype | readtime | oid_before | oid_after | pid_before | pid_after | num_before | num_after | +--------+------------+------------+------------+------------+------------+------------+------------+------------+|14810373343080000 | U | 2016 - 12 - 06 15 : 15 : 58.000253 | 1 | 1 | 2 | 2 | 1 | 20 |
在前一天的离线计算的基础数据上,我们可以写一个 StreamCompute 流计算的分析程序,很容易地对数据进行实时汇总,例如实时统计当前总的订单数,每种商品的销售量等。处理思路就是对于每一条到来的变更数据,可以拿到变化的数值,实时更新统计变量即可。
为了便于后续的离线分析,我们也可以将 Datahub 里的数据归档到 MaxCompute 中,在 MaxCompute 中创建相应 Schema 的表:
- create table orders_log(record_id string, optype string, readtime string, oid_before bigint, oid_after bigint, pid_before bigint, pid_after bigint, num_before bigint, num_after bigint);
在 Datahub 上创建,上述流入 Datahub 里的数据将自动同步到 MaxCompute 当中。建议将同步到 MaxCompute 中的数据按照时间段进行划分,比如每一天的增量数据都对应一个独立分区。这样当天的数据同步完成后,我们可以处理对应的分区,拿到当天所有的数据变更,而与和前一天的全量数据进行合并后,即可得到当天的全量数据。为了简单起见,先不考虑分区表的情况,以 2016-12-06 这天的增量数据为例,假设前一天的全量数据在表 orders_base 里面,datahub 同步过来的增量数据在 orders_log 表中,将 orders_base 与 orders_log 做合并操作,可以得到 2016-12-06 这天的最终全量数据写入表 orders_result 中。这个过程可以在 MaxCompute 上用如下这样一条 SQL 完成。
Q:目标端报错 OGG-06551 Oracle GoldenGate Collector: Could not translate host name localhost into an Internet address.
A:目标端机器 hostname 在 / etc/hosts 里面重新设置 localhost 对应的 ip
Q:找不到 jvm 相关的 so 包
A:将 jvm 的 so 路径添加到 LD_LIBRARY_PATH 后,重启 mgr
- 例如:export LD_LIBRARY_PATH = $ {
- LD_LIBRARY_PATH
- }: $JAVA_HOME / lib / amd64: $JAVA_HOME / lib / amd64 / server
Q:有了 DDL 语句,比如增加一列,源端 ogg 没有问题,但是 adapter 端的 ffwriter 和 jmswriter 进程退出,且报错: 2015-06-11 14:01:10 ERROR OGG-01161 Bad column index (5) specified for table OGG_TEST.T_PERSON, max columns = 5.
A:由于表结构改变,需要重做 def 文件,将重做的 def 文件放入 dirdef 后重启即可
来源: https://yq.aliyun.com/articles/66139