一、前言容器化部署是如今业界流行的一项技术基于Docker镜像运行能够让用户更加方便地对应用进行管理和运维。容器管理工具中最为流行的就是KubernetesK8s而Flink也在最近的版本中支持了K8s部署模式。与Standalone和YARN部署相比Flink on K8s具有以下优势资源隔离利用K8s Namespace和ResourceQuota实现多租户隔离弹性伸缩结合K8s HPA/VPA实现自动扩缩容云原生生态无缝对接K8s的监控、日志、服务发现等能力简化运维通过声明式API管理Flink作业生命周期二、Flink on K8s 架构概述2.1 原生K8s部署架构Flink从1.10版本开始支持原生Kubernetes部署基本原理与YARN类似如上图所示Flink原生K8s部署的核心组件交互流程Flink Client通过K8s Client向K8s ApiServer提交资源请求K8s ApiServer创建ConfigMap存储作业配置Flink Master Deployment启动Dispatcher和K8sResMngrK8s资源管理器JobMaster向K8sResMngr申请资源K8sResMngr通过K8s ApiServer创建TaskManager PodTaskManager Pod启动后向JobMaster注册Flink Dashboard通过SVC访问Web UI2.2 Flink与K8s组件映射Flink组件K8s资源类型说明JobManagerDeployment Service主控节点通过Service暴露REST端口TaskManagerPod工作节点由JobManager动态申请创建配置信息ConfigMap存储flink-conf.yaml等配置持久化存储PersistentVolumeClaimCheckpoint/Savepoint持久化服务发现Service DNS内部组件通信三、三种部署模式详解Flink on K8s同样支持三种部署模式Session Mode、Per-Job Mode和Application Mode。3.1 三种模式对比对比维度Session ModePer-Job ModeApplication Mode集群生命周期长期运行多作业共享随作业创建和销毁随作业创建和销毁资源隔离弱共享JobManager强独立JobManager强独立JobManagermain方法执行位置ClientClientJobManager资源申请时机集群启动时作业提交时作业提交时适用场景短作业、测试环境长作业、生产环境推荐的生产环境方式K8s支持✅ 完全支持✅ 支持✅ 推荐方式3.2 Session Mode会话模式Session Mode先启动一个长期运行的Flink集群然后向该集群提交多个作业。特点多个作业共享同一个JobManager资源提前分配作业启动快资源隔离性差一个作业故障可能影响其他作业启动命令# 启动Flink Session集群./bin/kubernetes-session.sh -Dkubernetes.cluster-idmy-flink-session-Dkubernetes.namespaceflink-Dkubernetes.container.imageflink:1.17-scala_2.12提交作业./bin/flink run-tkubernetes-session -Dkubernetes.cluster-idmy-flink-session-Dkubernetes.namespaceflink-ccom.example.MyJob ./my-job.jar3.3 Per-Job Mode单作业模式Per-Job Mode为每个作业单独创建一个Flink集群作业完成后集群销毁。特点每个作业独立JobManager资源隔离好作业提交时申请资源启动稍慢main方法在Client执行需要下载依赖到本地启动命令./bin/flink run-tkubernetes-per-job -Dkubernetes.cluster-idmy-flink-job-Dkubernetes.namespaceflink-Dkubernetes.container.imageflink:1.17-scala_2.12-ccom.example.MyJob ./my-job.jar3.4 Application Mode应用模式⭐推荐Application Mode是Flink 1.11引入的部署模式也是K8s环境下最推荐的方式。特点main方法在JobManager上执行减轻Client负担每个作业独立JobManager资源隔离好作业jar包和依赖直接构建到镜像中无需每次传输启动命令./bin/flink run-application-tkubernetes-application -Dkubernetes.cluster-idmy-flink-app-Dkubernetes.namespaceflink-Dkubernetes.container.imageflink:1.17-scala_2.12 local:///opt/flink/usrlib/my-job.jar关键配置说明# K8s集群标识 kubernetes.cluster-id: my-flink-app # 命名空间 kubernetes.namespace: flink # 容器镜像 kubernetes.container.image: flink:1.17-scala_2.12 # 镜像拉取策略Always/Never/IfNotPresent kubernetes.container.image.pull-policy: IfNotPresent # JobManager资源 kubernetes.jobmanager.cpu: 1.0 kubernetes.jobmanager.memory: 1600m # TaskManager资源 kubernetes.taskmanager.cpu: 2.0 kubernetes.taskmanager.memory: 1728m # 每个TaskManager的Slot数 taskmanager.numberOfTaskSlots: 2四、Flink Kubernetes Operator4.1 什么是Flink Kubernetes OperatorFlink Kubernetes Operator是Flink社区官方推出的K8s Operator通过CRDCustom Resource Definition以声明式的方式管理Flink作业生命周期。核心能力声明式部署通过YAML定义Flink作业无需手动执行命令作业升级支持Savepoint停启方式升级作业自动故障恢复监控作业状态自动从Checkpoint恢复弹性伸缩支持手动和自动调整并行度4.2 安装Flink Kubernetes Operator前提条件Kubernetes 1.9Helm 3集群已安装cert-manager安装步骤# 1. 添加Helm仓库helm repoaddflink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-1.7.0/# 2. 安装Operatorhelminstallflink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator# 3. 验证安装kubectl get pods-nflink# 应看到 flink-kubernetes-operator 的Pod运行中4.3 使用FlinkDeployment CRD部署作业FlinkDeployment是Operator定义的核心CRD用于描述Flink作业的配置。Session集群示例apiVersion:flink.apache.org/v1beta1kind:FlinkDeploymentmetadata:name:session-clusternamespace:flinkspec:image:flink:1.17-scala_2.12flinkVersion:v1.17mode:nativeingress:template:/{{namespace}}/{{name}}(/|$)(.*)className:nginxannotations:nginx.ingress.kubernetes.io/rewrite-target:/$2jobManager:resource:memory:2048mcpu:1taskManager:resource:memory:2048mcpu:2replicas:2Application模式作业示例apiVersion:flink.apache.org/v1beta1kind:FlinkDeploymentmetadata:name:wordcount-jobnamespace:flinkspec:image:flink:1.17-scala_2.12flinkVersion:v1.17mode:nativejobManager:resource:memory:2048mcpu:1taskManager:resource:memory:2048mcpu:2replicas:2job:jarURI:local:///opt/flink/examples/streaming/WordCount.jarparallelism:2upgradeMode:savepointstate:running提交作业kubectl apply-fflink-deployment.yaml查看作业状态kubectl get flinkdeployments-nflink kubectl describe flinkdeployment wordcount-job-nflink五、高可用HA配置5.1 K8s环境下HA架构如上图所示Flink on K8s的高可用架构包含以下组件多个JobManager通过Leader Election选举主节点Zookeeper/K8s ConfigMap存储Leader信息和元数据分布式存储S3/HDFS存储Checkpoint和SavepointTaskManager连接Leader JobManager5.2 基于K8s ConfigMap的HA配置Flink支持使用K8s ConfigMap替代Zookeeper作为HA元数据存储。# 启用HA high-availability: kubernetes high-availability.namespace: flink # JobManager副本数 kubernetes.jobmanager.replicas: 3 # 选举超时时间 high-availability.zookeeper.client.session-timeout: 60000 # Checkpoint存储路径 state.checkpoints.dir: s3://my-bucket/flink/checkpoints # Savepoint存储路径 state.savepoints.dir: s3://my-bucket/flink/savepoints5.3 基于Zookeeper的HA配置# 启用Zookeeper HA high-availability: zookeeper high-availability.zookeeper.quorum: zk-0:2181,zk-1:2181,zk-2:2181 high-availability.zookeeper.path.root: /flink # JobManager元数据存储 high-availability.storageDir: s3://my-bucket/flink/ha六、持久化存储配置6.1 Checkpoint与Savepoint持久化在K8s环境中Pod是临时的因此必须将Checkpoint和Savepoint持久化到外部存储。如上图所示JobManager和TaskManager将状态快照存储到NAS网络附加存储等持久化存储中。支持的外部存储HDFShadoop分布式文件系统S3/MinIO对象存储OSS阿里云对象存储GCSGoogle云存储6.2 S3存储配置示例# Checkpoint目录 state.checkpoints.dir: s3://my-bucket/flink/checkpoints # Savepoint目录 state.savepoints.dir: s3://my-bucket/flink/savepoints # S3访问配置 s3.access-key: your-access-key s3.secret-key: your-secret-key s3.endpoint: http://minio-service:90006.3 使用PersistentVolumeClaim对于RocksDB状态后端可以使用PVC挂载高性能本地磁盘或网络存储。apiVersion:v1kind:PersistentVolumeClaimmetadata:name:flink-rocksdb-storagenamespace:flinkspec:accessModes:-ReadWriteOnceresources:requests:storage:100GistorageClassName:fast-ssd在FlinkDeployment中引用spec:taskManager:volumeClaimTemplates:-metadata:name:rocksdb-storagespec:accessModes:-ReadWriteOnceresources:requests:storage:100GiadditionalVolumes:-name:rocksdb-storagepersistentVolumeClaim:claimName:rocksdb-storageadditionalVolumeMounts:-name:rocksdb-storagemountPath:/data/rocksdb七、生产环境最佳实践7.1 镜像构建最佳实践推荐方式将作业jar包和依赖构建到自定义镜像中。FROM flink:1.17-scala_2.12 # 复制作业jar包 COPY target/my-flink-job-1.0.jar /opt/flink/usrlib/my-job.jar # 复制依赖配置 COPY conf/flink-conf.yaml /opt/flink/conf/ # 设置用户 USER flink构建并推送dockerbuild-tmy-registry/flink-job:1.0.dockerpush my-registry/flink-job:1.07.2 资源配置建议组件CPU内存说明JobManager1-2核2-4GB控制节点资源需求较小TaskManager2-8核4-16GB工作节点根据作业需求调整Slot数--建议与CPU核数1:1或1:27.3 监控与日志如上图所示生产环境Flink on K8s的监控与日志架构监控方案Metrics通过Prometheus Grafana采集Flink指标Logging使用Vector/Fluentd收集日志到ELK/LokiAlerting配置告警规则监控作业健康状态Prometheus集成配置spec:jobManager:metrics:enabled:truereporterPrometheus:enabled:trueport:9249taskManager:metrics:enabled:truereporterPrometheus:enabled:trueport:92497.4 安全性配置RBAC权限控制apiVersion:rbac.authorization.k8s.io/v1kind:Rolemetadata:namespace:flinkname:flink-operatorrules:-apiGroups:[]resources:[pods,services,configmaps]verbs:[get,list,watch,create,update,delete]-apiGroups:[apps]resources:[deployments]verbs:[get,list,watch,create,update,delete]网络策略apiVersion:networking.k8s.io/v1kind:NetworkPolicymetadata:name:flink-network-policynamespace:flinkspec:podSelector:matchLabels:app:flinkpolicyTypes:-Ingress-Egressingress:-from:-namespaceSelector:matchLabels:name:flinkports:-protocol:TCPport:6123八、常见问题与排查8.1 常见问题问题原因解决方案Pod启动失败镜像拉取失败检查镜像名称和拉取策略OOMKilled内存不足调整taskmanager.memory配置Checkpoint失败存储权限不足检查S3/HDFS访问权限HA切换失败ConfigMap/ZK连接问题检查网络连通性和配置作业反压下游处理能力不足增加并行度或优化算子8.2 调试命令# 查看Flink Pod状态kubectl get pods-nflink-lappflink# 查看JobManager日志kubectl logs-nflink deployment/my-flink-app-jobmanager# 查看TaskManager日志kubectl logs-nflink pod/my-flink-app-taskmanager-1-2# 进入Pod调试kubectlexec-it-nflink pod/my-flink-app-taskmanager-1-2 -- /bin/bash# 查看FlinkDeployment状态kubectl get flinkdeployments-nflink-oyaml九、总结本文详细讲解了Flink on Kubernetes的部署方式原生K8s部署通过kubernetes-session.sh和flink run命令直接部署三种模式Session/Per-Job/Application灵活选择Flink Kubernetes Operator通过CRD以声明式方式管理Flink作业推荐的生产环境方案高可用配置支持基于K8s ConfigMap和Zookeeper的HA方案持久化存储Checkpoint/Savepoint必须配置外部存储S3/HDFS等生产实践镜像构建、资源配置、监控日志、安全控制等最佳实践Flink on K8s是云原生时代流处理部署的主流方向掌握其部署和运维技能对于大数据工程师至关重要。如果本文对你有帮助欢迎点赞 收藏 ⭐ 关注 你的支持是我持续创作的动力