如何优化 Python 处理大型 CSV 文件时的内存使用

本文介绍多种高效处理 2gb–10gb 级 csv 文件的内存优化策略,涵盖 pandas 类型精简、分块读写、替代库选型及工程实践要点,助你在有限内存下稳定完成过滤、转换与导出任务。

处理大型 CSV 文件(2GB–10GB)时,直接使用 pd.read_csv() 加载全量数据极易触发 MemoryError——尤其在 16GB 内存以下的常规开发机或云服务器上。根本原因在于:pandas 默认将字符串列全部存储为 object 类型(实际是 Python 字符串对象指针数组),而数值列未显式指定精度,常默认为 int64/float64,造成严重内存冗余。以下是一套经过验证的、分层递进的优化方案:

✅ 一、类型优化:用 dtype 和 category 压缩内存

对重复值多的文本列(如状态码、地区、分类标签),强制转为 category 类型可节省 50%+ 内存;对数值列明确指定最小必要精度(如 int32、float32)。示例:

import pandas as pd

# 预先定义高效 dtype 映射(根据实际字段调整)
dtypes = {
    'status': 'category',
    'region': 'category',
    'user_id': 'uint32',      # 若最大值 < 4.3B
    'score': 'float32',
    'timestamp': 'int64'      # 或使用 pd.to_datetime(..., cache=True) 延迟解析
}

df = pd.read_csv('large_input.csv', dtype=dtypes)
print(df.info(memory_usage='deep'))  # 对比优化前后内存占用
? 提示:使用 df.memory_usage(deep=True).sum() 快速估算当前内存占用;df.select_dtypes('object').nunique() 可快速识别适合转 category 的高重复列。

✅ 二、分块处理(Chunking):流式读取 + 增量写入

当数据无法完全驻留内存时,必须放弃“全量加载→处理→保存”模式,改用分块流式处理。关键原则:每块独立完成过滤/转换,并立即追加写入输出文件(避免累积中间结果)

def process_large_csv_chunked(input_file, output_file, chunk_size=50000):
    first_chunk = True
    for chunk in pd.read_csv(input_file, chunksize=chunk_size, dtype=dtypes):
        # 在 chunk 上执行轻量级转换(避免 .copy() 或复杂 apply)
        filtered_chunk = chunk[chunk['value'] > 100].copy()
        filtered_chunk['new_col'] = filtered_chunk['old_col'].str.upper()

        # 首次写入含 header,后续追加不写 header
        filtered_chunk.to_csv(
            output_file,
            mode='w' if first_chunk else 'a',
            header=first_chunk,
            index=False
        )
        first_chunk = False
        # 可选:显式删除 chunk 引用,协助 GC
        del chunk, filtered_chunk

process_large_csv_chunked('large_input.csv', 'transformed_output.csv')

⚠️ 注意事项:

  • chunksize 并非越大越好:建议从 10k–100k 行起步测试,观察内存峰值(可用 memory_profiler 工具监控);
  • 避免跨 chunk 的聚合操作(如全局 groupby().sum()),此类需求应改用 dask 或数据库;
  • to_csv(..., mode='a') 在 Windows 下需确保文件未被其他进程锁定。

✅ 三、进阶替代方案(按场景选择)

场景 推荐工具 优势
纯 ETL 流水线(无复杂分析) csv 模块 + DictReader/DictWriter 零 pandas 依赖,内存恒定 ≈ 单行大小,适合简单过滤/映射
需要类 pandas API 且支持并行 dask.dataframe 自动分块+延迟计算,语法兼容 pandas,支持 read_csv(..., blocksize='128MB')
超大规模(>50GB)或需 SQL 能力 polars(Rust 实现) 内存效率显著优于 pandas,scan_csv() 支持惰性加载,.filter().with_columns().sink_csv() 一行链式处理

示例(Polars):

import polars as pl

# 惰性读取(不立即加载)
lf = pl.scan_csv('large_input.csv', dtypes={'status': pl.Categorical})
result = (
    lf.filter(pl.col('value') > 100)
      .with_columns(pl.col('name').str.to_uppercase().alias('NAME'))
      .sink_csv('transformed_output.csv')  # 直接写入磁盘,不返回 DataFrame
)

✅ 四、关键总结

  • 优先做 dtype 优化:这是成本最低、见效最快的内存压缩手段;
  • 分块是底线策略:只要单块能 fit 进内存,就能处理任意大小文件;
  • 警惕隐式拷贝:避免 df.iloc[:], df.assign(...), .copy() 等无意识复制;
  • 日志与监控不可少:在循环中加入 print(f"Processed {i * chunk_size} rows") 和内存快照(psutil.Process().memory_info().rss / 1024**2);
  • 最后提醒:若业务允许,将原始数据预处理为 Parquet 格式(列存+压缩),后续读取速度提升 3–10 倍,内存降低 50–80%,是长期最优解。

通过组合运用以上方法,即使在 8GB 内存机器上,也能稳健处理 10GB CSV 的清洗与转换任务。