PySpark实战:从数据清洗到模型部署的泰坦尼克号幸存者预测完整流程
1. PySpark环境搭建与数据准备第一次接触PySpark时我被它处理海量数据的能力震撼到了。记得当时用传统Pandas处理一个2GB的CSV文件内存直接爆掉而PySpark轻松搞定。下面分享下我是如何搭建环境的以及处理泰坦尼克号数据集的实战经验。PySpark环境配置其实比想象中简单。我习惯用conda创建独立环境避免包冲突conda create -n pyspark_env python3.8 conda activate pyspark_env pip install pyspark3.3.1 findspark jupyterlab安装完成后在Jupyter Notebook中初始化SparkSession时有个小技巧设置spark.driver.memory可以避免内存不足的问题。我通常会这样配置from pyspark.sql import SparkSession spark SparkSession.builder \ .appName(Titanic_Analysis) \ .config(spark.driver.memory, 4g) \ .getOrCreate()泰坦尼克号数据集可以从Kaggle下载我习惯把数据放在项目目录的data文件夹下。加载数据时发现几个常见坑点必须指定headerTrue否则第一行会被当作数据inferSchemaTrue能自动推断数据类型但会稍微影响性能添加.cache()能提升后续重复操作的效率df spark.read.csv(data/titanic.csv, headerTrue, inferSchemaTrue).cache()2. 数据探索与可视化分析数据探索就像侦探破案每个线索都可能影响最终结果。我习惯先用describe()快速查看数值特征df.describe([Age, Fare, Pclass]).show()输出结果会显示计数、均值、标准差等关键指标。这里发现Age有缺失值后面需要处理。为了更直观我常用PySpark结合Pandas做可视化import matplotlib.pyplot as plt # 幸存者性别分布 gender_survival df.groupBy(Sex, Survived).count().toPandas() gender_survival.pivot(indexSex, columnsSurvived, valuescount).plot(kindbar) plt.title(Survival by Gender)通过分析发现几个有趣现象女性幸存率显著高于男性约74% vs 19%头等舱乘客幸存率更高儿童Age12幸存率优于其他年龄段这些发现将直接影响后续的特征工程策略。比如性别和舱位等级明显是强特征而年龄可能需要分箱处理。3. 数据清洗与特征工程数据清洗是最耗时但最关键的环节。针对泰坦尼克号数据我总结了以下处理步骤缺失值处理Age用中位数填充比均值更抗异常值Embarked用众数S填充Cabin字段缺失太多直接删除from pyspark.sql.functions import median median_age df.select(median(Age)).collect()[0][0] df df.fillna({Age: median_age, Embarked: S}) df df.drop(Cabin)特征转换性别转为数值StringIndexer登船港口做OneHot编码票价做对数变换处理偏态from pyspark.ml.feature import StringIndexer, OneHotEncoder sex_indexer StringIndexer(inputColSex, outputColSexIndex) embarked_indexer StringIndexer(inputColEmbarked, outputColEmbarkedIndex) encoder OneHotEncoder(inputCols[EmbarkedIndex], outputCols[EmbarkedVec])特征构造家庭规模 SibSp Parch姓名中提取称谓Mr/Mrs/Miss等年龄分箱儿童/青年/中年/老年from pyspark.sql.functions import udf from pyspark.sql.types import StringType def extract_title(name): return name.split(,)[1].split(.)[0].strip() title_udf udf(extract_title, StringType()) df df.withColumn(Title, title_udf(df[Name]))4. 构建机器学习PipelinePySpark的Pipeline让整个流程像流水线一样清晰。我通常按这个顺序构建数据准备阶段字符串索引、OneHot编码、特征缩放特征组合VectorAssembler合并所有特征模型训练添加分类器from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler from pyspark.ml.classification import LogisticRegression # 定义特征列 feature_cols [Pclass, SexIndex, Age, Fare, EmbarkedVec] # 创建流水线 assembler VectorAssembler(inputColsfeature_cols, outputColfeatures) lr LogisticRegression(featuresColfeatures, labelColSurvived) pipeline Pipeline(stages[sex_indexer, embarked_indexer, encoder, assembler, lr])训练时有个实用技巧先用小样本测试管道是否畅通再全量训练sample_data df.sample(0.1) model pipeline.fit(sample_data)5. 模型训练与评估我通常会对比逻辑回归和决策树两种模型。逻辑回归训练速度快决策树更易解释。逻辑回归实现# 划分训练测试集 train, test df.randomSplit([0.8, 0.2], seed42) # 训练模型 lr_model pipeline.fit(train) # 评估 lr_predictions lr_model.transform(test) from pyspark.ml.evaluation import BinaryClassificationEvaluator evaluator BinaryClassificationEvaluator(labelColSurvived) print(LR AUC:, evaluator.evaluate(lr_predictions))决策树实现from pyspark.ml.classification import DecisionTreeClassifier dt DecisionTreeClassifier(labelColSurvived, featuresColfeatures) dt_pipeline Pipeline(stages[sex_indexer, embarked_indexer, encoder, assembler, dt]) dt_model dt_pipeline.fit(train) dt_predictions dt_model.transform(test) print(DT AUC:, evaluator.evaluate(dt_predictions))评估时除了AUC我还会看混淆矩阵from pyspark.mllib.evaluation import MulticlassMetrics predictionAndLabels lr_predictions.select(prediction, Survived).rdd metrics MulticlassMetrics(predictionAndLabels) print(Confusion Matrix:, metrics.confusionMatrix().toArray())6. 模型优化与调参模型第一次结果往往不理想需要调参。PySpark的CrossValidator非常实用from pyspark.ml.tuning import ParamGridBuilder, CrossValidator paramGrid (ParamGridBuilder() .addGrid(lr.regParam, [0.01, 0.1, 1.0]) .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) .build()) cv CrossValidator(estimatorpipeline, estimatorParamMapsparamGrid, evaluatorevaluator, numFolds5) cv_model cv.fit(train)调参后模型AUC从0.78提升到了0.83。几个关键发现适度的L2正则化(regParam0.1)效果最好年龄分箱比原始年龄特征效果更好家庭规模特征贡献度很高7. 模型部署与生产化训练好的模型需要持久化以便复用lr_model.write().overwrite().save(models/titanic_lr)加载模型进行预测的完整流程from pyspark.ml import PipelineModel saved_model PipelineModel.load(models/titanic_lr) new_data spark.createDataFrame([ (3, male, 22.0, 1, 0, 7.25, S)], [Pclass, Sex, Age, SibSp, Parch, Fare, Embarked]) predictions saved_model.transform(new_data) predictions.select(prediction).show()在生产环境中我推荐使用mlflow跟踪实验定期用新数据重新训练模型监控模型性能衰减8. 项目复盘与经验总结通过这个项目我总结了PySpark机器学习的最佳实践数据预处理占整个项目70%时间但值得投入特征工程领域知识比算法选择更重要模型选择从简单模型开始逐步复杂化评估指标选择符合业务目标的指标常见踩坑点忘记.cache()导致重复计算类别不平衡时没设置classWeights测试集泄露到训练数据最后分享一个实用技巧使用explain()方法查看Spark执行计划能发现性能瓶颈df.filter(df.Age 30).select(Survived).explain()