环境:
源端: Oracle12.2 ogg for Oracle 12.3
目标端: Kafka ogg for bigdata 12.3
将 Oracle 中的数据通过 OGG 同步到 Kafka
源端配置:
1 为要同步的表添加附加日志
- dblogin USERID ogg@orclpdb, PASSWORD ogg
- add trandata scott.tab1
- add trandata scott.tab2
2 添加抽取进程
- GGSCI>add extract EXT_KAF1,integrated tranlog, begin now
- GGSCI>add EXTTRAIL ./dirdat/k1, extract EXT_KAF1,MEGABYTES 200
编辑抽取进程参数:
- GGSCI> edit params EXT_KAF1
- extract EXT_KAF1
- userid c##ggadmin,PASSWORD ggadmin
- LOGALLSUPCOLS
- UPDATERECORDFORMAT COMPACT
- exttrail ./dirdat/k1,FORMAT RELEASE 12.3
- SOURCECATALOG orclpdb --(指定 pdb)
- table scott.tab1;
- table scott.tab2;
注册进程
- GGSCI> DBLOGIN USERID c##ggadmin,PASSWORD ggadmin
- GGSCI> register extract EXT_KAF1 database container (orclpdb)
3 添加投递进程:
- GGSCI>add extract PMP_KAF1, exttrailsource ./dirdat/k1
- GGSCI>add rmttrail ./dirdat/f1,EXTRACT PMP_KAF1,MEGABYTES 200
编辑投递进程参数:
- GGSCI>edit param PMP_KAF1
- EXTRACT PMP_KAF1
- USERID c##ggadmin,PASSWORD ggadmin
- PASSTHRU
- RMTHOST 10.1.1.247, MGRPORT 9178
- RMTTRAIL ./dirdat/f1,format release 12.3
- SOURCECATALOG orclpdb
- TABLE scott.tab1;
- table scott.tab2;
4 添加数据初始化进程 (Oracle initial load) 可以多个表分开初始化也可以一起初始化, 此处选择分开初始化
GGSCI> add extract ek_01, sourceistable
编辑参数:
- GGSCI> EDIT PARAMS ek_01
- EXTRACT ek_01
- USERID c##ggadmin,PASSWORD ggadmin
- RMTHOST 10.1.1.247, MGRPORT 9178
- RMTFILE ./dirdat/ka,maxfiles 999, megabytes 500,format release 12.3
- SOURCECATALOG orclpdb
- table scott.tab1;
- GGSCI> add extract ek_02, sourceistable
- EDIT PARAMS ek_02
- EXTRACT ek_02
- USERID c##ggadmin,PASSWORD ggadmin
- RMTHOST 10.1.1.247, MGRPORT 9178
- RMTFILE ./dirdat/kb,maxfiles 999, megabytes 500,format release 12.3
- SOURCECATALOG orclpdb
- table scott.tab2;
5 生成 def 文件:
- GGSCI> edit param defgen1
- USERID c##ggadmin,PASSWORD ggadmin
- defsfile /home/oracle/ogg/ggs12/dirdef/defgen1.def,format release 12.3
- SOURCECATALOG orclpdb
- table scott.tab1;
- table scott.tab2;
在 OGG_HOME 下执行如下命令生成 def 文件
defgen paramfile dirprm/defgen1.prm
将生成的 def 文件传到目标端 $OGG_HOME/dirdef 下
目标端配置:
1 将 $OGG_HOME/AdapterExamples/big-data/kafka 下的所有文件 copy 到 $OGG_HOME/dirprm 下
- cd $OGG_HOME/AdapterExamples/big-data/kafka
- cp * $OGG_HOME/dirprm
2 将 $ORACLE_HOME/AdapterExamples/trail 下的文件 tr000000000 copy 到 $OGG_HOME/dirdat 下
- cd $ORACLE_HOME/AdapterExamples/trail
- cp tr000000000 $OGG_HOME/dirdat
3 添加初始化进程:(可以多表一起初始化也可以分开初始化, 此处选择单独初始化)
- GGSCI> ADD replicat rp_01, specialrun
- GGSCI> EDIT PARAMS rp_01
- SPECIALRUN
- end runtime
- setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
- targetdb libfile libggjava.so set property=./dirprm/kafka1.props
- SOURCEDEFS ./dirdef/defgen1.def
- EXTFILE ./dirdat/ka
- reportcount every 1 minutes, rate
- grouptransops 10000
- MAP orclpdb.scott.tab1, TARGET scott.tab1;
- GGSCI> ADD replicat rp_02, specialrun
- GGSCI> EDIT PARAMS rp_02
- SPECIALRUN
- end runtime
- setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
- targetdb libfile libggjava.so set property=./dirprm/kafka2.props
- SOURCEDEFS ./dirdef/defgen1.def
- EXTFILE ./dirdat/kb
- reportcount every 1 minutes, rate
- grouptransops 10000
- MAP orclpdb.scott.tab2, TARGET scott.tab2;
4 添加恢复进程:
- GGSCI>add replicat r_kaf1,exttrail ./dirdat/f1
- GGSCI>edit params r_kaf1
- REPLICAT r_kaf1
- setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
- HANDLECOLLISIONS
- targetdb libfile libggjava.so set property=./dirprm/kafka1.props
- SOURCEDEFS ./dirdef/defgen1.def
- reportcount every 1 minutes, rate
- grouptransops 10000
- MAP orclpdb.scott.tab1, TARGET scott.tab1;
- GGSCI> add replicat r_kaf2,exttrail ./dirdat/f2
- GGSCI> edit params r_kaf2
- REPLICAT r_kaf2
- setenv(NLS_LANG="AMERICAN_AMERICA.ZHS16GBK")
- HANDLECOLLISIONS
- targetdb libfile libggjava.so set property=./dirprm/kafka2.props
- SOURCEDEFS ./dirdef/defgen1.def
- reportcount every 1 minutes, rate
- grouptransops 10000
- MAP orclpdb.scott.tab2, TARGET scott.tab2;
5 参数配置:
custom_kafka_producer.properties 文件内容如下:
bootstrap.servers=10.1.1.246:9200,10.1.1.247:9200 -- 只需要改动这一行就行, 指定 kafka 的地址和端口号
- acks=1
- reconnect.backoff.ms=1000
- value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
- key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
- batch.size=16384
- linger.ms=10000
kafka1.props 文件内容如下:
- gg.handlerlist = kafkahandler
- gg.handler.kafkahandler.type=kafka
- gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
- #The following resolves the topic name using the short table name
- gg.handler.kafkahandler.topicMappingTemplate= topic1
- #gg.handler.kafkahandler.format=avro_op
gg.handler.kafkahandler.format =json -- 这里做了改动, 指定格式为 json 格式
- gg.handler.kafkahandler.format.insertOpKey=I
- gg.handler.kafkahandler.format.updateOpKey=U
- gg.handler.kafkahandler.format.deleteOpKey=D
- gg.handler.kafkahandler.format.truncateOpKey=T
- gg.handler.kafkahandler.format.prettyPrint=false
- gg.handler.kafkahandler.format.jsonDelimiter=CDATA[]
gg.handler.kafkahandler.format.includePrimaryKeys=true -- 包含主键
gg.handler.kafkahandler.SchemaTopicName= topic1 -- 此处指定为要同步到的目标 topic 名字
- gg.handler.kafkahandler.BlockingSend =false
- gg.handler.kafkahandler.includeTokens=false
- gg.handler.kafkahandler.mode=op
- goldengate.userexit.timestamp=utc
- goldengate.userexit.writers=javawriter
- javawriter.stats.display=TRUE
- javawriter.stats.full=TRUE
- gg.log=log4j
- gg.log.level=INFO
- gg.report.time=30sec
- #Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/ -- 指定 classpath, 这里很重要, 必须有 kafka 安装文件的类库
- #Sample gg.classpath for HDP
- #gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/
- javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
启动进程进程恢复:
1 启动源端抓取进程
GGSCI> start EXT_KAF1
2 启动源端投递进程
GGSCI> start PMP_KAF1
3 启动源端初始化进程
GGSCI> start ek_01
4 启动目标端初始化进程
在 $OGG_HOME 下执行如下命令:
./replicat paramfile ./dirprm/rp_01.prm reportfile ./dirrpt/rp_01.rpt -p INITIALDATALOAD
5 启动目标端恢复进程
GGSCI> start R_KAF1
遇到的错误:
1ERROR OGG-15050 Error loading Java VM runtime library(2 no such file or directory)
原因: 找不到类库 (配置好环境变量之后, OGG 的 mgr 进程没有重启, 导致的)
解决: 重启 MGR 进程
2ERROR OG-15051 Java or JNI exception
原因: 没有使用 ogg12.3.1.1.1 自带的 kafka.props, 而是 copy 了 ogg12.2 的 kafka.props, 导致出现异常
解决: 使用 ogg12.3.1.1.1 自带的 kafka.props, 并指定相关的属性, 解决
来源: http://www.linuxidc.com/Linux/2018-03/151513.htm