074、Pandas 数据合并:merge、join、concat 的参数混用场景与内存管理
074、Pandas 数据合并merge、join、concat 的参数混用场景与内存管理上周帮同事排查一个线上报表生成脚本的OOM问题数据量大概300万行用了三个DataFrame做合并结果内存直接飙到32GB还报错。我一看代码好家伙concat、merge、join三个函数混着用参数还传得乱七八糟典型的“能用就行”写法。今天就把这些坑掰开揉碎了讲清楚。从一次真实的内存爆炸说起那个脚本的逻辑其实很简单从三个不同系统拉取用户订单数据、支付数据和物流数据需要按订单ID合并成一个宽表。同事的写法是这样的# 别这样写内存会炸df1pd.read_csv(orders.csv)df2pd.read_csv(payments.csv)df3pd.read_csv(logistics.csv)# 先concat再merge索引全乱了temppd.concat([df1,df2],axis1)resulttemp.merge(df3,onorder_id,howleft)问题出在哪concat默认是按索引对齐的而三个DataFrame的索引根本不一样concat之后生成了大量NaN行数据量膨胀了3倍。再merge的时候Pandas为了做笛卡尔积内存直接爆炸。merge最常用但最容易忽略的参数merge是SQL风格的合并核心参数就那几个但混用场景下容易出问题。on参数指定合并键。如果两个DataFrame的列名不同用left_on和right_on分别指定。这里有个坑——当两个DataFrame都有相同列名但不是合并键时merge会自动加后缀_x和_y但如果你后续还要做其他合并这些后缀会变成新的列名冲突源。# 这里踩过坑两个DataFrame都有amount列但含义不同df_orderspd.DataFrame({order_id:[1,2],amount:[100,200]})df_paymentspd.DataFrame({order_id:[1,2],amount:[90,180]})# 默认suffixes(_x, _y)mergeddf_orders.merge(df_payments,onorder_id)# 得到amount_x和amount_y但如果你后续还要merge其他表注意列名不要重复how参数left、right、inner、outer。很多人以为outer就是全连接但实际场景中如果两个DataFrame的合并键有大量不匹配outer会产生大量NaN行内存消耗翻倍。我一般先做inner再单独处理不匹配的行这样内存可控。indicator参数这个参数很多人不知道但调试时特别好用。它会加一列’_merge’告诉你每行来自哪个表。# 调试利器看哪些行没匹配上mergeddf_orders.merge(df_payments,onorder_id,howouter,indicatorTrue)# 筛选出只在左边或右边的行left_onlymerged[merged[_merge]left_only]right_onlymerged[merged[_merge]right_only]join索引合并的陷阱join本质上是基于索引的merge但很多人把它当成merge的简化版来用结果索引对不上就出问题。# 别这样写join默认用索引但你的索引可能不是order_iddf_orders.set_index(order_id,inplaceTrue)df_payments.set_index(order_id,inplaceTrue)resultdf_orders.join(df_payments,howleft)这里有个隐藏问题如果两个DataFrame的索引有重复值join会做笛卡尔积数据量暴增。更坑的是join不会报错你只会看到结果行数莫名其妙变多。参数混用场景有时候你需要在join里指定列名但join不支持on参数只能用merge。我见过有人这样写# 混用先reset_index再用join多此一举df_orders.reset_index(inplaceTrue)df_payments.reset_index(inplaceTrue)resultdf_orders.join(df_payments.set_index(order_id),onorder_id)这种写法能工作但性能很差因为set_index会复制数据。直接merge更清晰。concat不是简单的堆叠concat的axis参数决定了是按行堆叠还是按列拼接。但很多人忽略了join参数默认是outer意味着如果两个DataFrame的列名不完全一致会生成NaN。# 这里踩过坑两个DataFrame列名不同concat后多了很多NaN列df_apd.DataFrame({id:[1,2],name:[A,B]})df_bpd.DataFrame({id:[3,4],age:[20,30]})resultpd.concat([df_a,df_b],axis0)# 结果name列有NaNage列也有NaNkeys参数当你需要区分数据来源时keys可以生成MultiIndex。但注意MultiIndex在后续merge时会有问题因为merge不支持MultiIndex作为合并键。# 用keys标记来源但后续merge要小心resultpd.concat([df_a,df_b],keys[source1,source2])# 索引变成了((source1, 0), (source1, 1), ...)参数混用的典型场景与解决方案场景一先concat再merge这是最常见的错误。concat会改变索引结构导致后续merge的on参数失效。# 错误写法temppd.concat([df1,df2],axis1)resulttemp.merge(df3,onorder_id)# 索引乱了on可能找不到# 正确做法先merge再concat或者统一用mergeresultdf1.merge(df2,onorder_id).merge(df3,onorder_id)场景二join和merge混用join基于索引merge基于列混用容易导致逻辑混乱。# 混用先join再merge索引和列混在一起tempdf1.join(df2.set_index(order_id),onorder_id)resulttemp.merge(df3,onorder_id)# 统一用merge更清晰resultdf1.merge(df2,onorder_id).merge(df3,onorder_id)场景三concat后忘记重置索引concat默认保留原索引如果原索引有重复后续操作会出问题。# 别这样写索引重复会导致merge结果异常temppd.concat([df1,df2])resulttemp.merge(df3,onorder_id)# 索引重复merge可能报错# 重置索引temppd.concat([df1,df2],ignore_indexTrue)resulttemp.merge(df3,onorder_id)内存管理从源头控制回到开头的OOM问题内存管理的关键不是等数据加载完再优化而是在合并过程中控制数据量。1. 分块读取与合并不要一次性把所有数据读进内存。用chunksize分块读取每块单独合并最后再concat。# 分块处理内存可控chunks[]forchunkinpd.read_csv(orders.csv,chunksize100000):# 每块先做必要的过滤和合并chunkchunk.merge(payments_small,onorder_id,howleft)chunks.append(chunk)resultpd.concat(chunks,ignore_indexTrue)2. 提前过滤与聚合在合并之前先对每个DataFrame做过滤和聚合减少数据量。# 先过滤再合并df_ordersdf_orders[df_orders[status]completed]df_paymentsdf_payments.groupby(order_id).agg({amount:sum}).reset_index()resultdf_orders.merge(df_payments,onorder_id)3. 使用categorical类型如果合并键是字符串且重复率高转成category类型可以大幅减少内存。# 字符串转category内存减半df_orders[order_id]df_orders[order_id].astype(category)df_payments[order_id]df_payments[order_id].astype(category)resultdf_orders.merge(df_payments,onorder_id)4. 及时释放中间变量Python的垃圾回收不是实时的合并过程中产生的中间DataFrame会占用大量内存。# 手动释放内存tempdf1.merge(df2,onorder_id)deldf1,df2# 显式删除resulttemp.merge(df3,onorder_id)deltemp个人经验总结写了三年Pandas踩过的坑比写过的代码还多。关于数据合并我的经验是能用merge就别用joinmerge的参数更直观而且支持列名合并。join只有在明确需要索引合并时才用而且一定要确保索引没有重复。concat只用于简单的堆叠不要用它来做列拼接除非你非常清楚两个DataFrame的索引结构。列拼接用merge更安全。内存管理要前置不要等数据加载完再想优化。分块读取、提前过滤、类型转换这些操作在数据量大的时候能救命。调试时多用indicator参数它能帮你快速定位哪些行没匹配上比肉眼检查快得多。最后如果数据量超过1000万行建议直接上Dask或SparkPandas的内存模型决定了它不适合处理超大规模数据。别硬撑该换工具就换工具。