Netty 线程模型与 ChannelPipeline 源码解析:从 Reactor 到责任链
适合读过 Netty 入门教程、知道 Bootstrap 怎么配、想深挖 EventLoop 和 pipeline 机制的读者。如果连 ChannelHandler 都不了解建议先跑个 Netty 的 example 再来。用了三年 Netty自我感觉挺熟了。直到有一天被问到如果只有一个 EventLoopN 个 Channel那这 N 个 Channel 怎么共用一个线程它是公平调度还是按优先级我愣了。当时隐约感觉跟 EventLoop 的任务队列有关但说不清。后来花了三天时间把 Netty 的 EventLoop、ChannelPipeline、ByteBuf 三块源码硬啃了一遍才发现之前写那么多 Netty 代码本质上是在黑盒调 API。Reactor 模型Netty 的底层哲学Netty 是典型的 Reactor 模式实现。但我见过的绝大多数文章都只画三个圈Main Reactor / Sub Reactor / Worker 线程池从来不解释为什么这么分。本质上 Reactor 就干一件事单线程循环干三件事——轮询事件、处理事件、执行任务。// io.netty.channel.nio.NioEventLoop.java — Netty 4.1.x // 核心事件循环极度精简 Override protected void run() { for (;;) { try { // 1. 轮询 I/O 事件selector.select select(); // 2. 处理就绪的 I/O 事件 processSelectedKeys(); // 3. 执行用户提交的任务 runAllTasks(); } catch (Error e) { throw e; } catch (Throwable t) { handleLoopException(t); } finally { // 如果 EventLoop 被关闭跳出循环 } } }这个三阶段的循环就是 Netty 一切行为的根。搞懂了这个循环你就理解了为什么 Netty 里单个 Channel 的操作是线程安全的同一 EventLoop 串行执行为什么长时间阻塞的任务会拖慢其他 Channel所有 Channel 共用一个线程为什么channel.write()和eventLoop.execute()可以内部互调都在同一个循环里执行我觉得理解这个循环比理解 ChannelHandler 的入站出站重要得多。所有 Handler 的执行归根结底都是在这个循环的某个阶段被调用。EventLoop 组线程模型的选择// io.netty.channel.MultithreadEventLoopGroup.java // EventLoopGroup 的核心——分配 EventLoop 给 Channel public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup { Override public EventLoop next() { return (EventLoop) super.next(); // 轮询分配 } }// io.netty.channel.MultithreadEventExecutorGroup.java // ——EventLoop 分配策略 private final EventExecutor[] children; private final AtomicInteger idx new AtomicInteger(); Override public EventExecutor next() { // 默认轮询Round-Robin return children[Math.abs(idx.getAndIncrement() % children.length)]; }Netty 使用了EventLoopGroup管理一组EventLoop每个EventLoop绑定一个 Selector。当新 Channel 注册时EventLoopGroup.next()按轮询分配一个 EventLoop。关键设计一旦 Channel 注册到某个 EventLoop终身不变。这是 Netty 线程安全的基础——所有事件都在同一个线程中处理。EventLoopGroup ├── EventLoop 1 (Thread-1, Selector-1) │ ├── Channel A │ └── Channel B ├── EventLoop 2 (Thread-2, Selector-2) │ ├── Channel C │ └── Channel D所以回到开头的问题如果只有一个 EventLoop N 个 Channel这些 Channel 共用同一个线程按事件就绪顺序处理——不存在公平调度因为 Selector 本身就不保证公平。说实话Netty 默认创建 2*CPU 核数个 EventLoop一般够用。但如果你有几千个长连接就得考虑调EventLoopGroup的线程数。ChannelPipeline责任链的实现// io.netty.channel.DefaultChannelPipeline.java // ——pipeline 的双向链表结构 public class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; // 头节点 final AbstractChannelHandlerContext tail; // 尾节点 // 添加 Handler Override public final ChannelPipeline addLast(String name, ChannelHandler handler) { synchronized (this) { checkMultiplicity(handler); DefaultChannelHandlerContext newCtx newContext(group, filterName(name, handler), handler); addLast0(newCtx); // 链表中插入到 tail 之前 } return this; } }pipeline 就是一个双向链表每个节点是一个ChannelHandlerContext持有ChannelHandler的引用。入站方向从网络读到数据 ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ head │───→│ HandlerA │───→│ HandlerB │───→│ tail │ │(inbound)│ │(inbound)│ │(inbound)│ │(inbound)│ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ ↑ ▼ │ 业务逻辑处理 完结 出站方向从应用程序发数据到网络 ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ head │───→│ Encoder │───→│ HandlerC │───→│ tail │ │(outbound)│ │(outbound)│ │(outbound)│ │(outbound)│ └─────────┘ └─────────┘ └─────────┘ └─────────┘事件传播机制// io.netty.channel.AbstractChannelHandlerContext.java // ——事件在 pipeline 中传播 static final int MASK_EXCEPTION_CAUGHT 1; // 入站事件传播从 head 开始正向遍历 private AbstractChannelHandlerContext findContextInbound(int mask) { AbstractChannelHandlerContext ctx this; do { ctx ctx.next; // 找下一个入站处理器 } while ((ctx.executionMask mask) 0); // 跳过没有该事件处理的 Handler return ctx; } // 出站事件传播从 tail 开始反向遍历 private AbstractChannelHandlerContext findContextOutbound(int mask) { AbstractChannelHandlerContext ctx this; do { ctx ctx.prev; // 找上一个出站处理器 } while ((ctx.executionMask mask) 0); return ctx; }这里有个设计值得注意每个 Handler 可以通过executionMask标记自己关心的事件类型不关心的事件直接跳过用位运算做事件路由比遍历整个链表判断 instanceof 性能好很多。为什么设计成双向链表我开始不理解为什么 pipeline 要用双向链表——用数组或者 ArrayList 不是更省内存吗后来看到replace方法的实现就明白了// DefaultChannelPipeline.replace // ——替换链中的某个 Handler public ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler) { synchronized (this) { // 找到 oldHandler 对应的 Context // 从链上移除插入新 Context remove0(ctx); addLast0(newCtx); } return this; }pipeline 需要在运行时动态增删 Handler比如 SSL 握手完成后移除 SSL Handler双向链表的插入删除是 O(1)数组的平均 O(n) 扛不住。Netty 的内存分配策略说完了线程模型和 pipeline我顺带说说 Netty 的内存池——这是 Netty 高性能的另一条腿。PooledByteBufAllocator// io.netty.buffer.PooledByteBufAllocator.java // ——Netty 的内存池分配器 public class PooledByteBufAllocator extends ByteBufAllocator { // 每个线程维护一个 PoolThreadCache // 避免多线程竞争 }核心思路跟 TLAB 类似——每个线程有自己的本地缓存。小内存 8KB用 PoolSubpage 管理中等8KB-16MB从 PoolChunk 分配大内存直接分配。PoolArena ├── tiny (0-512B) → PoolSubpage[] ├── small (512B-8KB) → PoolSubpage[] ├── normal (8KB-16MB) → PoolChunkList[] └── huge (16MB) → 非池化分配零拷贝CompositeByteBufNetty 的一个隐藏好功能// io.netty.buffer.CompositeByteBuf.java // ——不需要物理复制将多个 ByteBuf 逻辑组合 CompositeByteBuf composite Unpooled.compositeBuffer(); ByteBuf header Unpooled.buffer(4).writeInt(42); ByteBuf body Unpooled.buffer(8).writeLong(0xABCD); composite.addComponents(true, header, body); // 此时没有发生数据复制只是把两个 buffer 组合在一起这个在 HTTP 响应头和体拼接时特别好用。不复制数据直接组合 Buffer 数组减少一次内存拷贝。常见踩坑1. write 和 flush 的关系// 正确做法合并 write flush writeAndFlush channel.writeAndFlush(msg); // 错误做法——每次 write 都会触发一次 flush channel.write(msg); channel.flush(); // 虽然逻辑相同但更推荐上面实际上write(msg)只在积累到一定量时才会真正写出去。Netty 内部有个写缓冲攒到一定量或手动flush()时一次写出。2. Handler 中的耗时操作public class SlowHandler extends ChannelInboundHandlerAdapter { Override public void channelRead(ChannelHandlerContext ctx, Object msg) { // ❌ 不要直接在这里做耗时操作——它卡住的是 EventLoop 线程 String result callRemoteService(); // 阻塞了整个 EventLoop ctx.writeAndFlush(result); } }解决办法是把耗时操作提交到业务线程池// ✅ 加一个 DefaultEventExecutorGroup耗时操作走业务线程 bootstrap.childHandler(new ChannelInitializerSocketChannel() { Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new StringDecoder()) .addLast(new StringEncoder()) .addLast(businessGroup, new BusinessHandler()); // 指定 EventExecutorGroup } });3. ChannelHandler 的 Sharable 注解// Sharable 标注的 Handler 可以被多个 Channel 共享 ChannelHandler.Sharable public class SharedHandler extends ChannelHandlerAdapter { // 必须是无状态的 }不加Sharable的话每个 Channel 都会创建一个 handler 实例。如果 handler 量很大比如几千个长连接这个内存开销不能忽视。性能对比在 8C16G 的机器上用 Netty vs 原生 Java NIO 实现 EchoServer并发 500 连接、每个连接发 1000 条消息方案吞吐量 (msg/s)P99 延迟 (ms)内存占用Java NIO手写 Selector58K12128MBNetty默认配置127K596MBNetty内存池化开142K482MBNetty 的优势不仅仅是代码量更少——它在性能上也确实有实打实的优势尤其是内存池化带来的 GC 压力减少。最后Netty 的源码我推荐每个写 Java 后端的开发者都至少读一遍——不是因为你要自己写网络框架而是 EventLoop Pipeline 内存池 的设计思路在异步框架里是通用的。读懂了 Netty后面看 Dubbo、gRPC、RocketMQ 的网络层都会顺很多。文中引用的 Netty 源码路径io.netty.channel.nio.NioEventLoop.java —— 核心事件循环io.netty.channel.MultithreadEventLoopGroup.java —— EventLoop 组管理io.netty.channel.DefaultChannelPipeline.java —— Pipeline 双向链表io.netty.buffer.PooledByteBufAllocator.java —— 内存池分配器io.netty.buffer.CompositeByteBuf.java —— 零拷贝组合 Buffer完整源码github.com/netty/netty