整体架构客户端 ↔ 网关 ↔ 长连接服务 (SSE/WebSocket) ↔ 模型调度服务 ↔ LLM 推理服务两种主流长链接方案SSE单向流式AI 对话首选、WebSocket双向交互多轮实时控制下面给出可直接落地的原型代码 架构设计。一、整体架构分层1. 分层职责接入层Nginx / 网关限流、鉴权、协议转发长连接服务层核心原型SSE简单、HTTP 标准不用握手浏览器原生支持适合问答流式输出WebSocket双向收发支持中途中断、参数热更新、文件实时上传任务调度层会话管理会话 ID、用户上下文、token 计数、超时回收任务队列削峰防止 LLM 推理打满心跳保活、断线重连、会话缓存LLM 推理层封装大模型 APIOpenAI/DeepSeek/Qwen/ 本地 vLLM批量推理、流式 token 返回、prompt 组装存储层Redis在线会话、心跳、临时上下文MySQL历史对话、用户额度、模型配置2. 两种协议选型对比维度SSE (Server-Sent Events)WebSocket通信方向服务端单向推送客户端 - 服务端双向底层HTTP1.1无需升级协议HTTP 升级为 WS 协议适用场景AI 问答流式输出只读推送实时工具调用、中途停止、上传图片、多指令交互断线重连原生支持 last-event-id需自行实现重连逻辑后端实现Spring SseEmitterSpring WebSocketHandlerAI 大模型场景推荐优先 SSE 做对话流式复杂交互再上 WebSocket二、原型 1SSE 长链接流式模型服务SpringBoot 最简可运行原型1. 核心依赖 pom.xmldependencies dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-web/artifactId /dependency !-- Redis 会话管理 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-data-redis/artifactId /dependency !-- lombok -- dependency groupIdorg.projectlombok/groupId artifactIdlombok/artifactId optionaltrue/optional /dependency /dependencies2. 会话上下文实体Data public class ChatSession { // 唯一会话ID private String sessionId; // 用户ID private Long userId; // 对话历史上下文 private ListChatMessage messages; // 模型参数 temperature topP private ModelConfig modelConfig; // SSE发射器持有长连接 private SseEmitter emitter; // 创建时间、最后心跳时间 private Long createTime; private Long lastHeartbeat; // 是否结束会话 private boolean finished; } Data public class ChatMessage { private String role; // user / assistant / system private String content; } Data public class ModelConfig { private String modelName; private Float temperature; private Float topP; private Integer maxTokens; }3. SSE 长连接控制器核心入口RestController RequestMapping(/api/chat/sse) Slf4j public class ChatSseController { Autowired private ChatSessionManager sessionManager; Autowired private LlmStreamService llmStreamService; // 建立SSE长连接开始流式对话 GetMapping(value /connect, produces MediaType.TEXT_EVENT_STREAM_VALUE) public SseEmitter connect( RequestParam String sessionId, RequestParam Long userId, RequestParam String prompt ) { // 设置超时时间 300s 长连接 SseEmitter emitter new SseEmitter(300_000L); emitter.completeOnTimeout(true); // 初始化会话 ChatSession session sessionManager.createSession(sessionId, userId, emitter); // 连接异常/关闭回调释放资源、关闭会话 emitter.onCompletion(() - { log.info(会话{} 连接正常关闭, sessionId); sessionManager.closeSession(sessionId); }); emitter.onError((e) - { log.error(会话{} 连接异常, sessionId, e); sessionManager.closeSession(sessionId); }); emitter.onTimeout(() - { log.warn(会话{} 连接超时, sessionId); sessionManager.closeSession(sessionId); }); // 异步调用大模型流式输出 CompletableFuture.runAsync(() - { try { llmStreamService.streamChat(session, prompt); } catch (Exception e) { sendMsg(emitter, error, 模型推理异常 e.getMessage()); emitter.complete(); } }); return emitter; } // 心跳接口前端定时调用保活 PostMapping(/heartbeat) public R? heartbeat(RequestParam String sessionId) { sessionManager.refreshHeartbeat(sessionId); return R.ok(); } // 主动终止对话 PostMapping(/stop) public R? stopChat(RequestParam String sessionId) { sessionManager.stopSession(sessionId); return R.ok(对话已终止); } // 封装SSE消息发送 private void sendMsg(SseEmitter emitter, String event, String data) throws IOException { SseEmitter.SseEventBuilder builder SseEmitter.event() .name(event) .data(data); emitter.send(builder); } }4. 会话管理器Redis 内存双缓存管理长连接Service Slf4j public class ChatSessionManager { // 内存存储在线长连接持有SseEmitter对象无法序列化进Redis private final ConcurrentHashMapString, ChatSession sessionMap new ConcurrentHashMap(); Autowired private StringRedisTemplate redisTemplate; private static final String SESSION_REDIS_KEY chat:session:; // 会话超时阈值 120s 无心跳自动清理 private static final long HEARTBEAT_TIMEOUT 120_000; // 创建会话 public ChatSession createSession(String sessionId, Long userId, SseEmitter emitter) { ChatSession session new ChatSession(); session.setSessionId(sessionId); session.setUserId(userId); session.setEmitter(emitter); session.setCreateTime(System.currentTimeMillis()); session.setLastHeartbeat(System.currentTimeMillis()); session.setFinished(false); session.setMessages(new ArrayList()); sessionMap.put(sessionId, session); // Redis持久化基础会话信息 redisTemplate.opsForValue().set(SESSION_REDIS_KEY sessionId, JSON.toJSONString(session), 10, TimeUnit.MINUTES); return session; } // 更新心跳 public void refreshHeartbeat(String sessionId) { ChatSession session sessionMap.get(sessionId); if (session ! null) { session.setLastHeartbeat(System.currentTimeMillis()); } } // 停止对话模型停止输出保留连接可续问 public void stopSession(String sessionId) throws IOException { ChatSession session sessionMap.get(sessionId); if (session ! null) { session.setFinished(true); sendMsg(session.getEmitter(), stop, 用户主动终止对话); } } // 彻底关闭连接释放资源 public void closeSession(String sessionId) { ChatSession session sessionMap.remove(sessionId); if (session ! null) { redisTemplate.delete(SESSION_REDIS_KEY sessionId); try { session.getEmitter().complete(); } catch (Exception e) { log.error(关闭会话发射器失败, e); } } } // 定时任务清理超时无心跳会话 Scheduled(fixedRate 30_000) public void clearTimeoutSession() { long now System.currentTimeMillis(); ListString timeoutIds sessionMap.entrySet().stream() .filter(entry - now - entry.getValue().getLastHeartbeat() HEARTBEAT_TIMEOUT) .map(Map.Entry::getKey) .collect(Collectors.toList()); timeoutIds.forEach(this::closeSession); } private void sendMsg(SseEmitter emitter, String event, String data) throws IOException { SseEmitter.event().name(event).data(data).build(); emitter.send(SseEmitter.event().name(event).data(data)); } }5. LLM 流式推理服务核心逐 token 推送Service public class LlmStreamService { // 对接本地vLLM/OpenAI兼容接口 Autowired private RestTemplate restTemplate; private static final String LLM_STREAM_URL http://127.0.0.1:8000/v1/chat/completions; public void streamChat(ChatSession session, String userPrompt) throws IOException { // 追加用户提问到上下文 ChatMessage userMsg new ChatMessage(); userMsg.setRole(user); userMsg.setContent(userPrompt); session.getMessages().add(userMsg); // 组装请求体 MapString, Object req new HashMap(); req.put(model, session.getModelConfig().getModelName()); req.put(temperature, session.getModelConfig().getTemperature()); req.put(stream, true); // 开启模型流式返回 req.put(messages, session.getMessages()); HttpHeaders headers new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); HttpEntityObject entity new HttpEntity(req, headers); // 发起流式POST逐行读取模型返回token StreamingHttpResponse response restTemplate.exchange( LLM_STREAM_URL, HttpMethod.POST, entity, StreamingHttpResponse.class ).getBody(); StringBuilder fullAnswer new StringBuilder(); // 逐块读取模型输出 response.getBody().subscribe(dataBuffer - { String chunk dataBuffer.toString(StandardCharsets.UTF_8); // 解析SSE格式data: {} ListString lines Arrays.asList(chunk.split(\n)); for (String line : lines) { if (line.startsWith(data: ) !line.contains([DONE])) { String jsonStr line.replace(data: , ); StreamResp resp JSON.parseObject(jsonStr, StreamResp.class); String token resp.getChoices().get(0).getDelta().getContent(); if (token ! null) { fullAnswer.append(token); // 向前端推送单个token sendSse(session.getEmitter(), token, token); } } // 模型输出结束标识 if (line.contains([DONE])) { sendSse(session.getEmitter(), end, ); // 保存完整回答到会话上下文 ChatMessage assistantMsg new ChatMessage(); assistantMsg.setRole(assistant); assistantMsg.setContent(fullAnswer.toString()); session.getMessages().add(assistantMsg); session.setFinished(true); session.getEmitter().complete(); } } }); } private void sendSse(SseEmitter emitter, String event, String data) throws IOException { SseEmitter.SseEventBuilder eventBuilder SseEmitter.event() .name(event) .data(data); emitter.send(eventBuilder); } // 模型流式返回DTO Data public static class StreamResp { private ListChoice choices; } Data public static class Choice { private Delta delta; } Data public static class Delta { private String content; } }6. 前端简易 JS 消费 SSE 示例function connectSSE(sessionId, prompt) { const source new EventSource(/api/chat/sse/connect?sessionId${sessionId}userId10001prompt${encodeURIComponent(prompt)}); // 接收模型单个文字token source.addEventListener(token, (e) { document.getElementById(answer).innerText e.data; }); // 对话结束 source.addEventListener(end, () { source.close(); console.log(模型输出完成); }); // 报错重连 source.addEventListener(error, () { source.close(); setTimeout(() connectSSE(sessionId, prompt), 2000); }); // 30s一次心跳保活 setInterval(() { fetch(/api/chat/sse/heartbeat?sessionId${sessionId}) }, 30000); }三、原型 2WebSocket 双向长链接模型服务支持双向交互适用于中途修改参数、停止生成、图片输入、工具调用实时返回核心 Websocket 处理器Component public class ChatWebSocketHandler extends TextWebSocketHandler { private final ConcurrentHashMapString, WebSocketSession clientMap new ConcurrentHashMap(); Autowired private LlmStreamService llmStreamService; // 建立连接 Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String sessionId session.getUri().getQuery().split(sessionId)[1]; clientMap.put(sessionId, session); session.sendMessage(new TextMessage(JSON.toJSONString(Map.of(event, connect, msg, 连接成功)))); } // 接收前端消息 Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { String payload message.getPayload(); ChatReq req JSON.parseObject(payload, ChatReq.class); String sessionId req.getSessionId(); switch (req.getCmd()) { case chat: // 发起模型流式对话 CompletableFuture.runAsync(() - llmStreamService.streamWsChat(session, req.getPrompt())); break; case stop: // 终止生成 session.sendMessage(new TextMessage(JSON.toJSONString(Map.of(event, stop, msg, 已停止)))); break; } } // 连接关闭 Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { String sessionId getSessionId(session); clientMap.remove(sessionId); } // 向前端推送token public void sendWsMsg(WebSocketSession session, String event, String data) throws IOException { MapString, String msg Map.of(event, event, data, data); session.sendMessage(new TextMessage(JSON.toJSONString(msg))); } }四、生产级原型配套能力必加1. 限流与并发控制单用户最大同时在线会话数限制 5 个全局模型推理并发队列使用 ThreadPoolTaskExecutor 隔离长连接线程池Nginx 限流limit_conn 限制单 IP 长连接数量2. 会话持久化 上下文截断Redis 存储会话历史设置过期时间上下文超长时自动截断最早的历史消息控制输入 token 避免 OOM3. 安全鉴权连接建立时携带 token/JWT 校验用户权限会话绑定 userId禁止跨用户访问 sessionId敏感输入过滤、内容安全审核4. 监控告警在线长连接数量监控Prometheus 指标模型推理耗时、token 输出速率统计连接断开率、超时会话告警5. 断线重连机制SSE 依靠前端 EventSource 原生重连 last-event-id 续推未完成内容WebSocket 前端手动监听 error定时重试建立连接复用同一个 sessionId 恢复上下文五、部署拓扑原型plaintext客户端浏览器/App ↓ Nginx网关限流、SSL、协议转发 ↓ SpringBoot长连接服务集群SSE/WebSocket ↓ Redis集群会话缓存、心跳、分布式锁 ↓ 模型任务调度队列ThreadPool/RabbitMQ ↓ LLM推理服务集群vLLM/TGI/第三方模型API ↓ MySQL历史对话、用户配置、额度六、选型落地建议纯文字对话、后台管理、AI 问答页面使用 SSE开发成本最低兼容性好需要实时双向交互、上传图片、中途控制生成、多模态使用 WebSocket分布式集群部署SSE/WebSocket 存在会话粘性Nginx 开启 ip_hash 或使用 Redis 存储会话信息实现跨节点会话恢复