一、引言Flink 作业从用户编写的 DataStream API 代码到最终在集群上物理执行需要经历四个关键阶段的图转换StreamGraph → JobGraph → ExecutionGraph → 物理执行图Physical Execution Graph每个阶段承担着不同的职责——从逻辑拓扑表达、算子链优化、并行化展开到最终的物理部署。理解这一完整链路不仅有助于排查性能瓶颈、合理设置并行度与资源更能在作业调优时做出有据可依的决策。二、全局视角四层图转换总览核心设计思想逐层从逻辑抽象到物理实体每一层只关注自己的职责实现关注点分离。StreamGraph 忠实记录用户意图保留完整逻辑信息JobGraph 通过算子链化完成第一层性能优化形成可提交单元ExecutionGraph 在 JobManager 端实现并行化展开支撑调度和容错物理执行图 在 TaskManager 端完成实际部署和数据交换三、第一阶段StreamGraph —— 逻辑拓扑的忠实表达当用户调用env.execute()时StreamExecutionEnvironment内部调用StreamGraphGenerator.generate()方法遍历已注册的Transformation列表将其翻译为 StreamGraph。// 简化的调用链路 StreamExecutionEnvironment.execute() → getStreamGraph() → StreamGraphGenerator.generate() → 遍历 transformations 列表 → 为每个 Transformation 创建对应的 StreamNode 和 StreamEdge核心数据结构概念说明StreamNode代表一个算子operator包含算子工厂、并行度、SlotSharingGroup 等属性StreamEdge连接上下游 StreamNode携带 StreamPartitioner如 HashPartitioner、RebalancePartitioner、ForwardPartitionerTransformation用户 API 调用的中间表示如 OneInputTransformation、TwoInputTransformationStreamGraph 的特点一对一映射每个用户算子调用对应一个 StreamNode不做任何优化合并保留完整语义包括分区策略、算子名称、UID、并行度等所有元信息仅在 Client 端生成StreamGraph 的构建完全在客户端完成该阶段能干预的内容项// 设置算子并行度 stream.map(...).setParallelism(4); // 设置算子名称影响 Web UI 展示和指标命名 stream.map(...).name(my-map-operator); // 设置 UIDCheckpoint/Savepoint 恢复的关键标识 stream.map(...).uid(map-001); // 设置 Slot 共享组 stream.map(...).slotSharingGroup(heavy-ops); // 禁用算子链 stream.map(...).disableChaining(); // 开始新的算子链 stream.map(...).startNewChain();最佳实践务必为每个有状态算子设置 uid()这是 Savepoint 恢复时算子状态映射的唯一标识。不设置时 Flink 会自动生成但拓扑变更后将无法恢复。合理命名算子name() 直接影响 Web UI 的可读性和 Metrics 的命名层级。并行度设置遵循数据量驱动原则Source 并行度通常对齐外部系统分区数如 Kafka partition 数下游根据计算复杂度适当放大。四、第二阶段JobGraph —— 算子链优化与提交单元JobGraph 是对 StreamGraph 的关键优化层其核心工作是将满足条件的相邻算子链化chain到同一个 Task 中执行以减少线程切换和网络序列化/反序列化的开销。StreamGraph 中: JobGraph 中: ┌────────┐ ┌────────┐ ┌────────┐ ┌──────────────────────┐ ┌────────┐ │ Source │───▶│ Map │───▶│ KeyBy/ │ │ Source → Map (Chain) │───▶│ KeyBy/ │ │ │ │ │ │ Window │ │ (一个 JobVertex) │ │ Window │ └────────┘ └────────┘ └────────┘ └──────────────────────┘ └────────┘ ▲ 算子链合并优化以下条件全部满足时上下游算子才会被链化┌─────────────────────────────────────────────────────────────┐ │ Operator Chaining 成立条件AND 关系 │ ├─────────────────────────────────────────────────────────────┤ │ 1. 下游算子的 ChainingStrategy ALWAYS │ │ 2. 上游算子的 ChainingStrategy ! NEVER即 HEAD 或 ALWAYS │ │ 3. 上下游算子的并行度相同 │ │ 4. 连接使用 ForwardPartitioner │ │ 5. 上下游在同一个 SlotSharingGroup │ │ 6. 下游算子的输入边数量为 1无多输入 │ │ 7. 未被用户显式禁用 (disableChaining / startNewChain) │ └─────────────────────────────────────────────────────────────┘核心数据结构概念说明JobVertex链化后的算子集合是调度的基本逻辑单元JobEdge连接 JobVertex关联 IntermediateDataSet标记数据分发模式POINTWISE / ALL_TO_ALLIntermediateDataSet表示 JobVertex 的输出数据集对应一个 ResultPartitionTypePIPELINED / BLOCKING该阶段能干预的内容项// 全局禁用算子链 env.disableOperatorChaining(); // 单算子禁用链化该算子不与任何上下游链化 stream.map(...).disableChaining(); // 从该算子开始新的链切断与上游的链接但可与下游链化 stream.map(...).startNewChain(); // 设置 SlotSharingGroup不同组的算子无法链化 stream.map(...).slotSharingGroup(isolated);最佳实践不要盲目禁用算子链链化带来的性能收益显著减少序列化和线程上下文切换。仅在需要独立监控某算子指标或隔离反压时才断链。利用断链定位反压当 Web UI 显示某个 Task 反压时通过 startNewChain() 将可疑算子拆分出来精确定位瓶颈。注意并行度一致性如果上下游并行度不同会自动使用 Rebalance/Hash 等分区器而非 Forward导致链化自动失效并引入 Shuffle。五、第三阶段ExecutionGraph —— 并行化与调度核心JobGraph 提交到 JobManager具体是 Dispatcher → JobMaster后由DefaultExecutionGraphBuilder将 JobGraph 展开为 ExecutionGraph这是JobManager 端的核心数据结构。并行化展开示例核心数据结构概念说明ExecutionJobVertex对应一个 JobVertex管理其所有并行子任务ExecutionVertex对应一个并行子任务subtask是调度和部署的最小单元ExecutionExecutionVertex 的一次执行尝试Failover 重启时创建新的 ExecutionIntermediateResult对应 IntermediateDataSet 的并行化表示IntermediateResultPartition每个 ExecutionVertex 的输出分区ExecutionEdge连接 ExecutionVertex 和 IntermediateResultPartition该阶段能干预的内容项# flink-conf.yaml # Failover 策略 jobmanager.execution.failover-strategy: region # 重启策略 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 3 restart-strategy.fixed-delay.delay: 10s # 调度模式通常无需手动设置由运行模式自动决定 # jobmanager.scheduler: default最佳实践使用 Region Failover相比 Restart AllRegion 级别的恢复范围更小、恢复更快是生产环境的推荐配置。合理设置最大并行度maxParallelism该参数决定了 KeyGroup 的数量一旦设定后无法修改会影响 Savepoint 兼容性。建议设置为并行度的 2 的幂次倍数如并行度 12 → maxParallelism 128。关注 Slot 分配ExecutionGraph 阶段决定了 Task 与 Slot 的分配关系。通过 slotSharingGroup 可以实现计算密集型和 IO 密集型算子的资源隔离。六、第四阶段物理执行图 —— 真正的运行时核心运行时组件组件职责ResultPartitionTask 的输出端按下游消费者数量划分 ResultSubpartitionInputGateTask 的输入端通过 InputChannel 从上游拉取数据NetworkBuffer基于 Netty 的内存 Buffer 池承载反压传播Credit-based Flow ControlFlink的反压机制下游向上游通告可用 Buffer 数Credit该阶段能干预的内容项# Network Buffer 配置 taskmanager.memory.network.fraction: 0.1 taskmanager.memory.network.min: 64mb taskmanager.memory.network.max: 1gb # 每个 ResultPartition 的 Buffer 数 taskmanager.network.memory.buffers-per-channel: 2 taskmanager.network.memory.floating-buffers-per-gate: 8 # 反压监控采样 web.backpressure.refresh-interval: 60000最佳实践Network Buffer 调优如果作业存在大量 Shuffle如 keyBy 后并行度很高需要适当增大 network.fraction。Buffer 不足会直接导致反压。观察反压指标物理执行层的反压可通过 Web UI 的 BackPressure 面板或 outputQueueLength / inPoolUsage指标观察。Slot 资源规划每个 Slot 是资源隔离的基本单位。建议 taskmanager.numberOfTaskSlots 设置为 CPU 核心数每个 Slot 分配约 1 核 CPU。七、各阶段关系与对比维度StreamGraphJobGraphExecutionGraph物理执行图生成位置ClientClientJobManagerTaskManager核心节点StreamNodeJobVertexExecutionVertexTask核心边StreamEdgeJobEdgeExecutionEdgeNetwork Channel并行度逻辑设定逻辑设定实际展开物理实例优化动作无算子链合并并行化 调度资源分配 网络部署可干预手段APIuid/name/并行度链化控制Failover/调度策略Buffer/Slot 配置生命周期execute() 时构建提交前构建作业运行期间维护Task 运行期间存在八、端到端示例以一个典型的 Kafka → 处理 → Kafka 作业为例StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), Kafka Source) .uid(source-001) .map(new ParseFunction()) .uid(parse-001) .name(JSON Parse) .keyBy(Event::getUserId) .window(TumblingEventTimeWindows.of(Time.minutes(1))) .aggregate(new CountAggregate()) .uid(window-agg-001) .name(1min Window Aggregation) .setParallelism(8) .sinkTo(kafkaSink) .uid(sink-001) .name(Kafka Sink) .setParallelism(4); env.execute(User Event Aggregation Job);图转换过程StreamGraph (4 个 StreamNode): [Source(p4)] --Forward-- [Map(p4)] --Hash-- [Window(p8)] --Rebalance-- [Sink(p4)] JobGraph (3 个 JobVertexSource 和 Map 被链化): [Source→Map Chain(p4)] --Hash-- [Window(p8)] --Rebalance-- [Sink(p4)] ※ Source 和 Map 满足链化条件相同并行度(4)、Forward 分区、策略允许 ExecutionGraph (展开为 16 个 ExecutionVertex): [4 个 Source→Map EV] --Hash(ALL_TO_ALL)-- [8 个 Window EV] --Rebalance(ALL_TO_ALL)-- [4 个 Sink EV] 物理执行图: 4 个 Task(Source→Map) 8 个 Task(Window) 4 个 Task(Sink) 16 个 Task 最少需要 8 个 Slot取最大并行度前提是默认 SlotSharingGroup