在数据分析领域, 最热门的莫过于 Python 和 R 语言, 此前有一篇文章别老扯什么 Hadoop 了, 你的数据根本不够大指出: 只有在超过 5TB 数据量的规模下, Hadoop 才是一个合理的技术选择这次拿到近亿条日志数据, 千万级数据已经是关系型数据库的查询分析瓶颈, 之前使用过 Hadoop 对大量文本进行分类, 这次决定采用 Python 来处理数据:
硬件环境
CPU:3.5 GHz Intel Core i7
内存: 32 GB HDDR 3 1600 MHz
硬盘: 3 TB Fusion Drive
数据分析工具
- Python:2.7.6
- Pandas:0.15.0
- IPython notebook:2.0.0
源数据如下表所示:
Table | Size | Desc | |
---|---|---|---|
ServiceLogs | 98,706,832 rows x 14 columns | 8.77 GB | 交易日志数据,每个交易会话可以有多条交易 |
ServiceCodes | 286 rows × 8 columns | 20 KB | 交易分类的字典表 |
数据读取
启动 IPython notebook, 加载 pylab 环境:
ipython notebook --pylab=inline
Pandas 提供了 IO 工具可以将大文件分块读取, 测试了一下性能, 完整加载 9800 万条数据也只需要 263 秒左右, 还是相当不错了
- import pandas as pd
- reader = pd.read_csv('data/servicelogs', iterator=True)
- try:
- df = reader.get_chunk(100000000)
- except StopIteration:
- print "Iteration is stopped."
1 百万条 | 1 千万条 | 1 亿条 | |
---|---|---|---|
ServiceLogs | 1 s | 17 s | 263 s |
使用不同分块大小来读取再调用 pandas.concat 连接 DataFrame,chunkSize 设置在 1000 万条左右速度优化比较明显
- loop = True
- chunkSize = 100000
- chunks = []
- while loop:
- try:
- chunk = reader.get_chunk(chunkSize)
- chunks.append(chunk)
- except StopIteration:
- loop = False
- print "Iteration is stopped."
- df = pd.concat(chunks, ignore_index=True)
下面是统计数据, Read Time 是数据读取时间, Total Time 是读取和 Pandas 进行 concat 操作的时间, 根据数据总量来看, 对 5~50 个 DataFrame 对象进行合并, 性能表现比较好
Chunk Size | Read Time (s) | Total Time (s) | Performance |
---|---|---|---|
100,000 | 224.418173 | 261.358521 | |
200,000 | 232.076794 | 256.674154 | |
1,000,000 | 213.128481 | 234.934142 | √ √ |
2,000,000 | 208.410618 | 230.006299 | √ √ √ |
5,000,000 | 209.460829 | 230.939319 | √ √ √ |
10,000,000 | 207.082081 | 228.135672 | √ √ √ √ |
20,000,000 | 209.628596 | 230.775713 | √ √ √ |
50,000,000 | 222.910643 | 242.405967 | |
100,000,000 | 263.574246 | 263.574246 |
如果使用 Spark 提供的 Python Shell, 同样编写 Pandas 加载数据, 时间会短 25 秒左右, 看来 Spark 对 Python 的内存使用都有优化
数据清洗
Pandas 提供了 DataFrame.describe 方法查看数据摘要, 包括数据查看 (默认共输出首尾 60 行数据) 和行列统计由于源数据通常包含一些空值甚至空列, 会影响数据分析的时间和效率, 在预览了数据摘要后, 需要对这些无效数据进行处理
首先调用 DataFrame.isnull() 方法查看数据表中哪些为空值, 与它相反的方法是 DataFrame.notnull() ,Pandas 会将表中所有数据进行 null 计算, 以 True/False 作为结果进行填充, 如下图所示:
Pandas 的非空计算速度很快, 9800 万数据也只需要 28.7 秒得到初步信息之后, 可以对表中空列进行移除操作尝试了按列名依次计算获取非空列, 和 DataFrame.dropna() 两种方式, 时间分别为 367.0 秒和 345.3 秒, 但检查时发现 dropna() 之后所有的行都没有了, 查了 Pandas 手册, 原来不加参数的情况下, dropna() 会移除所有包含空值的行如果只想移除全部为空值的列, 需要加上 axis 和 how 两个参数:
df.dropna(axis=1, how='all')
共移除了 14 列中的 6 列, 时间也只消耗了 85.9 秒
接下来是处理剩余行中的空值, 经过测试, 在 DataFrame.replace() 中使用空字符串, 要比默认的空值 NaN 节省一些空间; 但对整个 CSV 文件来说, 空列只是多存了一个,, 所以移除的 9800 万 x 6 列也只省下了 200M 的空间进一步的数据清洗还是在移除无用数据和合并上
对数据列的丢弃, 除无效值和需求规定之外, 一些表自身的冗余列也需要在这个环节清理, 比如说表中的流水号是某两个字段拼接类型描述等, 通过对这些数据的丢弃, 新的数据文件大小为 4.73GB, 足足减少了 4.04G!
数据处理
使用 DataFrame.dtypes 可以查看每列的数据类型, Pandas 默认可以读出 int 和 float64, 其它的都处理为 object, 需要转换格式的一般为日期时间 DataFrame.astype() 方法可对整个 DataFrame 或某一列进行数据格式转换, 支持 Python 和 NumPy 的数据类型
df['Name'] = df['Name'].astype(np.datetime64)
对数据聚合, 我测试了 DataFrame.groupby 和 DataFrame.pivot_table 以及 pandas.merge ,groupby 9800 万行 x 3 列的时间为 99 秒, 连接表为 26 秒, 生成透视表的速度更快, 仅需 5 秒
- df.groupby(['NO','TIME','SVID']).count() # 分组
- fullData = pd.merge(df, trancodeData)[['NO','SVID','TIME','CLASS','TYPE']] # 连接
- actions = fullData.pivot_table('SVID', columns='TYPE', aggfunc='count') # 透视表
根据透视表生成的交易 / 查询比例饼图:
将日志时间加入透视表并输出每天的交易 / 查询比例图:
- total_actions = fullData.pivot_table('SVID', index='TIME', columns='TYPE', aggfunc='count')
- total_actions.plot(subplots=False, figsize=(18,6), kind='area')
除此之外, Pandas 提供的 DataFrame 查询统计功能速度表现也非常优秀, 7 秒以内就可以查询生成所有类型为交易的数据子表:
tranData = fullData[fullData['Type'] == 'Transaction']
该子表的大小为 [10250666 rows x 5 columns]在此已经完成了数据处理的一些基本场景实验结果足以说明, 在非 > 5TB 数据的情况下, Python 的表现已经能让擅长使用统计分析语言的数据分析师游刃有余
http://www.justinablog.com/archives/1357
来源: http://www.jqhtml.com/12353.html