Flink 实时数仓开发实战:Catalog 快照,让 DDL 只写一次
前两篇文章我们解决了两件事怎么跑像 Hive 那样用 Flink SQL怎么管像后端那样 CI/CD。传统的 Flink SQL DDL 散落在每个 SQL 脚本里。一个实时数仓有几十张表每张表的CREATE TABLE出现在引用它的每个脚本中。改一个字段类型所有脚本全要改。这也是为什么需要一个Flink SQL 元数据中心。本文介绍的Catalog 快照解决的就是这个问题把 DDL 从 SQL 脚本中剥离序列化为一个自包含的 JSON 文件作业启动时自动恢复到内存 Catalog。实现DDL 写一次所有作业共用。本文将深入 Catalog 快照的设计思路和底层原理 ——为什么必须是快照、如何在 Flink 现有 API 基础上实现 DDL 语义的完全兼容。本文基于 flink-sql-bootstrap 搭建如果你不想直接用该项目文中也介绍了内部原理能够指引你如何自建。为什么是快照先看快照长什么样。这是一个最简版本——只定义两张表两个 UDF{version: 1,snapshotId: 20240622-155500-a1b2,catalogName: platform,databaseName: default,tables: [{database: default,name: ods_words,columns: [{ name: sentence, type: STRING, nullable: true }],options: {connector: datagen,rows-per-second: 1}},{database: default,name: dws_word_count,columns: [{ name: word, type: STRING, nullable: false },{ name: cnt, type: BIGINT, nullable: false }],options: {connector: print}}],views: [],udfs: [{name: my_reverse,className: examples.udf.MyReverseFunction,functionLanguage: JAVA,jarRef: example-udf-reverse.jar},{name: my_substring,className: examples.udf.MySubstringFunction,functionLanguage: JAVA,jarRef: example-udf-substring.jar}]}配合这份快照SQL 脚本里不再需要任何 DDLINSERT INTO dws_word_countSELECT my_reverse(my_substring(word, 0, 2)) AS word, COUNT(*) AS cntFROM ods_wordsCROSS JOIN UNNEST(SPLIT(sentence, )) AS t(word)GROUP BY my_reverse(my_substring(word, 0, 2));可以看到表和 UDF 全部来自 Catalog 快照SQL 脚本里没有任何 DDL只剩业务逻辑。那为什么不直接用 Flink 自带的 CatalogFlink 提供了一些内置的 Catalog但是它们都有一定的劣势GenericInMemoryCatalog是纯内存版本进程重启就消失HiveCatalog可以持久化到 Hive Metastore但需要额外部署 HMS 服务JDBCCatalog同理。这些 Catalog 启动时都要连到某个活的元数据中心拿到的永远是最新的DDL。Catalog 快照设计的初衷是启停幂等性。想象一种场景你的 Flink 作业已经跑了三周因为集群维护需要重启。但就在这三周里上游业务改了 ODS 表的字段类型。如果你重启时去一个活的元数据中心拉取最新 DDL拿到的是新 Schema——和 Checkpoint 里保存的状态对不上轻则启动失败重则静默恢复后产出错误数据。Catalog 快照解决的就是这个问题把部署时刻的 DDL 冻结为一个不可变的 JSON 文件。部署时什么样重启后还是什么样。这也是为什么这份 JSON 应该和 SQL 脚本一起进 Git——部署时刻的完整元数据状态被永远锁定。flink-sql-bootstrap 项目中快照的不变性是约定不是代码强制的仅在Catalog定义中定义了snapshotId。flink-sql-bootstrap 负责把 DDL 从 SQL 脚本中剥离你负责让这份 DDL 在部署周期内不变——URL 锁死版本号、和脚本一起进 Git 即可。比如说你搭建了一个 REST 服务那么资源定义则为https://catalog-server/snapshot/{snapshot-id}。快速开始接下来我们将基于 flink-sql-bootstrap 项目及内置的示例带你体验一下 Catalog 快照的乐趣。你需要从 GitHub Releases 下载 JAR确保${FLINK_HOME}/lib下有flink-sql-gateway-*.jar从${FLINK_HOME}/opt拷贝即可。一个完整的带 Catalog 快照的部署命令$FLINK_HOME/bin/flink run \--target local \flink-sql-bootstrap-${version}.jar \--script-file classpath:example-word-count-advanced.sql \--catalog-file classpath:example-catalog.json \--dependency classpath:example-udf-reverse.jar \--dependency classpath:example-udf-substring.jar其中example-catalog.json就是上文展示的快照example-word-count-advanced.sql的内容就是上文那个只含 DML 的脚本INSERT INTO dws_word_countSELECT my_reverse(my_substring(word, 0, 2)) AS word, COUNT(*) AS cntFROM ods_wordsCROSS JOIN UNNEST(SPLIT(sentence, )) AS t(word)GROUP BY my_reverse(my_substring(word, 0, 2));JAR 内已内置这些示例文件无需额外准备。--catalog-file和--script-file一样支持五种协议classpath:、file://、http(s)://、hdfs://、s3://。--dependency用于加载 UDF 的 JAR 包。执行后输出I[6a, 1]I[00, 1]I[a3, 1]I[8f, 1]UDFmy_reverse和my_substring的组合效果是取每个单词前两个字符再反转。整个过程中 SQL 脚本没有写任何CREATE TABLE或CREATE FUNCTION所有 DDL 来自 Catalog 快照。Flink SQL Bootstrap 是如何做到的在讲 flink-sql-bootstrap 怎么做之前先简单说下 Flink 的 Catalog 是什么。Flink SQL 解析SELECT * FROM orders时orders这张表从哪来答案是CatalogFlink 的元数据注册中心。它存储了当前 Session 中所有可用的表结构、视图定义和 UDF。你在 SQL Client 里CREATE TABLE本质上就是往当前 Catalog 里注册了一条元数据。Catalog 的核心 API 就几个比如createDatabase()、createTable()、createFunction()。flink-sql-bootstrap 做的事也很直接把 JSON 快照翻译成这些元信息然后调用createXXX()API 构建 Catalog。整体架构如下图所示。