开发者

Spring AI框架结合Sentinel实现限流功能

目录
  • 场景描述
  • 项目结构和依赖
    • 1. Maven依赖配置
    • 2. 应用配置
    • 3. Sentinel配置类
    • 4. 自定义限流异常处理
    • 5. AI服务层
    • 6. 控制器层
    • 7. 启动类
    • 8. 测试用例
  • 使用示例
    • 1. 启动应用
    • 2. 测试接口
  • 关键特性说明

    场景描述

    构建一个AI智能客服系统,需要对AI接口调用进行限流保护,防止恶意请求或突发流量导致系统崩溃。

    项目结构和依赖

    1. Maven依赖配置

    <?XML version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
             http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>3.2.0</version>
            <relativePath/>
        </parent>
        
        <groupId>com.example</groupId>
        <artifactId>ai-customer-service</artifactId>
        <version>1.0.0</version>
        
        <dependencies>
            <!-- Spring Boot Starter -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            
            <!-- Spring AI -->
            <dependency>
                <groupId>org.springframework.ai</groupId>
                <artifactId>spring-ai-openai-spring-boot-starter</artifactId>
                <version>0.8.1</version>
            </dependency>
            
            <!-- Sentinel核心库 -->
            <dependency>
                <groupId>com.alibaba.csp</groupId>
                <artifactId>sentinel-core</artifactId>
                <version>1.8.6</version>
            </dependency>
            
            <!-- Sentinel Spring Boot Starter -->
            <dependency>
                <groupId>com.alibaba.cloud</groupId>
                <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
                <version>2022.0.0.0</version>
            </dependency>
            
            <!-- Sentinel Dashboard -->
            <dependency>
                <groupId>com.alibaba.csp</groupId>
                <artifactId>sentinel-transport-simple-http</artifactId>
                <version>1.8.6</version>
            </dependency>
            
            <!-- Redis for rate limiting -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            
            <!-- jsON处理 -->
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
            </dependency>
        </dependencies>
    </project>
    

    2. 应用配置

    # application.yml
    server:
      port: 8080
    
    spring:
      application:
        name: ai-customer-service
      
      # Spring AI OpenAI配置
      ai:
        openai:
          api-key: ${OPENAI_API_KEY:your-api-key-here}
          base-url: https://api.openai.com
          chat:
            options:
              model: gpt-3.5-turbo
              temperature: 0.7
              max-tokens: 1000
      
      # Redis配置
      redis:
        host: localhost
        port: 6379
        database: 0
        timeout: 2000ms
        jedis:
          pool:
            max-active: 8
            max-idle: 8
            min-idle: 0
    
    # Sentinel配置
    management:
      endpoints:
        web:
          exposure:
            include: "*"
    
    logging:
      level:
        com.alibaba.csp.sentinel: DEBUG
        com.example: DEBUG
    

    3. Sentinel配置类

    package com.example.config;
    
    import com.alibaba.csp.sentinel.annotation.ASPectj.SentinelResourceAspect;
    import com.alibaba.csp.sentinel.slots.block.RuleConstant;
    import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
    import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
    import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRule;
    import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowRuleManager;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import Javax.annotation.PostConstruct;
    import java.util.ArrayList;
    import java.util.List;
    
    @Configuration
    public class SentinelConfig {
    
        @Bean
        public SentinelResourceAspect sentinelResourceAspect() {
            return new SentinelResourceAspect();
        }
    
        @PostConstruct
        public void initFlowRules() {
            initBasicFlowRules();
            initParamFlowRules();
        }
    
        /**
         * 初始化基础限流规则
         */
        private void initBasicFlowRules() {
            List<FlowRule> rules = new ArrayList<>();
    
            // AI聊天接口限流规则 - 令牌桶算法
            FlowRule chatRule = new FlowRule();
            chatRule.setResource("ai-chat");
            chatRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
            chatRule.setCount(10); // 每秒10个请求
            chatRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
            rules.add(chatRule);
    
            // AI问答接口限流规则 - 预热算法
            FlowRule qaRule = new FlowRule();
            qaRule.setResource("ai-qa");
            qaRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
            qaRule.setCount(20);
            qaRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP);
            qaRule.setWarmUpPeriodSec(30); // 30秒预热时间
            rules.add(qaRule);
    
            // 知识库查询接口 - 排队等待
            FlowRule knowledgeRule = new FlowRule();
            knowledgeRule.setResource("knowledge-search");
            knowledgeRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
            knowledgeRule.setCount(15);
            knowledgeRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
            knowledgeRule.setMaxQueueingTimeMs(500); // 最大等待500ms
            rules.add(knowledgeRule);
    
            FlowRuleManager.loadRules(rules);
        }
    
        /**
         * 初始化热点参数限流规则
         */
        private void initParamFlowRules() {
            List<ParamFlowRule> rules = new ArrayList<>();
    
            // 基于用户ID的限流
            ParamFlowRule userRule = new ParamFlowRule();
            userRule.setResource("ai-chat");
            userRule.setParamIdx(0); // 第一个参数是用户ID
            userRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
            userRule.setCount(3); // 单个用户每秒最多3次请求
            userRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
            rules.add(userRule);
    
            ParamFlowRuleManager.loadRules(rules);
        }
    }
    

    4. 自定义限流异常处理

    package com.example.handler;
    
    import com.alibaba.csp.sentinel.slots.block.BlockException;
    import com.alibaba.csp.sentinel.slots.block.flow.FlowException;
    import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowException;
    imp编程客栈ort lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    @Slf4j
    @Component
    public class SentinelBlockHandler {
    
        /**
         * AI聊天接口限流降级处理
         */
        public static String handleAiChatBlock(String userId, String message, BlockException ex) {
            log.warn("AI聊天接口被限流,用户ID: {}, 异常类型: {}", userId, ex.getClass().getSimpleName());
            
            if (ex instanceof FlowException) {
                return "系统繁忙,请稍后再试。我们正在为您排队处理...";
            } else if (ex instanceof ParamFlowException) {
                return "您的请求过于频繁,请稍后再试。";
            }
            
            return "系统暂时无法处理您的请求,请稍后重试。";
        }
    
        /**
         * AI问答接口限流降级处理
         */
        public static String handleAiQaBlock(String question, BlockException ex) {
            log.warn("AI问答接口被限流,问题: {}, 异常类型: {}", question, ex.getClass().getSimpleName());
            return "当前咨询人数较多,系统正在预热中,请稍后再试。";
        }
    
        /**
         * 知识库搜索限流降级处理
         */
        public static String handleKnowledgeSearchBlock(String keyword, BlockException ex) {
            log.warn("知识库搜索被限流,关键词: {}", keyword);
            return "知识库查询繁忙,请稍后再试或联系人工客服。";
        }
    }
    

    5. AI服务层

    package com.example.service;
    
    import com.alibaba.csp.sentinel.annotation.SentinelResource;
    import com.alibaba.csp.sentinel.slots.block.BlockException;
    import com.example.handler.SentinelBlockHandler;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.ai.chat.ChatClient;
    import org.springframework.ai.chat.ChatResponse;
    import org.springframework.ai.chat.messages.UserMessage;
    import org.springframework.ai.chat.prompt.Prompt;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import java.time.LocalDateTime;
    import java.util.concurrent.CompletableFuture;
    
    @Service
    @Slf4j
    public class AiCustomerService {
    
        @Autowired
        private ChatClient chatClient;
    
        /**
         * AI聊天服务 - 使用热点参数限流
         */
        @SentinelResource(
            value = "ai-chat",
            blockHandler = "handleAiChatBlock",
            blockHandlerClass = SentinelBlockHandler.class
        )
        public String chatWithAi(String userId, String message) {
            log.info("用户 {} 发起聊天请求: {}", userId, message);
            
            try {
                // 构建聊天上下文
                String systemPrompt = "你是一个专业的客服助手,请礼貌、准确地回答用户问题。";
                String fullPrompt = systemPrompt + "\n用户问题: " + message;
                
                Prompt prompt = new Prompt(new UserMessage(fullPrompt));
                ChatResponse response = chatClient.call(prompt);
                
                String aiResponse = response.getResult().getOutput().getContent();
                log.info("AI回复用户 {}: {}", userId, aiResponse);
                
                return aiResponse;
            } catch (Exception e) {
                log.error("AI聊天服务异常,用户ID: {}", userId, e);
                return "抱歉,AI服务暂时不可用,请联系人工客服。";
            }
        }
    
        /**
         * AI问答服务 - 使用预热限流
         */
        @SentinelResource(
            value = "ai-qa",
            blockHandler = "handleAiQaBlock",
            blockHandlerClass = SentinelBlockHandler.class
        )
        public String answerQuestion(String question) {
            log.info("收到问答请求: {}", question);
            
            try {
                String prophpmpt = String.format(
                    "作为客服专家,请简洁准确地回答以下问题:%s\n" +
                    "要求:1. 回答要专业且易懂 2. 控制在200字以内 3. 如果不确定请说明", 
                    question
                );
                
                ChatResponse response = chatClient.call(new Prompt(prompt));
                return response.getResult().getOutput().getContent();
            } catch (Exception e) {
                log.error("AI问答服务异常", e);
                return "抱歉,暂时无法回答您的问题,请稍后再试。";
            }
        }
    
        /**
         * 知识库搜索 - 使用排队等待限流
         */
        @SentinelResource(
            value = "knowledge-search",
            blockHandler = "handleKnowledgeSearchBlock",
            blockHandlerClass = SentinelBlockHandler.class
        )
        public String searchKnowledge(String keyword) {
            log.info("知识库搜索: {}", keyword);
            
            try {
                // 模拟知识库搜索
                Thread.sleep(100); // 模拟搜索耗时
                
                String searchPrompt = String.format(
                    "基于关键词 '%s' 搜索相关知识,提供简洁的信息摘要。", 
                    keyword
                );
                
                ChatResponse response = chatClient.call(new Prompt(searchPrompt));
                return response.getResult().getOutput().getContent();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "搜索被中断,请重试。";
            } catch (Exception e) {
                log.error("知识库搜索异常", e);
                return "知识库搜索失败,请联系技术支持。";
            }
        }
    
        /**
         * 异步AI处理 - 用于处理复杂请求
         */
        public CompletableFuture<String> processComplexRequest(String userId, String request) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return chatWithAi(userId, request);
                } catch (BlockException e) {
                    return "请求过于频繁,已加入处理队列,请稍后查看结果。";
                }
            });
        }
    }
    

    6. 控制器层

    package com.example.controller;
    
    import com.example.service.AiCustomerService;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.*;
    
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.CompletableFuture;
    
    @RestController
    @RequestMapping("/api/ai")
    @Slf4j
    public class AiCustomerController {
    
        @Autowired
        private AiCustomerService aiCustomerService;
    
        /**
         * AI聊天接口
         */
        @PostMapping("/chat")
        public ResponseEntity<Map<String, Object>> chat(
                @RequestParam String userId,
                @RequestBody Map<String, String> request) {
            
            String message = request.get("message");
            log.info("接收到聊天请求 - 用户: {}, 消息: {}", ujswww.devze.comserId, message);
            
            Map<String, Object> response = new HashMap<>();
            
            try {
                String aiResponse = aiCustomerService.chatWithAi(userId, message);
                response.put("success", true);
                response.put("data", aiResponse);
                response.put("timestamp", System.currentTimeMillis());
                return ResponseEntity.ok(response);
            } catch (Exception e) {
                log.error("聊天接口异常", e);
                response.put("success", false);
                response.put("error", "服务暂时不可用");
                return ResponseEntity.internalServerError().body(response);
            }
        }
    
        /**
         * AI问答接口
         */
        @PostMapping("/qa")
        public ResponseEntity<Map<String, Object>> qa(@RequestBody Map<String, String> request) {
            String question = request.get("question")android;
            log.info("接收到问答请求: {}", question);
            
            Map<String, Object> response = new HashMap<>();
            
            try {
                String answer = aiCustomerService.answerQuestion(question);
                response.put("success", true);
                response.put("question", question);
                response.put("answer", answer);
                response.put("timestamp", System.currentTimeMillis());
                return ResponseEntity.ok(response);
            } catch (Exception e) {
                log.error("问答接口异常", e);
                response.put("success", false);
                response.put("error", "问答服务暂时不可用");
                return ResponseEntity.internalServerError().body(response);
            }
        }
    
        /**
         * 知识库搜索接口
         */
        @GetMapping("/knowledge/search")
        public ResponseEntity<Map<String, Object>> searchKnowledge(@RequestParam String keyword) {
            log.info("接收到知识库搜索请求: {}", keyword);
            
            Map<String, Object> response = new HashMap<>();
            
            try {
                String result = aiCustomerService.searchKnowledge(keyword);
                response.put("success", true);
                response.put("keyword", keyword);
                response.put("result", result);
                response.put("timestamp", System.currentTimeMillis());
                return ResponseEntity.ok(response);
            } catch (Exception e) {
                log.error("知识库搜索异常", e);
                response.put("success", false);
                response.put("error", "知识库搜索服务暂时不可用");
                return ResponseEntity.internalServerError().body(response);
            }
        }
    
        /**
         * 异步处理复杂请求
         */
        @PostMapping("/chat/async")
        public ResponseEntity<Map<String, Object>> chatAsync(
                @RequestParam String userId,
                @RequestBody Map<String, String> request) {
            
            String message = request.get("message");
            Map<String, Object> response = new HashMap<>();
            
            CompletableFuture<String> future = aiCustomerService.processComplexRequest(userId, message);
            
            response.put("success", true);
            response.put("message", "请求已提交,正在处理中...");
            response.put("userId", userId);
            response.put("taskId", System.currentTimeMillis());
            
            // 实际应用中可以返回任务ID,客户端轮询结果
            return ResponseEntity.ok(response);
        }
    }
    

    7. 启动类

    package com.example;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.EnableAsync;
    
    @SpringBootApplication
    @EnableAsync
    public class AiCustomerServiceApplication {
        public static void main(String[] args) {
            // 设置Sentinel Dashboard地址
            System.setProperty("csp.sentinel.dashboard.server", "localhost:8080");
            System.setProperty("project.name", "ai-customer-service");
            
            SpringApplication.run(AiCustomerServiceApplication.class, args);
        }
    }
    

    8. 测试用例

    package com.example.test;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    @SpringJUnitConfig
    @SpringBootTest
    public class RateLimitTest {
    
        @Test
        public void testConcurrentRequests() throws InterruptedException {
            ExecutorService executor = Executors.newFixedThreadPool(20);
            CountDownLatch latch = new CountDownLatch(50);
            
            for (int i = 0; i < 50; i++) {
                final int requestId = i;
                executor.submit(() -> {
                    try {
                        // 模拟并发请求
                        System.out.println("请求 " + requestId + " 开始");
                        Thread.sleep(100);
                        System.out.println("请求 " + requestId + " 完成");
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    } finally {
                        latch.countDown();
                    }
                });
            }
            
            latch.await();
            executor.shutdown();
        }
    }
    

    使用示例

    1. 启动应用

    mvn spring-boot:run
    

    2. 测试接口

    聊天接口测试:

    curl -X POST "http://localhost:8080/api/ai/chat?userId=user123" \
         -H "Content-Type: application/json" \
         -d '{"message": "你好,我想了解产品信息"}'
    

    问答接口测试:

    curl -X POST "http://localhost:8080/api/ai/qa" \
         -H "Content-Type: application/json" \
         -d '{"question": "如何退换货?"}'
    

    知识库搜索测试:

    curl "http://localhost:8080/api/ai/knowledge/search?keyword=退款政策"
    

    关键特性说明

    • 多种限流算法: 演示了令牌桶、预热、排队等待等不同算法的使用场景
    • 热点参数限流: 基于用户ID进行个性化限流
    • 优雅降级: 提供友好的限流提示而非直接拒绝
    • 异步处理: 对于被限流的复杂请求提供异步处理选项
    • 监控友好: 集成Sentinel Dashboard进行实时监控

    这个案例展示了如何在实际的AI应用中合理使用Sentinel的各种限流算法,既保护了系统稳定性,也提供了良好的用户体验。

    到此这篇关于Spring AI框架结合Sentinel实现限流功能的文章就介绍到这了,更多相关Spring AI Sentinel限流内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜