Flink SQL联结与集合操作详解
一、前言在上一篇文章中我们学习了Flink SQL的聚合查询、窗口TVF和TopN等核心功能。本文将继续深入Flink SQL的查询能力探讨联结Join操作、集合操作以及查询优化等高级特性。联结查询是SQL中最常用也最复杂的操作之一在流处理场景中更是如此。Flink SQL针对流处理的特点提供了多种联结方式包括常规联结Regular Join、间隔联结Interval Join和维表联结Lookup Join等。理解这些联结方式的原理和适用场景是构建复杂流处理应用的关键。二、联结Join查询2.1 Flink SQL中的Join类型概览Flink SQL中的联结查询大体上可以分为两类SQL原生的联结查询方式和流处理中特有的联结查询。上图对比了Flink SQL中四种Join类型的特点Join类型联结条件多版本构建消费更新流重要考虑因素Regular Join无限制否是性能受probe side状态影响Lookup Join等值条件否是可能非常慢依赖外部系统可用性Temporal Join等值条件完整主键支持是事件时间必须正确定义Interval Join等值条件时间范围否否需要合理定义时间范围2.2 常规联结Regular Join常规联结Regular Join是SQL中原生定义的Join方式是最通用的一类联结操作。它的具体语法与标准SQL的联结完全相同通过关键字JOIN来联结两个表后面用关键字ON来指明联结条件。上图展示了Regular Join与Temporal Join在不同输入类型append/upsert下的输出行为差异。Regular Join包含以下几种Join类型说明输出行为Inner Join等值内联结只有两条流Join到才输出 [L, R]Left Join左外联结左流数据到达后无论有没有Join到右流都输出Right Join右外联结与Left Join逻辑相反Full Join全外联结左流或右流数据到达后无论有没有Join到都输出等值内联结INNER Equi-JOINSELECT*FROMwsINNERJOINws1ONws.idws1.id;目前仅支持等值联结条件。内联结会返回两表中符合联结条件的所有行的组合也就是所谓的笛卡尔积Cartesian product中满足条件的部分。等值外联结OUTER Equi-JOIN-- 左外联结SELECT*FROMwsLEFTJOINws1ONws.idws1.id;-- 右外联结SELECT*FROMwsRIGHTJOINws1ONws.idws1.id;-- 全外联结SELECT*FROMwsFULLOUTERJOINws1ONws.idws.id;Regular Join的注意事项实时Regular Join可以不是等值Join。等值Join和非等值Join区别在于等值Join数据shuffle策略是Hash会按照Join on中的等值条件作为id发往对应的下游非等值Join数据shuffle策略是Global所有数据发往一个并发流的上游是无限的数据Flink会将两条流的所有数据都存储在State中所以Flink任务的State会无限增大因此需要为State配置合适的TTL以防止State过大2.3 间隔联结Interval Join间隔联结Interval Join返回符合约束条件的两条流中数据的笛卡尔积。与常规联结不同间隔联结多了一个时间间隔的限制。上图展示了Interval Join的时间窗口机制对于Orders流中的每个事件在Shipments流中查找落在时间窗口内的匹配事件。语法要点两表的联结不需要用JOIN关键字直接在FROM后将要联结的两表列出来用逗号分隔联结条件用WHERE子句来定义用一个等值表达式描述时间间隔限制在WHERE子句中通过AND追加时间间隔定义方式-- 方式一精确匹配ltimertime-- 方式二范围匹配ltimertimeANDltimertimeINTERVAL10MINUTE-- 方式三BETWEEN语法ltimeBETWEENrtime-INTERVAL10SECONDANDrtimeINTERVAL5SECOND案例SELECT*FROMws,ws1WHEREws.idws1.idANDws.etBETWEENws1.et-INTERVAL2SECONDANDws1.etINTERVAL2SECOND;2.4 维表联结Lookup JoinLookup Join其实就是维表Join实时获取外部缓存的Join。Lookup的意思就是实时查找。上图展示了Lookup Join的工作原理左流Source的数据到达时去外部维表Paimon Table/MySQL等中查找匹配数据通过LRU缓存加速查询减少对外部系统的访问压力将关联后的结果输出到Sink语法表AJOIN维度表名FORSYSTEM_TIMEASOF表A.proc_timeAS别名ONxx.字段别名.字段案例MySQL维表Join-- 创建维表CREATETABLECustomers(idINT,name STRING,country STRING,zip STRING)WITH(connectorjdbc,urljdbc:mysql://hadoop102:3306/customerdb,table-namecustomers);-- 使用Lookup JoinSELECTo.order_id,o.total,c.country,c.zipFROMOrdersASoJOINCustomersFORSYSTEM_TIMEASOFo.proc_timeAScONo.customer_idc.id;Lookup Join的特点仅支持处理时间字段左流事实表每来一条数据都会去外部维表查找维度数据如果维表数据发生变化已经关联过的数据不会自动更新基于处理时间快照三、Order By 和 Limit3.1 Order ByFlink SQL支持Order By但在实时任务中一般用的非常少。实时任务中Order By子句中必须要有时间属性字段并且必须写在最前面且为升序。SELECT*FROMwsORDERBYet,idDESC;3.2 LimitSELECT*FROMwsLIMIT3;四、SQL Hints4.1 什么是SQL Hints在执行查询时可以在表名后面添加SQL Hints来临时修改表属性对当前job生效。上图展示了SQL Hints在查询优化中的作用通过Hints可以影响查询执行计划优化查询性能。4.2 Hints语法select*fromws1/* OPTIONS(rows-per-second10)*/;常用HintsHint说明OPTIONS(rows-per-second10)修改DataGen的生成速率LOOKUP(tablemy_table2, asynctrue)启用异步Lookup Join异步Lookup Join案例-- 同步Lookup默认SELECT/* LOOKUP(tablemy_table2, asyncfalse) */*FROMmy_table1ASt1JOINmy_table2FORSYSTEM_TIMEASOFt1.proctimeASt2ONt1.at2.c;-- 异步Lookup提升吞吐量SELECT/* LOOKUP(tablemy_table2, asynctrue) */*FROMmy_table1ASt1JOINmy_table2FORSYSTEM_TIMEASOFt1.proctimeASt2ONt1.at2.c;五、集合操作5.1 集合操作概述Flink SQL支持标准SQL中的集合操作包括UNION、UNION ALL、INTERSECT、INTERSECT ALL、EXCEPT和EXCEPT ALL。上图通过维恩图直观展示了四种集合操作的结果集关系UNION合并两个集合并去重UNION ALL合并两个集合不去重INTERSECT取两个集合的交集EXCEPT取左集合中不在右集合中的元素5.2 UNION 和 UNION ALL-- UNION合并并去重(SELECTidFROMws)UNION(SELECTidFROMws1);-- UNION ALL合并不去重(SELECTidFROMws)UNIONALL(SELECTidFROMws1);5.3 INTERSECT 和 INTERSECT ALL-- INTERSECT交集并去重(SELECTidFROMws)INTERSECT(SELECTidFROMws1);-- INTERSECT ALL交集不去重(SELECTidFROMws)INTERSECTALL(SELECTidFROMws1);5.4 EXCEPT 和 EXCEPT ALL-- EXCEPT差集并去重(SELECTidFROMws)EXCEPT(SELECTidFROMws1);-- EXCEPT ALL差集不去重(SELECTidFROMws)EXCEPTALL(SELECTidFROMws1);5.5 流式集合操作的特点上述SQL在流式任务中如果一条左流数据先来了没有从右流集合数据中找到对应的数据时会直接输出当右流对应数据后续来了之后会下发回撤流将之前的数据给撤回。这也是一个回撤流。5.6 IN 子查询In子查询的结果集只能有一列SELECTid,vcFROMwsWHEREidIN(SELECTidFROMws1);上述SQL的In子句和之前介绍到的Inner Join类似。并且In子查询也会涉及到大状态问题要注意设置State的TTL。六、总结本文详细讲解了Flink SQL中的联结与集合操作常规联结Regular Join包括Inner/Left/Right/Full Join语法与标准SQL一致但需要注意流式场景下状态无限增长的问题间隔联结Interval Join在等值联结的基础上增加时间间隔限制适合有时间范围关联需求的场景维表联结Lookup Join流与外部存储MySQL/Redis/HBase等的实时关联仅支持处理时间Order By/Limit实时任务中Order By必须包含时间属性字段且放在最前面SQL Hints临时修改表属性常用于优化Lookup Join同步/异步和DataGen参数集合操作UNION/UNION ALL、INTERSECT/INTERSECT ALL、EXCEPT/EXCEPT ALL流式场景下会产生回撤流In子查询结果集只能有一列底层类似于Inner Join需要注意大状态问题理解这些联结和集合操作的原理与适用场景是构建复杂流处理SQL应用的基础。下一篇文章我们将继续深入Flink SQL的Connector与Catalog实战。如果本文对你有帮助欢迎点赞 收藏 ⭐ 关注 你的支持是我持续创作的动力