今天是 spark 专题的第七篇文章, 我们一起看看 spark 的数据分析和处理.
过滤去重
在机器学习和数据分析当中, 对于数据的了解和熟悉都是最基础的. 所谓巧妇难为无米之炊, 如果说把用数据构建一个模型或者是支撑一个复杂的上层业务比喻成做饭的话. 那么数据并不是 "米", 充其量最多只能算是未脱壳的稻. 要想把它做成好吃的料理, 必须要对原生的稻谷进行处理.
但是处理也并不能乱处理, 很多人做数据处理就是闷头一套三板斧. 去空值, 标准化还有 one-hot, 这一套流程非常熟悉. 以至于在做的时候都不会想, 做这些处理的意义是什么. 我们做数据处理也是有的放矢的, 针对不同的情况采取不同的策略. 所以说到这里, 你应该已经明白了, 首要任务还是需要先对数据有个基本的了解, 做到心中有数.
那么怎么做到心中有数呢? 我们先来看一个具体的例子, 假设现在我们有了这么一批数据:
- df = spark.createDataFrame([
- (1, 144.5, 5.9, 33, 'M'),
- (2, 167.2, 5.4, 45, 'M'),
- (3, 124.1, 5.2, 23, 'F'),
- (4, 144.5, 5.9, 33, 'M'),
- (5, 133.2, 5.7, 54, 'F'),
- (3, 124.1, 5.2, 23, 'F'),
- (5, 129.2, 5.3, 42, 'M'),
- ], ['id', 'weight', 'height', 'age', 'gender'])
这批数据粗略看起来没什么问题, 但实际上藏着好几个坑.
首先, id 为 3 的数据有两条, 不仅如此, 这两条数据的特征也完全一样. 其次, id 为 1 和 4 的数据特征也完全相同, 只是 id 不同. 除此之外, id 为 5 的数据也有两条, 但是它们的特征都不同. 显然这不是同一条数据, 应该是记录的时候出现的错误.
那么对于这样一份数据, 我们怎么发现它们当中的问题, 又怎么修正呢?
我们先从最简单开始, 先来找找完全一样的数据. 我们通过 count 方法可以求出整个数据集当中的条数, 通过 distinct().count() 可以获得去重之后的数据数量. 这两个结合一起使用, 就可以看出是否存在数据完全重复的情况.
可以看出来, 直接 count 是 7 条, 如果加上 distinct 的话是 6 条, 也就是说出现了数据的完全重复. 那么我们可以知道, 我们需要做一下去重, 去除掉完全重复的行, 要去除也非常简单, dataframe 当中自带了 dropDuplicates 方法, 我们直接调用即可:
很明显, 刚才两条完全一样 id 为 3 的数据少了一条, 被 drop 掉了.
接下来, 我们继续分析, 怎么判断是否存在 id 不同但是其他数据相同的情况呢?
其实也是一样使用 distinct.count, 只不过我们需要把 count distinct 运算的范畴去除掉 id. 我们可以通过 columns 获取 dataframe 当中的列名, 我们遍历一下列名, 过滤掉 id 即可.
这里我们依然还是套用的 distinct.count 只不过我们在使用之前通过 select 限制了使用范围, 只针对除了 id 之外的列进行去重的计算.
不仅 distinct 如此, dropDuplicate 同样可以限制作用的范围. 使用的方法也很简单, 我们通过 subset 这个变量来进行控制, 我们传入一个 list, 表示 duplicate 的范围.
可以很明显地看到, 我们的数据又减少了一条. 说明我们去除掉了 id 不同但是内容一样的情况, 最后还剩下 id 相同, 但是内容不同的情况. 这种情况一般是由于记录的时候发生了错误, 比如并发没有处理好, 导致两条不同的信息采用了同一个 id.
这个很简单, 因为我们已经经过了整体去重了, 所以正常是不应该存在 id 一样的条目的. 所以我们只需要判断 id 是否有重复就好了. 判断的方法也很简单, 我们 count 一下 id 的数量.
这里我们可以和之前一样通过 distinct.count 来判断, 这里我们介绍一种新的方法, 叫做 agg.agg 是 aggregate 的缩写, 直译过来是聚合的意思. 通过 agg 我们可以对一些列进行聚合计算, 比如说 sum,min,max 这些. 在这个问题当中, 我们要进行的聚合计算就是 count 和 count distinct, 这两个也有现成的函数, 我们导入就可以直接用了.
也就是说通过 agg 我们可以同时对不同的列进行聚合操作, 我们发现加上了 distinct 之后, 只剩下了 4 条, 说明存在两条不同的数据 id 一样的情况.
接下来我们要做的就是给这些数据生成新的 id, 从而保证每一条数据的 id 都是 unique 的. 这个也有专门的函数, 我们直接调用就好:
monotonically_increasing_id 这个方法会生成一个唯一并且递增的 id, 这样我们就生成了新的 id, 完成了整个数据的去重过滤.
空值处理
当我们完成了数据的过滤和清洗还没有结束, 我们还需要对空值进行处理. 因为实际的数据往往不是完美的, 可能会存在一些特征没有收集到数据的情况. 空值一般是不能直接进入模型的, 所以需要我们对空值进行处理.
我们再创建一批数据:
- df_miss = spark.createDataFrame([
- (1, 143.5, 5.6, 28, 'M', 100000),
- (2, 167.2, 5.4, 45, 'M', None),
- (3, None , 5.2, None, None, None),
- (4, 144.5, 5.9, 33, 'M', None),
- (5, 133.2, 5.7, 54, 'F', None),
- (6, 124.1, 5.2, None, 'F', None),
- (7, 129.2, 5.3, 42, 'M', 76000),
- ], ['id', 'weight', 'height', 'age', 'gender', 'income'])
这份数据和刚才的相比更加贴近我们真实的情况, 比如存在若干行数据大部分列为空, 存在一些列大部分行为空. 因为现实中的数据往往是分布不均匀的, 存在一些特征和样本比较稀疏. 比如有些标签或者是行为非常小众, 很多用户没有, 或者是有些用户行为非常稀疏, 只是偶尔使用过产品, 所以缺失了大部分特征.
所以我们可能会希望查看一下有哪些样本的缺失比较严重, 我们希望得到一个 id 和缺失特征数量映射的一个 pair 对. 这个操作通过 dataframe 原生的 API 比较难实现, 我们需要先把 dataframe 转成 rdd 然后再通过 MapReduce 进行:
image-20200525163206376
我们可以看到是 3 对应的缺失值最多, 所以我们可以单独看下这条数据:
我们可能还会向看下各列缺失值的情况, 究竟有多少比例缺失了. 由于我们需要对每一列进行聚合, 所以这里又用到了 agg 这个方法:
这段代码可能看起来稍稍有一点复杂, 因为用到了 * 这个操作. 因为当 agg 这个函数传入一个 list 之后, 可以对多列进行操作. 而在这里, 我们要对每一列进行统计. 由于列数很多, 我们手动列举显然是不现实的. 所以我们用循环实现,* 操作符的意思就是将循环展开. count('*') 等价于 SQL 语句当中的 count(1), 也就是计算总条数的意思.
从结果当中我们可以看出来, income 这个特征缺失得最严重, 足足有 71% 的数据是空缺的. 那么显然这个特征对我们的用处很小, 因为缺失太严重了, 也不存在填充的可能. 所以我们把这行去掉:
我们去掉了 income 之后发现还是存在一些行的缺失非常严重, 我们希望设置一个阈值, 将超过一定数量特征空缺的行过滤, 因为起到的效果也很小.
这个功能不用我们自己开发了, dataframe 当中原生的 API 就支持.
经过这样的处理之后, 剩下的缺失就比较少了. 这个时候我们就不希望再进行删除了, 因为只有个别数据空缺, 其他数据还是有效果的, 如果删除了会导致数据量不够. 所以我们通常的方式是对这些特征进行填充.
缺失值填充是一种非常常见的数据处理方式, 填充的方式有好几种. 比如可以填充均值, 也可以填充中位数或者是众数, 还可以另外训练一个模型来根据其他特征来预测. 总之手段还是挺多的, 我们这里就用最简单的方法, 也就是均值来填充. 看看 spark 当中使用均值填充是怎么操作的.
既然要填充, 那么显然需要先算出均值. 所以我们首先要算出每一个特征的均值. 这里性别是要排除的, 因为性别是类别特征, 不存在均值. 所以如果要填充性别的话, 就只能填充众数或者是用模型来预测了, 不能直接用均值.
均值的计算本身并不复杂, 和刚才的一系列操作差不多. 但是有一点需要注意, 我们这里得到了结果但是却不能直接作为参数传入. 因为 dataframe 中的 fillna 方法只支持传入一个整数, 浮点数, 字符串或者是 dict. 所以我们要把这份数据转化成 dict 才行. 这里的转化稍稍有些麻烦, 因为 dataframe 不能直接转化, 我们需要先转成 pandas 再调用 pandas 当中的 to_dict 方法.
我们有了 dict 类型的均值就可以用来填充了:
总结
在实际的工作或者是 kaggle 比赛当中, 涉及的数据处理和分析的流程远比文章当中介绍到的复杂. 但去重, 过滤, 填充是数据处理当中最基础也是最重要的部分. 甚至可以说无论应用场景如何变化, 解决问题的方法怎么更新, 这些都是不可缺失的部分.
如果喜欢本文, 可以的话, 请点个关注, 给我一点鼓励, 也方便获取更多文章.
本文使用 https://mdnice.com/ 排版
来源: https://www.cnblogs.com/techflow/p/13223974.html