最近加入一个 Spark 项目, 作为临时的开发人员协助进行开发工作. 该项目中不存在测试的概念, 开发人员按需求进行编码工作后, 直接向生产系统部署, 再由需求的提出者在生产系统检验程序运行结果的正确性. 在这种原始的工作方式下, 产品经理和开发人员总是在生产系统验证自己的需求, 代码. 可以想见, 各种直接交给用户的错误导致了一系列的事故和不信任. 为了处理各类线上问题, 大家都疲于奔命. 当工作进行到后期, 每一个相关人都已经意气消沉, 常常对工作避之不及.
为了改善局面, 我尝试了重构部分代码, 将连篇的 SQL 分散到不同的方法里, 并对单个方法构建单元测试. 目的是, 在编码完成后, 首先在本地执行单元测试, 以实现:
部署到生产系统的代码中无 SQL 语法错误.
将已出现的 bug 写入测试用例, 避免反复出现相同的 bug.
提前发现一些错误, 减少影响到后续环节的问题.
通过自动化减少开发和处理程序问题的总时间花费.
通过流程和结果的改善, 减少开发人员的思维负担, 增加与其他相关人的互信.
本文将介绍我的 Spark 单元测试实践, 供大家参考, 批评. 本文中的 Spark API 是 PySpark, 测试框架为 pytest.
对于希望将本文当作单元测试教程使用的读者, 本文会假定读者已经准备好了开发和测试所需要的环境, 如果没有也没有关系, 文末的参考部分会包含一些配置环境相关的链接.
本文链接: https://www.cnblogs.com/hhelibeb/p/10534862.html
转载请注明
概念
定义
单元测试是一种测试方法, 它的对象是单个程序单元 / 组件, 目的是验证软件的每个组件都符合设计要求.
单元是软件中最小的可测试部分. 它通常包含一些输入和单一的输出.
本文中的单元就是 python 函数 (function).
单元测试通常是程序开发人员的工作.
原则
为了实现单元测试, 函数最好符合一个条件,
对于相同的输入, 函数总有相同的输出.
这要求函数内部不能存在 "副作用".
它的输出结果的确定不应该依赖输入参数外的任何内容, 例如, 不可以因为本地测试环境中没有相应的数据库就产生 "连接数据库异常" 导致无法返回结果.
它也不应该改变除了返回结果以外的任何内容, 例如, 不可以改变全局可变状态.
代码实践
下面是数据和程序部分.
数据
假设我们的服务对象是一家水果运销公司, 公司在不同城市设有仓库, 现有三张表, 其中 inventory 包含水果的总库存数量信息, inventory_ratio 包含水果在不同城市的应有比例,
目标是根据总库存数量和比例算出水果在各地的库存, 写入到第三张表 inventory_city 中. 三张表的列如下,
1. inventory. Columns: "item", "qty".
- inventory_ratio. Columns: "item", "city", "ratio".
- inventory_city. Columns: "item", "city", "qty".
第一版代码
用最直接的方式实现这一功能, 代码将是,
- from pyspark.sql import SparkSession
- if __name__ == "__main__":
- spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
- result = spark.sql('''select t1.item, t2.city,
- case when t2.ratio is not null then t1.qty * t2.ratio
- else t1.qty
- end as qty
- from v_inventory as t1
- left join v_ratio as t2 on t1.item = t2.item ''')
- result.write.CSV(path="somepath/inventory_city", mode="overwrite")
这段代码可以实现计算各城市库存的需求, 但测试起来会不太容易. 特别是如果未来我们还要在这个程序中增加其他逻辑的话, 不同的逻辑混杂在一起, 测试和修改都会变得麻烦.
所以, 在下一步, 我们要将部分代码封装到一个函数中.
有副作用的函数
创建一个名为 get_inventory_city 的函数, 将代码包含在内,
- from pyspark.sql import SparkSession
- def get_inventory_city():
- spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
- result = spark.sql('''select t1.item, t2.city,
- case when t2.ratio is not null then t1.qty * t2.ratio
- else t1.qty
- end as qty
- from v_inventory as t1
- left join v_ratio as t2 on t1.item = t2.item ''')
- result.write.CSV(path="somepath/inventory_city", mode="overwrite") if __name__ == "__main__": get_inventory_city()
显然, 这是一个不太易于测试的函数, 因为它,
没有输入输出参数, 不能直接根据给定数据检验运行结果.
包含对数据库的读 / 写, 这意味着它要依赖外部数据库.
包含对 spark session 的获取 / 创建, 这和计算库存的逻辑也毫无关系.
我们把这些函数中的多余的东西称为副作用. 副作用和函数的核心逻辑纠缠在一起, 使单元测试变得困难, 也不利于代码的模块化.
我们必须另外管理副作用, 只在函数内部保留纯逻辑.
无副作用的函数
按照上文中提到的原则, 重新设计函数, 可以得到,
- from pyspark.sql import SparkSession, DataFrame
- def get_inventory_city(spark: SparkSession, inventory: DataFrame, ratio: DataFrame):
- inventory.createOrReplaceTempView('v_inventory')
- ratio.createOrReplaceTempView('v_ratio')
- result = spark.sql('''select t1.item, t2.city,
- case when t2.ratio is not null then t1.qty * t2.ratio
- else t1.qty
- end as qty
- from v_inventory as t1
- left join v_ratio as t2 on t1.item = t2.item ''')
- return result
- if __name__ == "__main__":
- spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
- inventory = spark.sql('''select * from inventory''')
- ratio = spark.sql('''select * from inventory_ratio''')
- result = get_inventory_city(spark, inventory, ratio)
- result.write.CSV(path="somepath/inventory_city", mode="overwrite")
修改后的函数 get_inventory_city 有 3 个输入参数和 1 个返回参数, 函数内部已经不再包含对 spark session 和数据库表的处理, 这意味着对于确定的输入值, 它总会输出确定的结果.
这比之前的设计更加理想, 因为函数只包含纯逻辑, 所以调用者使用它时不会再受到副作用的干扰, 这使得函数的可测试性和可组合性得到了提高.
测试代码
创建一个 test_data 目录, 将 CSV 格式的测试数据保存到里面. 测试数据的来源可以是手工模拟制作, 也可以是生产环境导出.
然后创建测试文件, 添加代码,
- from inventory import get_inventory_city
- from pyspark.sql import SparkSession
- spark = SparkSession.builder.appName('TestAPP').enableHiveSupport().getOrCreate()
- def test_get_inventory_city():
- #导入测试数据
- inventory = spark.read.format("csv").option("header", "true").load("./test_data/inventory.csv")
- ratio = spark.read.format("csv").option("header", "true").load("./test_data/inventory_ratio.csv")
- #执行函数
- result = get_inventory_city(spark, inventory, ratio)
- #验证拆分后的总数量等于拆分前的总数量
- result.createOrReplaceTempView('v_result')
- inventory.createOrReplaceTempView('v_inventory')
- qty_before_split = spark.sql('''select sum(qty) as qty from v_inventory''')
- qty_after_split = spark.sql('''select sum(qty) as qty from v_result''')
- assert qty_before_split.take(1)[0]['qty'] == qty_after_split.take(1)[0]['qty']
执行测试, 可以看到以下输出内容
- ============================= test session starts =============================
- platform win32 -- Python 3.6.8, pytest-4.3.1, py-1.8.0, pluggy-0.9.0
- rootdir: C:\Users\zhaozhe42\PycharmProjects\spark_unit\unit, inifile:collected 1 item
- test_get_inventory_city.py .2019-03-21 14:16:24 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
- [100%]
- ========================= 1 passed in 18.06 seconds ==========================
这样一个单元测试例子就完成了.
相比把程序放到服务器测试, 单元测试的运行速度更快, 开发者不用再担心测试会对用户造成影响, 也可以更早发现在编码期间犯下的错误. 它也可以成为自动化测试的基础.
待解决的问题
目前我已经可以在项目中构建初步的单元测试, 但依然面临着一些问题.
运行时间
上面这个简单的测试示例在我的联想 T470 笔记本上需要花费 18.06 秒执行完成, 而实际项目中的程序的复杂度要更高, 执行时间也更长. 执行时间过长一件糟糕的事情, 因为单元测试的执行花费越大, 就会越被开发者拒斥. 面对显示器等待单元测试执行完成的时间是难捱的. 虽然相比于把程序丢到生产系统中执行, 单元测试已经可以节约不少时间, 但还不够好.
接下来可能会尝试的解决办法: 提升电脑配置 / 改变测试数据的导入方式.
有效范围
在生产实践中构建纯函数是一件不太容易的事情, 它对开发者的设计和编码能力有相当的要求.
单元测试虽然能帮助发现一些问题和确定问题代码范围, 但它似乎并不能揭示错误的原因.
笔者水平有限, 目前写出的代码中仍有不少单元测试力所不能及的地方. 可能需要在实践中对它们进行改进, 或者引入其它测试手段作为补充.
参考
一些参考内容.
配置
Getting Started with PySpark on Windows http://deelesh.github.io/pyspark-windows.html
win10 下安装 pyspark
PyCharm 中的 pytest https://www.jetbrains.com/help/pycharm/pytest.html
pycharm 配置 spark 2.2.0 https://www.jianshu.com/p/22426c490066
阅读
函数响应式领域建模 https://book.douban.com/subject/27605361/
ABAP 单元测试最佳实践
来源: https://www.cnblogs.com/hhelibeb/p/10534862.html