1. 项目概述为什么多维聚合不是“加个groupby”就能搞定的事我在银行数据平台组干了八年从最早用SQL写几十行嵌套子查询做客户分群到现在每天在Jupyter里调试pandas的agg链式调用踩过的坑比写的代码还多。今天这篇讲的“多维聚合”绝不是教你怎么把df.groupby(col).sum()敲得更顺——那是实习生第一天就能学会的操作。真正卡住业务分析进度、让风控模型上线延期、让月度经营分析会开到凌晨两点的永远是那些看似简单、实则暗藏玄机的聚合需求比如“请按客户等级城市消费频次三重维度同时输出近30天交易额中位数、单笔金额标准差、高价值交易占比500元、以及滚动7天均值的环比变化率”。这种需求一出来你要是还想着拆成四个独立groupby再merge那等着被业务方拉进会议室反复对齐口径吧。核心关键词就三个多维聚合、滚动计算、业务可解释性。它们共同指向一个现实现代数据分析早已脱离“算得出来”的初级阶段进入“算得准、算得快、算得懂”的深水区。所谓“多维”不是简单堆叠groupby字段而是要理解维度间的业务层级关系比如“华东大区→上海→静安区”是树状结构而“产品线×渠道类型”是笛卡尔积所谓“滚动”不是机械地套window参数而是要匹配业务节奏信用卡反欺诈看7天理财销售追踪看季度供应链库存预警看24小时所谓“可解释性”意味着每个agg函数背后必须有业务逻辑锚点——median不是为了炫技是因为它对刷单异常交易不敏感weighted_average不是为了装X是因为上周的交易权重该是这周的1.5倍这是风控策略文档白纸黑字写的。我带过的新人常犯一个致命错误把pandas当Excel用。看到unstack()输出的矩阵表格就以为大功告成结果导出给销售总监看时对方指着“North Gadget 12000.0”问“这个12000是平均值还是总和是上月数据还是YTD为什么比上季度跌了15%”——这时候你才发现那个漂亮的DataFrame里连基础的统计口径注释都没留。真正的生产级聚合从第一行代码开始就在为下游埋伏笔列名要带单位revenue_mean_usd缺失值要明确标注rolling_7day_avg_n_days3自定义函数必须有docstring说明业务阈值# 高价值交易定义单笔≥300元依据2023年反洗钱新规第4.2条。这不是过度设计是血泪教训换来的职业本能。2. 核心思路拆解为什么这些模式能扛住银行级数据压力2.1 多维聚合的本质是“降维打击”不是维度堆砌很多人一看到“多维”下意识就想往groupby里塞更多字段。错。我在某股份制银行做信用卡客群分析时曾接到需求“按客户年龄分段25/25-35/35-45/45、地域省市、职业类型8大类、月均消费额分位Top10%/20%/50%五维交叉分析”。如果真按字面意思执行groupby([age_bin,province,city,occupation,spend_quantile])会产生多少组合保守估计2000。但实际业务中90%的组合根本没意义——比如“西藏那曲市的在校大学生”和“北京朝阳区的退休教师”几乎不可能出现在同一张报表里。真正的解法是分层聚合先按地域大类一线/新一线/二线/其他粗筛再在重点区域展开细粒度分析先按职业大类金融/IT/制造/服务业聚类再对高价值职业做深度挖掘。这背后是业务知识在驱动技术选型维度不是越多越好而是要构建有业务意义的层级路径。pandas的groupby天然支持这种思维。你看原文中df_sales.groupby([region,product])[revenue].mean().unstack()这段代码表面是二维groupby实则暗含了“区域是主维度产品是次维度”的业务假设。如果换成groupby([product,region])unstack后就是“产品为行、区域为列”阅读逻辑完全颠倒。我在实际项目中甚至会强制要求所有groupby字段必须按业务重要性降序排列第一个字段永远是决策链路的起点如“客户ID”之于个性化推荐“商户编号”之于风控评分。这样做的好处是当后续需要reset_index()或pivot_table()时索引层级天然符合业务汇报结构避免后期用swaplevel()反复折腾。2.2 滚动与扩展窗口的根本差异时间视角决定分析深度滚动窗口rolling和扩展窗口expanding常被混为一谈但它们解决的是两类完全不同的问题。举个真实案例某城商行做贷后管理需要监控企业客户的还款波动性。如果用rolling(window30).std()得到的是“过去30天内每日还款额的标准差”这反映的是短期行为稳定性但如果用expanding().std()得到的是“从放款日至今所有还款额的标准差”这反映的是全生命周期风险特征。两者数值可能相差十倍但业务含义天壤之别。关键区别在于时间锚点的选择滚动窗口的锚点是“当前时刻”窗口随时间滑动适合检测趋势突变如某客户连续7天消费额骤增300%触发反欺诈规则扩展窗口的锚点是“历史起点”窗口随时间扩张适合评估长期表现如某理财经理任职3年来服务客户的平均收益率需包含其入职首月的全部业绩。我在生产环境踩过最深的坑是忘记处理滚动窗口的边界效应。原文示例中rolling(window3).mean()前两行输出NaN这在演示时无伤大雅但在银行实时风控系统里这意味着前两天的数据完全不可用。我们的解决方案是永远显式指定min_periods参数。比如rolling(window7, min_periods3)表示只要窗口内有3个有效值就计算均值不足3个才返回NaN。这个参数值不是拍脑袋定的——我们根据业务SLA确定反欺诈模型允许最多2天数据延迟所以min_periods57-2而月度经营分析允许整月数据完整后再计算所以min_periods30。没有业务背景支撑的技术参数都是空中楼阁。2.3 自定义聚合函数的生死线可审计性大于性能原文提到用lambda写x.max()-x.min()计算范围这在教学场景很优雅但在银行生产系统里我严禁团队这么干。原因很简单当审计师拿着监管报告问“这个‘交易额范围’指标是如何定义的”你总不能说“哦就是pandas里一行lambda”。去年某农商行因风控模型指标定义不清被罚根源就在于自定义函数没留痕。我的硬性规定是所有业务逻辑必须封装为命名函数且函数名docstring要能直接写进监管报备文档。比如原文的weighted_average函数我会改成def calc_transaction_volatility(series): 计算交易额波动性指标监管备案号FIN-RISK-2024-003 定义max(单笔交易额) - min(单笔交易额)单位人民币元 依据《商业银行反洗钱操作指引》第2.4条高波动商户需提高监测频率 注意剔除退款交易amount0及测试交易merchant_idTEST valid_series series[series 0] # 剔除退款 if len(valid_series) 2: return np.nan return valid_series.max() - valid_series.min()这个函数名calc_transaction_volatility比transaction_range更准确range是数学概念volatility才是监管术语docstring里嵌入了监管依据和数据清洗逻辑。当半年后内部审计抽查时只需搜索函数名就能定位全部使用场景比翻几十个notebook高效得多。性能在百亿级交易数据上命名函数比lambda慢不到1毫秒但可审计性提升100倍——这笔账怎么算都划算。3. 实操细节解析那些文档里不会写的魔鬼参数3.1 多重聚合的列名陷阱Hierarchical Index不是装饰品原文示例中df.groupby(merchant_category).agg({transaction_amount: [mean,median]})输出的列结构是MultiIndex外层是transaction_amount内层是mean/median。很多新手直接result.columns [avg_amt, med_amt]强行扁平化结果在后续join操作中引发索引错乱。真正的生产级做法是把MultiIndex当作结构化元数据来用。我在某保险科技公司做保费分析时要求所有agg结果必须保留原始列名层级并通过map方法动态生成业务友好列名# 保留层级结构 result df.groupby(policy_type).agg({ premium: [sum, mean, std], claim_ratio: [mean, lambda x: (x0.8).sum()] # 理赔率80%的保单数 }) # 动态生成列名原始字段_统计方法_业务含义 new_columns [] for col in result.columns: field, agg_func col if agg_func sum: biz_desc total_premium_cny elif agg_func mean: biz_desc favg_{field}_cny if fieldpremium else avg_claim_ratio_pct elif agg_func std: biz_desc f{field}_volatility else: # lambda函数 biz_desc high_claim_policy_count new_columns.append(f{field}_{biz_desc}) result.columns new_columns这样生成的列名premium_total_premium_cny、claim_ratio_avg_claim_ratio_pct既保留了技术溯源性知道来自哪个字段和函数又满足业务系统对接要求财务系统只认total_premium_cny这种命名。更重要的是当某天业务方要求“把理赔率80%的保单数改成85%”你只需改一行lambda x: (x0.85).sum()所有下游报表自动更新不用手动改列名映射表。3.2 滚动窗口的时序对齐别让日期索引成为定时炸弹原文中df_ts.set_index(date)后直接rolling(window3).mean()这在理想数据下没问题但真实银行数据充满坑交易日志可能缺失周末因为系统不记账也可能存在重复时间戳微秒级精度未对齐更可能有未来日期测试数据污染。我见过最惨的事故某基金公司用rolling(30).mean()计算净值波动率结果因测试数据里混入2099年的日期导致整个窗口被拉长到百年尺度均值变成零。生产环境必须做三重校验日期去重与排序df df.sort_values(date).drop_duplicates(subset[date], keeplast)业务日历对齐用pd.bdate_range()生成真实交易日序列再reindex()填充缺失值fill_valuenp.nan或业务默认值时间窗口校准对滚动计算必须用rolling(7D)按日历天而非rolling(7)按行数尤其当数据存在非交易日空缺时。例如# 错误按行数滚动遇到周末空缺会跳过 df.set_index(date)[revenue].rolling(7).mean() # 正确按日历滚动自动包含周末值为NaN df.set_index(date)[revenue].rolling(7D).mean() # 更优结合业务日历只计算交易日 business_days pd.offsets.CustomBusinessDay(holidays[2024-01-28,2024-02-10]) # 春节假期 df.set_index(date)[revenue].rolling(7B, freqbusiness_days).mean()这个7B参数里的B代表Business Day比硬编码30天靠谱得多——毕竟谁记得今年春节休几天3.3 Unstack的维度坍缩当业务要求“行转列”时的生存指南unstack()看着简单但实际业务中90%的问题出在维度坍缩上。原文df_sales.groupby([region,product])[revenue].mean().unstack()能成功是因为region和product都是低基数分类变量各4-5个值。但当你面对“客户ID×商户类别×月份”三维聚合时unstack()会直接爆内存——客户ID可能百万级unstack后列数等于客户数Pandas直接罢工。我的实战方案是分层unstack业务过滤# 第一步按高基数维度分组客户ID但不unstack customer_stats df.groupby(customer_id).agg({ revenue: [sum, mean], transaction_count: count }) # 第二步对低基数维度单独unstack如商户类别 category_pivot df.groupby([customer_id,merchant_category])[revenue].sum().unstack(fill_value0) # 此时列数商户类别数通常100安全 # 第三步业务过滤——只unstackTOP100客户 top_customers customer_stats[revenue][sum].nlargest(100).index safe_pivot category_pivot.loc[top_customers]更狠的招数是用pivot_table()替代unstack()因为它原生支持aggfunc和fill_value# 直接生成安全的交叉表自动处理缺失值 safe_crosstab pd.pivot_table( df, valuesrevenue, indexregion, columnsproduct, aggfuncmean, fill_value0, marginsTrue, # 自动添加行列总计 margins_nameTotal # 总计行名称 )这个marginsTrue在银行月报里简直是神器——财务总监再也不用自己手算“华东大区合计”了。4. 全流程实操从原始交易日志到高管仪表盘4.1 数据准备模拟真实银行交易流的5个关键特征原文的合成数据过于干净我按真实信用卡系统日志重构了数据生成逻辑加入5个致命细节import pandas as pd import numpy as np from datetime import datetime, timedelta def generate_bank_transactions(n_samples10000): 生成符合银行业务特征的交易数据 np.random.seed(42) # 特征1时间分布非均匀工作日交易多周末少月末冲量 dates pd.date_range(2024-01-01, 2024-12-31, freqD) # 工作日权重2.0周末0.5月末最后3天权重3.0 weights np.array([ 2.0 if d.weekday() 5 else 0.5 for d in dates ]) weights np.array([3.0 if d.day in [28,29,30,31] else 0.0 for d in dates]) # 特征2商户类别有强相关性餐饮客户大概率也刷零售 categories np.random.choice( [Groceries,Dining,Retail,Travel,Utilities], sizen_samples, p[0.25, 0.20, 0.25, 0.15, 0.15] # 基础分布 ) # 特征3金额服从对数正态分布符合真实消费习惯 amounts np.random.lognormal(mean6.0, sigma0.8, sizen_samples).round(2) # 但剔除异常值99.9%分位数的视为测试数据 upper_bound np.percentile(amounts, 99.9) amounts np.where(amounts upper_bound, np.nan, amounts) # 特征4客户分层VIP客户交易频次高、金额大 customers np.random.choice( [VIP_C001,VIP_C002,REG_C003,REG_C004,REG_C005], sizen_samples, p[0.1, 0.1, 0.2, 0.3, 0.3] ) # 特征5手续费非固定比例VIP客户费率0.015普通客户0.025 fee_rates np.array([0.015 if c.startswith(VIP) else 0.025 for c in customers]) fees (amounts * fee_rates).round(2) return pd.DataFrame({ date: np.random.choice(dates, sizen_samples, pweights/weights.sum()), customer_id: customers, category: categories, amount: amounts, fee: fees, merchant_id: np.random.choice([M001,M002,M003,M004], n_samples) }) df generate_bank_transactions(50000) print(真实感数据特征) print(f- 时间分布偏斜度: {df[date].value_counts().skew():.2f} (越接近0越均匀)) print(f- 金额异常值比例: {df[amount].isna().mean()*100:.1f}% (已剔除)) print(f- VIP客户交易占比: {df[df[customer_id].str.startswith(VIP)].shape[0]/len(df)*100:.1f}%)这个生成器模拟了银行数据的典型病灶时间分布偏斜、金额长尾分布、客户分层明显、手续费差异化。没有这些特征你的聚合代码在测试环境跑得飞起上线后立刻被真实数据打脸。4.2 分析1多维聚合的“三明治”结构——业务层技术层治理层我们按银行风控部门的真实需求构建一个三层聚合结构# 业务层定义核心指标必须和监管报表对齐 biz_metrics { amount: [sum, mean, std, lambda x: (x 500).sum(), # 高价值交易数 lambda x: (x 500).sum() / len(x) * 100], # 高价值占比 fee: [sum, lambda x: x.sum() / df[amount].sum() * 100] # 手续费收入占比 } # 技术层选择聚合维度按业务重要性排序 group_dims [customer_id, category, merchant_id] # 治理层添加元数据标记为后续审计埋点 def add_audit_metadata(result_df): result_df.attrs[generated_at] datetime.now().isoformat() result_df.attrs[source_table] credit_card_transactions_2024_q1 result_df.attrs[regulation_ref] CBIRC-2023-007 # 银保监会文件号 return result_df # 执行聚合注意agg字典的key必须是原始列名不能是别名 raw_result df.groupby(group_dims).agg(biz_metrics) # 层级化列名外层业务字段中层统计方法内层业务含义 new_cols [] for col in raw_result.columns: field, agg_func col if callable(agg_func): # 识别lambda函数并赋予业务名称 if 500 in str(agg_func): biz_name high_value_count if sum() in str(agg_func) else high_value_pct else: biz_name fee_revenue_ratio else: biz_name agg_func new_cols.append((field, biz_name)) raw_result.columns pd.MultiIndex.from_tuples(new_cols, names[field, metric]) final_result add_audit_metadata(raw_result) print(治理完备的聚合结果) print(f维度层级: {final_result.index.names}) print(f指标层级: {final_result.columns.names}) print(f审计元数据: {final_result.attrs})这个结构的价值在于当监管检查时你可以直接导出final_result.attrs作为合规证明当业务方质疑“为什么高价值占比是12.3%”你能快速定位到lambda x: (x 500).sum() / len(x) * 100这行代码并出示监管文件CBIRC-2023-007第3.2条“单笔交易超500元视为大额交易”。4.3 分析2滚动窗口的“双轨制”——实时流与批处理的统一接口银行系统既要T0实时监控如反欺诈又要T1批量报表如经营分析。我的方案是用同一个滚动逻辑通过参数切换模式def robust_rolling_agg( series, window7D, funcmean, min_periods3, modebatch # batch or stream ): 生产级滚动聚合统一实时与批量场景 modebatch: 使用pd.rolling()支持日期窗口 modestream: 使用deque维护滑动窗口内存恒定 if mode batch: # 批处理模式利用pandas优化 return series.rolling(window, min_periodsmin_periods).agg(func) else: # 实时流模式 from collections import deque window_deque deque(maxlenint(window.strip(D)) if D in window else 7) results [] for val in series: if pd.notna(val): window_deque.append(val) # 当窗口满或达到min_periods时计算 if len(window_deque) min_periods: if func mean: results.append(np.mean(window_deque)) elif func std: results.append(np.std(window_deque, ddof1)) else: results.append(np.nan) return pd.Series(results, indexseries.index) # 应用示例 df_sorted df.sort_values([customer_id,date]).set_index(date) df_sorted[rolling_7d_revenue] robust_rolling_agg( df_sorted.groupby(customer_id)[amount].sum(), window7D, funcmean, modebatch ) # 实时流模式模拟Kafka消息流 stream_data df_sorted[amount].head(1000).values stream_result robust_rolling_agg( pd.Series(stream_data), window7, funcmean, modestream )这个设计让同一套业务逻辑如“7天滚动均值”能无缝接入Flink实时管道和Airflow批处理任务避免代码重复和口径不一致。4.4 分析3Unstack的“熔断机制”——防止内存爆炸的5道防线当业务坚持要unstack()高基数维度时我设置了5道熔断阀def safe_unstack( series, level-1, fill_value0, max_columns1000, # 熔断阈值1列数上限 max_memory_mb500, # 熔断阈值2内存预估 sample_ratio0.1, # 熔断阈值3抽样验证 timeout_sec30, # 熔断阈值4执行超时 fallback_methodpivot # 熔断阈值5降级方案 ): 银行级安全unstack5重熔断保护 import psutil import time # 熔断1检查列数是否超限 unique_vals series.index.get_level_values(level).nunique() if unique_vals max_columns: raise ValueError(fUnstack将产生{unique_vals}列超过阈值{max_columns}) # 熔断2预估内存占用粗略每列约8字节*行数 estimated_mb (len(series) * unique_vals * 8) / (1024**2) if estimated_mb max_memory_mb: raise MemoryError(f预估内存{estimated_mb:.1f}MB超过阈值{max_memory_mb}MB) # 熔断3抽样验证数据质量 sample_series series.sample(fracsample_ratio, random_state42) try: _ sample_series.unstack(levellevel, fill_valuefill_value) except Exception as e: raise RuntimeError(f抽样unstack失败{e}) # 熔断4设置执行超时 start_time time.time() try: result series.unstack(levellevel, fill_valuefill_value) if time.time() - start_time timeout_sec: raise TimeoutError(funstack耗时{time.time()-start_time:.1f}s超时{timeout_sec}s) except Exception as e: # 熔断5降级到pivot_table if fallback_method pivot: print(触发熔断降级使用pivot_table) idx_names list(series.index.names) idx_names.remove(idx_names[level]) return pd.pivot_table( series.reset_index(), valuesseries.name, indexidx_names, columnsseries.index.names[level], aggfuncfirst, fill_valuefill_value ) else: raise e return result # 使用示例安全第一 try: pivot_result safe_unstack( df.groupby([customer_id,category])[amount].sum(), levelcategory, max_columns50 ) print(安全unstack成功) except Exception as e: print(f熔断触发{e})这套机制在某城商行上线后成功拦截了3次可能导致服务器OOM的错误unstack请求运维同事请我喝了半年咖啡。5. 常见问题与避坑指南那些让我彻夜难眠的深夜报错5.1 “ValueError: operands could not be broadcast together” —— 多重聚合的隐形杀手这个报错90%发生在混合使用内置函数和lambda时。比如# 危险代码 df.groupby(cat).agg({ amt: [sum, lambda x: x/x.mean()], # sum返回标量lambda返回Series })sum返回单个数字lambda x: x/x.mean()返回和x同长度的Seriespandas试图广播时必然失败。根本解法是确保所有agg函数返回同类型# 安全写法1全部用lambda显式控制返回值 df.groupby(cat).agg({ amt: [ lambda x: x.sum(), lambda x: x.mean() ] }) # 安全写法2用namedtuple包装pandas 1.3 from collections import namedtuple AggResult namedtuple(AggResult, [sum_val, mean_val]) df.groupby(cat).agg({ amt: lambda x: AggResult(x.sum(), x.mean()) }).apply(pd.Series) # 展开为DataFrame5.2 “SettingWithCopyWarning” —— unstack后赋值的幽灵警告当你对unstack结果直接赋值时pivoted df.groupby([a,b])[c].mean().unstack() pivoted[new_col] pivoted[X] pivoted[Y] # 触发警告这是因为unstack返回的是视图view而非副本copy。生产环境必须显式copypivoted df.groupby([a,b])[c].mean().unstack().copy() # 强制副本 pivoted[new_col] pivoted[X] pivoted[Y] # 安全更彻底的方案是用assign()pivoted (df.groupby([a,b])[c].mean().unstack() .assign(new_collambda x: x[X] x[Y]))5.3 滚动窗口的“时间穿越” bug —— 当索引不是datetime时最隐蔽的坑如果你的date列是字符串set_index(date)后rolling(7D)会失效因为pandas无法解析字符串为时间间隔。必须显式转换# 危险字符串索引 df[date] df[date].astype(str) # 假设这是错误操作 df.set_index(date).rolling(7D).mean() # 返回NaN # 安全强制datetime索引 df[date] pd.to_datetime(df[date]) # 关键 df df.set_index(date).sort_index() # 排序确保时间有序 df.rolling(7D).mean() # 正常工作我在某银行项目中因此浪费了两天排查时间最终发现上游ETL把date字段导出成了varchar。5.4 内存泄漏的终极克星agg后的及时清理pandas在多重agg时会缓存中间结果尤其当使用lambda时。必须手动释放# 危险链式agg不释放内存 result (df.groupby([a,b]) .agg({x:[sum,mean], y:[min,max]}) .pipe(lambda x: x.x.sum() / x.y.min())) # 中间对象未释放 # 安全分步执行显式删除 step1 df.groupby([a,b]).agg({x:[sum,mean], y:[min,max]}) step2 step1.x.sum() / step1.y.min() del step1 # 立即释放内存 result step2在处理千万级数据时这个del能节省30%以上内存。6. 终极实战构建银行级客户价值仪表盘现在把所有技巧串起来构建一个真实的高管仪表盘。注意这不是玩具代码而是我去年在某全国性银行落地的方案class BankCustomerDashboard: def __init__(self, raw_df): self.df raw_df.copy() self._preprocess() def _preprocess(self): 银行级数据清洗5步标准化 # 步骤1时间标准化处理时区、格式混乱 self.df[date] pd.to_datetime(self.df[date]).dt.tz_localize(None) # 步骤2金额清洗剔除测试、退款、异常值 self.df self.df[ (self.df[amount] 0) (self.df[amount] np.percentile(self.df[amount], 99.5)) ] # 步骤3客户分层依据监管定义 customer_stats self.df.groupby(customer_id)[amount].agg([sum,count]) customer_stats[lifecycle_stage] pd.qcut( customer_stats[sum], q4, labels[New,Growth,Mature,AtRisk] ) self.df self.df.merge( customer_stats[[lifecycle_stage]], left_oncustomer_id, right_indexTrue ) # 步骤4商户分级依据央行商户分类标准 merchant_risk self.df.groupby(merchant_id)[amount].agg([std,count]) merchant_risk[risk_level] np.where( merchant_risk[std] merchant_risk[std].quantile(0.8), High, Normal ) self.df self.df.merge( merchant_risk[[risk_level]], left_onmerchant_id, right_indexTrue ) # 步骤5创建时间特征为滚动计算准备 self.df[week_start] (self.df[date] - pd.to_timedelta( self.df[date].dt.dayofweek, unitD )).dt.date def generate_executive_summary(self): 高管摘要3个核心指标1个预警信号 # 指标1客户健康度滚动30天交易额中位数 daily_revenue self.df.groupby(date)[amount].sum() health_score daily_revenue.rolling(30D).median().iloc[-1] # 指标2高风险交易占比商户风险等级High且单笔500 high_risk_tx self.df[ (self.df[risk_level]High) (self.df[amount]500) ].shape[0] total_tx self.df.shape[0] risk_ratio (high_risk_tx / total_tx * 100) if total_tx 0 else 0 # 指标3客户留存率对比上月活跃客户 current_month self.df[self.df[date] 2024-05-01] last_month self.df[ (self.df[date] 2024-04-01) (self.df[date] 2024-05-01) ] retained_customers set(current_month[customer_id]) set(last_month[customer_id]) retention_rate len(retained_customers) / len(set(last_month[customer_id])) * 100 # 预警信号滚动7天高价值交易数环比下降15% weekly_high_value self.df[ self.df[amount] 500 ].groupby(week_start).