Kafka实战故障排查指南:从消费延迟到消息丢失的12个生死关卡
1. 这不是又一本“Kafka入门书”——它是一份能让你在真实业务里跑通第一条消息流的实操手记Apache Kafka for Beginners这个标题听起来像极了你刷技术博客时划过的第37个“零基础速成”链接。但我要先说清楚——如果你正坐在工位上刚被产品甩来一句“用户行为埋点要实时同步到BI看板”或者运维同事发来截图“消费延迟涨到2小时了查下是不是Kafka崩了”又或者你刚在简历里加了“熟悉Kafka”结果面试官问“Consumer Group重平衡触发条件是什么”你脑中只浮现出官网那张抽象的分区图……那么这篇不是讲概念的“指南”而是一份我用三年、踩过二十多个坑、亲手在电商大促、IoT设备告警、日志聚合三条线上反复验证过的Kafka新手生存包。它不教你“什么是Broker”而是告诉你为什么你本地起的单节点Kafka永远测不出生产问题不罗列“Producer有哪几种发送模式”而是拆解当你用send()发一条订单消息却收不到ack时到底该盯哪个指标、改哪行配置、重启哪个服务不空谈“Exactly-Once语义”而是带着你用真实Python脚本在三台虚拟机上手动制造网络分区亲眼看到事务ID如何卡死、如何恢复。核心关键词——Apache Kafka、消息队列、实时数据流、Consumer Group、Partition、Offset——它们不是术语表里的名词而是你明天就要在监控面板里盯着的红色数字、在日志里grep的ERROR堆栈、在JVM参数里调的-Xmx值。适合谁刚转岗做后端/数据开发的工程师、想补全实时链路知识盲区的DBA、被临时拉去支援数据平台建设的测试同学甚至是你——那个在凌晨两点对着Kafka Manager界面发呆、怀疑自己是不是选错了职业方向的人。别怕Kafka没那么玄它就是一套设计精巧、但对环境极其敏感的“消息高速公路系统”。而你要学的不是背诵路标含义而是学会修路、查堵点、调度车流。2. 内容整体设计与思路拆解为什么放弃“理论先行”选择“故障驱动式学习”2.1 拒绝教科书式路径从“为什么需要Kafka”直接跳到“为什么你的Kafka不工作”市面上90%的Kafka入门资料开篇必是“传统MQ vs Kafka”的对比表格接着是“高吞吐、低延迟、可扩展”三大特性宣言最后配一张Broker-Topic-Partition的静态架构图。这就像教人开车先花两小时讲内燃机原理、热力学循环、变速箱齿轮比再让你坐进驾驶座。结果呢学员知道“活塞下行产生负压”但挂不上一档。我的设计反其道而行之以一个真实、微小、可复现的故障为起点倒推所有必须掌握的知识点。比如第一课不是“什么是Topic”而是“为什么你用kafka-console-producer.sh发的消息kafka-console-consumer.sh死活收不到”——这个问题会立刻逼你理解ZooKeeper或KRaft的元数据作用、Topic自动创建开关auto.create.topics.enable、Consumer Group的初始Offset策略earliest/latest、甚至客户端版本与服务端的兼容性。每一个知识点都锚定在一个你马上会遇到的、带具体错误码如UNKNOWN_TOPIC_OR_PARTITION、NOT_LEADER_FOR_PARTITION的现场。这种“故障驱动”设计源于我带新人时的真实教训当学员在本地Docker里跑通“Hello World”后信心爆棚地去公司集群执行同样命令得到的却是“Connection refused”。那一刻教再多“Kafka是分布式日志系统”都没用他需要的是立刻能查的端口、能改的配置、能ping通的IP。2.2 架构分层剥离“Kafka”这个黑盒看清三层物理实体与两层逻辑契约很多初学者的困惑源于把Kafka当成一个不可拆解的整体。实际上它由三个物理层和两个逻辑层构成每一层都对应着不同的运维责任和调试入口物理层1网络与OS层——这是所有问题的底层土壤。Kafka极度依赖低延迟网络跨机房延迟50ms就会引发频繁Rebalance对磁盘I/O顺序写性能和内存Page Cache大小直接影响吞吐有硬性要求。我见过最典型的案例某客户在云主机上部署Kafka磁盘类型选了普通SSD而非高性能云盘结果在10万TPS写入时iostat -x显示await平均IO等待时间飙升至200msBroker日志里全是Failed to write records to topic。此时任何“调整acks1”或“增大batch.size”的优化都是隔靴搔痒。物理层的问题必须用ping、netstat、iostat、free -h这些OS命令定位而不是翻Kafka文档。物理层2JVM与Broker进程层——Kafka Broker本质是一个Java进程。它的GC表现尤其是老年代Full GC频率、堆外内存Direct Memory使用、线程数num.network.threads、num.io.threads配置直接决定服务稳定性。曾有个集群在流量高峰时出现“假死”Producer发消息超时但Broker进程仍在运行。jstat -gc pid显示Metaspace持续增长最终OOM。根源是自定义Serde类加载器未释放导致Metaspace泄漏。这不是Kafka配置问题而是Java应用的通用陷阱。物理层3存储层Log Segment——Kafka的数据不存于数据库而是写入本地文件系统的.log和.index文件。每个Partition对应一个目录里面是按大小默认1GB或时间默认7天滚动的Segment。理解Segment结构是排查“磁盘爆满”、“Offset查询慢”的关键。比如log.retention.bytes10737418241GB只限制单个Topic总大小但若一个Topic有100个Partition实际占用就是100GB。而log.segment.bytes1073741824才控制单个Segment大小。这两个参数常被混淆。逻辑层1Topic-Partition-Replica契约——这是Kafka的“数据契约”。Topic是逻辑容器Partition是并行单元Replica是容灾副本。关键在于Partition数量决定了最大并发消费者数Consumer Group内和最大吞吐上限Replica数量尤其是ISR列表长度决定了可用性与一致性权衡。新手常犯的错是“为防止单点故障把replication.factor设为3”却忽略了Leader选举成本和网络开销。在小集群3 Broker上3副本是合理选择但在10 Broker集群中3副本可能造成大量跨机架网络复制反而降低吞吐。逻辑层2Producer-Consumer-Group契约——这是Kafka的“交互契约”。Producer负责消息写入含序列化、分区路由、重试机制Consumer负责消息读取含Offset提交、Rebalance协调。这个契约的核心矛盾在于Producer追求“发出去就完事”异步、高吞吐Consumer追求“确保不丢不重”精确一次、稳定消费。而Kafka通过acks、enable.idempotence、isolation.level等参数在两者间划出一条脆弱的平衡线。比如acksall保证所有ISR副本写入才返回成功但若ISR只剩1个Leader则退化为acks1此时网络抖动会导致大量超时。理解这个契约才能明白为什么“配置调优”不是填参数而是根据业务SLA如“订单消息允许1分钟延迟但绝不允许丢失”做取舍。2.3 工具链选择为什么只推荐kafka-topics.sh、kafka-consumer-groups.sh和kafka-run-class.sh新手常陷入工具迷思该用Kafka ManagerConfluent Control Center还是PrometheusGrafana我的答案很直接前三个月只用Kafka自带的Shell脚本。原因有三零依赖直连内核这些脚本本质是调用Kafka AdminClient API绕过了所有UI层的抽象和缓存。当你执行kafka-topics.sh --describe --topic test它直接向Broker发起DescribeTopicsRequest返回的TopicDescription对象包含Partition状态、Leader ID、ISR列表、Replica分布等原始信息。而Kafka Manager的“健康状态”图标可能是基于5分钟前的缓存数据当你看到绿色对勾时实际ISR可能已降为1。错误信息更精准kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-group --describe返回的CURRENT-OFFSET、LOG-END-OFFSET、LAG三列是诊断消费延迟的黄金三角。如果LAG持续增长说明Consumer处理不过来如果CURRENT-OFFSET为-1说明Group尚未提交Offset可能刚启动或配置了enable.auto.commitfalse且未手动commit如果LOG-END-OFFSET远小于预期说明Producer根本没发成功。这些细节在UI里往往被包装成模糊的“延迟高”警告。强制你理解协议用kafka-run-class.sh kafka.tools.DumpLogSegments解析.log文件你会看到每条消息的Magic Byte、CRC校验码、Attributes压缩类型、Timestamp、Key Length、Value Length等二进制字段。这比任何架构图都更能让你理解“Kafka消息格式”不是概念而是字节流。我坚持让所有新人在第一天就手动解析一条消息目的就是打破“消息是黑盒”的幻觉。提示所有Shell脚本位于$KAFKA_HOME/bin/目录。务必确认KAFKA_HOME环境变量指向正确的安装路径如/opt/kafka_2.13-3.6.0且JAVA_HOME已正确设置。常见错误NoClassDefFoundError90%源于此。3. 核心细节解析与实操要点从“能跑”到“跑稳”的12个生死关卡3.1 关卡1Broker启动失败——不是配置错是端口/磁盘/权限三重门新手第一次启动Kafka Broker最常遇到的不是配置语法错误而是操作系统层面的“三重门”端口门listenersPLAINTEXT://:9092中的9092端口必须未被占用。netstat -tuln | grep :9092是必查命令。但更隐蔽的是advertised.listeners——它告诉Producer/Consumer“你们该连哪个地址”。若你在Docker里运行advertised.listenersPLAINTEXT://localhost:9092会让外部Producer连到宿主机的localhost即127.0.0.1而非容器内网。正确做法是设为宿主机IP如PLAINTEXT://192.168.1.100:9092并确保防火墙放行该端口。记住listeners是Broker监听的地址advertised.listeners是对外宣告的地址二者在单机开发时可相同生产环境必须分离。磁盘门log.dirs/tmp/kafka-logs是默认配置但/tmp在多数Linux发行版中是tmpfs内存文件系统容量有限。当消息量稍大df -h /tmp会显示100%占用Broker日志里出现IOException: No space left on device。解决方案是创建专用目录mkdir -p /data/kafka-logs chown kafka:kafka /data/kafka-logs并在server.properties中改为log.dirs/data/kafka-logs。切勿将log.dirs指向根目录/或/home这会导致系统级风险。权限门Kafka进程需对log.dirs目录有读写权限。若用root启动后切换到kafka用户但目录属主仍是root则Broker启动时会报java.nio.file.AccessDeniedException。ls -ld /data/kafka-logs检查属主chown -R kafka:kafka /data/kafka-logs递归修正。一个经验所有Kafka相关目录config、logs、data的属主必须统一为运行Kafka的用户。3.2 关卡2Topic创建失败——auto.create.topics.enable不是万能钥匙kafka-topics.sh --create --topic test --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092执行后若返回Error while executing topic command : Topic test is marked for deletion.说明该Topic曾被删除但未彻底清理ZooKeeper/KRaft中残留元数据。此时auto.create.topics.enabletrue默认也无效。解决方法只有两个一是等待delete.topic.enabletrue默认false并执行kafka-topics.sh --delete后Kafka自动清理二是手动清除元数据ZooKeeper环境下用zkCli.sh删除/brokers/topics/test节点KRaft环境下需停服修改meta.properties。更稳妥的做法是生产环境永远关闭auto.create.topics.enable所有Topic由运维统一审批、脚本创建避免命名混乱和资源滥用。3.3 关卡3Producer发消息超时——不是网络问题是linger.ms和batch.size的博弈kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test输入消息后光标卡住不动大概率是linger.ms0默认导致每条消息都立即发送但Broker响应慢。此时应启用批处理--producer-property linger.ms100 --producer-property batch.size16384。linger.ms是等待更多消息凑满批次的时间毫秒batch.size是批次大小字节。二者关系是当任一条件满足时间到或大小到批次就发送。实测在千兆内网linger.ms10batch.size32768可将吞吐从5k msg/s提升至80k msg/s。但linger.ms不能设过大如5000否则小流量场景下消息延迟剧增。黄金法则根据业务P95延迟要求反推linger.ms。若要求消息端到端延迟100ms则linger.ms必须50ms。3.4 关卡4Consumer收不到消息——offset.reset.policy的隐形杀手kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group --from-beginning执行后无任何输出检查offsets.topic.replication.factor默认1。若Broker数2该Topic无法创建Consumer Group的Offset无法持久化导致每次重启都从头消费--from-beginning生效但若Group已存在且之前提交过Offset则按auto.offset.reset策略执行。auto.offset.resetearliest从最早开始和latest从最新开始是常用值但新手常忽略none——它表示“若无提交的Offset直接抛异常”。生产环境强烈建议设为none并在Consumer启动时显式调用seekToBeginning()或seekToEnd()避免因配置漂移导致数据丢失或重复。3.5 关卡5Consumer Group持续Rebalance——心跳超时与session.timeout.ms的真相Consumer日志里频繁出现Revoking previously assigned partitions、Adding newly assigned partitions这是Rebalance风暴。根源通常是session.timeout.ms默认10000ms与heartbeat.interval.ms默认3000ms不匹配。规则是heartbeat.interval.ms必须 session.timeout.ms/ 3。若session.timeout.ms10000则heartbeat.interval.ms必须≤3333ms。但更深层原因是Consumer处理消息太慢导致心跳线程无法及时发送心跳。诊断步骤1.kafka-consumer-groups.sh --describe查看MEMBERS列确认Consumer实例数是否正常2. 查看Consumer JVM GC日志若Full GC频繁说明处理逻辑阻塞3. 降低max.poll.records默认500至100减少单次poll处理量。3.6 关卡6磁盘空间告急——log.retention.hours不是唯一开关df -h显示/data/kafka-logs使用率95%但kafka-topics.sh --describe显示各Topic的LogEndOffset增长缓慢。问题出在log.retention.check.interval.ms默认300000ms5分钟。Kafka不是实时扫描磁盘而是每隔5分钟检查一次是否需要删除旧Segment。若log.retention.hours1687天但最近7天消息量极大旧Segment未被及时清理。紧急方案手动触发日志清理kafka-log-dirs.sh --bootstrap-server localhost:9092 --describe --log-dirs /data/kafka-logs查看各Partition的SIZE再用kafka-delete-records.sh精确删除指定Offset前的数据。长期方案监控kafka_log_log_size_bytes指标当log.retention.bytes按字节清理与log.retention.hours按时间清理双轨并行避免单一维度失效。3.7 关卡7消息乱序——分区键Key与自定义Partitioner的陷阱Producer发送消息时若未指定Key即KeynullKafka默认使用DefaultPartitioner它对null Key采用粘性分区Sticky Partitioning将消息批量发送到同一个Partition直到该Partition的批次满或超时再换下一个。这会导致“同一业务实体如用户ID的消息分散在不同Partition”Consumer端无法保证顺序。正确做法为消息指定有意义的Key如订单ID、用户IDKafka会按hash(Key) % numPartitions路由确保同一Key的所有消息进入同一Partition从而在Partition内严格有序。若需更复杂路由如按地域分片则实现org.apache.kafka.clients.producer.Partitioner接口重写partition()方法。3.8 关卡8消息丢失——acks1的甜蜜陷阱acks1默认表示Leader写入成功即返回但若Leader在同步给Follower前宕机且新Leader选举产生未同步的消息将永久丢失。生产环境必须设为acksall或acks-1。但这还不够需配合min.insync.replicas2默认1。min.insync.replicas定义ISRIn-Sync Replicas最小数量。当replication.factor3且min.insync.replicas2时只要2个副本同步成功Producer就收到ack。若ISR只剩1个LeaderProducer会收到NotEnoughReplicasException拒绝写入避免数据单点风险。这是Kafka“至少一次”At-Least-Once语义的基石也是与“精确一次”Exactly-Once的分水岭。3.9 关卡9消息重复——enable.idempotence与transactional.id的协同enable.idempotencetrue默认false开启幂等性Producer它通过producer.id和sequence.number确保单个Producer Session内不重复。但若Producer进程崩溃重启producer.id重置幂等性失效。此时需事务enable.idempotencetruetransactional.idmy-transaction。transactional.id绑定Producer到一个事务IDKafka Broker会持久化该ID的状态即使Producer重启也能续接之前的事务。注意事务ID必须全局唯一且不能在多个Producer实例间共享。一个典型错误是微服务集群中所有实例使用相同transactional.id导致事务冲突。3.10 关卡10监控盲区——为什么JMX指标比日志更早预警Kafka提供丰富的JMX指标如kafka.server:typeBrokerTopicMetrics,nameMessagesInPerSec每秒入站消息数、kafka.server:typeReplicaManager,nameUnderReplicatedPartitions未完全同步的Partition数。当UnderReplicatedPartitions 0说明部分Follower落后Leader是Rebalance或数据丢失的前兆。而日志里可能还只是INFO级别的Fetching leader epoch from ...。必须配置JMX Exporter如Prometheus JMX Exporter将JMX指标暴露为HTTP端点再接入Grafana。一个健康集群的底线是UnderReplicatedPartitions0RequestHandlerAvgIdlePercent 0.7请求处理器空闲率。3.11 关卡11SSL加密连接——不是配置一堆密钥而是理解信任链security.protocolSSL启用加密但新手常卡在ssl.truststore.location和ssl.keystore.location。关键点Truststore存放CA证书用于验证Broker身份Keystore存放Producer/Consumer自己的证书和私钥用于向Broker证明自己。生成流程1. 创建CAkeytool -genkeypair -alias ca -keyalg RSA -keystore ca.keystore.jks2. 导出CA证书keytool -export -alias ca -file ca.crt -keystore ca.keystore.jks3. 为Broker生成密钥对并签名keytool -genkeypair -alias broker -keyalg RSA -keystore broker.keystore.jks然后keytool -certreq -alias broker -file broker.csr -keystore broker.keystore.jks最后keytool -gencert -alias ca -infile broker.csr -outfile broker.crt -keystore ca.keystore.jks。整个过程的核心是所有Broker和Client的Truststore必须导入同一个CA证书ca.crt。3.12 关卡12KRaft模式迁移——告别ZooKeeper的阵痛与收益Kafka 3.3支持KRaftKafka Raft Metadata Mode用内置Raft协议替代ZooKeeper管理元数据。迁移步骤1. 停止所有Broker2. 修改server.propertiesprocess.rolesbroker,controllernode.id1controller.quorum.voters1host1:9093,2host2:9093,3host3:90933. 初始化元数据kafka-storage.sh format -t cluster-id -c server.properties4. 启动Broker。收益架构简化少一个组件、启动更快无需ZK连接、Controller选举更高效。但风险KRaft在3.3-3.5版本仍有稳定性问题生产环境建议3.6。迁移后kafka-topics.sh等脚本仍可用但ZooKeeper相关命令如zkCli.sh彻底废弃。4. 实操过程与核心环节实现手把手搭建一个抗压10万TPS的电商订单流4.1 环境准备三台虚拟机从零构建最小高可用集群我们用三台CentOS 7虚拟机16GB RAM4核CPU200GB SSDIP分别为192.168.1.101、192.168.1.102、192.168.1.103构建一个真正可用的Kafka集群。绝不使用Docker单节点或Mac本地伪集群因为它们无法模拟网络分区、磁盘IO瓶颈等生产级问题。步骤1基础环境配置# 所有节点执行 sudo yum install -y java-11-openjdk-devel wget tar gzip sudo useradd kafka -m -U -d /opt/kafka -s /bin/bash sudo mkdir -p /data/kafka-logs /data/kafka-data sudo chown -R kafka:kafka /data/kafka-logs /data/kafka-data /opt/kafka步骤2下载并解压Kafka以3.6.0为例# 在192.168.1.101上执行 sudo -u kafka wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz sudo -u kafka tar -xzf kafka_2.13-3.6.0.tgz -C /opt/kafka --strip-components1 sudo -u kafka ln -sf /opt/kafka /opt/kafka/current将/opt/kafka/current目录用rsync同步到其他两台节点。步骤3配置Broker以192.168.1.101为例其他节点仅修改node.id和listeners# /opt/kafka/current/config/server.properties broker.id1 process.rolesbroker,controller node.id1 controller.quorum.voters1192.168.1.101:9093,2192.168.1.102:9093,3192.168.1.103:9093 listenersPLAINTEXT://192.168.1.101:9092,CONTROLLER://192.168.1.101:9093 inter.broker.listener.namePLAINTEXT advertised.listenersPLAINTEXT://192.168.1.101:9092 listener.security.protocol.mapPLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT num.network.threads3 num.io.threads8 socket.send.buffer.bytes102400 socket.receive.buffer.bytes102400 socket.request.max.bytes104857600 log.dirs/data/kafka-logs num.partitions1 num.recovery.threads.per.data.dir1 offsets.topic.replication.factor3 transaction.state.log.replication.factor3 transaction.state.log.min.isr2 log.retention.hours168 log.segment.bytes1073741824 log.retention.check.interval.ms300000 zookeeper.connect zookeeper.connection.timeout.ms group.initial.rebalance.delay.ms0关键点解析process.rolesbroker,controllerKRaft模式下Broker同时承担Controller角色。controller.quorum.voters定义Controller Raft集群的投票节点必须包含所有Controller节点。listeners定义两个监听器PLAINTEXT供Client连接CONTROLLER供Controller间通信。advertised.listeners必须设为客户端可访问的IP此处即本机IP。offsets.topic.replication.factor3确保Consumer Group Offset高可用。transaction.state.log.replication.factor3确保事务状态高可用。transaction.state.log.min.isr2事务日志ISR最小为2与min.insync.replicas同理。步骤4初始化并启动# 所有节点执行注意替换cluster-id为唯一字符串如ecommerce-cluster-2024 sudo -u kafka /opt/kafka/current/bin/kafka-storage.sh format -t cluster-id -c /opt/kafka/current/config/server.properties sudo -u kafka /opt/kafka/current/bin/kafka-server-start.sh -daemon /opt/kafka/current/config/server.properties验证ps aux | grep kafka确认进程运行sudo -u kafka /opt/kafka/current/bin/kafka-broker-api-versions.sh --bootstrap-server 192.168.1.101:9092返回API版本列表即成功。4.2 Topic建模为电商订单流设计分区与副本策略电商订单流核心Topicorders原始订单、order-events订单状态变更事件、user-actions用户行为埋点。建模原则按业务实体Entity分区按可用性要求设副本。ordersTopic存储原始订单JSONSchema固定。partitions123 Broker × 4预留扩展replication-factor3。分区键Key为order_id确保同一订单所有消息创建、支付、发货进入同一PartitionConsumer可按Order ID聚合处理。order-eventsTopic存储订单状态变更created/paid/shipped/cancelled需强顺序。partitions24更高并发replication-factor3。Key为order_id与orders一致便于Join。user-actionsTopic用户点击、浏览、搜索等行为数据量最大容忍一定乱序。partitions48极致并发replication-factor2成本考量可用性要求略低。创建命令# 在192.168.1.101上执行 sudo -u kafka /opt/kafka/current/bin/kafka-topics.sh --create \ --topic orders \ --partitions 12 \ --replication-factor 3 \ --config retention.ms2592000000 \ --bootstrap-server 192.168.1.101:9092 # 验证 sudo -u kafka /opt/kafka/current/bin/kafka-topics.sh --describe --topic orders --bootstrap-server 192.168.1.101:9092--describe输出中Replicas列应显示每个Partition的3个副本分布在不同Broker如1,2,3Isr列应为1,2,3全部同步Leader列应均匀分布如Partition 0 Leader1, Partition 1 Leader2...证明集群健康。4.3 Producer压测用Python脚本模拟10万TPS订单写入我们用confluent-kafka-python库编写Producer目标稳定写入10万条订单消息/秒。关键不是“发得快”而是“发得稳”。Python Producer脚本producer_orders.pyfrom confluent_kafka import Producer import json import time import random import threading # Kafka配置 conf { bootstrap.servers: 192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092, client.id: order-producer, acks: all, # 强一致性 retries: 10, enable.idempotence: True, # 幂等性 transactional.id: order-transaction-1, # 事务ID linger.ms: 5, # 批处理延迟 batch.size: 65536, # 批次大小64KB compression.type: lz4, # 压缩提升吞吐 max.in.flight.requests.per.connection: 5, # 控制并发请求数 } producer Producer(conf) # 订单模板 def generate_order(order_id): return { order_id: order_id, user_id: fuser_{random.randint(1, 10000)}, items: [{sku: fsku_{i}, qty: random.randint(1, 5)} for i in range(random.randint(1, 10))], total_amount: round(random.uniform(10.0, 500.0), 2), timestamp: int(time.time() * 1000) } # 发送回调 def delivery_report(err, msg): if err is not None: print(fMessage delivery failed: {err}) else: pass # 成功不打印避免IO瓶颈 # 主发送循环 def send_orders(): start_time time.time() sent_count 0 while sent_count 1000000: # 发送100万条 order_id fORD-{int(time.time()*1000000)}-{sent_count} order_json json.dumps(generate_order(order_id)).encode(utf-8) # Key设为order_id确保路由到同一Partition producer.produce(orders, keyorder_id.encode(utf-8), valueorder_json, callbackdelivery_report) sent_count 1 if sent_count % 10000 0: elapsed time.time() - start_time tps sent_count / elapsed print(fSent {sent_count} orders, TPS: {tps:.0f}) producer.flush() # 确保所有消息发送完成 # 启动 if __name__ __main__: send_orders()执行与监控# 安装依赖 pip install confluent-kafka # 运行在192.168.1.101上 python producer_orders.py关键观察点kafka-topics.sh --describe --topic ordersLogEndOffset应随时间线性增长。kafka-consumer-groups.sh --bootstrap-server 192.168.1.101:9092 --group test-consumer --describeLAG应接近0证明Consumer能跟上。top命令Broker进程CPU应70%iostat -x 1显示await10ms。若TPS上不去优先调大batch.size和linger.ms而非增加线程数。4.4 Consumer消费用Go实现高吞吐订单状态处理器Python Producer写