前情提要:
工作原因需要处理一批约 30G 左右的 CSV 数据, 数据量级不需要使用 hadoop 的使用, 同时由于办公的本本内存较低的缘故, 需要解读取数据时内存不足的原因.
操作流程:
方法与方式: 首先是读取数据, 常见的 CSV 格式读取时一次性全部读取进来, 面对数据量较大 (本次 3 亿条实车数据) 时, 需要 分批并且有 选择性的读取后 提取有效信息 删除冗余信息并清理内存.
同时, 为了使处理数据时效率更高, 将整理好的数据实时读取进来以后, 保存成快速且可读的数据形式另行存储. 然后释放内存并读取下一批数据直到整个流程结束
下面是操作代码:
- #import pickle # pkl 存储与 hdf5 存储
- import pandas as pd
- # 释放内存
- import gc
- reader = pd.read_csv(r'E:\VEH_GBK_2019-01-01.csv', encoding='gbk',iterator=True,low_memory=False,usecols=[0,1,2,4])
- title_mc=['location','vid','上报时间','充电状态']
- loop = True
- chunkSize = 1000000
- ans_vid={}
- location_list=['上海','重庆','广东','北京']
- for i in location_list:
- ans_vid[i]=[]
- while loop:
- try:
- chunk = reader.get_chunk(chunkSize)
- chunk.columns=title_mc;
- chunk['充电状态']=chunk['充电状态'].astype(str)
- chunk['location']=chunk['location'].astype(str)
- for i in location_list:
- temp=chunk[chunk['location'].str.contains(i)]
- if temp[(temp['充电状态']=='1.0') | (temp['充电状态']=='4.0')].empty==False:
- ans_vid[i].append(temp[(temp['充电状态']=='1.0') | (temp['充电状态']=='4.0')])
- del temp
- gc.collect()
- del chunk
- gc.collect()
- except StopIteration:
- loop = False
- print ("Iteration is stopped.")
- for i in location_list:
- ans_vid[i]=pd.concat(ans_vid[i])
- location_list=['shanghai','chongqing','guangdong','beijing']
- for i in location_list:
- ans_vid[i].to_hdf(i+'_charging.h5',key=ans_vid[i],encoding='gbk')
- View Code
gc.collect()放在 del 参数的后面用以及时释放内存.
读取的核心代码是:
- reader = pd.read_csv(r'E:\VEH_GBK_2019-01-01.csv', encoding='gbk',iterator=True,low_memory=False,usecols=[0,1,2,4]) # usecols 是读取原数据的某几列 chunkSize 是分批读取的量级
- chunk = reader.get_chunk(chunkSize)
本次读取的存储格式采用的是 h5 格式即 hdf, 该种格式易于读取较大数据量级, 同时也有一些数据格式可以保存较大的数据量级: pkl ,npy 等
推荐 h5(保存 dataframe)与 pkl(保存字典格式), 其读取速度更快. 易于使用
来源: https://www.cnblogs.com/techs-wenzhe/p/10937903.html