基本架构
RDS -> SLS -> Spark Streaming -> Spark HDFS
上述链路主要包含 3 个过程:
如何把 RDS 的 binlog 收集到 SLS.
如何通过 Spark Streaming 将 SLS 中的日志读取出来, 进行分析.
如何把链路 2 中读取和处理过的日志, 保存到 Spark HDFS 中.
环境准备
安装一个 MySQL 类型的数据库 (使用 MySQL 协议, 例如 RDS,DRDS 等), 开启 log-bin 功能, 且配置 binlog 类型为 ROW 模式 (RDS 默认开启).
开通 SLS 服务.
操作步骤
检查 MySQL 数据库环境.
查看是否开启 log-bin 功能.
- MySQL> show variables like "log_bin";
- +---------------+-------+
- | Variable_name | Value |
- +---------------+-------+
- | log_bin | ON |
- +---------------+-------+
- 1 row in set (0.02 sec)
查看 binlog 类型
- MySQL> show variables like "binlog_format";
- +---------------+-------+
- | Variable_name | Value |
- +---------------+-------+
- | binlog_format | ROW |
- +---------------+-------+
- 1 row in set (0.03 sec)
添加用户权限.(也可以直接通过 RDS 控制台添加)
CREATE USER canal IDENTIFIED BY 'canal';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%';FLUSH PRIVILEGES;
为 SLS 服务添加对应的配置文件, 并检查数据是否正常采集.
在 SLS 控制台添加对应的 project 和 logstore, 例如: 创建一个名称为 canaltest 的 project, 然后创建一个名称为 canal 的 logstore.
对 SLS 进行配置: 在 /etc/ilogtail 目录下创建文件 user_local_config.JSON, 具体配置如下:
- {
- "metrics": {
- "##1.0##canaltest$plugin-local": {
- "aliuid": "****",
- "enable": true,
- "category": "canal",
- "defaultEndpoint": "*******",
- "project_name": "canaltest",
- "region": "cn-hangzhou",
- "version": 2
- "log_type": "plugin",
- "plugin": {
- "inputs": [
- {
- "type": "service_canal",
- "detail": {
- "Host": "*****",
- "Password": "****",
- "ServerID": ****,
- "User" : "***",
- "DataBases": [
- "yourdb"
- ],
- "IgnoreTables": [
- "\\S+_inner"
- ],
- "TextToString" : true
- }
- }
- ],
- "flushers": [
- {
- "type": "flusher_sls",
- "detail": {}
- }
- ]
- }
- }
- }
- }
其中 detail 中的 Host 和 Password 等信息为 MySQL 数据库信息, User 信息为之前授权过的用户名. aliUid,defaultEndpoint,project_name,category 请根据自己的实际情况填写对应的用户和 SLS 信息.
等待约 2 分钟, 通过 SLS 控制台查看日志数据是否上传成功, 具体如图所示.
如果日志数据没有采集成功, 请根据 SLS 的提示, 查看 SLS 的采集日志进行排查.
准备代码, 将代码编译成 jar 包, 然后上传到 OSS.
将 EMR 的示例代码通过 Git 复制下来, 然后进行修改, 具体命令为:
Git clone https://github.com/aliyun/aliyun-emapreduce-demo.git.
示例代码中已经有 LoghubSample 类, 该类主要用于从 SLS 采集数据并打印. 以下是修改后的代码, 供参考:
- package com.aliyun.emr.example
- import org.apache.spark.SparkConf
- import org.apache.spark.storage.StorageLevel
- import org.apache.spark.streaming.aliyun.logservice.LoghubUtils
- import org.apache.spark.streaming.{Milliseconds, StreamingContext}
- object LoghubSample {
- def main(args: Array[String]): Unit = {
- if (args.length <7) {
- System.err.println(
- """Usage: bin/spark-submit --class LoghubSample examples-1.0-SNAPSHOT-shaded.jar
- |
- |
- """.stripMargin)
- System.exit(1)
- }
- val loghubProject = args(0)
- val logStore = args(1)
- val loghubGroupName = args(2)
- val endpoint = args(3)
- val accessKeyId = args(4)
- val accessKeySecret = args(5)
- val batchInterval = Milliseconds(args(6).toInt * 1000)
- val conf = new SparkConf().setAppName("Mysql Sync")
- // conf.setMaster("local[4]");
- val ssc = new StreamingContext(conf, batchInterval)
- val loghubStream = LoghubUtils.createStream(
- ssc,
- loghubProject,
- logStore,
- loghubGroupName,
- endpoint,
- 1,
- accessKeyId,
- accessKeySecret,
- StorageLevel.MEMORY_AND_DISK)
- loghubStream.foreachRDD(rdd =>
- rdd.saveAsTextFile("/mysqlbinlog")
- )
- ssc.start()
- ssc.awaitTermination()
- }
- }
其中的主要改动是:
loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile("/mysqlbinlog") )
这样在 EMR 集群中运行时, 就会把 Spark Streaming 中流出来的数据, 保存到 EMR 的 HDFS 中.
说明
由于如果要在本地运行, 请在本地环境提前搭建 Hadoop 集群.
由于 EMR 的 Spark SDK 做了升级, 其示例代码比较旧, 不能直接在参数中传递 OSS 的 AccessKeyId,AccessKeySecret, 而是需要通过 SparkConf 进行设置, 如下所示.
- trait RunLocally {
- val conf = new SparkConf().setAppName(getAppName).setMaster("local[4]")
- conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
- conf.set("spark.hadoop.mapreduce.job.run-local", "true")
- conf.set("spark.hadoop.fs.oss.endpoint", "YourEndpoint")
- conf.set("spark.hadoop.fs.oss.accessKeyId", "YourId")
- conf.set("spark.hadoop.fs.oss.accessKeySecret", "YourSecret")
- conf.set("spark.hadoop.job.runlocal", "true")
- conf.set("spark.hadoop.fs.oss.impl", "com.aliyun.fs.oss.nat.NativeOssFileSystem")
- conf.set("spark.hadoop.fs.oss.buffer.dirs", "/mnt/disk1")
- val sc = new SparkContext(conf)
- def getAppName: String
- }
在本地调试时, 需要把 loghubStream.foreachRDD(rdd => rdd.saveAsObjectFile("/mysqlbinlog") ) 中的 /mysqlbinlog 修改成本地 HDFS 的地址.
代码编译.
在本地调试完成后, 我们可以通过如下命令进行打包编译:
clean install
上传 jar 包.
请先在 OSS 上建立 bucket 为 qiaozhou-EMR/jar 的目录, 然后通过 OSS 控制台或 OSS 的 SDK 将 /target/shaded 目录下的 examples-1.1-shaded.jar 上传到 OSS 的这个目录下. 上传后的 jar 包地址为 oss://qiaozhou-EMR/jar/examples-1.1-shaded.jar, 这个地址在后面会用上, 如下图所示:
搭建 EMR 集群, 创建任务并运行执行计划.
通过 EMR 控制台创建一个 EMR 集群, 大约需要 10 分钟左右, 请耐心等待.
创建一个类型为 Spark 的作业.
请根据您具体的配置将
SLS_endpoint $SLS_access_id $SLS_secret_key
替换成真实值. 请注意参数的顺序, 否则可能会报错.
-master yarn -deploy-mode client -driver-memory 4g -executor-memory 2g -executor-cores 2 -class com.aliyun.EMR.example.LoghubSample ossref://EMR-test/jar/examples-1.1-shaded.jar canaltest canal sparkstreaming $SLS_endpoint $SLS_access_id $SLS_secret_key 1
运行以上的命令
查询 Master 节点的 IP
通过 SSH 登录后, 执行以下命令:
fs -ls /
可以看到 mysqlbinlog 开头的目录, 再通过以下命令查看 mysqlbinlog 文件:
fs -ls /mysqlbinlog
还可以通过 hadoop fs -cat /mysqlbinlog/part-00000 命令查看文件内容.
错误排查.
如果没有看到正常的结果, 可以登陆节点, 查看对应的作业的错误情况.
来源: https://yq.aliyun.com/articles/693105