从日志到 HBase:用 Flume、Kafka、Flink 串起一条实时数据链路
面向读者刚接触大数据组件的 Java 程序员。配套项目redants-101/hbase-springboot-demo https://github.com/redants-101/hbase-springboot-demo 本文目标在上一篇《低配服务器上 HBase 从部署到 Java 访问完整指南》的基础上把链路继续往前推进完成log - Flume - Kafka - Flink - HBase的完整实验闭环。阅读前先准备这些资料建议先把代码仓库和前置资料打开。本文不会重复展开 HBase、Kafka、Flume 的完整安装过程而是重点讲它们如何组成一条实时链路以及当前项目里的 Flink 代码如何承接这条链路。资料作用GitHub 项目redants-101/hbase-springboot-demo https://github.com/redants-101/hbase-springboot-demo本文对应的示例代码仓库上一篇文章低配服务器上 HBase 从部署到 Java 访问完整指南 https://zhuanlan.zhihu.com/p/2050858006309516584先理解 HBase 部署、远程访问和 Java 客户端Apache Flume 安装配置教程 https://github.com/redants-101/hbase-springboot-demo/blob/main/docs/Apache-Flume-安装配置教程.mdFlume 安装、systemd 管理、Exec Source Kafka Sink 配置Docker 安装 Kafka 单节点教程 https://github.com/redants-101/hbase-springboot-demo/blob/main/docs/Docker安装Kafka单节点教程.mdDocker 单节点 Kafka 部署、topic、生产消费测试Kafka → Flink → HBase 核心代码目录 https://github.com/redants-101/hbase-springboot-demo/tree/main/src/main/java/com/example/hbase/flink当前项目中 Kafka - Flink - HBase 的核心代码如果你还没有跑通过 HBase先读上一篇 HBase 文章。 如果你还没有部署 Kafka 和 Flume先按对应安装资料完成基础环境。 本文默认你已经至少知道HBase 能访问Kafka 能收到 Flume 写入的public-test消息。上一篇文章已经解决了一个问题Java 程序如何连接 HBase并完成建表、写入、查询、扫描和删除。但真实业务里数据通常不是由你手动调用接口写入 HBase 的。更常见的情况是应用不断产生日志 日志先被采集 再进入消息队列 然后由实时计算任务消费、清洗、转换 最后落到存储系统中供查询和分析这就是本文要补上的部分。下面先从整体链路开始再逐步拆到 Flume、Kafka、Flink、HBase 和当前项目代码。1. 这条链路要解决什么问题我们要实现的是/var/log/demo/app.log - Flume - Kafka topic: public-test - Flink - HBase table: flink_public_test你可以先用一句话记住每个组件的职责组件职责在本文中的角色日志文件原始数据来源/var/log/demo/app.logFlume采集日志监听日志文件将新行推到 KafkaKafka缓冲和解耦保存public-testtopic 中的日志消息Flink实时处理持续消费 Kafka并把消息写入 HBaseHBase存储和查询保存原始日志、Kafka 元信息和 JSON 字段Spring Boot 页面实验控制台启动、停止、观察这条链路如果不用 KafkaFlume 可以直接写 HBase。但这样会把采集系统和存储系统耦合得太紧。中间加 Kafka 后链路变成采集归采集 缓冲归缓冲 计算归计算 存储归存储这也是生产环境中常见的数据链路设计思路。2. 先把四个核心概念讲清楚2.1 Flume日志采集器Flume 的结构是Source - Channel - Sink在这条链路里Exec Source - Memory Channel - Kafka Sink配置重点是a1.sources.r1.type exec a1.sources.r1.command tail -F /var/log/demo/app.log a1.sinks.k1.type org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.kafka.topic public-test a1.sinks.k1.kafka.bootstrap.servers 140.143.201.112:9092 a1.sinks.k1.useFlumeEventFormat false其中最容易忽略的是a1.sinks.k1.useFlumeEventFormat false如果不关闭 Flume 事件格式Kafka 消费端可能看到不是你原始写入的那一行文本而是被 Flume 包装过的二进制格式。对本文这种日志一行就是一条 Kafka 消息的实验应该关闭它。2.2 Kafka消息缓冲层Kafka 可以先理解为一个高性能消息队列。本文使用bootstrap server: 140.143.201.112:9092 topic: public-test几个基本概念需要掌握概念含义BrokerKafka 服务节点Topic消息分类本文是public-testPartitionTopic 内部的分区Offset消息在某个 Partition 里的序号Consumer Group消费者组用来记录消费进度Flink 消费 Kafka 时最关键的是group.id。如果两个 Flink Job 使用同一个 group id 消费同一个 topic它们会分摊消息如果希望两个 Job 都完整消费同一份数据必须使用不同的 group id。2.3 Flink实时计算引擎Flink 在本文里做的事很简单从 Kafka 读一条日志 转成 Java 对象 解析 JSON 字段 写入 HBase但它的价值不只是搬运数据。Flink 真正适合做的是持续运行的流处理任务例如实时清洗 实时聚合 异常检测 字段补全 按窗口统计 写入多种下游存储当前项目为了降低实验门槛把 Flink Job 嵌入在 Spring Boot 进程里运行。页面点启动后Spring Boot 在本 JVM 内启动一个 Flink 任务。这适合学习验证不是生产形态。2.4 HBase面向大规模数据的宽表存储HBase 的数据结构可以先记成Table - RowKey - ColumnFamily - Qualifier - Value本文写入Table: flink_public_test ColumnFamily: info每条 Kafka 消息会写成 HBase 的一行。RowKey 规则是topic-partition-offset例如public-test-0-00000000000000000047这个 RowKey 的好处是天然幂等。同一条 Kafka 消息如果因为重启、失败恢复被再次写入它仍然写到同一行HBase 的 Put 会覆盖旧值而不是插入重复行。3. 当前项目中的代码结构项目里和这条链路直接相关的代码在src/main/java/com/example/hbase/flink src/main/resources/static/pipeline.html src/main/resources/application.yml核心文件如下文件作用KafkaToHBasePipelineProperties从application.yml读取 Kafka、HBase、Flink 配置KafkaToHBasePipelineController提供启动、停止、查询状态的 REST 接口KafkaToHBasePipelineService管理 Flink 作业生命周期和页面指标KafkaToHBaseFlinkJob构建 Flink 流图KafkaSource - HBaseSinkKafkaRecordToLogEventDeserializer把 Kafka 原始字节消息转成 Java 对象KafkaToHBaseSinkFunction把每条消息写入 HBaseKafkaLogEventKafka 消息在程序里的中间对象KafkaToHBasePipelineStatus页面状态接口返回的数据结构KafkaToHBasePreviewRow页面展示最近写入记录的数据结构pipeline.html页面控制台你可以按下面的调用链读代码pipeline.html - KafkaToHBasePipelineController - KafkaToHBasePipelineService - KafkaToHBaseFlinkJob - KafkaRecordToLogEventDeserializer - KafkaToHBaseSinkFunction - HBase这个顺序就是一次点击启动并写入数据的真实执行顺序。4. 当前项目如何配置这条链路核心配置在application.ymlpipeline: kafka-to-hbase: bootstrap-servers: ${PIPELINE_KAFKA_BOOTSTRAP_SERVERS:140.143.201.112:9092} topic: ${PIPELINE_KAFKA_TOPIC:public-test} group-id: ${PIPELINE_KAFKA_GROUP_ID:hbase-springboot-demo-flink} start-from-earliest: ${PIPELINE_KAFKA_START_FROM_EARLIEST:true} hbase-table: ${PIPELINE_HBASE_TABLE:flink_public_test} column-family: ${PIPELINE_HBASE_CF:info} parallelism: ${PIPELINE_FLINK_PARALLELISM:1} auto-create-table: ${PIPELINE_AUTO_CREATE_TABLE:true} preview-size: ${PIPELINE_PREVIEW_SIZE:20} checkpoint-interval-ms: ${PIPELINE_CHECKPOINT_INTERVAL:30000} min-pause-between-checkpoints-ms: ${PIPELINE_CHECKPOINT_MIN_PAUSE:15000} checkpoint-timeout-ms: ${PIPELINE_CHECKPOINT_TIMEOUT:600000}初学阶段重点看前六项bootstrap-servers topic group-id start-from-earliest hbase-table column-familystart-from-earliesttrue表示第一次消费时尽量从 topic 最早消息开始读。实验时很方便因为你可以看到历史测试消息。生产环境通常要谨慎设置避免上线后一次性回放大量历史数据。5. 数据是如何从 Kafka 写入 HBase 的Flink Job 中最核心的代码可以概括成env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka public-test) .addSink(new KafkaToHBaseSinkFunction(...));这表示KafkaSource 负责读取 Kafka KafkaToHBaseSinkFunction 负责写 HBaseKafka 中的一条消息进入 Flink 后先经过KafkaRecordToLogEventDeserializerbyte[] value - UTF-8 字符串 - KafkaLogEvent(topic, partition, offset, timestamp, value)然后进入 HBase Sink。Sink 做几件事生成 RowKey。写入原始消息到info:raw。写入 Kafka 元信息到info:kafka_topic、info:kafka_partition、info:kafka_offset、info:kafka_timestamp。尝试解析 JSON。如果 JSON 解析成功把第一层字段展开成 HBase 列。调用table.put(put)写入 HBase。如果写入的是普通文本Hello Flume, this is a test log!HBase 中主要会看到info:raw Hello Flume, this is a test log!如果写入的是 JSON{level:INFO,service:demo-app,message:json test from flume,userId:10001}HBase 中会看到info:raw 原始 JSON 字符串 info:level INFO info:service demo-app info:message json test from flume info:userId 10001这个设计适合入门学习既保留原始数据又能在 HBase 中按字段观察解析结果。6. 如何手动跑通完整链路先确认 Flume 和 Kafka 已经在云服务器上运行。Flume 监听/var/log/demo/app.logKafka topic 是public-test本地启动当前项目mvn spring-boot:run打开页面http://localhost:8080/pipeline.html页面运行效果如下点击启动。状态应变成RUNNING然后在云服务器写一条 JSON 日志echo {\level\:\INFO\,\service\:\demo-app\,\message\:\log to hbase\,\userId\:\10001\,\traceId\:\trace-$(date %s)\,\eventTime\:\$(date %Y-%m-%d %H:%M:%S)\} /var/log/demo/app.log页面应该看到已读取 1 已写入 1 最近写入预览出现 JSON 字段HBase Shell 可以验证scan flink_public_test, {LIMIT 10}如果页面已写入增加但 HBase 查不到优先确认你查的是同一个 HBase 集群和同一张表。7. 生产环境中 Flink 通常怎么使用当前项目的嵌入式 Flink 只适合学习。生产环境不应该让 Spring Boot 长期承载 Flink 作业。生产形态通常是Spring Boot 管理页面 - 提交 Flink Job - 查询 Job 状态 - 停止 Job Flink 集群 - 长期运行 Kafka - HBase 作业也就是说Spring Boot 负责管理 Flink 集群负责计算 Kafka 负责缓冲 HBase 负责存储Flink 集群一般有这些部署方式模式适合场景Standalone学习、单机实验、小规模内部任务YARN已经有 Hadoop/YARN 集群的公司Kubernetes云原生生产环境对这条学习链路建议先从 Standalone 单机集群开始。它比嵌入式 Flink 更接近真实使用方式也比 YARN、Kubernetes 简单。生产环境还需要考虑问题说明Checkpoint 存储不能只放本地临时目录应该放 HDFS、对象存储或可靠共享存储Savepoint用于升级、停止、恢复作业Kafka group id每个独立作业要规划好消费组避免互相抢分区HBase RowKey要根据查询场景设计避免热点幂等写入当前topic-partition-offsetRowKey 可以降低重复写入影响监控告警要关注 Flink 失败重启、Kafka lag、HBase 写入延迟资源隔离多个 Job 不能无限抢同一台机器的 CPU、内存和 slot如果后续把当前项目改成 Standalone Flink 集群建议做三件事抽出一个带main方法的独立 Flink Job。打一个 Flink 专用 fat jar不要直接提交 Spring Boot 可执行 jar。Spring Boot 页面改成调用 Flink REST API 提交、停止、查询 Job。多个 Job 的本质是同一个 Flink jar 多次提交 每次传不同参数例如不同 topic、不同 group id、不同 HBase 表。8. 常见问题排查8.1 Kafka 能看到消息但页面已读取为 0这说明Flume - Kafka 成功 Kafka - Flink 失败优先检查本地或 Flink 所在机器能否访问 140.143.201.112:9092 Kafka advertised.listeners 是否是公网 IP 页面是否已经点击启动 是否在启动后又写入了新日志Kafka 在云服务器本机能消费不代表本地 Spring Boot 或 Flink 集群也能消费。8.2 已读取增加但已写入不增加这说明Flink 已经读到 Kafka 写 HBase 失败优先看页面最近错误 Spring Boot 控制台异常 HBase 表是否存在 列族是否是 info HBase RegionServer 是否可访问8.3 JSON 字段没有展开当前项目只自动展开第一层 JSON 对象。下面这种可以展开{level:INFO,message:hello}多行 JSON 不适合这个链路因为 Flume 通常按行采集。嵌套对象会被当成字符串保存不会递归展开。8.4 第一次启动后突然写入很多数据通常是因为start-from-earliest: true它会从历史消息开始消费。只想消费新数据时改成$env:PIPELINE_KAFKA_START_FROM_EARLIESTfalse mvn spring-boot:run9. 本文和上一篇 HBase 文章的关系上一篇文章《低配服务器上 HBase 从部署到 Java 访问完整指南》解决的是Java 如何连接 HBase Java 如何操作 HBase 表和数据 HBase 远程访问容易踩哪些坑本文解决的是日志如何进入 Kafka Flink 如何消费 Kafka Flink 如何把日志写入 HBase 这条实时链路的代码结构如何理解两篇文章合起来就是先知道 HBase 怎么部署、怎么连、怎么写 再知道实时日志怎么经过 Flume、Kafka、Flink 写到 HBase完整学习路线建议先读 HBase 文章跑通 HBase 和 Java 访问。按 Kafka 安装教程启动单节点 Kafka。按 Flume 教程配置 Exec Source Kafka Sink。启动当前项目页面。从/var/log/demo/app.log写入 JSON 日志。看页面和 HBase Shell 中的写入结果。10. 延伸资料本文开头已经给出项目地址和前置资料。读完正文后建议按下面顺序回看看 GitHub 项目确认代码结构和本文描述一致。看上一篇 HBase 文章补齐 HBase 连接、端口和 Java 客户端知识。看 Kafka 安装资料重点确认advertised.listeners、public-testtopic 和生产消费测试。看 Flume 安装资料重点确认tail -F /var/log/demo/app.log和 Kafka Sink。回到当前项目重点读KafkaToHBaseFlinkJob和KafkaToHBaseSinkFunction。如果你是第一次接触这几个组件建议不要先追求生产级架构。先把下面这条链路跑通echo 写日志 - Flume 采集 - Kafka 能消费 - 页面启动 Flink - HBase scan 能看到数据跑通这条线后再去学习 Standalone Flink 集群、多个 Job、Checkpoint 存储、Savepoint、Kafka lag 监控和 HBase RowKey 设计。这样学习顺序更稳定也更容易定位问题。 写在最后从日志到 HBase 的这条实时链路是大数据入门的一个经典实验。把 Flume、Kafka、Flink 串起来跑通之后再去看生产架构会清晰很多。如果你在搭建过程中遇到问题或者有自己想尝试的链路组合欢迎评论区交流。我会尽量回复。觉得有用的话点个赞、关注一下「程序员之路」后面还会继续更新大数据实战系列。往期推荐低配服务器上 HBase 从部署到 Java 访问完整指南关注「程序员之路」一起搞懂 AI 和大数据实战。