600万行爆内存?Pandas分块、Dask并行与Polars选型实录
600万行爆内存Pandas分块、Dask并行与Polars选型实录接了一个这样需求清洗600多万条社交媒体帖子。源数据是API拉下来的JSON展平后两百多列看着挺整齐结果脚本一跑直接OOM。盯着报错日志发了一会儿呆。以前总想着“加机器”就能解决现在云账单压着预算没那么宽裕。这600万行数据展开大概30G普通Worker根本吃不下。我没急着换库先试了最笨的办法后面又折腾了Dask和Polars。把这几种方案的优缺点和坑记下来。正文先拿Pandas分块硬扛最开始想用Pandas处理混合字段觉得比PySpark对Schema的一致性要求低一些。数据里有不少“薛定谔的类型”字段比如reaction_count有的行是整数1250有的是字符串1250还有null。importpandasaspddefnormalize_mixed_columns(df,mixed_columns):cleaned_dfdf.copy()forcolumninmixed_columns:cleaned_df[column](cleaned_df[column].where(cleaned_df[column].isna(),cleaned_df[column].astype(str)))returncleaned_df代码简单但一跑就崩。Pandas处理大列时会在内存生成临时对象30G的数据直接撑爆RAM。只能上分块Chunking。把数据切成25万行一块处理完释放一块内存。运行时间长点但稳定不会中途罢工。importgcdefnormalize_mixed_columns_chunked(df,mixed_columns,chunk_size250000):cleaned_dfdf.copy()forcolumninmixed_columns:col_idxcleaned_df.columns.get_loc(column)forstartinrange(0,len(cleaned_df),chunk_size):endmin(startchunk_size,len(cleaned_df))# 截取当前块chunkcleaned_df.iloc[start:end,col_idx]maskchunk.notna()ifmask.any():chunkchunk.astype(object)chunk.loc[mask]chunk.loc[mask].astype(str).values cleaned_df.iloc[start:end,col_idx]chunk.values# 手动清理内存delchunkdelmask gc.collect()returncleaned_df这写法很繁琐而且只用了单核CPU。云端多核环境这么搞纯属浪费资源。Dask自动并行但容易踩类型坑单核太慢就上Dask。它能自动切分DataFrame在多核上并行执行理论上能省不少时间。但Dask有个隐形炸弹元数据推断。读CSV或JSON时Dask会采样一部分数据推断类型。如果前几行是整数它就标记为Integer。后续遇到字符串Dask就会报错ValueError或TypeError因为它认为类型不一致。解决办法粗暴点显式指定类型转换别依赖自动推断。importdask.dataframeasdd dfdd.read_parquet(social_posts.parquet,enginepyarrow)mixed_columns[hashtags,mentions,location,reaction_count]# 必须显式指定meta否则容易炸forcolumninmixed_columns:df[column]df[column].map(str,meta(column,str))df.to_parquet(social_posts_clean/,enginepyarrow)Dask解决了并行问题但处理动态混合类型列时代码复杂度反而高了。如果底层还是大量Python对象操作内存压力并没根本缓解。Polars真香定律同事推荐了Polars。基于Rust引擎号称比Pandas快几个数量级内存占用极低。Polars用Apache Arrow的列式存储格式支持Lazy Evaluation延迟执行。它会在真正读取数据前优化查询计划只加载必要的数据流。importpolarsaspl dfpl.read_parquet(social_posts.parquet)mixed_columns[hashtags,mentions,location,reaction_count]# 一行代码搞定类型转换底层Rust执行dfdf.with_columns([pl.col(col).cast(pl.String)forcolinmixed_columns])df.write_parquet(social_posts_clean.parquet)跑了一下内存峰值几乎没动速度快得离谱。不过Polars也不是银弹。API跟Pandas不一样常用的apply、groupby语法得重写。如果团队都只认Pandas引入Polars意味着要维护两套逻辑或者频繁做格式转换这也是成本。数据菌说没有最好的方案只有适合当下约束的方案。资源极度受限且数据模式动态变化Pandas分块虽然慢但可能是唯一的选择。集群有多核资源能接受显式定义SchemaDask是个平衡点。追求极致性能愿意学新APIPolars确实能带来质变。选型前先算笔账这笔成本是算在计算资源上还是人力维护上我踩过的坑有一次用Dask处理含空值的JSON没指定meta参数。前10%数据正常跑到第15%时报TypeError: casting string to int。排查半天发现某条脏数据混入了特殊字符导致Dask采样推断错误。后来强制指定所有混合列为String类型才解决。半结构化数据的类型推断永远不可靠显式声明保平安。落地清单检查当前ETL任务的内存峰值确认是否真的超出单机限制若用Pandas务必加入gc.collect()并设置合理的chunk_size如25w若用Dask读取非Parquet文件时显式传入dtype或meta避免推断错误评估团队对Polars的学习成本小规模实验后再决定是否全量迁移参考来源What Can We Do When Memory Becomes the New Bottleneck in Data Engineering?— Towards Data Science原文链接https://towardsdatascience.com/when-memory-becomes-the-new-bottleneck-in-data-engineering-what-can-we-do/Dask Documentation延伸阅读Polars Documentation延伸阅读Python Cookbook延伸阅读