WebSocket实现实时通知
引入依赖xml!-- WebSocket支持 -- dependency groupIdorg.springframework.boot/groupId artifactIdspring-boot-starter-websocket/artifactId /dependency添加Serverjavaimport com.ruoyi.common.websocket.WebSocketUsers; import jakarta.websocket.*; import jakarta.websocket.server.PathParam; import jakarta.websocket.server.ServerEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; /** * WebSocket服务端 * 路径格式/websocket/{userId} 用于区分不同用户 */ Component ServerEndpoint(/websocket/{userId}) public class WebSocketServer { private static final Logger log LoggerFactory.getLogger(WebSocketServer.class); /** * 当前连接数在线人数 */ private static AtomicInteger onlineCount new AtomicInteger(0); /** * 当前会话 */ private Session session; /** * 用户ID */ private Long userId; /** * 连接建立成功调用的方法 */ OnOpen public void onOpen(Session session, PathParam(userId) Long userId) { this.session session; this.userId userId; // 使用会话管理类添加用户 WebSocketUsers.addUser(userId, session); onlineCount.incrementAndGet(); log.info(用户 {} 连接WebSocket当前在线人数{}, userId, onlineCount.get()); // 发送连接成功消息 try { String message {\type\:\connected\,\message\:\WebSocket连接成功\,\userId\: userId }; session.getBasicRemote().sendText(message); } catch (IOException e) { log.error(发送欢迎消息失败, e); } } /** * 连接关闭调用的方法 */ OnClose public void onClose() { // 使用会话管理类移除用户 WebSocketUsers.removeUser(this.userId); onlineCount.decrementAndGet(); log.info(用户 {} 断开连接当前在线人数{}, userId, onlineCount.get()); } /** * 收到客户端消息后调用的方法 */ OnMessage public void onMessage(String message, Session session) { log.info(收到用户 {} 的消息: {}, userId, message); // 处理心跳消息 try { if (message.contains(ping)) { String reply {\type\:\pong\,\timestamp\: System.currentTimeMillis() }; session.getBasicRemote().sendText(reply); } else { // 简单回复客户端 String reply {\type\:\reply\,\message\:\服务器收到消息\,\timestamp\: System.currentTimeMillis() }; session.getBasicRemote().sendText(reply); } } catch (IOException e) { log.error(回复消息失败, e); } } /** * 发生错误时调用 */ OnError public void onError(Session session, Throwable error) { log.error(WebSocket连接发生错误用户ID{}, this.userId, error); // 发生错误时移除用户会话 if (this.userId ! null) { WebSocketUsers.removeUser(this.userId); } } /** * 获取当前在线人数 */ public static int getOnlineCount() { return onlineCount.get(); } /** * 发送消息到指定用户你业务里调用的就是这个方法 * * param message 要推送的消息内容 * param userId 指定用户ID */ public void sendInfo(String message, Long userId) { log.info(发送消息到用户{}消息内容{}, userId, message); // 判断用户是否在线 Session session WebSocketUsers.getSession(userId); if (userId ! null session ! null) { try { // 异步发送消息给前端 session.getBasicRemote().sendText(message); } catch (Exception e) { log.error(发送消息到用户【{}】异常{}, userId, e.getMessage()); } } else { log.warn(用户【{}】不在线消息暂存, userId); } } }WebSocket用户会话管理javaimport jakarta.websocket.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; /** * WebSocket用户会话管理 * 存储所有在线用户的会话信息 */ public class WebSocketUsers { private static final Logger log LoggerFactory.getLogger(WebSocketUsers.class); /** * 用户会话存储 * key: userId (用户ID) * value: Session (WebSocket会话) */ private static ConcurrentHashMapLong, Session USERS new ConcurrentHashMap(); /** * 添加用户会话 */ public static void addUser(Long userId, Session session) { USERS.put(userId, session); log.info(用户 {} 加入WebSocket连接当前在线人数{}, userId, USERS.size()); } /** * 移除用户会话 */ public static void removeUser(Long userId) { Session session USERS.remove(userId); if (session ! null) { try { if (session.isOpen()) { session.close(); } } catch (IOException e) { log.error(关闭会话失败, e); } } log.info(用户 {} 断开WebSocket连接当前在线人数{}, userId, USERS.size()); } /** * 根据会话对象移除用户 */ public static boolean removeUserBySession(Session session) { Long userId null; for (Long key : USERS.keySet()) { if (USERS.get(key).equals(session)) { userId key; break; } } if (userId ! null) { removeUser(userId); return true; } return false; } /** * 获取用户会话 */ public static Session getSession(Long userId) { return USERS.get(userId); } /** * 获取在线用户数 */ public static int getOnlineCount() { return USERS.size(); } /** * 判断用户是否在线 */ public static boolean isOnline(Long userId) { return USERS.containsKey(userId) USERS.get(userId).isOpen(); } /** * 发送消息给指定用户 */ public static void sendMessage(Long userId, String message) { Session session USERS.get(userId); if (session ! null session.isOpen()) { try { session.getBasicRemote().sendText(message); log.debug(发送消息给用户 {}: {}, userId, message); } catch (IOException e) { log.error(发送消息给用户 {} 失败, userId, e); // 发送失败移除该用户会话 removeUser(userId); } } } /** * 发送消息给所有在线用户 */ public static void sendMessageToAll(String message) { for (Long userId : USERS.keySet()) { sendMessage(userId, message); } } }前端javascriptcreated() { this.initWebSocket() }, beforeDestroy() { this.closeWebSocket() }, methods: { // 初始化WebSocket initWebSocket() { if (typeof WebSocket undefined) { this.$message.warning(您的浏览器不支持WebSocket,无法实时获取通知数量) return } const userId this.$store.state.user.id // 若依原生ws地址 const wsUrl ws://${process.env.VUE_APP_BASE_URL}/websocket/${userId} this.ws new WebSocket(wsUrl) // 收到消息 → 立即刷新通知列表 this.ws.onmessage (evt) { console.log(收到实时通知, evt.data) try {