python里与spark相关的语法介绍
在 Python 中使用 Spark主要通过PySpark库来实现。它是 Apache Spark 的 Python API语法风格与 Pandas 有相似之处但核心思想是分布式计算和惰性求值。以下是 PySpark 核心语法的系统性介绍基于目前主流的 Spark 3.x / 4.x 版本1. 入口SparkSession所有 PySpark 程序的起点替代了旧版的SparkContext。frompyspark.sqlimportSparkSession spark(SparkSession.builder.appName(MyApp).master(local[*])# 本地测试用生产环境由集群管理器指定.config(spark.some.config,value).getOrCreate())2. DataFrame 创建DataFrame 是 PySpark 最核心的数据结构等价于分布式表。方式代码示例从文件读取spark.read.parquet(/path)/.csv()/.json()/.table(db.tbl)从列表创建spark.createDataFrame([(1,a),(2,b)], [id,name])从 Pandas 转换spark.createDataFrame(pandas_df)SQL 查询创建spark.sql(SELECT * FROM table WHERE dt2026-06-30)3. 核心变换语法 (Transformations)⚠️关键概念惰性求值。以下操作不会立即执行只是记录逻辑计划直到遇到 Action 操作才真正触发计算。3.1 列操作 (pyspark.sql.functions)这是 PySpark 最常用的模块通常简写为Ffrompyspark.sqlimportfunctionsasF df.select(F.col(name),# 引用列F.lit(2026).alias(year),# 常量列F.concat_ws(-,F.col(y),F.col(m)),# 字符串拼接F.when(F.col(age)18,adult).otherwise(minor),# 条件判断F.date_add(F.current_date(),-7),# 日期函数F.explode(F.col(tags))# 数组展开)3.2 常用 DataFrame 方法# 过滤df.filter(F.col(amount)100)df.where(region cn AND dt 2026-06-30)# 也支持SQL表达式字符串# 聚合df.groupBy(region).agg(F.sum(amount).alias(total),F.countDistinct(user_id).alias(uv))# 关联df1.join(df2,onuser_id,howleft)df1.join(df2,on(df1.iddf2.uid)(df1.dtdf2.dt),howinner)# 窗口函数frompyspark.sql.windowimportWindow wWindow.partitionBy(dept).orderBy(F.desc(salary))df.withColumn(rank,F.row_number().over(w))# 其他df.dropDuplicates([user_id])# 去重df.na.fill(0,subset[amount])# 空值填充df.repartition(10,dt)# 重分区影响Shuffledf.cache()/df.persist()# 缓存到内存/磁盘4. 动作语法 (Actions)触发实际计算并返回结果到 Driver 端df.show(20,truncateFalse)# 打印前N行df.count()# 计数df.collect()# ⚠️ 拉取全部数据到Driver大表慎用df.toPandas()# ⚠️ 转为Pandas DataFrame同样慎用df.write.parquet(/output)# 写出文件df.createOrReplaceTempView(tmp)# 注册临时视图供SQL使用5. Spark SQL 语法如果不习惯 DataFrame API可以直接写 SQLdf.createOrReplaceTempView(sales)resultspark.sql( SELECT region, SUM(amount) as total FROM sales WHERE dt 2026-06-30 GROUP BY region HAVING total 10000 ORDER BY total DESC )6. UDF用户自定义函数当内置函数无法满足需求时使用但性能较差Python 与 JVM 间有序列化开销应优先使用内置函数或 Pandas UDF。# 普通 UDF逐行处理慢F.udf(string)defmask_phone(phone):returnphone[:3]****phone[7:]ifphoneelseNone# Pandas UDF / Vectorized UDF向量化快10-100倍✅ 推荐F.pandas_udf(double)defnormalize(s:pd.Series)-pd.Series:return(s-s.mean())/s.std()7. PySpark vs Pandas 语法速查对照操作PandasPySpark选列df[[a,b]]df.select(a,b)过滤df[df.age18]df.filter(F.col(age)18)新增列df[new] ...df.withColumn(new, ...)重命名df.rename(columns{})df.withColumnRenamed(old,new)分组聚合df.groupby().agg()df.groupBy().agg()排序df.sort_values()df.orderBy()/df.sort()采样df.sample(frac0.1)df.sample(fraction0.1) 最佳实践与避坑指南避免collect()/toPandas()除非确认数据量很小 几GB否则会导致 Driver OOM。调试时用show()或limit(100).toPandas()。优先用内置函数pyspark.sql.functions里的函数在 JVM 端执行比 Python UDF 快几个数量级。注意 Shufflejoin、groupBy、repartition、distinct都会触发 Shuffle是性能瓶颈。尽量用广播 JoinF.broadcast(small_df)减少 Shuffle。合理设置分区数分区太少导致并行度不足太多导致任务调度开销大。一般每个分区 128MB~256MB 为宜。类型安全PySpark 是强类型的字符串123和整数123不能直接比较/关联需用F.col().cast()显式转换。Spark Connect (Spark 4.x 新特性)如果你使用的是较新版本推荐使用 Spark Connect 客户端模式将 Driver 与 Cluster 解耦支持更好的 IDE 补全和远程开发体验。