Kafka Streams 与生态集成 —— 实时流处理实战
一、Kafka Streams 概述1.1 什么是 Kafka StreamsKafka Streams是 Apache Kafka 开源项目的组成部分是一个用于在 Kafka 上构建高可扩展、容错流处理应用的客户端类库。1.2 Kafka Streams 核心特点特性说明功能强大高扩展性、弹性、容错轻量级无需专门集群一个 Jar 包即可运行完全集成100% 兼容 Kafka易于集成现有应用实时性毫秒级延迟非微批处理窗口支持允许乱序数据、迟到数据1.3 为什么选择 Kafka Streams对比维度: Spark Streaming Storm Kafka Streams ───────────────────────────────────────────────────────────────────── 部署复杂度: 高 高 低嵌入应用 资源占用: 高预留内存 高 低不额外占用 与 Kafka 集成: 需专门模块 需 spout 原生支持 状态管理: 外部系统 外部系统 内置Kafka 持久化 延迟: 秒级微批 毫秒级 毫秒级 重新计算: 困难 困难 滚动部署自动重算选择 Kafka Streams 的 6 大理由类库而非框架开发者控制运行方式方便调试部署简单嵌入应用无额外集群要求Kafka 生态已有大部分流式系统已部署 Kafka接入成本低不占用系统资源无 supervisor/node manager 等额外进程数据持久化Kafka 本身持久化支持滚动部署和重新计算动态调整并行度利用 Consumer Rebalance 在线调整二、Kafka Streams 核心概念2.1 Topology拓扑组件作用类比Source从 Kafka Topic 读取数据数据源Processor处理/转换数据算子Sink将结果写入 Kafka Topic数据汇2.2 Stream 与 TableStream流: Table表: ┌─────────────────────────┐ ┌─────────────────────────┐ │ 时间 → 事件序列 │ │ Key → 最新 Value │ │ │ │ │ │ t1: K1-V1 │ │ K1 - V3 (最新) │ │ t2: K2-V2 │ ──▶ │ K2 - V4 (最新) │ │ t3: K1-V3 │ 聚合 │ K3 - V5 (最新) │ │ t4: K2-V4 │ │ │ │ t5: K3-V5 │ │ 有状态的视图 │ └─────────────────────────┘ └─────────────────────────┘ KStream: 每个事件独立处理插入 KTable: 相同 Key 更新Upsert三、Kafka Streams 数据清洗实战3.1 需求分析需求实时处理单词带有前缀的内容过滤前缀后输出。输入 Topic first: 输出 Topic second: ┌─────────────────┐ ┌─────────────────┐ │ helloworld │ │ world │ │ hatguigu │ ──清洗──▶ │ atguigu │ │ hahaha │ │ hahaha │ └─────────────────┘ └─────────────────┘3.2 完整代码实现① 主程序构建 Topologypackagecom.atguigu.kafka.stream;importorg.apache.kafka.streams.KafkaStreams;importorg.apache.kafka.streams.StreamsConfig;importorg.apache.kafka.streams.processor.Processor;importorg.apache.kafka.streams.processor.ProcessorSupplier;importorg.apache.kafka.streams.processor.TopologyBuilder;importjava.util.Properties;publicclassApplication{publicstaticvoidmain(String[]args){// 定义输入/输出 TopicStringfromfirst;Stringtosecond;// 设置参数PropertiessettingsnewProperties();settings.put(StreamsConfig.APPLICATION_ID_CONFIG,logFilter);settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop102:9092);StreamsConfigconfignewStreamsConfig(settings);// 构建拓扑TopologyBuilderbuildernewTopologyBuilder();builder.addSource(SOURCE,from)// 添加数据源.addProcessor(PROCESS,newProcessorSupplierbyte[],byte[](){OverridepublicProcessorbyte[],byte[]get(){// 返回自定义处理器returnnewLogProcessor();}},SOURCE)// 依赖 SOURCE.addSink(SINK,to,PROCESS);// 添加数据汇依赖 PROCESS// 创建并启动 Kafka StreamsKafkaStreamsstreamsnewKafkaStreams(builder,config);streams.start();}}② 业务处理器LogProcessorpackagecom.atguigu.kafka.stream;importorg.apache.kafka.streams.processor.Processor;importorg.apache.kafka.streams.processor.ProcessorContext;publicclassLogProcessorimplementsProcessorbyte[],byte[]{privateProcessorContextcontext;Overridepublicvoidinit(ProcessorContextcontext){this.contextcontext;}Overridepublicvoidprocess(byte[]key,byte[]value){StringinputnewString(value);// 如果包含 则只保留该标记后面的内容if(input.contains()){inputinput.split()[1].trim();}// 输出到下一个 Topiccontext.forward(logProcessor.getBytes(),input.getBytes());}Overridepublicvoidpunctuate(longtimestamp){// 周期性操作如定时刷新状态}Overridepublicvoidclose(){// 资源清理}}3.3 测试验证Step 1启动 Kafka Streams 程序Step 2在 hadoop104 上启动生产者bin/kafka-console-producer.sh\--broker-list hadoop102:9092\--topicfirsthelloworldhatguiguhahahaStep 3在 hadoop103 上启动消费者bin/kafka-console-consumer.sh\--zookeeperhadoop102:2181\--from-beginning\--topicsecond输出结果world atguigu hahaha四、Kafka 与 Flume 集成4.1 定位对比维度FlumeKafka开发公司ClouderaLinkedIn适用场景多个生产者下游消费者少下游消费者众多数据安全一般高支持 ReplicationHadoop 生态原生对接需连接器吞吐量中等极高4.2 经典数据采集架构4.3 Flume 配置写入 Kafka配置文件flume-kafka.conf# Agent 定义 a1.sources r1 a1.sinks k1 a1.channels c1 # Source 配置 a1.sources.r1.type exec a1.sources.r1.command tail -F -c 0 /opt/module/datas/flume.log a1.sources.r1.shell /bin/bash -c # Sink 配置Kafka a1.sinks.k1.type org.apache.flume.sink.kafka.KafkaSink # Kafka 集群地址 a1.sinks.k1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092 # 目标 Topic a1.sinks.k1.kafka.topic first # 批次大小条 a1.sinks.k1.kafka.flumeBatchSize 20 # ACK 级别 a1.sinks.k1.kafka.producer.acks 1 # 发送延迟 a1.sinks.k1.kafka.producer.linger.ms 1 # Channel 配置 a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # 绑定 a1.sources.r1.channels c1 a1.sinks.k1.channel c14.4 启动与验证Step 1启动 Kafka 消费者IDEA 或命令行bin/kafka-console-consumer.sh\--zookeeperhadoop102:2181\--from-beginning\--topicfirstStep 2启动 Flume Agentcd/opt/module/flume bin/flume-ng agent\-cconf/\-na1\-fjobs/flume-kafka.confStep 3向日志文件追加数据echohello kafka from flume/opt/module/datas/flume.logStep 4观察 Kafka 消费者输出hello kafka from flume五、Kafka 配置参数大全5.1 Broker 核心配置参数默认值描述broker.id-Broker 唯一标识log.dirs/tmp/kafka-logs数据存储目录num.partitions1默认分区数default.replication.factor1默认副本数log.retention.hours168日志保留时间小时log.segment.bytes1073741824Segment 文件大小num.network.threads3网络处理线程数num.io.threads8IO 处理线程数socket.send.buffer.bytes102400发送缓冲区socket.receive.buffer.bytes102400接收缓冲区zookeeper.connect-ZK 连接地址delete.topic.enablefalse允许删除 Topicauto.create.topics.enabletrue自动创建 Topicmin.insync.replicas1最小 ISR 副本数unclean.leader.election.enabletrue允许非 ISR 选举 Leader5.2 Producer 核心配置参数默认值描述bootstrap.servers-Kafka 集群地址acks1ACK 级别0/1/allretries0发送失败重试次数batch.size16384批次大小字节linger.ms0发送延迟毫秒buffer.memory33554432缓冲区大小compression.typenone压缩类型key.serializer-Key 序列化器value.serializer-Value 序列化器5.3 Consumer 核心配置参数默认值描述bootstrap.servers-Kafka 集群地址group.id-消费者组 IDenable.auto.committrue自动提交 Offsetauto.commit.interval.ms5000自动提交间隔auto.offset.resetlatest无 Offset 时起始位置key.deserializer-Key 反序列化器value.deserializer-Value 反序列化器max.poll.records500单次最大拉取数session.timeout.ms10000会话超时时间如果本专栏对你有帮助欢迎点赞 收藏 ⭐ 关注 你的支持是我持续创作的动力