开发者

Spring Boot集成WebSocket项目实战的示例代码

目录
  • 一、项目配置与依赖
    • 1. 添加依赖
  • 2. WebSocket配置类
    • 二、WebSocket服务端实现
      • 1. 消息实体类
      • 2. WebSocket服务核心类
      • 3. WebSocket控制器(可选)
    • 三、前端实现
      • 1. html页面
    • 四、功能测试
      • 五、高级功能扩展
        • 1. 消息持久化
        • 2. 断线重连机制(前端)
        • 3. 使用STOMP协议(可选)
      • 六、优化

        WebSocket是一种在单个TCP连接上进行全双工通信的协议,它使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。下面我将详细介绍如何在Spring Boot项目中集成WebSocket,并提供完整的实战代码。

        一、项目配置与依赖

        1. 添加依赖

        首先在pom.XML中添加必要依赖:

        <dependencies>
            <!-- WebSocket支持 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-websocket</artifactId>
            </dependency>
            
            <!-- Web支持 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            
            <!-- Lombok简化代码 -->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            
            <!-- jsON处理 -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.83</version>
            </dependency>
        </dependencies>
        

        2. WebSocket配置类

        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;
        import org.springframework.web.socket.server.standard.ServerEndpointExporter;
        
        @Configuration
        public class WebSocketConfig {
            
            @Bean
            public ServerEndpointExporter serverEndpointExporter() {
                return new ServerEndpointExporter();
            }
        }
        

        二、WebSocket服务端实现

        1. 消息实体类

        import lombok.Data;
        
        @Data
        public class WebSocketMessage {
            private String type;       // 消息类型:heartbeat/unicast/broadcast
            private String from;      // 发送者ID
            private String to;        // 接收者ID(单播时使用)
            private String content;    // 消息内容
            private Long timestamp;   // 时间戳
        }
        

        2. WebSocket服务核心类

        import lombok.extern.slf4j.Slf4j;
        import org.springframework.stereotype.Component;
        import Javax.websocket.*;
        import javax.websocket.server.PathParam;
        import javax.websocket.server.ServerEndpoint;
        import java.io.IOException;
        import java.util.Map;
        import java.util.concurrent.ConcurrentHashMap;
        import java.util.concurrent.atomic.AtomicInteger;
        
        @Slf4j
        @Component
        @ServerEndpoint("/ws/{userId}")
        public class WebSocketServer {
            
            // 在线连接数
            private static final AtomicInteger onlineCount = new AtomicInteger(0);
            
            // 存放每个客户端对应的WebSocketServer对象
            private static final ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
            
            // 与某个客户端的连接会话
            private Session session;
            
            // 接收的用户ID
            private String userId;
            
            // 最后心跳时间
            private long lastHeartbeatTime = System.currentTimeMillis();
            
            /**
             * 连接建立成功调用的方法
             */
            @OnOpen
            public void onOpen(Session session, @PathParam("userId") String userId) {
                this.session = session;
                this.userId = userId;
                
                if (webSocketMap.containsKey(userId)) {
                    webSocketMap.remove(userId);
                    webSocketMap.put(userId, this);
                } else {
                    webSocketMap.put(userId, this);
                    addOnlineCount();
                }
                
                log.info("用户连接:{},当前在线人数:{}", userId, getOnlineCount());
                
                // 启动心跳检测线程
                new HeartbeatThread().start();
            }
        
            /**
             * 连接关闭调用的方法
             */
            @OnClose
            public void onClose() {
                if (webSocketMap.containsKey(userId)) {
                    webSocketMap.remove(userId);
                    subOnlineCount();
                }
                log.info("用户退出:{},当前在线人数:{}", userId, getOnlineCount());
            }
        
            /**
             * 收到客户端消息后调用的方法
             */
            @OnMessage
            public void onMessage(String message, Session session) {
                log.info("收到用户{}的消息:{}", userId, message);
                
                WebSocketMessage msg = JSON.parseobject(message, WebSocketMessage.class);
                
                // 心跳检测
                if ("heartbeat".equals(msg.getType())) {
                    this.lastHeartbeatTime = System.currentTimeMillis();
                    return;
                }
                
                // 处理业务消息
                switch (msg.getType()) {
                    case "unicast":
                        sendToUser(msg.getTo(), message);
                        break;
                    case "broadcast":
                        broadcast(message);
                        break;
                    default:
                        log.warn("未知的消息类型:{}", msg.getType());
                }
            }
        
            /**
             * 发生错误时调用
             */
            @OnError
            public void onError(Session session, Throwable error) {
                log.error("用户{}的连接发生错误:{}", userId, error.getMessage());
                error.printStackTrace();
            }
        
            /**
             * 单播消息
             */
            public static void sendToUser(String toUserId, String message) {
                if (webSocketMap.containsKey(toUserId)) {
                    webSocketMap.get(toUserId).sendMessage(message);
                } else {
                    log.warn("用户{}不在线,消息发送失败", toUserId);
                }
            }
        
            /**
             * 广播消息
             */
            public static void broadcast(String message) {
                webSocketMap.forEach((userId, server) -> {
                    if (server.session.isOpen()) {
                        server.sendMessage(message);
                    }
                });
            }
        
            /**
             * 服务器主动推送
             */
            public void sendMessage(String message) {
                try {
                    this.session.getBasicRemote().sendText(message);
                } catch (IOException e) python{
                    log.error("发送消息失败:{}", e.getMessage());
                }
            }
        
            public static synchronized int getOnlineCount() {
                return onlineCount.get();
            }
        
            public static synchronized void addOnlineCount() {
                onlineCount.incrementAndGet();
            }
        
            public static synchronized void subOnlineCount() {
                onlineCount.decrementAndGet();
            }
            
            /**
             * 心跳检测线程
             */
            private class HeartbeatThread extends Thread {
                @Override
                public void run() {
                    while (session.isOpen()) {
                        try {
                            // 每30秒检测一次
                            Thread.sleep(30000);
                            
                            // 超过60秒未收到心跳,关闭连接
                            if (System.currentTimeMillis() - lastHeartbeatTime > 60000) {
                                session.close();
                                break;
                            }
                        } catch (Exception e) {
                            log.error("心跳检测异常:{}", e.getMessage());
                            break;
                        }
                    }
                }
            }
        }
        

        3. WebSocket控制器(可选)

        import org.springframework.web.bind.annotation.*;
        
        @RestController
        @RequestMapping("/api/ws")
        public class WebSocketController {
            
            @PostMapping("/sendToUser")
            public Str编程客栈ing sendToUser(@RequestParam String toUserId, @RequestParam String message) {
                WebSocketServer.sendToUser(toUserId, message);
                return "消息已发送";
            }
            
            @PostMapping("/broadcast")
            public String broadcast(@RequestParam String message) {
                WebSocketServer.broadcast(message);
                return "广播已发送";
            }
        }
        

        三、前端实现

        1. HTML页面

        <!DOCTYPE html>
        <html>
        <head>
            <meta charset="UTF-8">
            <title>WebSocket测试</title>
            <script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>
        </head>
        <body>
            <div>
                <h2>WebSocket测试</h2>
                <div>
                    <label>用户ID:</label>
                    <input type="text" id="userId" value="user1">
                    <button onclick="connect()">连接</button>
                    <button onclick="disconnect()" disabled id="disconnectBtn">断开</button>
                </div>
                <div>
                    <label>目标用户:</label>
                    <input type="text" id="toUserId" value="user2">
                    <label>消息内容:</label>
                    <input type="text" id="message" value="Hello WebSocket">
                    <button onclick="sendUnicast()">单播</button>
                    <button onclick="sendBroadcast()">广播</button>
                </div>
                <div>
                    <h3>消息日志:</h3>
                    <div id="messageLog"></div>
                </div>
            </div>
        
            <script>
                var websocket = null;
                var heartbeatInterval = null;
                
                // 连接WebSocket
                function connect() {
                    var userId = $('#userId').val();
                    if (!userId) {
                        alert('请输入用户ID');
                        return;
                    }
                    
                    if ('WebSocket' in window) {
                        websocket = new WebSocket('ws://' + window.location.host + '/ws/' + userId);
                    } else {
                        alert('当前浏览器不支持WebSocket');
                        return;
                    }
                    
                    websocket.onopen = function() {
                        logMessage('连接已建立');
                        $('#disconnectBtn').prop('disabled', false);
                        
                        // 启动心跳检测
                        startHeartbeat();
                    };
                    
                    websocket.onmessage = function(event) {
                        logMessage('收到消息: ' + event.data);
                    };
                    
                    websocket.onclose = function() {
                        logMessage('连接已关闭');
                        $('#disconnectBtn').prop('disabled', true);
                        
                        // 停止心跳检测
                        stopHeartbeat();
                    };
                    
                    websocket.onerror = function(error) {
                        logMessage('发生错误: ' + error.data);
                    };
                }
                
                // 断开连接
                function disconnect() {
                    if (websocket != nuandroidll) {
                        websocket.close();
                    }
                }
                
                // 发送单播消息
                function sendUnicast() {
                    if (websocket == null || websocket.readyState != WebSocket.OPEN) {
                 javascript       alert('请先建立连接');
                        return;
                    }
                    
                    var message = {
                        type: 'unicast',
                        from: $('#userId').val(),
                        to: $('#toUserId').val(),
                        content: $('#message').val(),
                        timestamp: new Date().getTime()
                    };
                    
                    websocket.send(JSON.stringify(message));
                    logMessage('已发送单播消息: ' + JSON.stringify(message));
                }
                
                // 发送广播消息
                function sendBroadcast() {
                    if (websocket == null || websocket.readyState != WebSocket.OPEN) {
                        alert('请先建立连接');
                        return;
                    }
                    
                    var message = {
                        type: 'broadcast',
                        from: $('#userId').val(),
                        content: $('#message').val(),
                        timestamp: new Date().getTime()
                    };
                    
                    websocket.send(JSON.stringify(message));
                    logMessage('已发送广播消息: ' + JSON.stringify(message));
                }
                
                // 启动心跳检测
                function startHeartbeat() {
                    // 每20秒发送一次心跳
                    heartbeatInterval = setInterval(function() {
                        if (websocket.readyState == WebSocket.OPEN) {
                            var heartbeat = {
                                type: 'heartbeat',
                                from: $('#userId').val(),
                                timestamp: new Date().getTime()
                            };
                            websocket.send(JSON.stringify(heartbeat));
                            logMessage('已发送心跳');
                        }
                    }, 20000);
                }
                
                // 停止心跳检测
                function stopHeartbeat() {
                    if (heartbeatInterval != null) {
                        clearInterval(heartbeatInterval);
                        heartbeatInterval = null;
                    }
                }
                
                // 记录日志
                function logMessage(message) {
                    var logDiv = $('#messageLog');
                    logDiv.append('<p>' + new Date().toLocaleString() + ' - ' + message + '</p>');
                    logDiv.scrollTop(logDiv[0].scrollHeight);
                }
            </script>
        </body>
        </html>
        

        四、功能测试

        1. 连接测试​:

          • 启动Spring Boot应用
          • 打开两个浏览器窗口,php分别输入不同用户ID连接
          • 观察控制台日志,确认连接建立
        2. 单播测试​:

          • 在窗口A输入目标用户ID(窗口B的用户ID)和消息内容
          • 点击"单播"按钮,确认窗口B收到消息
        3. 广播测试​:

          • 在任意窗口点击"广播"按钮
          • 确认所有连接的窗口都收到消息
        4. 心跳检测测试​:

          • 观察控制台日志,确认每20秒发送一次心跳
          • 断开网络连接60秒以上,确认自动断开WebSocket连接

        五、高级功能扩展

        1. 消息持久化

        // 在WebSocketServer类中添加
        private void saveMessage(WebSocketMessage message) {
            // 实现消息存储逻辑,可存入数据库或Redis
            log.info("存储消息: {}", JSON.toJSONString(message));
        }
        

        2. 断线重连机制(前端)

        // 修改前端connect函数
        var reconnectAttempts = 0;
        var maxReconnectAttempts = 5;
        var reconnectInterval = 5000; // 5秒
        
        function connect() {
            // ...原有代码...
            
            websocket.onclose = function() {
                logMessage('连接已关闭');
                $('#disconnectBtn').prop('disabled', true);
                stopHeartbeat();
                
                // 断线重连逻辑
                if (reconnectAttempts < maxReconnectAttempts) {
                    reconnectAttempts++;
                    logMessage('尝试重新连接(' + reconnectAttempts + '/' + maxReconnectAttempts + ')');
                    setTimeout(connect, reconnectInterval);
                }
            };
            
            // 连接成功后重置重试计数
            websocket.onopen = function() {
                reconnectAttempts = 0;
                // ...原有代码...
            };
        }
        

        3. 使用STOMP协议(可选)

        如果需要更复杂的消息路由和订阅/发布模式,可以考虑使用STOMP协议:

        @Configuration
        @EnableWebSocketMessageBroker
        public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
            
            @Override
            public void configureMessageBroker(MessageBrokerRegistry config) {
                config.enableSimpleBroker("/topic", "/queue");
                config.setApplicationDestinationPrefixes("/app");
                config.setUserDestinationPrefix("/user");
            }
        
            @Override
            public void registerStompEndpoints(StompEndpointRegistry registry) {
                registry.addEndpoint("/ws-stomp")
                        .setAllowedOriginPatterns("*")
                        .withSockJS();
            }
        }
        

        六、优化

        实际项目中,还可以根据需求进一步优化:

        • 添加JWT认证机制
        • 实现消息历史记录查询
        • 增加用户在线状态管理

        对于高并发场景,可以考虑以下优化:

        • 使用session.getAsyncRemote().sendText()替代同步发送
        • 增加消息缓冲区
        • 使用Redis等中间件实现分布式WebSocket

        到此这篇关于Spring Boot集成WebSocket项目实战的示例代码的文章就介绍到这了,更多相关Spring Boot集成WebSocket内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

        0

        上一篇:

        下一篇:

        精彩评论

        暂无评论...
        验证码 换一张
        取 消

        最新开发

        开发排行榜