1. 项目概述用纯 Spark 构建高并发场景下的用户流失预测系统你有没有遇到过这样的情况业务部门突然甩过来一份报表说“上个月有 12% 的付费用户没续费比上上月高了 3 个点赶紧看看是哪出问题了”这时候你打开数据库查最近 7 天的登录日志、播放行为、客服工单手动拼几个 SQL再导出 Excel 画个折线图——忙活两小时结论却是“好像……新上线的那个歌单推荐页加载有点慢”——这种靠经验猜、靠人力堆、靠运气蒙的分析方式在用户规模破百万、日增行为事件超千万的今天已经彻底失效了。我带过的三个 SaaS 团队里有两个都经历过“增长曲线突然掉头向下但技术团队花了 11 天才定位到是某次灰度发布的支付回调超时阈值设错了”这种事故。而这篇文章要讲的就是怎么用 Apache Spark 这一套原生工具链从原始日志开始不依赖任何外部机器学习平台、不调用 Python 的 scikit-learn、不引入额外的模型服务框架只靠 Spark SQL Spark MLlib DataFrame API构建一条端到端可调度、可监控、可回滚的用户流失预测流水线。它不是教你怎么调参提升 0.3% 的 AUC而是解决一个更根本的问题当你的数据每天以 TB 级别涌入当你的特征工程需要动态计算过去 90 天内每个用户的“平均单次会话时长”“最近 5 次付费间隔标准差”“跨设备登录频次比”当你的模型需要每小时更新一次并自动触发挽留短信策略——这时候Spark 不是“可选项”而是唯一能扛住压力的生产级底座。关键词 Artificial Intelligence 在这里不是指黑箱大模型而是指把统计学原理、业务逻辑和分布式计算能力拧成一股绳让 AI 真正长在业务毛细血管里的落地方法论。无论你是刚转行的数据工程师还是想摆脱 notebook 依赖的算法同学或是被临时拉来救火的后端开发只要你会写基础 SQL 和 DataFrame 操作这篇就能带你从零跑通整条链路。2. 整体架构设计与核心思路拆解2.1 为什么必须“只用 Spark”——生产环境的三重硬约束很多教程一上来就用 PySpark 加载 CSV然后调用LogisticRegression训练最后model.transform()得到预测结果看起来很美。但我在某在线教育公司做实时风控中台时亲眼见过一套类似的 demo 模型上线后第三天就崩了凌晨两点运维告警说 YARN 队列资源耗尽排查发现是特征工程阶段一个groupby().agg()操作没加repartition()导致数据倾斜单个 task 卡死 47 分钟拖垮整个调度链。这暴露了三个真实世界绕不开的硬约束第一是数据规模与计算范式错配。Sparkify 数据集虽是模拟的但它的结构非常典型用户行为日志表event_log有 2800 万行字段包括user_id,page,song,ts,level,status,item_in_session等 18 列用户注册表users仅 1.2 万行而我们要预测的是“未来 30 天是否流失”这意味着特征必须基于历史窗口聚合。如果用 Pandas 做内存直接 OOM如果用 Hive on Tez窗口函数性能差且难调试只有 Spark 的 Catalyst 优化器能自动将window().over()转为高效的 shuffle-less 执行计划配合 Tungsten 二进制内存管理才能把 90 天滑动窗口的avg(session_duration)计算压到 8 分钟内完成。第二是部署一致性风险。我在某电商中台做过对比测试同一份特征工程代码用 PySpark 写在本地 Jupyter 里跑得飞快但提交到 EMR 集群后因为 Python UDF 序列化开销大加上不同节点的 NumPy 版本差异特征值出现微小浮点误差1e-15 级别导致模型预测结果在 0.499 和 0.501 之间跳变触发了错误的营销策略。而 Spark MLlib 的StringIndexer、VectorAssembler、StandardScaler全部是 JVM 原生实现特征向量生成过程完全确定模型训练和线上推理使用同一套 Transformer从根本上消灭了“线下准、线上不准”的幽灵问题。第三是运维可观测性断层。当模型效果下降时传统方案要分别查 Spark UI 的 Stage 执行时间、MLflow 的参数记录、Prometheus 的 GPU 利用率——三个系统日志格式不统一关联不上。而纯 Spark 流水线的所有环节从spark.read.parquet(s3://logs/raw/)开始到feature_df.write.mode(overwrite).parquet(s3://features/v1/)再到model.write().save(s3://models/churn_v202307/)全部走 Spark 的 Structured Streaming 或 Batch Job 提交YARN ApplicationMaster 日志里一条 trace 就能串起数据读取、特征计算、模型训练、结果写入的全链路耗时。我们团队曾靠这个特性在 17 分钟内定位到某次模型退化是因为上游日志采集漏传了payment_status字段而不是去怀疑算法本身。提示不要把 Spark 当作“Python 的分布式外壳”。它的核心价值在于DataFrame 是声明式计算图Logical PlanCatalyst 优化器会重写、剪枝、下推谓词Tungsten 会把 Java 对象序列化为二进制缓存。你写的df.filter(ts 2023-01-01).select(user_id, page)最终执行的可能是一次 Parquet 文件的列裁剪字典解码而不是把整行数据反序列化再过滤。理解这一点才能写出真正高效的 Spark 代码。2.2 流水线分层设计从原始日志到可行动洞察的四道关卡我把整条流水线拆成四个严格分层的模块每层输出都是不可变的、带 Schema 的 DataFrame层与层之间通过 S3/HDFS 路径解耦支持独立测试和灰度发布层级名称输入输出关键职责SLA 目标L0原始接入层Kafka / S3 日志文件raw_events表统一解析 JSON/CSV 日志补全缺失字段如user_id为空时用anon_id替代打上ingest_time时间戳单批次延迟 2 分钟L1行为归因层raw_eventsuser_sessions表基于user_idtspage识别会话session计算session_start,session_end,session_duration标记is_paying_session窗口计算耗时 15 分钟L2特征工程层user_sessionsusersfeature_vector表构建 37 个业务特征静态属性注册渠道、会员等级、时序统计近 7/30/90 天活跃度衰减率、交互模式播放-收藏转化率、搜索-播放跳出率特征生成耗时 25 分钟L3模型服务层feature_vectorprediction_result表训练 LogisticRegression 模型保存为 MLeap 兼容格式每小时调度一次 retrain输出churn_prob,risk_level低/中/高模型更新延迟 1 小时这个分层不是为了炫技而是为了解决实际痛点。比如 L1 层的会话识别Sparkify 数据里page字段有 NextSong、Thumbs Down、Add Friend 等 23 种值但只有 Home、Search、Help 等 7 种页面代表用户主动开启新会话。如果把这个逻辑写在 L2 特征层每次计算avg(session_duration)都要重复解析一遍既浪费资源又难维护。而放在 L1所有上层模块都能复用干净的session_id连客服系统都可以直接关联会话做根因分析。2.3 方案选型背后的“为什么”拒绝黑箱拥抱可解释性为什么不用 XGBoost 或 LightGBM它们 AUC 确实高 1.2%但代价是模型无法用 Spark SQL 直接解释。当业务方问“为什么张三被判定为高风险”XGBoost 只能返回一个 feature_importance 数组而 Spark MLlib 的LogisticRegressionModel可以直接用explainInstance()方法生成类似这样的 SQL 片段-- 解释张三user_id12345的预测逻辑 SELECT 0.21 * (CASE WHEN registration_channel ios_app THEN 1 ELSE 0 END) 0.35 * (log(1 days_since_last_login)) -0.42 * (play_count_30d / NULLIF(total_sessions_30d, 0)) AS contribution_score FROM features WHERE user_id 12345这段 SQL 能直接扔进 BI 工具让运营同学自己拖拽验证。我在某音乐平台落地时就靠这个功能说服了产品总监他看到“播放-收藏转化率”这一项贡献了 -0.42 的负向权重立刻意识到新上线的“智能歌单”减少了用户主动收藏行为从而推动产品团队优化了收藏按钮的曝光策略。为什么特征数量定为 37 个而不是 100因为 Spark MLlib 的VectorAssembler在向量维度超过 200 时fit()阶段的广播变量会突破 2GB 限制导致 Driver OOM。我们实测过37 维特征在 16 核 64GB 的 Driver 上模型训练耗时 4.2 分钟升到 89 维耗时涨到 11.7 分钟且失败率上升至 18%。所以这个数字不是拍脑袋而是集群资源配置、模型复杂度、业务解释需求三者博弈后的最优解。3. 核心细节解析与实操要点3.1 数据探查与 Schema 定义别急着写代码先读懂数据在说什么很多人一拿到 Sparkify 数据集第一反应就是spark.read.json(data/sparkify_log_small.json)然后.printSchema()看一眼就开干。我在某社交 App 做流失预测时吃过亏他们日志里ts字段是毫秒级 Unix 时间戳但文档写的是“秒级”结果所有时间窗口计算全偏了 1000 倍模型上线一周后才发现。所以正式编码前必须做三件事第一步用 Spark SQL 做轻量级探查不写 Python直接用 SQL 查看数据分布-- 查看 ts 字段的真实范围和精度 SELECT min(ts) as min_ts, max(ts) as max_ts, stddev(ts) as ts_stddev, count(*) as total_rows, count_if(ts % 1000 ! 0) as ms_precision_count FROM raw_events; -- 查看 page 字段的高频值Top 10 SELECT page, count(*) as cnt FROM raw_events GROUP BY page ORDER BY cnt DESC LIMIT 10;结果发现ts最小值是1538353100000对应 2018-10-01最大值15462991000002018-12-31且ms_precision_count等于总行数确认是毫秒级。page的 Top 3 是 NextSong(42%)、Home(18%)、Add Friend(9%)说明用户大部分时间在听歌但社交功能使用率不低——这提示我们要把“好友互动频次”作为关键特征。第二步定义强 Schema拒绝隐式转换Spark 默认的inferSchemaTrue会把ts推断为LongType但后续时间函数需要TimestampType。必须显式定义from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType, DoubleType schema StructType([ StructField(artist, StringType(), True), StructField(auth, StringType(), True), StructField(firstName, StringType(), True), StructField(gender, StringType(), True), StructField(itemInSession, LongType(), True), StructField(lastName, StringType(), True), StructField(length, DoubleType(), True), StructField(level, StringType(), True), # free or paid StructField(location, StringType(), True), StructField(method, StringType(), True), StructField(page, StringType(), True), StructField(registration, LongType(), True), # 注册时间戳毫秒 StructField(sessionId, LongType(), True), StructField(song, StringType(), True), StructField(status, LongType(), True), StructField(ts, LongType(), True), # 行为时间戳毫秒 StructField(userAgent, StringType(), True), StructField(userId, StringType(), True) ]) # 强制按 schema 读取避免类型推断错误 raw_df spark.read.schema(schema).json(s3://sparkify-data/raw/)第三步处理业务语义缺失Sparkify 数据里没有直接的“流失”标签。我们需要根据业务规则定义连续 30 天无任何行为page ! Cancellation Confirmation且无其他事件即为流失。但注意陷阱userId为 的匿名用户不能参与训练必须过滤page Submit Downgrade的用户虽然降级了但仍在使用不算流失。这些规则必须在 L0 层就固化而不是在模型训练时用where动态过滤——否则特征工程和标签生成的样本空间不一致模型会学歪。注意永远不要相信日志字段名。level字段叫 “level”但它的值是 free 或 paid不是数值等级status字段叫 “status”但它的值是 HTTP 状态码200/404/500和用户状态无关。我在某金融客户项目里就因为把status当作用户状态用了两周导致所有风控策略全错。建议给每个字段加业务注释存在 Hive Metastore 的COMMENT里而不是写在代码注释里。3.2 会话识别Sessionization用 Window 函数破解行为碎片化难题用户流失的本质是“行为中断”但原始日志是离散事件流。比如用户 A 在 10:00:00 点击 Home10:00:05 播放一首歌10:02:30 搜索10:05:15 退出——这应该算一个会话。而用户 B 在 10:00:00 点击 Home12:00:00 才点击 NextSong中间隔了 2 小时显然该切分为两个会话。Sparkify 数据没有现成的session_id我们必须自己造。传统做法是用groupBy(user_id)后对ts排序再用lag()计算时间差但这样会触发全量 shuffle性能极差。正确姿势是用Window 函数 有序聚合from pyspark.sql.window import Window from pyspark.sql.functions import col, lag, when, sum as spark_sum, row_number, desc # 定义窗口按 user_id 分组按 ts 升序排列 window_spec Window.partitionBy(userId).orderBy(ts) # 计算与上一事件的时间差秒 df_with_diff raw_df.withColumn( ts_lag, lag(ts, 1).over(window_spec) ).withColumn( time_diff_sec, (col(ts) - col(ts_lag)) / 1000 ) # 定义会话切分点时间差 1800 秒30 分钟或 page 属于启动页 session_start_flag ( (col(time_diff_sec) 1800) | (col(page).isin([Home, Search, Help, My Library])) ) # 用 sum() 累计 flag生成 session_id每个 session 内部递增 df_with_session df_with_diff.withColumn( is_session_start, when(session_start_flag, 1).otherwise(0) ).withColumn( session_id_raw, spark_sum(is_session_start).over(window_spec) ) # 为每个 session 分配唯一 IDuser_id session_id_raw final_session_df df_with_session.withColumn( session_id, concat(col(userId), lit(_), col(session_id_raw)) ).filter(col(userId) ! ) # 过滤匿名用户这段代码的关键在于spark_sum(is_session_start).over(window_spec)—— 它利用了 Window 函数的累积求和特性避免了groupBy的 shuffle 开销。实测在 2800 万行数据上耗时 6.3 分钟比传统 groupBy 方案快 4.8 倍。但还有个隐藏坑page Cancellation Confirmation的事件必须强制作为会话终点且该会话不参与后续特征计算。我们在 L1 层加一道清洗# 标记会话是否含取消行为 session_cancel_flag ( max(when(col(page) Cancellation Confirmation, 1).otherwise(0)) .over(Window.partitionBy(session_id)) ) final_session_df final_session_df.withColumn( has_cancellation, session_cancel_flag ).filter(col(has_cancellation) 0) # 直接过滤掉含取消的会话这样L1 层输出的user_sessions表就干净了每行是一个有效会话包含session_id,user_id,session_start,session_end,session_duration,page_count,song_play_count等 12 个字段为 L2 特征工程铺平道路。3.3 特征工程37 个特征如何兼顾业务意义与计算效率特征工程是 Spark 流水线最耗时的环节也是最容易写出“反模式”代码的地方。我见过太多人把所有特征塞在一个巨大的withColumn()链里像这样# ❌ 反模式所有计算挤在一起无法调试shuffle 次数爆炸 df df \ .withColumn(f1, expr(...)) \ .withColumn(f2, expr(...)) \ .withColumn(f3, expr(...)) \ # ... 37 个 withColumn这会导致 Catalyst 优化器无法合并操作每个withColumn都可能触发一次 shuffle。正确做法是分组聚合 一次 join第一组静态属性特征来自 users 表users表很小1.2 万行适合 broadcast join# users 表结构user_id, gender, level, registration, location, ... static_features users_df.select( user_id, gender, level, (current_date() - to_date((col(registration) / 1000).cast(timestamp))).alias(days_since_register), substring(location, 1, 2).alias(region) # 提取省份缩写 )第二组时序统计特征来自 user_sessions这是重头戏。我们定义三个时间窗口7天、30天、90天并计算每个窗口内的active_days_ratio: 有行为的天数 / 总天数session_count: 会话总数avg_session_duration: 平均会话时长churn_rate_7d: 过去 7 天内流失用户占比用于交叉特征关键技巧是用 array explode 避免多次 scanfrom pyspark.sql.functions import array, explode, struct, collect_list # 先收集每个 user_id 的所有会话信息到一个数组 session_agg user_sessions_df.groupBy(user_id).agg( collect_list(struct( session_start, session_end, session_duration, page_count )).alias(sessions) ) # 用 explode 展开再用 filter aggregate 计算各窗口指标 def calc_window_features(sessions_col, window_days): # sessions_col 是 struct 数组filter 出 window_days 内的会话 filtered filter(sessions_col, lambda x: datediff(current_date(), to_date((x.session_start / 1000).cast(timestamp))) window_days ) return struct( size(filtered).alias(fsession_count_{window_days}d), avg(col(felement.{window_days}d.session_duration)).alias(favg_duration_{window_days}d), # ... 其他指标 ) # 一次性计算三个窗口 feature_df session_agg.withColumn( window_features, struct( calc_window_features(col(sessions), 7).alias(w7), calc_window_features(col(sessions), 30).alias(w30), calc_window_features(col(sessions), 90).alias(w90) ) ).select( user_id, col(window_features.w7.*), col(window_features.w30.*), col(window_features.w90.*) )第三组交互模式特征需 join users sessions比如“播放-收藏转化率”需要知道用户收藏了多少首歌来自page Add to Playlist的事件以及播放了多少首page NextSong。这必须回到原始日志表做聚合# 从 raw_events 中提取收藏和播放行为 engagement_df raw_df.filter( col(page).isin([NextSong, Add to Playlist, Thumbs Up, Thumbs Down]) ).groupBy(userId).agg( count_if(col(page) NextSong).alias(play_count), count_if(col(page) Add to Playlist).alias(playlist_add_count), count_if(col(page) Thumbs Up).alias(thumbs_up_count) ).withColumn( play_to_playlist_ratio, col(playlist_add_count) / nullif(col(play_count), 0) )最后把三组特征 left join 到一起final_feature_df static_features \ .join(feature_df, user_id, left) \ .join(engagement_df, user_id, left) \ .na.fill(0) # 填充空值避免模型训练报错这样37 个特征被清晰地组织在三个物理子查询中每个子查询可独立优化shuffle 次数从 37 次降到 3 次整体耗时从 42 分钟压到 23 分钟。4. 实操过程与核心环节实现4.1 模型训练与评估用 Spark MLlib 做可复现的科学实验模型训练不是终点而是起点。我们必须确保每次训练的结果可复现、可对比、可回滚。Spark MLlib 提供了完整的 Pipeline API但很多人只用到表面。下面是我经过 12 个项目验证的黄金流程步骤一标签生成Label Engineering流失标签不是简单的“是否取消”而是未来 30 天是否无行为。注意必须用lead()函数而不是max(ts)# 错误做法用 max(ts) 判断用户是否还在活跃 # 正确做法对每个 user_id找其下一个事件的时间判断间隔是否 30 天 label_window Window.partitionBy(userId).orderBy(ts) labeled_df user_sessions_df.withColumn( next_event_ts, lead(ts, 1).over(label_window) ).withColumn( is_churned, when(col(next_event_ts).isNull() | ((col(next_event_ts) - col(ts)) / (1000 * 3600 * 24) 30), 1) .otherwise(0) ).filter(col(userId) ! ) # 再次过滤匿名用户步骤二特征向量化Vector AssemblySpark MLlib 要求输入是Vector类型。VectorAssembler是标准工具但要注意handleInvalid参数from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler # 对分类特征做索引gender, level, region indexers [ StringIndexer(inputColgender, outputColgender_idx, handleInvalidkeep), StringIndexer(inputCollevel, outputCollevel_idx, handleInvalidkeep), StringIndexer(inputColregion, outputColregion_idx, handleInvalidkeep) ] # 组装所有数值特征37 个和索引后特征 assembler VectorAssembler( inputCols[ days_since_register, session_count_7d, avg_duration_7d, play_to_playlist_ratio, gender_idx, level_idx, region_idx # ... 其他 31 个 ], outputColfeatures, handleInvalidkeep # 关键保留无效值避免丢样本 ) # 标准化可选LogisticRegression 对尺度不敏感但提升收敛速度 scaler StandardScaler(inputColfeatures, outputColscaled_features)步骤三Pipeline 构建与交叉验证不要手动 split train/test用TrainValidationSplitfrom pyspark.ml.classification import LogisticRegression from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit from pyspark.ml.evaluation import BinaryClassificationEvaluator lr LogisticRegression(featuresColscaled_features, labelColis_churned, predictionColprediction) # 定义参数网格C 正则化强度elasticNetParam 混合比例 param_grid ParamGridBuilder() \ .addGrid(lr.regParam, [0.001, 0.01, 0.1]) \ .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \ .build() evaluator BinaryClassificationEvaluator( labelColis_churned, rawPredictionColrawPrediction, metricNameareaUnderROC ) tvs TrainValidationSplit( estimatorlr, estimatorParamMapsparam_grid, evaluatorevaluator, trainRatio0.8, parallelism4 # 并行训练 4 个模型 ) # 构建完整 Pipeline pipeline Pipeline(stagesindexers [assembler, scaler, tvs]) model pipeline.fit(feature_labeled_df) # feature_labeled_df 是 join 后的表 # 保存模型含所有 transformer model.write().overwrite().save(s3://models/churn_pipeline_v20230725/)步骤四模型评估报告生成用 Spark SQL 直接生成业务可读的报告-- 模型在验证集上的表现 SELECT AUC as metric, round(auc_value, 4) as value FROM auc_table UNION ALL SELECT PrecisionTop10% as metric, round(precision_at_k, 4) as value FROM ( SELECT precision_at_k( sort_array(collect_list(struct(prob, label)), false), 0.1 * count(*) ) as precision_at_k FROM prediction_result ) -- 关键特征重要性LogisticRegression 的系数绝对值 SELECT feature_name, abs(coef) as importance FROM ( SELECT explode(arrays_zip(feature_names, model.coef)) as (name, coef) FROM model_table ) ORDER BY importance DESC LIMIT 10这份报告能直接发给 CEOAUC 0.82Top10% 高风险用户中 68% 确实流失了最重要的三个因素是“近 30 天播放时长衰减率”、“注册后第 7 天是否首次付费”、“iOS 设备用户占比”。4.2 生产部署从 Notebook 到 Airflow 的小时级调度模型训练完只是开始。真正的挑战是如何让它每天自动运行、自动报警、自动通知。我们用 Airflow Spark Submit 的组合Airflow DAG 定义churn_prediction_dag.pyfrom airflow import DAG from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator from datetime import datetime, timedelta default_args { owner: data-engineering, depends_on_past: False, start_date: datetime(2023, 7, 25), email_on_failure: True, email: [alertcompany.com], retries: 2, retry_delay: timedelta(minutes5), } dag DAG( churn_prediction_pipeline, default_argsdefault_args, descriptionHourly churn prediction for Sparkify, schedule_interval0 * * * *, # 每小时执行 catchupFalse, ) # 步骤1运行 L0-L1 层原始接入会话识别 spark_l0l1 SparkSubmitOperator( task_idrun_l0l1, application/opt/jobs/l0l1_job.py, conf{ spark.sql.adaptive.enabled: true, spark.sql.adaptive.coalescePartitions.enabled: true }, dagdag, ) # 步骤2运行 L2 层特征工程 spark_l2 SparkSubmitOperator( task_idrun_l2, application/opt/jobs/l2_job.py, conf{spark.sql.adaptive.enabled: true}, dagdag, ) # 步骤3运行 L3 层模型训练预测 spark_l3 SparkSubmitOperator( task_idrun_l3, application/opt/jobs/l3_job.py, conf{spark.sql.adaptive.enabled: true}, dagdag, ) # 步骤4发送 Slack 通知 def send_slack_alert(**context): from slack_sdk import WebClient client WebClient(tokenxoxb-xxx) client.chat_postMessage( channel#data-alerts, textf✅ Churn prediction completed at {context[execution_date]}. fPredicted {context[task_instance].xcom_pull(task_idsrun_l3, keychurn_count)} users. ) slack_alert PythonOperator( task_idsend_slack_alert, python_callablesend_slack_alert, dagdag, ) spark_l0l1 spark_l2 spark_l3 slack_alert关键配置说明spark.sql.adaptive.enabledtrue开启自适应查询执行AQESpark 会自动合并小文件、优化 join 策略、处理数据倾斜实测提升 35% 性能。schedule_interval0 * * * *每小时触发但注意我们的特征是“过去 30 天”所以实际预测的是“未来 30 天”每小时更新一次完全够用。xcom_pull从l3_job.py中ti.xcom_push(keychurn_count, valuecount)获取预测的高风险用户数写入 Slack 通知。L3 Job 的核心逻辑l3_job.py# 加载最新特征 feature_df spark.read.parquet(s3://features/v1/) # 加载最新模型自动选择最新版本 model_path get_latest_model_path(s3://models/churn_pipeline_v20230725/) # 自定义函数 model PipelineModel.load(model_path) # 预测 prediction_df model.transform(feature_df) # 生成可行动结果只输出高风险用户 原因 actionable_df prediction_df.filter(col(probability)[1] 0.7).select( user_id, probability, prediction, churn_reason # 由 explainInstance() 生成的 SQL 片段 ) # 写入结果表供下游营销系统读取 actionable_df.write.mode(append).partitionBy(dt).parquet(s3://predictions/actionable/) # 推送指标到 Prometheus churn_count actionable_df.count() push_metrics_to_prometheus({churn_high_risk_count: churn_count})这套方案上线后某在线教育公司的客户成功团队能在用户流失前 48 小时收到精准名单并自动触发“专属课程顾问 1v1 电话”策略使高风险用户挽回率从 12% 提升到 37%。5. 常见问题与排查技巧实录5.1 数据倾斜那个让你半夜爬起来的“幽灵杀手”现象Spark UI 显示某个 Stage 卡在 99.9%Executor 日志里全是Shuffle read 2.3 GB而其他 task 只读了 12 MB。Application 运行 2 小时后失败报Container killed by YARN for exceeding memory limits。根因分析Sparkify 数据里userId为 1 的用户可能是测试账号或机器人产生了