Python 处理 CSV 大文件(>1GB),并转为 JSON,以及对列表字典value值相同的进行分组


0x00 脚本功能

处理超过1GB的csv文件,并转换成json格式。将json格式的列表字典的相同的value值进行分组,也就是相同的value合并到一个list里面。然后记录遇到的几个小问题。

0x01 脚本代码

  1. 采用pandas进行读取(效率略慢)
import os
from itertools import groupby
from operator import itemgetter

import pandas as pd

dir = r"c:\temp1\input"
out_dir = r"c:\temp1\output"
if not os.path.exists(out_dir):
    os.makedirs(out_dir)


def big_csv_to_json(path, chunksize=1000000):
    df_chunk = pd.read_csv(path, chunksize=chunksize, iterator=True, encoding='gbk')
    chunk_list = []
    for chunk in df_chunk:
        chunk_list.append(chunk)
    df_concat = pd.concat(chunk_list)

    head_list = list(df_concat.columns)
    return [dict(zip(head_list, i)) for i in df_concat.values]


def count_iterable(i):
    return sum(1 for e in i)


def list_dict_group_by_key(data, key):
    index = 0
    count = count_iterable(groupby(data, key=itemgetter(key)))
    for value, _items in groupby(data, key=itemgetter(key)):
        value = value.replace(":", "")
        paths = [path for path in os.listdir(out_dir)]
        items = [item for item in _items]
        save_path = os.path.join(out_dir, f"{value}.csv")
        df = pd.json_normalize(items)
        if value not in str(paths):
            df.to_csv(save_path, index=False)
        df.to_csv(save_path, mode='a', index=False, header=False)
        index += 1
        print('\r', f"{index}/{count}", end='', flush=True)


if __name__ == '__main__':
    for path in os.listdir(dir):
        print(f"handle {path}")
        data = big_csv_to_json(os.path.join(dir, path))
        list_dict_group_by_key(data, key='your group key')
        print(f"done handle {path}")

  1. 采用原始with open进行读取(效率较高,excel其实也是一种文本而已)
import datetime
import os

dir = r"c:\temp1\input"
out_dir = r"c:\temp1\output"
if not os.path.exists(out_dir):
    os.makedirs(out_dir)

if __name__ == '__main__':
    for path in os.listdir(dir):
        start_dt = datetime.datetime.now()
        print(f"handle {path}")
        index = 0
        try:
            with open(os.path.join(dir, path), 'r') as file:
                for line in file:
                    if index == 0:
                        index += 1
                        continue

                    try:
                        site = line.split(",")[8]
                        with open(os.path.join(out_dir, f"{site.replace(':', '')}.csv"), 'a') as file2:
                            file2.write(line)
                    except Exception as e1:
                        continue
        except Exception as e2:
            continue

        end_dt = datetime.datetime.now()
        time_sum = (end_dt - start_dt).seconds
        print(f"done handle {path} {time_sum}")

0x02 问题记录

  1. pandas读取csv文件的遇到的一个错误
UnicodeDecodeError: ‘utf-8’ codec can’t decode byte 0xd0 in position 0: invalid continuation byte

原因:excel和csv中的中文储存格式是GBK,而Python对输入源的格式读取默认为‘utf-8’,所以导致带中文的文件读不进来。

解决方法:直接在读取时声明csv的编码方式为‘gbk’,data=pd.read_csv('data.csv',encoding='gbk')

  1. 在运用python中xlrd库读取.xlsx文件时报错,无法读取
xlrd.biffh.XLRDError: Excel xlsx file; not supported

这是由于当前python中的xlrd版本过高导致的,高版本下删除的对应的.xlsx读取方法。因此,只需要重装xlrd即可,win+R打开cmd,输入pip3 install xlrd==1.2.0,即可解决该问题。

  1. 配置supervisor + gunicorn 不实时刷新print日志,使用python 直接运行 或直接 gunicorn 直接运行日志没问题

出现以上问题的根据就是Python的print方法存在缓存,要么增加参数flush=Tru开启刷新缓存,要么使用logging.warning方法【更加推荐logging.warning方法,logging默认基本为warning】

(python 3)
# 【方式一】开始刷新缓存
print(something, flush=True)

# 【方式二,推荐使用】logging必须使用warning
import logging
logging.warning('Watch out!')

0x03 参考文章

  • https://blog.csdn.net/qq_43605229/article/details/116462410
  • https://blog.csdn.net/lucindawuyi/article/details/80418277
  • https://blog.csdn.net/weixin_43343144/article/details/106735566