Python 处理 CSV 大文件(>1GB),并转为 JSON,以及对列表字典value值相同的进行分组
0x00 脚本功能
处理超过1GB的csv
文件,并转换成json格式。将json格式的列表字典的相同的value值进行分组,也就是相同的value合并到一个list里面。然后记录遇到的几个小问题。
0x01 脚本代码
- 采用
pandas
进行读取(效率略慢)
import os
from itertools import groupby
from operator import itemgetter
import pandas as pd
dir = r"c:\temp1\input"
out_dir = r"c:\temp1\output"
if not os.path.exists(out_dir):
os.makedirs(out_dir)
def big_csv_to_json(path, chunksize=1000000):
df_chunk = pd.read_csv(path, chunksize=chunksize, iterator=True, encoding='gbk')
chunk_list = []
for chunk in df_chunk:
chunk_list.append(chunk)
df_concat = pd.concat(chunk_list)
head_list = list(df_concat.columns)
return [dict(zip(head_list, i)) for i in df_concat.values]
def count_iterable(i):
return sum(1 for e in i)
def list_dict_group_by_key(data, key):
index = 0
count = count_iterable(groupby(data, key=itemgetter(key)))
for value, _items in groupby(data, key=itemgetter(key)):
value = value.replace(":", "")
paths = [path for path in os.listdir(out_dir)]
items = [item for item in _items]
save_path = os.path.join(out_dir, f"{value}.csv")
df = pd.json_normalize(items)
if value not in str(paths):
df.to_csv(save_path, index=False)
df.to_csv(save_path, mode='a', index=False, header=False)
index += 1
print('\r', f"{index}/{count}", end='', flush=True)
if __name__ == '__main__':
for path in os.listdir(dir):
print(f"handle {path}")
data = big_csv_to_json(os.path.join(dir, path))
list_dict_group_by_key(data, key='your group key')
print(f"done handle {path}")
- 采用原始
with open
进行读取(效率较高,excel其实也是一种文本而已)
import datetime
import os
dir = r"c:\temp1\input"
out_dir = r"c:\temp1\output"
if not os.path.exists(out_dir):
os.makedirs(out_dir)
if __name__ == '__main__':
for path in os.listdir(dir):
start_dt = datetime.datetime.now()
print(f"handle {path}")
index = 0
try:
with open(os.path.join(dir, path), 'r') as file:
for line in file:
if index == 0:
index += 1
continue
try:
site = line.split(",")[8]
with open(os.path.join(out_dir, f"{site.replace(':', '')}.csv"), 'a') as file2:
file2.write(line)
except Exception as e1:
continue
except Exception as e2:
continue
end_dt = datetime.datetime.now()
time_sum = (end_dt - start_dt).seconds
print(f"done handle {path} {time_sum}")
0x02 问题记录
- pandas读取csv文件的遇到的一个错误
UnicodeDecodeError: ‘utf-8’ codec can’t decode byte 0xd0 in position 0: invalid continuation byte
原因:excel和csv中的中文储存格式是GBK,而Python对输入源的格式读取默认为‘utf-8’,所以导致带中文的文件读不进来。
解决方法:直接在读取时声明csv的编码方式为‘gbk’,data=pd.read_csv('data.csv',encoding='gbk')
- 在运用python中xlrd库读取.xlsx文件时报错,无法读取
xlrd.biffh.XLRDError: Excel xlsx file; not supported
这是由于当前python中的xlrd版本过高导致的,高版本下删除的对应的.xlsx读取方法。因此,只需要重装xlrd即可,win+R打开cmd,输入pip3 install xlrd==1.2.0
,即可解决该问题。
- 配置supervisor + gunicorn 不实时刷新print日志,使用python 直接运行 或直接 gunicorn 直接运行日志没问题
出现以上问题的根据就是Python的print方法存在缓存,要么增加参数flush=Tru开启刷新缓存,要么使用logging.warning方法【更加推荐logging.warning方法,logging默认基本为warning】
(python 3)
# 【方式一】开始刷新缓存
print(something, flush=True)
# 【方式二,推荐使用】logging必须使用warning
import logging
logging.warning('Watch out!')
0x03 参考文章
- https://blog.csdn.net/qq_43605229/article/details/116462410
- https://blog.csdn.net/lucindawuyi/article/details/80418277
- https://blog.csdn.net/weixin_43343144/article/details/106735566