本文对应脚本及数据已上传至我的 GitHub 仓库 https://github.com/CNFeffery/DataScienceStudyNotes
1 简介
在数据分析任务中, 从原始数据读入, 到最后分析结果出炉, 中间绝大部分时间都是在对数据进行一步又一步的加工规整, 以流水线 (pipeline) 的方式完成此过程更有利于梳理分析脉络, 也更有利于查错改正. pdpipe 作为专门针对 pandas 进行流水线化改造的模块, 为熟悉 pandas 的数据分析人员书写优雅易读的代码提供一种简洁的思路, 本文就将针对 pdpipe 的用法进行介绍.
2 pdpipe 常用功能介绍
pdpipe 的出现极大地对数据分析过程进行规范, 其主要拥有以下特性:
简洁的语法逻辑
在流水线工作过程中可输出规整的提示或错误警报信息
轻松串联不同数据操作以组成一条完整流水线
轻松处理多种类型数据
纯 Python 编写, 便于二次开发
通过 pip install pdpipe 安装完成, 接下来我们将在 jupyter lab 中以 TMDB 5000 Movie Dataset https://www.kaggle.com/tmdb/tmdb-movie-metadata 中的 tmdb_5000_movies.CSV 数据集 (图 1) 为例来介绍 pdpipe 的主要功能, 这是 Kaggle https://www.kaggle.com/ 上的公开数据集, 记录了一些电影的相关属性信息, 你也可以在数据科学学习手札系列文章的 GitHub 仓库对应本篇文章的路径下直接获取该数据集.
图 1 TMDB 5000 Movie Dataset 数据集
2.1 从一个简单的例子开始
首先在 jupyter lab 中读入 tmdb_5000_movies.CSV 数据集并查看其前 3 行(图 2):
- import pandas as pd
- import pdpipe
- # 读入 tmdb_5000_movies.CSV 数据集并查看前 3 行
- data = pd.read_csv('tmdb_5000_movies.csv');data.head(3)
图 2
可以看出, 数据集包含了数值, 日期, 文本以及 JSON 等多种类型的数据, 现在假设我们需要基于此数据完成以下流程:
1, 删除 original_title 列
2, 对 title 列进行小写化处理
3, 丢掉 vote_average 小于等于 7, 且 original_language 不为 en 的行
4, 求得 genres 对应电影类型的数量保存为新列 genres_num, 并删除原有的 genres 列
5, 丢掉 genres_num 小于等于 5 的行
上述操作直接使用 pandas 并不会花多少时间, 但是想要不创造任何中间临时结果一步到位产生所需的数据框子集, 并且保持代码的可读性不是一件太容易的事, 但是利用 pdpipe, 我们可以非常优雅地实现上述过程:
- # 以 pdp.PdPipeline 传入流程列表的方式创建 pipeline
- first_pipeline = pdp.PdPipeline([pdp.ColDrop("original_title"),
- pdp.ApplyByCols(columns=['title'], func=lambda x: x.lower()),
- pdp.RowDrop({'vote_average': lambda x: x <= 7, 'original_language': lambda x: x != 'en'}),
- pdp.ApplyByCols(columns=['genres'], func=lambda x: [item['name'] for item in eval(x)].__len__(), result_columns=['genres_num']),
- pdp.RowDrop({'genres_num': lambda x: x <= 5})])
- # 将创建的 pipeline 直接作用于 data 直接得到所需结果, 并打印流程信息
- first_pipeline(data, verbose=True).reset_index(drop=True)
得到的结果如图3所示:
图3
我们不仅保证了代码优雅简洁, 可读性强, 结果的一步到位, 还自动打印出整个流水线运作过程的状态说明! 令人兴奋的是 pdpipe 充分封装了 pandas 的核心功能尤其是 apply 相关操作, 使得常规或非常规的数据分析任务都可以利用 pdpipe 中的 API 结合自定义函数来优雅地完成, 小小领略到 pdpipe 的妙处之后, 下文我们来展开详细介绍.
2.2 pdpipe 中的重要子模块
pdpipe 中的 API 按照不同分工被划分到若干子模块, 下面将针对常用的几类 API 展开介绍.
2.2.1 basic_stages
basic_stages 中包含了对数据框中的行, 列进行丢弃 / 保留, 重命名以及重编码的若干类:
ColDrop:
这个类用于对指定单个或多个列进行丢弃, 其主要参数如下:
columns: 字符串或列表, 用于指定需要丢弃的列名
errors: 字符串, 传入'ignore'或'raise', 用于指定丢弃指定列时遇到错误采取的应对策略,'ignore'表示忽略异常,'raise'表示抛出错误打断流水线运作, 默认为'raise'
下面是举例演示(注意单个流水线部件可以直接传入源数据执行 apply 方法直接得到结果), 我们分别对单列和多列进行删除操作:
单列删除
- # 删除 budget 列
- pdp.ColDrop(columns='budget').apply(data).head(3)
删除后得到的结果如图 4:
图 4
多列删除
- # 删除 budget 之外的所有列
- del_col = data.columns.tolist()
- del_col.remove('budget')
- pdp.ColDrop(columns=del_col).apply(data).head(3)
得到的结果中只有 budget 列被保留, 如图 5:
图 5
ColRename:
这个类用于对指定列名进行重命名, 其主要参数如下:
rename_map: 字典, 传入旧列名 ->新列名键值对
下面是举例演示:
列重命名
- # 将 budget 重命名为 Budget
- pdp.ColRename(rename_map={
- 'budget': 'Budget'
- }).apply(data).head(3)
结果如图6:
图 6
ColReorder:
这个类用于修改列的顺序, 其主要参数如下:
positions: 字典, 传入列名 ->新的列下标键值对
下面是举例演示:
修改列位置
- # 将 budget 从第 0 列挪动为第 3 列
- pdp.ColReorder(positions={
- 'budget': 3
- }).apply(data).head(3)
结果如图 7:
图 7
DropNa:
这个类用于丢弃数据中空值元素, 其主要参数与 pandas 中的 dropna()保持一致, 核心参数如下:
axis:0 或 1,0 表示删除含有缺失值的行, 1 表示删除含有缺失值的列
下面是举例演示, 首先我们创造一个包含缺失值的数据框:
- import numpy as np
- # 创造含有缺失值的示例数据
- df = pd.DataFrame({'a': [1, 4, 1, 5],
- 'b': [4, None, np.nan, 7]})
- df
图 8
删除缺失值所在行
- # 删除含有缺失值的行
- pdp.DropNa(axis=0).apply(df)
结果如图 9:
图 9
删除缺失值所在列
- # 删除含有缺失值的列
- pdp.DropNa(axis=1).apply(df)
结果如图 10:
图 10
FreqDrop:
这个类用于删除在指定的一列数据中出现频次小于所给阈值对应的全部行, 主要参数如下:
threshold:int 型, 传入频次阈值, 低于这个阈值的行将会被删除
column:str 型, 传入 threshold 参数具体作用的列
下面是举例演示, 首先我们来查看电影数据集中 original_language 列对应的频次分布情况:
- # 查看 original_language 频次分布
- pd.value_counts(data['original_language'])
图 11
下面我们来过滤删除 original_language 列出现频次小于 10 的行:
- # 过滤 original_language 频次低于 10 的行, 再次查看过滤后的数据 original_language 频次分布
- pd.value_counts(pdp.FreqDrop(threshold=10, column='original_language').apply(data)['original_language'])
图 12
RowDrop:
这个类用于删除满足指定限制条件的行, 主要参数如下:
conditions:dict 型, 传入指定列 ->该列删除条件键值对
reduce:str 型, 用于决定多列组合条件下的删除策略,'any'相当于条件或, 即满足至少一个条件即可删除;'all'相当于条件且, 即满足全部条件才可删除;'xor'相当于条件异或, 即当恰恰满足一个条件时才会删除, 满足多个或 0 个都不进行删除. 默认为'any'
下面是举例演示, 我们以 budget 小于 100000000,genres 不包含 Action,release_date 缺失以及 vote_count 小于 1000 作为组合删除条件, 分别查看在三种不同删除策略下的最终得以保留的数据行数:
删除策略: any
- pdp.RowDrop(conditions={'budget': lambda x: x <= 100000000,
- 'genres': lambda x: 'Action' not in x,
- 'release_date': lambda x: x == np.nan,
- 'vote_count': lambda x: x <= 1000},
- reduce='any').apply(data).shape[0]
删除策略: all
- pdp.RowDrop(conditions={'budget': lambda x: x <= 100000000,
- 'genres': lambda x: 'Action' not in x,
- 'release_date': lambda x: x == np.nan,
- 'vote_count': lambda x: x <= 1000},
- reduce='all').apply(data).shape[0]
删除策略: xor
- pdp.RowDrop(conditions={'budget': lambda x: x <= 100000000,
- 'genres': lambda x: 'Action' not in x,
- 'release_date': lambda x: x == np.nan,
- 'vote_count': lambda x: x <= 1000},
- reduce='xor').apply(data).shape[0]
对应的结果如下:
图 13
2.2.2 col_generation
col_generation 中包含了从原数据中产生新列的若干功能:
AggByCols:
这个类用于将指定的函数作用到指定的列上以产生新结果(可以是新的列也可以是一个聚合值), 即这时函数真正传入的最小计算对象是列, 主要参数如下:
columns:str 或 list, 用于指定对哪些列进行计算
func: 传入需要计算的函数
drop:bool 型, 决定是否在计算完成后把旧列删除, 默认为 True, 即对应列的计算结果直接替换掉对应的旧列
suffix:str 型, 控制新列后缀名, 当 drop 参数设置为 False 时, 结果列的列名变为其对应列 + suffix 参数指定的后缀名; 当 drop 设置为 False 时, 此参数将不起作用(因为新列直接继承了对应旧列的名称)
result_columns:str 或 list, 与 columns 参数一一对应的结果列名称, 当你想要自定义结果新列名称时这个参数就变得非常有用, 默认为 None
func_desc:str 型, 可选参数, 为你的函数添加说明文字, 默认为 None
下面我们来举例演示帮助理解上述各个参数:
针对单个列进行计算
- pdp.AggByCols(columns='budget',
- func=np.log).apply(data).head(3)
对应的结果如图 14, 可以看到在只传入 columns 和 func 这两个参数, 其他参数均为默认值时, 对 budget 列做对数化处理后的新列直接覆盖了原有的 budget 列:
图 14
设置 drop 参数为 False, 并将 suffix 参数设置为'_log':
- # 设置 drop 参数为 False, 并将 suffix 参数设置为'_log'
- pdp.AggByCols(columns='budget',
- func=np.log,
- drop=False,
- suffix='_log').apply(data).head(3)
图 15
可以看到这时原有列得以保留, 新的列以旧列名 + 后缀名的方式被添加到旧列之后, 下面我们修改 result_columns 参数以自定义结果列名:
- # 设置 drop 参数为 False, 并将 suffix 参数设置为'_log'
- pdp.AggByCols(columns='budget',
- func=np.log,
- result_columns='budget(log)').apply(data).head(3)
图 16
针对多个列进行计算
- pdp.AggByCols(columns=['budget', 'revenue'],
- func=np.log,
- drop=False,
- suffix='_log').apply(data).head(3)
图 17
计算列的聚合值
- pdp.AggByCols(columns='budget',
- func=np.mean, # 这里传入的函数是聚合类型的
- drop=False,
- suffix='_mean').apply(data).loc[:, ['budget', 'budget_mean']]
这时为了保持整个数据框形状的完整, 计算得到的聚合值填充到新列的每一个位置上:
图 18
ApplyByCols:
这个类用于实现 pandas 中对列的 apply 操作, 不同于 AggByCols 中函数直接处理的是列, ApplyByCols 中函数直接处理的是对应列中的每个元素. 主要参数如下:
columns:str 或 list, 用于指定对哪些列进行 apply 操作
func: 传入需要计算的函数
drop:bool 型, 决定是否在计算完成后把旧列删除, 默认为 True, 即对应列的计算结果直接替换掉对应的旧列
colbl_sfx:str 型, 控制新列后缀名, 当 drop 参数设置为 False 时, 结果列的列名变为其对应列 + suffix 参数指定的后缀名; 当 drop 设置为 False 时, 此参数将不起作用(因为新列直接继承了对应旧列的名称)
result_columns:str 或 list, 与 columns 参数一一对应的结果列名称, 当你想要自定义结果新列名称时这个参数就变得非常有用, 默认为 None
func_desc:str 型, 可选参数, 为你的函数添加说明文字, 默认为 None
下面我们来举例演示帮助理解上述各个参数:
求 spoken_languages 涉及语言数量
下面的示例对每部电影中涉及的语言语种数量进行计算:
- pdp.ApplyByCols(columns='spoken_languages',
- func=lambda x: [item['name'] for item in eval(x)].__len__(),
- drop=False,
- result_columns='spoken_languages_num').apply(data)[['spoken_languages', 'spoken_languages_num']]
对应的结果:
图 19
ApplyToRows:
这个类用于实现 pandas 中对行的 apply 操作, 传入的计算函数直接处理每一行, 主要参数如下:
func: 传入需要计算的函数, 对每一行进行处理
colname:str 型, 用于定义结果列的名称(因为 ApplyToRows 作用的对象是一整行, 因此只能形成一列返回值), 默认为'new_col'
follow_column:str 型, 控制结果列插入到指定列名之后, 默认为 None, 即放到最后一列
func_desc:str 型, 可选参数, 为你的函数添加说明文字, 默认为 None
下面我们来举例演示帮助理解上述各个参数:
得到对应电影的盈利简报
- pdp.ApplyToRows(func=lambda row: f"{row['original_title']}: {round(((row['revenue'] / row['budget'] -1)*100), 2)}%" if row['budget'] != 0
- else f"{row['original_title']}: 因成本为 0 故不进行计算",
- colname='movie_desc',
- follow_column='budget',
- func_desc='输出对应电影的盈利百分比').apply(data).head(3)
对应的结果:
图 20
Bin:
这个类用于对连续型数据进行分箱, 主要参数如下:
bin_map: 字典型, 传入列名 ->分界点列表
drop:bool 型, 决定是否在计算完成后把旧列删除, 默认为 True, 即对应列的计算结果直接替换掉对应的旧列
下面我们以计算电影盈利率小于 0, 大于 0 小于 100% 以及大于 100% 作为三个分箱区间, 首先我们用到上文介绍过的 RowDrop 丢掉那些成本或利润为 0 的行, 再用 ApplyToRows 来计算盈利率, 最终使用 Bin 进行分箱:
为电影盈利率进行数据分箱
- pipeline = pdp.PdPipeline([pdp.RowDrop(conditions={'budget': lambda x: x == 0,
- 'revenue': lambda x: x == 0},
- reduce='any'),
- pdp.ApplyToRows(func=lambda row: row['revenue'] / row['budget'] - 1,
- colname='rate of return',
- follow_column='budget'),
- pdp.Bin(bin_map={'rate of return': [0, 1]}, drop=False)])
- pipeline(data).head(3)
对应的结果:
图 21
OneHotEncode:
这个类用于为类别型变量创建哑变量(即独热处理), 效果等价于 pandas 中的, 主要参数如下:
columns:str 或 list, 用于指定需要进行哑变量处理的列名, 默认为 None, 即对全部类别型变量进行哑变量处理
dummy_na:bool 型, 决定是否将缺失值也作为哑变量的一个类别进行输出, 默认为 False 即忽略缺失值
exclude_columns:list, 当 columns 参数设置为 None 时, 这个参数传入的列名列表中指定的列将不进行哑变量处理, 默认为 None, 即不对任何列进行排除
drop_first:bool 型或 str 型, 默认为 True, 这个参数是针对哑变量中类似这样的情况: 譬如有类别型变量性别 {男性, 女性}, 那么实际上只需要产生一列 0-1 型哑变量即可表示原始变量的信息, 即性别{男性, 女性}-> 男性{0,1},0 代表不为男性即女性, 1 相反, 而 drop_dirst 设置为 False 时, 原始变量有几个类别就对应几个哑变量被创造; 当设置为指定类别值时(譬如设置
drop_first = '男性'
), 这个值对应的类别将不进行哑变量生成
drop:bool 型, 控制是否在生成哑变量之后删除原始的类别型变量, 默认为 True 即删除
下面我们伪造包含哑变量的数据框:
- # 伪造的数据框
- df = pd.DataFrame({
- 'a': ['x', 'y', 'z'],
- 'b': ['i', 'j', 'q']
- })
- df
图 22
默认参数下执行 OneHotEncode:
pdp.OneHotEncode().apply(df)
图 23
设置 drop_first 为 False:
pdp.OneHotEncode(drop_first=False).apply(df)
图 23
2.2.3 text_stages
text_stages 中包含了对数据框中文本型变量进行处理的若干类, 下文只介绍其中我认为最有用的:
RegexReplace:
这个类用于对文本型列进行基于正则表达式的内容替换, 其主要参数如下:
columns:str 型或 list 型, 传入要进行替换的单个或多个列名
pattern:str, 传入匹配替换内容的正则表达式
replace:str, 传入替换后的新字符串
result_columns:str 或 list, 与 columns 参数一一对应的结果列名称, 当你想要自定义结果新列名称时这个参数就变得非常有用, 默认为 None, 即直接替换原始列
drop:bool 型, 用于决定是否删除替换前的原始列, 默认为 True, 即删除原始列
下面是举例演示:
替换 original_language 中的'en'或'cn'为'英文 / 中文'
- pdp.RegexReplace(columns='original_language',
- pattern='en|cn',
- replace='英文 / 中文').apply(data)['original_language'].unique()
结果如图 24:
图 24
2.3 组装 pipeline 的几种方式
上文中我们主要演示了单一 pipeline 部件工作时的细节, 接下来我们来了解 pdpipe 中组装 pipeline 的几种方式:
2.3.1 PdPipeline
这是我们在 2.1 中举例说明使用到的创建 pipeline 的方法, 直接传入由按顺序的 pipeline 组件组成的列表便可生成所需 pipeline, 而除了直接将其视为函数直接传入原始数据和一些辅助参数 (如 verbose 控制是否打印过程) 之外, 还可以用类似 scikit-learn 中的 fit_transform 方法:
- # 调用 pipeline 的 fit_transform 方法作用于 data 直接得到所需结果, 并打印流程信息
- first_pipeline.fit_transform(data, verbose=1)
图 25
2.3.2 make_pdpipeline
与 PdpPipeline 相似, make_pdpipeline 不可以传入 pipeline 组件形成的列表, 只能把每个 pipeline 组件当成位置参数按顺序传入:
- # 以 make_pdpipeline 将 pipeline 组件作为位置参数传入的方式创建 pipeline
- first_pipeline1 = pdp.make_pdpipeline(pdp.ColDrop("original_title"),
- pdp.ApplyByCols(columns=['title'], func=lambda x: x.lower()),
- pdp.RowDrop({'vote_average': lambda x: x <= 7, 'original_language': lambda x: x != 'en'}),
- pdp.ApplyByCols(columns=['genres'], func=lambda x: [item['name'] for item in eval(x)].__len__(), result_columns=['genres_num']),
- pdp.RowDrop({'genres_num': lambda x: x <= 5}))
- # 以 pdp.PdPipeline 传入流程列表的方式创建 pipeline
- first_pipeline2 = pdp.PdPipeline([pdp.ColDrop("original_title"),
- pdp.ApplyByCols(columns=['title'], func=lambda x: x.lower()),
- pdp.RowDrop({'vote_average': lambda x: x <= 7, 'original_language': lambda x: x != 'en'}),
- pdp.ApplyByCols(columns=['genres'], func=lambda x: [item['name'] for item in eval(x)].__len__(), result_columns=['genres_num']),
- pdp.RowDrop({'genres_num': lambda x: x <= 5})])
- # 比较两种不同方式创建的 pipeline 产生结果是否相同
- first_pipeline1.fit_transform(data) == first_pipeline2(data)
比较结果如图 26, 两种方式殊途同归:
图 26
以上就是本文全部内容, 如有笔误望指出!
参考资料:
https://pdpipe.github.io/pdpipe/doc/pdpipe/
来源: https://www.cnblogs.com/feffery/p/12179647.html