最近公司开始做大数据项目, 让我使用 sqoop(1.6.4 版本)导数据进行数据分析计算, 然而当我们将所有的工作流都放到 azkaban 上时整个流程跑完需要花费 13 分钟, 而其中导数据 (增量) 就占了 4 分钟左右, 老板给我提供了使用 spark 导数据的思路, 学习整理了一个多星期, 终于实现了 sqoop 的主要功能.
这里我使用的是 pyspark 完成的所有操作.
条件: hdfs 平台, pyspark,ubuntu 系统
运行: 我这里是在 /usr/bin 目录下 (或者指定在此目录下 ) 运行的 python 文件, 也可以使用系统自带的 pyspark
1 ./spark-submit --jars "/home/engyne/spark/ojdbc7.jar" --master local /home/engyne/spark/SparkDataBase.py
其中 --jars 是指定连接 oracle 的驱动, ojdbc7.jar 对应的是 oracle12 版本,--master local /... 指定的是运行的 python 文件
注意: 我的代码没有解决中文问题, 所以不管是注释还是代码中都不能出现中文, 记得删除!!!
1,pyspark 连接 oracle, 导数据到 hive(后面的代码需要在此篇代码基础上进行, 重复代码不再 copy 了)
- import sys
- from pyspark.sql import HiveContext
- from pyspark import SparkConf, SparkContext, SQLContext
- conf = SparkConf().setAppName('inc_dd_openings')
- sc = SparkContext(conf=conf)
- sqlContext = HiveContext(sc)
- #以下是为了在 console 中打印出表内容
- reload(sys)
- sys.setdefaultencoding("utf-8")
- get_df_url = "jdbc:oracle:thin:@//192.168.1.1:1521/ORCLPDB"
- get_df_driver = "oracle.jdbc.driver.OracleDriver"
- get_df_user = "xxx"
- get_df_password = "xxx"
- df = sqlContext.read.format("jdbc") \
- .option("url", get_df_url) \
- .option("driver", get_df_driver) \
- .option("dbtable", "tableName") \
- .option("user", get_df_user).option("password", get_df_password) \
- .load()
- #df.show() #可以查看到获取的表的内容, 默认显示 20 行
- sqlContext.sql("use databaseName") #databaseName 指定使用 hive 中的数据库
- #创建临时表
- df.registerTempTable("tempTable")
- #创建表并写入数据
- sqlContext.sql("create table %s as select * from tempTable")
2,pyspark 在 hive 中创建动态分区表
- # 修改一下 hive 的默认设置以支持动态分区
- sqlContext.sql("set hive.exec.dynamic.partition=true")
- sqlContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")
- #设置 hive 支持创建分区文件的最大值
- sqlContext.sql("SET hive.exec.max.dynamic.partitions=100000")
- sqlContext.sql("SET hive.exec.max.dynamic.partitions.pernode=100000")
这里需要先手动创建分区表, 我使用 dataframe 的 dtypes 属性获取到表结构, 然后循环拼接表的每个字段在 hive 中所对应的类型
最后写入表数据的代码是:
1 sqlContext.sql("insert overwrite table STUDENT partition(AGE) SELECT ID,NAME,UPDATETIME,AGE FROM tempTable")
3, 实现增量导入数据
我这里使用了 MySql 数据库, 用来存储增量导入的信息, 创建表(job)
- DROP TABLE IF EXISTS `job`;
- CREATE TABLE `job` (
- `id` int(10) NOT NULL AUTO_INCREMENT,
`database_name` varchar(50) DEFAULT NULL, -- 数据库名称
`table_name` varchar(100) DEFAULT NULL, -- 需要增量导入的表名
`partition_column_name` varchar(100) DEFAULT NULL, -- 分区的字段名(这里只考虑对一个字段分区, 如果多个字段这里应该使用一对多表结构吧)
`partition_column_desc` varchar(50) DEFAULT NULL, -- 分区字段类型
`check_column` varchar(50) DEFAULT NULL, -- 根据 (table_name 中) 此字段进行增量导入校验(我这里例子使用的是 updatetime)
`last_value` varchar(255) DEFAULT NULL, -- 校验值
`status` int(1) NOT NULL, -- 是否使用(1 表示此 job 激活)
- PRIMARY KEY (`id`)
- ) INCREMENTAL=InnoDB AUTO_INCREMENT=81 DEFAULT CHARSET=utf8;
存储 STUDENT 表增量导入信息(这里是为了演示)
insert into `job`(`id`,`database_name`,`table_name`,`partition_column_name`,`partition_column_desc`,`check_column`,`last_value`,`status`)values (1,'test_datebase','STUDENT','AGE','string','UPDATETIME','2018-07-30',1)
python 连接 MySql 的方法我这里就直接怼代码了, 具体详解大家就看菜鸟教程 http://www.runoob.com/python/python-mysql.html
Ubuntu 需要安装 MySQLdb( sudo apt-get install python-mysqldb )
- import MySQLdb
- # insert update delete
- def conMysqlDB_exec(sqlStr):
- db = MySQLdb.connect("192.168.xxx.xxx", "xx", "xx", "xx", charset='utf8' )
- cursor = db.cursor()
- try:
- cursor.execute(sqlStr)
- db.commit()
- result = True
- except:
- print("---->MySqlError: execute error")
- result = False
- db.rollback()
- db.close
- return result
- # select
- def conMysqlDB_fetchall(sqlStr):
- db = MySQLdb.connect("192.168.xxx.xxx", "xx", "xx", "xx", charset='utf8' )
- cursor = db.cursor()
- results = []
- try:
- cursor.execute(sqlStr)
- results = cursor.fetchall()
- except:
- print("---->MySqlError: unable to fecth data")
- db.close
- return results
查询增量信息, 使用 spark 进行导入
- findJobSql = "SELECT * FROM job where status=1"
- result = conMysqlDB_fetchall(findJobSql)
- databaseName = val[1]
- tableName = val[2]
- partitionColumnName = val[3]
- partitionColumnDesc = val[4]
- checkColumn = val[5]
- lastValue = val[6]
- sqlContext.sql("use database")
- df = sqlContext.read.format("jdbc") \
- .option("url", "jdbc:oracle:thin:@//192.168.xxx.xxx:1521/ORCLPDB") \
- .option("driver", "oracle.jdbc.driver.OracleDriver") \
- .option("dbtable", "(select * from %s where to_char(%s,'yyyy-MM-dd')>'%s')" % (tableName, checkColumn, lastValue)) \ #这里是关键, 直接查询出新增的数据, 这样后面的速度才能提升, 否则要对整个表的 dataframe 进行操作, 慢死了, 千万不要相信 dataframe 的 filter,where 这些东西, 4 万多条数据要查 3 分钟!!!
- .option("user", "xxx").option("password", "xxx") \
- .load()
- def max(a, b):
- if a>b:
- return a
- else:
- return b
- try:
- #获取到新增字段的最大值!!!(这块也困了我好久)这里使用的是 python 的 reduce 函数, 调用的 max 方法
- nowLastValue = df.rdd.reduce(max)[checkColumn]
- df.registerTempTable("temp")# 写入内容
- saveSql = "insert into table student select * from temp"
- sqlContext.sql(saveSql)
- #更新 mysql 表, 使 lastValue 是表最新值
- updataJobSql = "UPDATE job SET last_value='%s'WHERE table_name='%s'" % (nowLastValue, tableName)
- if conMysqlDB_exec(updataJobSql):
- print("---->SUCCESS: incremental import success")
- except ValueError:
- print("---->INFO: No new data added!")
- except:
- print("---->ERROR: other error")
4, 解决导入数据换行符问题
有时候 oracle 中的数据中会存在换行符 ("\n") 然而 hive1.1.0 中数据换行默认识别的也是 \ n, 最坑的是还不能对它进行修改(目前我没有查出修改的方法, 大家要是有办法欢迎在评论区讨论)
那我只能对数据进行处理了, 以前使用 sqoop 的时候也有这个问题, 所幸 sqoop 有解决换行符的语句,,,, 巴拉巴拉,,, 扯远了
解决换行符需要 dataframe 的 map 方法, 然后使用 lambda 表达式进行 replace, 总结好就是下面的代码(第 3 行)
解释: 这是个 for 循环里面加 if else 判断, 整个需要用 [ ] 包起来, 没错这是个 list , 如果不包就报错, lambda x 获取到的是表中一行行的数据, for 循环对每一行进行遍历, 然后对一行中每个字段
进行判断, 是否是 unicode 或者 str 类型,(一般只有是这两个类型才存在换行符)如果是则进行 replace 处理, 否则不做处理.
转化好之后这是个 rdd 类型的数据, 需要转化为 dataframe 类型才能写入 hive
- #df 自带获取 schema 的方法, 不要学我去拼凑出来()
- schema = df.schema
- rdd = df.map(lambda x : [(x[i].replace("\n","").replace("\r","") if isinstance(x[i], unicode) or isinstance(x[i], str) else x[i]) for i in range(len(x))])
- df = sqlContext.createDataFrame(rdd, schema)
完成代码我已经上传到 github 上了 https://github.com/yangzijia/learnSpark ,
总结: 使用 spark 进行数据导入和增量导入与 sqoop 做对比, 80 张表, sqoop 4 分钟多, 使用此方法, 0.7 分钟,(同是没有新数据的前提下), 普通导表, 此方法 5 分钟, 80 张大表(外网 oracle),sqoop 的话我就不说了, 当时一张 200 万数据的表导了一晚上...
初次写这么多话, 内容不是很紧凑, 如果大家还有其他的问题, 欢迎在评论区留言提意见.
来源: https://www.cnblogs.com/yzj-blog/p/9393297.html