rabbitmq+websocket实时通知
一、业务背景在网约车场景中乘客下单后会进入「等待司机接单」页面。如果只靠 HTTP 轮询每隔几秒查一次订单状态会有这些问题延迟高轮询间隔越大用户感知越慢浪费资源大量无效请求服务器压力大体验差页面需要不断刷新或定时请求更好的方案是服务端主动推送。乘客打开等待页时建立 WebSocket 长连接司机抢单成功后后端立刻把通知推送到乘客浏览器。但这里有个架构问题抢单逻辑在 rpc-order 服务WebSocket 连接在 api-gateway 服务两个服务不在同一进程不能直接调用hub.Push()。中间就需要一个消息队列做解耦——本项目用的是 RabbitMQ。二、整体架构完整链路如下司机抢单成功↓Redis Stream异步落库↓rpc-order 更新 MySQL↓发布 RabbitMQ 消息OrderGrabbedEvent↓api-gateway 消费消息↓WebSocket Hub 按 userId 精准推送↓乘客浏览器收到「司机已接单」通知三个角色分工rpc-order抢单、落库、发布 MQ 事件RabbitMQ跨服务传递「接单通知」api-gateway维护 WebSocket 连接消费 MQ 并推送三、RabbitMQDirect 路由模式3.1 为什么选 Direct 路由模式RabbitMQ 有多种交换机类型fanout、topic、direct 等。本项目用的是 Direct 路由模式生产者发消息时带上 routing key交换机按 key 精确匹配把消息路由到对应队列适合「一种事件类型 → 一个消费队列」的场景3.2 拓扑结构生产者(rpc-order)──Publish──► Exchange: order.notify.exchange│ routing key: order.grabbed▼Queue: order.notify.queue│消费者(api-gateway) ◄──Consume──┘对应常量交换机order.notify.exchange路由键order.grabbed队列order.notify.queue3.3 事件结构消息体是一个 JSON 对象主要字段event固定为driver_accepted前端用来判断消息类型user_id乘客 IDGateway 用它查找 WebSocket 连接order_no订单号driver_name司机姓名car_number车牌号car_type车型rating司机评分前端拿到这些字段可以直接渲染不需要再调接口。3.4 发布时机发布放在 MySQL 落库成功之后保证「通知出去时数据库已经是已接单状态」。调用链GrabOrder(Lua 抢单)→ Redis Stream→ OrderGrabbedConsumer 消费→ updateOrderGrabbed(MySQL)→ publishOrderGrabbedNotify()→ rmq.PublishOrderGrabbed()设计要点从 MySQL 查订单和司机信息不依赖 Redis 缓存抢单后缓存已被删除发布失败只打日志不影响 Stream 消费 ACK通知与落库解耦3.5 Confirm 模式发送端开启了 Publisher Confirm消息发出后等待 Broker 的 ACK确保 RabbitMQ 确实收到了避免「以为发出去了实际丢了」四、WebSocketHub 连接管理4.1 为什么用 Hub 模式WebSocket 连接散落在各个 HTTP 请求里需要一个中心来统一管理。本项目用的是经典的 Hub 模式数据结构map[userId] → WebSocket 连接线程安全RWMutex保护 map单连接写操作用Mutex保护职责注册连接、移除连接、按 userId 推送消息4.2 连接建立流程路由GET /ws/order?tokenxxx流程从 URL 参数或 Header 取 JWT token解析 token 得到userIdHTTP Upgrade 为 WebSockethub.Register(userId, conn)注册连接阻塞ReadMessage保持连接断开时hub.Unregister为什么 token 放 query 参数浏览器 WebSocket 无法自定义 Header所以 token 只能放 URL 里。4.3 心跳保活服务端每 30 秒发 Ping 帧60 秒无响应则断开连接防止「僵尸连接」占用 Hub 内存4.4 重复连接处理同一userId重复连接时关闭旧连接只保留最新一条。避免多端登录导致重复推送。4.5 Unregister 的坑移除连接时要校验 conn 指针是否匹配避免误删新连接旧连接断开 → 触发 Unregister但此时用户可能已经建立了新连接如果不校验指针会把新连接也删掉五、MQ 消费者 → WebSocket 推送api-gateway 启动时在 HTTP 服务之前启动 RabbitMQ 消费者连接 RabbitMQ绑定order.notify.queue收到消息后解析OrderGrabbedEvent调用hub.Push(userId, body)推送给在线乘客手动 Ack 确认消费推送逻辑乘客在线 → 写入 WebSocket → 返回 true乘客离线 → 返回 falseMVP 方案直接 Ack不做离线补偿当前是 单实例 Gateway MVP 方案Hub 存在内存里多实例部署需要额外方案如 Redis Pub/Sub 广播到各实例 Hub。六、完整时序乘客 api-gateway RabbitMQ rpc-order│ │ │ ││── GET /ws/order ────────►│ │ ││◄── WebSocket 建立 ───────│ │ ││ │ hub.Register(userId) │ ││ │ │ ││ │ │ │◄── 司机抢单│ │ │ │── MySQL 落库│ │ │◄── Publish ───────││ │◄── Consume ──────────│ ││ │ hub.Push(userId) │ ││◄── JSON 推送 ────────────│ │ ││ {event: driver_accepted}│ │ │七、关键设计决策7.1 为什么不用 HTTP 轮询WebSocket 是全双工长连接服务端可以主动推。对于「等司机接单」这种实时场景比轮询更合适。7.2 为什么 rpc-order 不直接推 WebSocket微服务职责分离order 服务管订单gateway 管连接order 服务不知道乘客连在哪台 gateway 上MQ 解耦order 只管发事件gateway 只管推连接7.3 离线不补偿乘客没开等待页或已离开MQ 消息仍然 Ack。后续可以扩展离线消息存 Redis / DB乘客下次登录时拉取或配合 App Push 通知7.4 手动 Ack vs 自动 Ack消费者用手动 Ack业务处理成功才确认失败可以 Nack 重新入队JSON 格式错误等不可恢复的错误直接 Ack 丢弃避免无限重试八、今天学到的核心知识点RabbitMQ Direct 路由模式Exchange Routing Key Queue 三件套Publisher Confirm发送端确认 Broker 收到消息WebSocket Hub 模式集中管理连接按 userId 精准推送跨服务实时通知MQ 做桥梁解耦生产者和推送端JWT WebSockettoken 放 query 参数因为浏览器 WS 不能自定义 Header心跳保活Ping/Pong ReadDeadline防止僵尸连接并发安全Hub 用 RWMutex单连接写用 Mutex九、后续可优化方向Gateway 多实例Hub 改 Redis Pub/Sub 或专用推送服务离线消息补偿MQ 消费时落库乘客上线后补推RabbitMQ 连接池当前每次发布建独立连接生产环境应复用生产环境 CheckOrigin当前开发环境允许所有跨域上线需校验 Origin十、总结这套方案的本质是用 RabbitMQ 打通微服务之间的实时通知链路用 WebSocket 打通服务端到浏览器的最后一跳。乘客体验上从「每隔几秒刷新看看有没有司机接单」变成「司机一点接单页面立刻弹出通知」——这就是 RabbitMQ WebSocket 在这个项目里的价值。