TDengine 数据订阅 — Topic 与 Consumer 基础
分类6.数据订阅 TMQ |篇章01 Topic 与 Consumer适用版本TDengine v3.xv3.3.x / v3.4.xTDengine 内置的 TMQTime-series Message Queue让数据库既是存储也是消息总线。应用通过 Topic 订阅数据变化无需额外部署 Kafka 即可获得数据库 消息队列双重能力。核心概念速查表概念说明Topic订阅主题基于 SQL 查询定义Consumer消费者实例Consumer Group消费组同组共享分区Offset消费位点VGroup 内序号Subscribe订阅一个或多个 TopicCommit提交位点标记已消费Rebalance消费组成员变化时重新分配分区详细解析1. Topic 的三种类型① 列订阅最常用 CREATE TOPIC topic_meters AS SELECT ts, current, voltage FROM meters; - 订阅指定列的所有写入 - 可附加 WHERE 过滤 - 可订阅子集 ② 超级表订阅 CREATE TOPIC topic_all AS STABLE meters; - 订阅整张超级表所有列 - 包含所有子表的变化 ③ 数据库订阅 CREATE TOPIC topic_db AS DATABASE test; - 订阅数据库内所有表的变化 - 用于全库同步场景2. Consumer 模型Consumer 抽象 Consumer Group: etl_workers Consumer-1 ─┐ Consumer-2 ─┼─→ 共同消费 Topic Consumer-3 ─┘ 特点 - 同组消费者分摊 VGroup 分区 - 每个 VGroup 同时只能被组内一个 Consumer 消费 - 跨组互不影响广播模式 分区分配 Topic 在 N 个 VGroup 上有数据 Consumer 组有 M 个成员 分配每个 Consumer 拿 N/M 个 VGroup 如果 M N多余的 Consumer 空转 如果 M N每个 Consumer 处理多个 VGroup3. 订阅生命周期Consumer 完整生命周期 ① 配置 Consumer 参数 - group.id必须 - auto.offset.resetearliest/latest - enable.auto.commit - msg.with.table.name ② 创建 Consumer 实例 ③ Subscribe 一个或多个 Topic ④ Poll 循环 - 拉取消息 - 处理业务逻辑 - Commit Offset ⑤ Unsubscribe可选 ⑥ Close Consumer 关键参数 group.id // 消费组 ID auto.offset.reset // earliest / latest / none enable.auto.commit // true / false auto.commit.interval.ms msg.with.table.name // 消息中包含表名4. Offset 与位点管理Offset 概念 每个 VGroup 维护一个递增序号 Consumer 消费后 Commit Offset 下次启动从 Committed Offset 1 继续 位点策略 auto.offset.reset earliest → 首次订阅从最早位置开始 auto.offset.reset latest → 首次订阅从最新位置开始之前的数据跳过 auto.offset.reset none → 没有位点则报错 自动 vs 手动 Commit 自动定期自动 Commit简单但可能丢/重复 手动业务确认后 Commit精确控制5. Rebalance 机制Rebalance 触发场景 - 新 Consumer 加入组 - 现有 Consumer 离开崩溃/关闭 - Topic 分区变化新增 VGroup Rebalance 过程 ① 协调者检测变化 ② 通知所有 Consumer 暂停消费 ③ 重新分配分区 ④ 各 Consumer 获取新的 VGroup 列表 ⑤ 从 Committed Offset 继续消费 对应用的影响 - 短暂停顿毫秒~秒级 - 未 Commit 的消息可能被另一 Consumer 重新处理 - 建议设计为幂等消费6. 消息内容消费到的消息结构 Message { topic: topic_meters, vgroup_id: 3, offset: 12345, table_name: d001, // 若启用 msg.with.table.name columns: [ts, current, voltage], rows: [ [T1, 25.3, 220], [T2, 25.5, 221], ... ] } 消息特点 - 一次 Poll 可返回多行批量 - 同一消息内可能来自不同子表 - 已写入 WAL 的数据才可订阅 - 时间顺序按写入顺序非业务 ts7. 与 Kafka 的对比特性TDengine TMQKafka数据来源数据库写入直接产生应用显式 produce数据保留与数据库共用存储独立 Topic 文件过滤能力Topic SQL 内置过滤应用层过滤Schema强类型字节流部署数据库内置无额外组件独立集群适用时序数据 ETL/CDC通用消息8. 应用场景① 数据 ETL 订阅原始数据 → 清洗 → 写入下游 ② 实时计算 订阅指标数据 → 聚合 → 报警 ③ 数据同步 订阅整库 → 同步到异地灾备 ④ 数据归档 订阅冷数据 → 备份到对象存储 ⑤ 流计算上游 流计算引擎订阅 Topic 作为输入源代码示例Python 消费者fromtaos.tmqimportConsumer consumerConsumer({group.id:etl_group,auto.offset.reset:earliest,td.connect.user:root,td.connect.pass:taosdata,enable.auto.commit:false,})consumer.subscribe([topic_meters])try:whileTrue:msgconsumer.poll(timeout1.0)ifmsgisNone:continueforblockinmsg:forrowinblock:print(row)consumer.commit()finally:consumer.close()Java 消费者PropertiespropsnewProperties();props.setProperty(TMQConstants.BOOTSTRAP_SERVERS,127.0.0.1:6030);props.setProperty(TMQConstants.GROUP_ID,etl_group);props.setProperty(TMQConstants.ENABLE_AUTO_COMMIT,false);try(TaosConsumerMeterconsumernewTaosConsumer(props)){consumer.subscribe(Collections.singletonList(topic_meters));while(running){ConsumerRecordsMeterrecordsconsumer.poll(Duration.ofMillis(1000));for(ConsumerRecordMeterr:records){process(r.value());}consumer.commitSync();}}管理 Topic-- 创建CREATETOPIC topic_metersASSELECTts,currentFROMmeters;-- 查看SHOWTOPICS;-- 查看 Consumer Group 状态SELECT*FROMinformation_schema.ins_subscriptions;-- 删除DROPTOPIC topic_meters;性能考量消费性能因素影响单 Consumer 拉取批量大批量提升吞吐并发 Consumer 数与 VGroup 数匹配最优处理逻辑耗时直接限制吞吐Commit 频率频繁 Commit 影响性能配置建议场景配置低延迟实时poll 短间隔 频繁 commit高吞吐 ETL大批量 异步 commit严格一次手动 commit 业务幂等FAQQ1: 订阅历史数据吗可以。设置auto.offset.resetearliest从 WAL 中可保留范围的最早数据开始消费。但 WAL 有保留期限。Q2: 一个 Topic 能多少 Consumer 同时消费同组内最多 VGroup 数。再多的 Consumer 会空闲。跨组不同 group.id则无限制。Q3: 消费失败如何重试不 Commit Offset 即可。下次 Poll 仍能拿到该消息。注意业务必须幂等。Q4: Topic 删除后消费者会怎样Poll 返回错误。需要重新订阅其他 Topic 或关闭 Consumer。Q5: TMQ 与流计算关系流计算本质上是内置消费 计算 写入。可以用 Topic 作为流计算的输入实现外部应用可见的中间数据。参考系统构架篇01-《TDengine 整体架构全景》02-《集群拓扑深度解析》03-《MNode 内部机制深度解析》04-《RPC 通信层深度解析》05-《VNode 生命周期》06-《RAFT 共识协议》07-《端到端的消息流》数据模型01-《数据库创建与参数详解》02-《超级表/子表/普通表》03-《支持数据类型深度解析》04-《TDengine Tag 设计哲学与 Schema 变更机制》05-《TDengine 虚拟表实现原理》存储引擎01-《TDengine 存储引擎概览》02-《TDengine MemTable 深度解析》03-《TDengine WAL 预写日志机制》04-《TDengine 数据文件格式》05-《TDengine Commit 与 Flush 机制 》06-《TDengine Compaction 合并策略 》07-《TDengine 数据保留与 TTL》08-《TDengine 压缩编码机制》09-《TDengine Cache 与 Last 查询加速》10-《TDengine 逻辑计划生成》查询引擎01-《TDengine 查询引擎概览》02-《TDengine SQL 解析与词法分析》03-《TDengine 语义分析与 AST 重写》04-《TDengine 逻辑计划生成》05-《TDengine 物理计划生成》06-《TDengine 扫描算子》07-《TDengine 聚合算子》08-《TDengine 聚合算子》09-《TDengine 连接算子》10-《TDengine 排序、填充与投影》11-《TDengine 分布式查询执行》12-《TDengine EXPLAIN 与查询优化》数据写入01-《TDengine SQL INSERT》02-《TDengine 无模式写入》03-《TDengine STMT 写入》04-《TDengine 写入内部流程》05-《TDengine 数据更新删除》关于 TDengineTDengine 专为物联网IoT平台、工业大数据平台设计。其中TDengine TSDB 是一款高性能、分布式的时序数据库Time Series Database同时它还带有内建的缓存、流式计算、数据订阅等系统功能TDengine IDMP 是一款AI原生工业数据管理平台它通过树状层次结构建立数据目录对数据进行标准化、情景化并通过 AI 提供实时分析、可视化、事件管理与报警等功能。