开发者

SpringBoot实现SSE(Server-Sent Events)的完整指南

目录
  • 引言
  • 基础实现
    • 1. 添加依赖
    • 2. 创建 SSE 控制器
    • 3. 前端监听 SSE
  • 添加鉴权支持
    • 1. 基于 Token 的鉴权
    • 2. 前端发送鉴权信息
  • 高级功能实现
    • 1. 广播事件给多个客户端
    • 2. 在控制器中使用广播服务
    • 3. 发送 jsON 数据
    • 4. 重连机制
  • 生产环境最佳实践
    • 1. 配置超时和心跳
    • 2. 异常处理
    • 3. CORS 配置
    • 4. 性能优化
  • 完整示例:实时股票报价
    • 后端控制器
    • 前端实现
  • 部署注意事项
    • 1. 负载均衡配置
    • 2. Spring Boot 配置
    • 3. 监控端点
  • 最佳实践总结

    引言

    在 Spring Boot 中实现 SSE (Server-Sent Events) 非常简单,SSE 是一种服务器向客户端推送事件的技术。以下是完整的实现步骤:

    基础实现

    1. 添加依赖

    确保 pom.XML 中包含 Spring Web 依赖:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    

    2. 创建 SSE 控制器

    import org.springframework.http.MediaType;
    import org.springframework.wephpb.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
    
    import Java.io.IOException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @RestController
    public class SseController {
    
        // 用于异步发送事件的线程池
        private final ExecutorService executor = Executors.newCachedThreadPool();
    
        @GetMapping(path = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public SseEmitter handleSse() {
            SseEmitter emitter = new SseEmitter(60_000L); // 设置超时时间(毫秒)
            
            // 在单独的线程中发送事件
            executor.execute(() -> {
                try {
                    for (int i = 0; i < 10; i++) {
                        // 构建事件
                        SseEmitter.SseEventBuilder event = SseEmitter.event()
                                .id(String.valueOf(i)) // 事件ID
                                .name("message")      // 事件名称
                                .data("Event #" + i);  // 事件数据
                        
                        // 发送事件
                        emitter.send(event);
                        
                        // 模拟延迟
                        Thread.sleep(1000);
                 php   }
                    
                    // 完成发送
                    emitter.complete();
                } catch (IOException | InterruptedException e) {
                    // 发生错误时关闭连接
                    emitter.completeWithError(e);
                }
            });
            
            // 处理完成和超时事件
            emitter.onCompletion(() -> System.out.println("SSE completed"));
            emitter.onTimeout(() -> {
                System.out.println("SSE timeout");
                emitter.complete();
            });
      编程客栈      
            return emitter;
        }
    }
    

    3. 前端监听 SSE

    <!DOCTYPE html>
    <html>
    <head>
        <title>SSE Demo</title>
    </head>
    <body>
        <div id="events"></div>
    
        <script>
            const eventSource = new EventSource('/sse');
            
            // 监听消息事件
            eventSource.onmessage = function(event) {
                const data = event.data;
                const element = document.createElement('p');
                element.textContent = 'Received: ' + data;
                document.getElementById('events').appendChild(element);
            };
            
            // 监听自定义事件
            eventSource.addEventListener('message', function(event) {
                console.log('Custom event:', event.data);
            });
            
            // 错误处理
          编程  eventSource.onerror = function(error) {
                console.error('EventSource error:', error);
                eventSource.close();
            };
        </script>
    </body>
    </html>
    

    添加鉴权支持

    1. 基于 Token 的鉴权

    @GetMapping(path = "/secure-sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter handleSecureSse(@RequestHeader("Authorization") String token) {
        // 验证Token
        if (!isValidToken(token)) {
            throw new ResponseStatusException(HttpStatus.UNAUTHORIZED, "Invalid token");
        }
        
        SseEmitter emitter = new SseEmitter();
        
        // 获取用户信息
        User user = getUserFromToken(token);
        
        executor.execute(() -> {
            try {
                // 发送个性化事件
                emitter.send(SseEmijstter.event()
                    .data("Welcome, " + user.getName())
                    .name("greeting"));
                    
                // 继续发送其他事件...
            } catch (IOException e) {
                emitter.completeWithError(e);
            }
        });
        
        return emitter;
    }
    
    private boolean isValidToken(String token) {
        // 实现Token验证逻辑
        return token != null && token.startsWith("Bearer ");
    }
    
    private User getUserFromToken(String token) {
        // 从Token中提取用户信息
        return new User("John Doe"); // 示例
    }
    

    2. 前端发送鉴权信息

    const token = "Bearer your_jwt_token_here";
    const eventSource = new EventSource('/secure-sse', {
        headers: {
            Authorization: token
        }
    });
    

    高级功能实现

    1. 广播事件给多个客户端

    import org.springframework.stereotype.Service;
    import java.util.List;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    @Service
    public class SseService {
        private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
        
        public SseEmitter subscribe() {
            SseEmitter emitter = new SseEmitter(60_000L);
            emitters.add(emitter);
            
            emitter.onCompletion(() -> emitters.remove(emitter));
            emitter.onTimeout(() -> emitters.remove(emitter));
            
            return emitter;
        }
        
        public void broadcast(String eventName, Object data) {
            for (SseEmitter emitter : emitters) {
                try {
                    emitter.send(SseEmitter.event()
                        .name(eventName)
                        .data(data));
                } catch (IOException e) {
                    emitter.completeWithError(e);
                }
            }
        }
    }
    

    2. 在控制器中使用广播服务

    @RestController
    public class SseController {
        
        private final SseService sseService;
        
        public SseController(SseService sseService) {
            this.sseService = sseService;
        }
        
        @GetMapping("/subscribe")
        public SseEmitter subscribe() {
            return sseService.subscribe();
        }
        
        @PostMapping("/broadcast")
        public ResponseEntity<String> broadcastMessage(@RequestBody String message) {
            sseService.broadcast("message", message);
            return ResponseEntity.ok("Message broadcasted");
        }
    }
    

    3. 发送 JSON 数据

    emitter.send(SseEmitter.event()
        .name("userUpdate")
        .data(new User("Alice", "alice@example.com"), MediaType.APPLICATION_JSON));
    

    4. 重连机制

    let eventSource;
    
    function connectSSE() {
        eventSource = new EventSource('/sse');
        
        eventSource.onmessage = event => {
            console.log('Received:', event.data);
        };
        
        eventSource.onerror = () => {
            console.log('Connection lost. Reconnecting...');
            eventSource.close();
            setTimeout(connectSSE, 3000); // 3秒后重连
        };
    }
    
    connectSSE(); // 初始连接
    

    生产环境最佳实践

    1. 配置超时和心跳

    @Bean
    public SseEmitter createSseEmitter() {
        SseEmitter emitter = new SseEmitter(120_000L); // 2分钟超时
        
        // 心跳机制
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> {
            try {
                emitter.send(SseEmitter.event()
                    .name("heartbeat")
                    .data("ping"));
            } catch (IOException e) {
                scheduler.shutdown();
            }
        }, 0, 30, TimeUnit.SECONDS); // 每30秒发送心跳
        
        return emitter;
    }
    

    2. 异常处理

    @RestControllerAdvice
    public class SseExceptionHandler {
    
        @ExceptionHandler(SseException.class)
        public ResponseEntity<String> handleSseException(SseException ex) {
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                    .body("SSE Error: " + ex.getMessage());
        }
    }
    

    3. CORS 配置

    @Configuration
    public class WebConfig implements WebMvcConfigurer {
    
        @Override
        public void addCorsMappings(CorsRegistry registry) {
            registry.addMapping("/sse/**")
                .allowedOrigins("https://your-frontend.com")
                .allowedMethods("GET")
                .allowCredentials(true);
        }
    }
    

    4. 性能优化

    @Configuration
    public class AsyncConfig implements AsyncConfigurer {
    
        @Override
        public Executor getAsyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(10);
            executor.setMaxPoolSize(50);
            executor.setQueueCapacity(100);
            executor.setThreadNamePrefix("SSE-Executor-");
            executor.initialize();
            return executor;
        }
    }
    

    完整示例:实时股票报价

    后端控制器

    @RestController
    public class StockController {
    
        private final SseService sseService;
        private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
        public StockController(SseService sseService) {
            this.sseService = sseService;
            startStockUpdates();
        }
    
        @GetMapping("/stocks")
        public SseEmitter getStockUpdates() {
            return sseService.subscribe();
        }
    
        private void startStockUpdates() {
            scheduler.scheduleAtFixedRate(() -> {
                Map<String, Double> stocks = Map.of(
                    "AAPL", 150 + Math.random() * 10,
                    "MSFT", 250 + Math.random() * 15,
                    "GOOGL", 2800 + Math.random() * 50
                );
                sseService.broadcast("stockUpdate", stocks);
            }, 0, 2, TimeUnit.SECONDS);
        }
    }
    

    前端实现

    <div id="stock-prices"></div>
    
    <script>
    const eventSource = new EventSource('/stocks');
    
    eventSource.addEventListener('stockUpdate', event => {
        const stocks = JSON.parse(event.data);
        let html = '<h2>Stock Prices</h2><ul>';
        
        for (const [symbol, price] of Object.entries(stocks)) {
            html += `<li>${symbol}: $${price.toFixed(2)}</li>`;
        }
        
        html += '</ul>';
        document.getElementById('stock-prices').innerHTML = html;
    });
    </script>
    

    部署注意事项

    1. 负载均衡配置

    # Nginx 配置
    location /sse {
        proxy_pass http://backend;
        proxy_http_version 1.1;
        proxy_set_header Connection '';
        proxy_buffering off;
    }
    

    2. Spring Boot 配置

    # application.properties
    server.servlet.context-path=/api
    spring.mvc.async.request-timeout=120000 # 2分钟超时
    

    3. 监控端点

    @Endpoint(id = "sse")
    public class SseEndpoint {
    
        private final SseService sseService;
    
        public SseEndpoint(SseService sseService) {
            this.sseService = sseService;
        }
    
        @ReadOperation
        public Map<String, Object> sseMetrics() {
            return Map.of(
                "activeConnections", sseService.getActiveConnections(),
                "lastBroadcast", sseService.getLastBroadcastTime()
            );
        }
    }
    

    最佳实践总结

    1. 使用专用服务类:封装 SSE 逻辑,提高代码复用性
    2. 实现心跳机制:防止连接超时断开
    3. 添加鉴权支持:保护敏感数据
    4. 优雅处理错误:实现异常处理和重连机制
    5. 监控连接状态:使用 Actuator 端点监控 SSE 连接
    6. 优化线程池:合理配置异步处理线程
    7. 前端重连逻辑:自动恢复断开连接

    通过以上实现,您可以在 Spring Boot 应用中轻松创建 SSE 端点,实现服务器向客户端的实时事件推送,同时满足鉴权需求。

    以上就是SpringBoot实现SSE(Server-Sent Events)完整指南的详细内容,更多关于SpringBoot实现SSE(Server-Sent Events)的资料请关注编程客栈(www.devze.com)其它相关文章!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜