从零开始设计基于SpringBoot的Serverless(本地函数计算)引擎
目录
- 前言
- 设计思路
- 核心特性
- 架构设计
- 核心实现
- 项目结构
- 函数接口定义
- 执行上下文
- 执行结果
- 函数指标统计
- 隔离类加载器
- 函数执行器
- 函数管理器
- 执行引擎
- HTTP触发器
- 定时触发器
- Serverless控制器
- 主启动类
- 配置文件
- 示例函数
- Hello World函数
- 用户服务函数
- 功能测试
- Maven配置
- 总结
- 核心特性
- 技术亮点
前言
最近突然冒出一个想法:能不能用SpringBoot自己实现一个类似AWS Lambda或阿里云函数计算的执行引擎?
说干就干,于是从零开始设计了一套基于SpringBoot的Serverless执行框架。
这套框架支持函数动态加载、按需执行、资源隔离,甚至还实现了简单的冷启动优化。
今天分享给大家,看看如何用SpringBoot的强大能力,打造一个属于自己的Serverless引擎。
设计思路
核心特性
我们要实现的Serverless引擎包含以下特性:
动态函数加载:支持运行时加载新的函数代码
函数隔离执行:每个函数在独立的上下文中运行
生命周期管理:自动管理函数的创建、执行和销毁
资源限制:控制函数的执行时间
函数调用:支持HTTP、定时器等多种触发方式
监控统计:记录函数执行次数、耗时、成功率等指标
架构设计
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Function API │ │ Event Trigger │ │ Management UI │└─────────┬───────┘ └─────────┬───────┘ └─────────┬───────┘ │ │ │ └──────────────────────┼──────────────────────┘ │ ┌────────────┴──────────────┐ │ Serverless Engine │ └────────────┬──────────────┘ │ ┌────────────────────────┼──────────────────────┐ │ │ │┌───────▼───────┐ ┌───────────▼─────────┐ ┌───────▼───────┐│ Function Pool │ │ Execution Manager │ │ Resource Pool │└───────────────┘ └─────────────────────┘ └───────────────┘
核心实现
项目结构
src/
├── main/│ ├── Java/│ │ └── com/│ │ └── example/│ │ ├── ServerlessEngine.java│ │ ├── core/│ │ │ ├── FunctionManager.java│ │ │ ├── ExecutionEngine.java│ │ │ ├── ResourceManager.java│ │ │ └── EventDispatcher.java│ │ ├── model/│ │ │ ├── ServerlessFunction.java│ │ │ ├── ExecutionContext.java│ │ │ ├── ExecutionResult.java│ │ │ └── FunctionMetrics.java│ │ ├── executor/│ │ │ ├── FunctionExecutor.java│ │ │ └── IsolatedClassLoader.java│ │ ├── trigger/│ │ │ ├── HttpTrigger.java│ │ │ ├── TimerTrigger.java│ │ │ └── EventTrigger.java│ │ ├── api/│ │ │ └── ServerlessController.java│ └── resources/│ ├── application.yml│ └── functions/│ ├── demo-function.jar│ └── user-function.jar
函数接口定义
package com.example.model; import java.util.Map; /** * Serverless函数接口 * 所有用户函数都需要实现这个接口 */ @FunctionalInterface public interface ServerlessFunction { /** * 函数执行入口 * @param input 输入参数 * @param context 执行上下文 * @return 执行结果 */ Object handle(Map<String, Object> input, ExecutionContext context) throws Exception; }
执行上下文
package com.example.model; import java.time.LocalDateTime; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 函数执行上下文 */ public class ExecutionContext { private String requestId; private String functionName; private String functionVersion; private LocalDateTime startTime; private long timeoutMs; private Map<String, Object> environment; private Map<String, Object> attributes; public ExecutionContext(String requestId, String functionName) { this.requestId = requestId; this.functionName = functionName; this.functionVersion = "1.0"; this.startTime = LocalDateTime.now(); this.timeoutMs = 30000; // 默认30秒超时 this.environment = new ConcurrentHashMap<>(); this.attributes = new ConcurrentHashMap<>(); } // 获取剩余执行时间 public long getRemainingTimeMs() { long elapsed = System.currentTimeMillis() - java.sql.Timestamp.valueOf(startTime).getTime(); return Math.max(0, timeoutMs - elapsed); } @Override public String toString() { return "ExecutionContext{" + "requestId='" + requestId + ''' + ", functionName='" + functionName + ''' + ", functionVersion='" + functionVersion + ''' + ", startTime=" + startTime + ", timeoutMs=" + timeoutMs + '}'; } }
执行结果
package com.example.model; import java.time.LocalDateTime; /** * 函数执行结果 */ public class ExecutionResult { private String requestId; private String functionName; private boolean success; private Object result; private String errorMessage; private String errorType; private LocalDateTime startTime; private LocalDateTime endTime; private long executionTime; public ExecutionResult(String requestId, String functionName) { this.requestId = requestId; this.functionName = functionName; this.startTime = LocalDateTime.now(); } // 标记执行成功 public void markSuccess(Object result) { this.success = true; this.result = result; this.endTime = LocalDateTime.now(); this.executionTime = calculateExecutionTime(); } // 标记执行失败 public void markFailure(String errorType, String errorMessage) { this.success = false; this.errorType = errorType; this.errorMessage = errorMessage; this.endTime = LocalDateTime.now(); this.executionTime = calculateExecutionTime(); } // 计算执行时间 private long calculateExecutionTime() { if (startTime != null && endTime != null) { return java.sql.Timestamp.valueOf(endTime).getTime() - java.sql.Timestamp.valueOf(startTime).getTime(); } return 0; } // Getter和Setter方法省略 @Override public String toString() { return "ExecutionResult{" + "requestId='" + requestId + ''' + ", functionName='" + functionName + ''' + ", success=" + success + ", executionTime=" + executionTime + '}'; } }
函数指标统计
package com.example.model; import java.time.LocalDateTime; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; /** * 函数执行指标 */ public class FunctionMetrics { private String functionName; private AtomicLong invocationCount = new AtomicLong(0); private AtomicLong successCount = new AtomicLong(0); private AtomicLong errorCount = new AtomicLong(0); private AtomicLong totalExecutionTime = new AtomicLong(0); private AtomicLong minExecutionTime = new AtomicLong(Long.MAX_VALUE); private AtomicLong maxExecutionTime = new AtomicLong(0); private AtomicReference<LocalDateTime> lastInvocation = new AtomicReference<>(); private AtomicReference<LocalDateTime> createTime = new AtomicReference<>(LocalDateTime.now()); public FunctionMetrics(String functionName) { this.functionName = functionName; } // 记录函数调用 public void recordInvocation(ExecutionResult result) { invocationCount.incrementAndGet(); lastInvocation.set(LocalDateTime.now()); if (result.isSuccess()) { successCount.incrementAndGet(); } else { errorCount.incrementAndGet(); } long executionTime = result.getExecutionTime(); totalExecutionTime.addAndGet(executionTime); // 更新最小执行时间 minExecutionTime.updateAndGet(current -> Math.min(current, executionTime)); // 更新最大执行时间 maxExecutionTime.updateAndGet(current -> Math.max(current, executionTime)); } // 获取平均执行时间 public double getAvgExecutionTime() { long count = invocationCount.get(); if (count == 0) { return 0.0; } return (double) totalExecutionTime.get() / count; } // 获取成功率 public double getSuccessRate() { long total = invocationCount.get(); if (total == 0) { return 0.0; } return (double) successCount.get() / total * 100; } // 获取错误率 public double getErrorRate() { return 100.0 - getSuccessRate(); } @Override public String toString() { return "FunctionMetrics{" + "functionName='" + functionName + ''' + ", invocationCount=" + invocationCount.get() + ", successCount=" + successCount.get() + ", errorCount=" + errorCount.get() + ", avgExecutionTime=" + String.format("%.2f", getAvgExecutionTime()) + ", successRate=" + String.format("%.2f", getSuccessRate()) + "%" + '}'; } }
隔离类加载器
package com.example.executor; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.net.URLClassLoader; import java.util.HashMap; import java.util.Map; /** * 隔离类加载器 * 为每个函数提供独立的类加载环境 */ public class IsolatedClassLoader extends URLClassLoader { private final String functionName; private final Map<String, Class<?>> loadedClasses = new HashMap<>(); private final ClassLoader parentClassLoader; public IsolatedClassLoader(String functionName, URL[] urls, ClassLoader parent) { super(urls, parent); this.functionName = functionName; this.parentClassLoader = parent; } @Override protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException { // 检查是否已经加载过 Class<?> loadedClass = loadedClasses.get(name); if (loadedClass != null) { return loadedClass; } // 对于Java系统类,使用父类加载器 if (name.startsWith("java.") || name.startsWith("javax.") || name.startsWith("sun.") || name.startsWith("com.sun.")) { return super.loadClass(name, resolve); } // 对于Spring相关类,使用父类加载器 if (name.startsWith("org.springframework.") || name.startsWith("org.apache.") || name.startsWith("com.fasterXML.")) { return super.loadClass(name, resolve); } try { // 尝试自己加载类 Class<?> clazz = findClass(name); loadedClasses.put(name, clazz); if (resolve) { resolveClass(clazz); } return clazz; } catch (ClassNotFoundException e) { // 如果找不到,使用父类加载器 return super.loadClass(name, resolve); } } @Override protected Class<?> findClass(String name) throws ClassNotFoundException { try { String path = name.replace('.', '/') + ".class"; InputStream is = getResourceAsStream(path); if (is == null) { throw new ClassNotFoundException(name); } byte[] classData = readClassData(is); return defineClass(name, classData, 0, classData.length); } catch (IOException e) { throw new ClassNotFoundException(name, e); } } private byte[] readClassData(InputStream is) throws IOException { ByteArrayOutputStream buffer = new ByteArrayOutputStream(); byte[] data = new byte[1024]; int bytesRead; while ((bytesRead = is.read(data)) != -1) { buffer.write(data, 0, bytesRead); } return buffer.toByteArray(); } public String getFunctionName() { return functionName; } public int getLoadedClassCount() { return loadedClasses.size(); } @Override public void close() throws IOException { loadedClasses.clear(); super.close(); } }
函数执行器
package com.example.executor; import com.example.model.ExecutionContext; import com.example.model.ExecutionResult; import com.example.model.ServerlessFunction; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.File; import java.lang.management.ManagementFactory; import java.lang.management.MemoryMXBean; import java.net.URL; import java.util.Map; import java.util.concurrent.*; /** * 函数执行器 * 负责在隔离环境中执行函数 */ @Component @Slf4j public class FunctionExecutor { @Autowired private ClassLoaderPool classLoaderPool; private final ExecutorService executorService; public FunctionExecutor() { // 创建线程池用于执行函数 this.executorService = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r); t.setName("function-executor-" + System.currentTimeMillis()); t.setDaemon(true); return t; }); } /** * 执行函数 */ public ExecutionResult execute(String functionName, String jarPath, String className, Map<String, Object> input, ExecutionContext context) { ExecutionResult result = new ExecutionResult(context.getRequestId(), functionName); Future<Object> future = executorService.submit(() -> { // 从池中获取ClassLoader(不需要每次创建) IsolatedClassLoader classLoader = classLoaderPool.getClassLoader( functionName, jarPath, className); // 加载函数类 Class<?> functionClass = classLoader.loadClass(className); Object functionInstance = functionClass.getDeclaredConstructor().newInstance(); // 检查是否实现了ServerlessFunction接口 if (!(functionInstance instanceof ServerlessFunction)) { throw new IllegalArgumentException( "Function class must implement ServerlessFunction interface"); } ServerlessFunction function = (ServerlessFunction) functionInstance; // 执行函数 return function.handle(input, context); }); try { // 等待执行结果,支持超时 Object functionResult = future.get(context.getTimeoutMs(), TimeUnit.MILLISECONDS); result.markSuccess(functionResult); } catch (TimeoutException e) { future.cancel(true); result.markFailure("TIMEOUT", "Function execution timeout"); } catch (ExecutionException e) { android Throwable cause = e.getCause(); log.error(cause.getMessage(),cause); result.markFailure( cause.getClass().getSimpleName(), cause.getMessage() ); } catch (Exception e) { result.markFailure( e.getClass().getSimpleName(), e.getMessage() ); } return result; } /** * 关闭执行器 */ public void shutdown() { executorService.shutdown(); try { if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { executorService.shutdownNow(); Thread.currentThread().interrupt(); } } }
函数管理器
package com.example.core; import com.example.model.FunctionMetrics; import org.springframework.stereotype.Component; import java.io.File; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * 函数管理器 * 负责函数的注册、查找、生命周期管理 */ @Component public class FunctionManager { // 函数注册表 private final Map<String, FunctionDefinition> functions = new ConcurrentHashMap<>(); // 函数指标 private final Map<String, FunctionMetrics> metrics = new ConcurrentHashMap<>(); /** * 函数定义 */ public static class FunctionDefinition { private String name; private String description; private String jarPath; private String className; private long timeoutMs; private Map<String, Object> environment; private Date createTime; private Date updateTime; public FunctionDefinition(String name, String jarPath, String className) { this.name = name; this.jarPath = jarPath; this.className = className; this.timeoutMs = 30000; // 默认30秒 this.environment = new HashMap<>(); this.createTime = new Date(); this.updateTime = new Date(); } // Getter和Setter方法 public String getName() { return name; } public void setName(String name) { this.name = name; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } public String getJarPath() { return jarPath; } public void setJarPath(String jarPath) { this.jarPath = jarPath; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public long getTimeoutMs() { return timeoutMs; } public void setTimeoutMs(long timeoutMs) { this.timeoutMs = timeoutMs; } public Map<String, Object> getEnvironment() { return environment; } public void setEnvironment(Map<String, Object> environment) { this.environment = environment; } public Date getCreateTime() { return createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } } /** * 注册函数 */ public void registerFunction(String name, String jarPath, String className) { // 验证jar文件是否存在 File jarFile = new File(jarPath); if (!jarFile.exists()) { throw new IllegalArgumentException("JAR file not found: " + jarPath); } FunctionDefinition definition = new FunctionDefinition(name, jarPath, className); functions.put(name, definition); // 初始化指标 metrics.put(name, new FunctionMetrics(name)); System.out.println("Function registered: " + name + " -> " + className); } /** * 注册函数(带配置) */ public void registerFunction(String name, String jarPath, String className, long timeoutMs, Map<String, Object> environment) { registerFunction(name, jarPath, className); FunctionDefinition definition = functions.get(name); definition.setTimeoutMs(timeoutMs); if (environment != null) { definition.setEnvironment(new HashMap<>(environment)); } } /** * 获取函数定义 */ public FunctionDefinition getFunction(String name) { return functions.get(name); } /** * 检查函数是否存在 */ public boolean functionExists(String name) { return functions.containsKey(name); } /** * 获取所有函数名称 */ public Set<String> getAllFunctionNames() { return new HashSet<>(functions.keySet()); } /** * 获取所有函数定义 */ public Collection<FunctionDefinition> getAllFunctions() { return new ArrayList<>(functions.values()); } /** * 更新函数 */ public void updateFunction(String name, String jarPath, String className) { if (!functionExists(name)) { throw new IllegalArgumentException("Function not found: " + name); } FunctionDefinition definition = functions.get(name); definition.setJarPath(jarPath); definition.setClassName(className); definition.setUpdateTime(new Date()); System.out.println("Function updated: " + name); } /** * 删除函数 */ public void removeFunction(String name) { if (functions.remove(name) != null) { metrics.remove(name); System.out.println("Function removed: " + name); } } /** * 获取函数指标 */ public FunctionMetrics getFunctionMetrics(String name) { return metrics.get(name); } /** * 获取所有函数指标 */ public Collection<FunctionMetrics> getAllMetrics() { return new ArrayList<>(metrics.values()); } /** * 清理所有函数 */ public void clear() { functions.clear(); metrics.clear(); } /** * 获取函数数量 */ public int getFunctionCount() { return functions.size(); } }
执行引擎
package com.example; import cn.hutool.core.io.FileUtil; import com.example.core.FunctionManager; import com.example.trigger.TimerTrigger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; import java.util.HashMap; import java.util.Map; /** * Serverless引擎启动类 */ @SpringBootApplication @EnableScheduling public class ServerlessEngine implements CommandLineRunner { @Autowired private FunctionManager functionManager; @Autowired private TimerTrigger timerTrigger; public static void main(String[] args) { FileUtil.writeBytes("123".getBytes(),"functions/function.txt"); SpringApplication.run(ServerlessEngine.class, args); } @Override public void run(String... args) throws Exception { System.out.println("=== Serverless Engine Started ==="); // 注册示例函数 registerDemoFunctions(); // 注册示例定时任务 registerDemoTimerTasks(); System.out.println("=== Demo Functions and Tasks Registered ==="); System.out.println("API available at: http://localhost:8080/serverless"); } /** * 注册演示函数 */ private void registerDemoFunctions() { // 注册Hello World函数 functionManager.registerFunction( "hello-world", "functions/demo-function.jar", "com.example.functions.HelloWorldFunction" ); // 注册用户服务函数 Map<String, Object> userEnv = new HashMap<>(); userEnv.put("DB_URL", "jdbc:h2:mem:testdb"); userEnv.put("MAX_USERS", "1000"); functionManager.registerFunction( "user-service", "functions/user-function.jar", "com.example.functions.UserServiceFunction", 60000, // 60秒超时 userEnv ); } /** * 注册演示定时任务 */ private void registerDemoTimerTasks() { // 注册清理任务 timerTrigger.registerTimerTask( "cleanup-task", "user-service", "0 0 2 * * ?" // 每天凌晨2点执行 ); // 注册健康检查任务 timerTrigger.registerTimerTask( "health-check", "hello-world", "0/10 * * * * ?" // 每10秒执行一次 ); } }
HTTP触发器
package com.example.trigger; import com.example.core.ExecutionEngine; import com.example.model.ExecutionResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.servlet.http.HttpServletRequest; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; /** * HTTP触发器 * 处理HTTP请求触发的函数调用 */ @Component public class HttpTrigger { @Autowired private ExecutionEngine executionEngine; /** * 处理HTTP请求 */ public ExecutionResult handleRequest(String functionName, HttpServletRequest request, Map<String, Object> body) { // 构建输入参数 Map<String, Object> input = new HashMap<>(); // 添加HTTP相关信息 Map<String, Object> httpInfo = new HashMap<>(); httpInfo.put("method", request.getMethod()); httpInfo.put("path", request.getRequestURI()); httpInfo.put("queryString", request.getQueryString()); httpInfo.put("remoteAddr", request.getRemoteAddr()); httpInfo.put("userAgent", request.getHeader("User-Agent")); // 添加请求头 Map<String, String> headers = new HashMap<>(); Enumeration<String> headerNames = request.getHeaderNames(); if (headerNames != null) { while (headerNames.hasMoreElements()) { String headerName = headerNames.nextElement(); headers.put(headerName, request.getHeader(headerName)); } } httpInfo.put("headers", headers); // 添加查询参数 Map<String, String[]> queryParams = request.getParameterMap(); Map<String, Object> params = new HashMap<>(); queryParams.forEach((key, values) -> { if (values.length == 1) { params.put(key, values[0]); } else { params.put(key, values); } }); httpInfo.put("queryParams", params); input.put("http", httpInfo); // 添加请求体 if (body != null) { input.put("body", body); } // 调用函数 return executionEngine.invoke(functionName, input); } /** * 简化的GET请求处理 */ public ExecutionResult handleGetRequest(String functionName, HttpServletRequest request) { return handleRequest(functionName, request, null); } /** * 简化的POST请求处理 */ public ExecutionResult handlePostRequest(String functionName, HttpServletRequest request, Map<String, Object> body) { return handleRequest(functionName, request, body); } }
定时触发器
package com.example.trigger; import com.example.core.ExecutionEngine; import com.example.model.ExecutionResult; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 定时触发器 * 支持cron表达式定时触发函数 */ @Component public class TimerTrigger { @Autowired private ExecutionEngine executionEngine; // 定时任务注册表 private final Map<String, TimerTask> timerTasks = new ConcurrentHashMap<>(); /** * 定时任务定义 */ public static class TimerTask { private String name; private String functionName; private String cronExpression; private boolean enabled; private LocalDateTime lastExecution; private LocalDateTime nextExecution; private long executionCount; public TimerTask(String name, String functionName, String cronExpression) { this.name = name; this.functionName = functionName; this.cronExpression = cronExpression; this.enabled = true; this.executionCount = 0; } // Getter和Setter方法 public String getName() { return name; } public String getFunctionName() { return functionName; } public String getCronExpression() { return cronExpression; } public boolean isEnabled() { return enabled; } public void setEnabled(boolean enabled) { this.enabled = enabled; } public LocalDateTime getLastExecution() { return lastExecution; } public void setLastExecution(LocalDateTime lastExecution) { this.lastExecution = lastExecution; } public LocalDateTime getNextExecution() { return nextExecution; } public void setNextExecution(LocalDateTime nextExecution) { this.nextExecution = nextExecution; } public long getExecutionCount() { return executionCount; } public void incrementExecutionCount() { this.executionCount++; } } /** * 注册定时任务 */ public void registerTimerTask(String taskName, String functionName, String cronExpression) { TimerTask task = new TimerTask(taskName, functionName, cronExpression); timerTasks.put(taskName, task); System.out.println("Timer task registered: " + taskName + " -> " + functionName + " ("YFiyGAnPM + cronExpression + ")"); } /** * 移除定时任务 */ public void removeTimerTask(String taskName) { if (timerTasks.remove(taskName) != null) { System.out.println("Timer task removed: " + taskName); } } /** * 启用/禁用定时任务 */ public void setTimerTaskEnabled(String taskName, boolean enabled) { TimerTask task = timerTasks.get(taskName); if (task != null) { task.setEnabled(enabled); System.out.println("Timer task " + taskName + " " + (enabled ? "enabled" : "disabled")); } } /** * 获取所有定时任务 */ public Map<String, TimerTask> getAllTimerTasks() { return new HashMap<>(timerTasks); } /** * 手动执行定时任务 */ public ExecutionResult executeTimerTask(String taskName) { TimerTask task = timerTasks.get(taskName); if (task == null) { throw new IllegalArgumentException("Timer task not found: " + taskName); } return executeTask(task); } /** * 定时执行 - 每分钟检查一次 */ @Scheduled(fixedRate = 60000) // 每分钟执行一次 public void checkAndExecuteTimerTasks() { LocalDateTime now = LocalDateTime.now(); timerTasks.values().stream() .filter(TimerTask::isEnabled) .forEach(task -> { // 这里简化处理,实际应该解析cron表达式 // 为了演示,我们每5分钟执行一次 if (task.getLastExecution() == null || task.getLastExecution().isBefore(now.minusMinutes(5))) { executeTask(task); } }); } /** * 执行定时任务 */ private ExecutionResult executeTask(TimerTask task) { // 构建输入参数 Map<String, Object> input = new HashMap<>(); Map<String, Object> timerInfo = new HashMap<>(); timerInfo.put("taskName", task.getName()); timerInfo.put("cronExpression", task.getCronExpression()); timerInfo.put("executionTime", LocalDateTime.now().toString()); timerInfo.put("executionCount", task.getExecutionCount()); input.put("timer", timerInfo); // 执行函数 ExecutionResult result = executionEngine.invoke(task.getFunctionName(), input); // 更新任务信息 task.setLastExecution(LocalDateTime.now()); task.incrementExecutionCount(); System.out.println("Timer task executed: " + task.getName() + " -> " + task.getFunctionName() + ", success: " + result.isSuccess()); return result; } }
Serverless控制器
package com.example.api; import com.example.core.ExecutionEngine; import com.example.core.FunctionManager; import com.example.model.ExecutionResult; import com.example.model.FunctionMetrics; import com.example.trigger.HttpTrigger; import com.example.trigger.TimerTrigger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Set; /** * Serverless API控制器 */ @RestController @RequestMapping("/serverless") public class ServerlessController { @Autowired private FunctionManager functionManager; @Autowired private ExecutionEngine executionEngine; @Autowired private HttpTrigger httpTrigger; @Autowired private TimerTrigger timerTrigger; /** * 调用函数 */ @PostMapping("/functions/{functionName}/invoke") public ResponseEntity<Map<String, Object>> invokeFunction( @PathVariable String functionName, @RequestBody(required = false) Map<String, Object> input, HttpServletRequest request) { ExecutionResult result = httpTrigger.handlePostRequest(functionName, request, input); Map<String, Object> response = new HashMap<>(); response.put("requestId", result.getRequestId()); response.put("functionName", result.getFunctionName()); response.put("success", result.isSuccess()); response.put("executionTime", result.getExecutionTime()); response.put("memoryUsedhttp://www.devze.com", result.getMemoryUsed()); if (result.isSuccess()) { response.put("result", result.getResult()); } else { response.put("errorType", result.getErrorType()); response.put("errorMessage", result.getErrorMessage()); } return ResponseEntity.ok(response); } /** * GET方式调用函数 */ @GetMapping("/functions/{functionName}/invoke") public ResponseEntity<Map<String, Object>> invokeFunctionGet( @PathVariable String functionName, HttpServletRequest request) { ExecutionResult result = httpTrigger.handleGetRequest(functionName, request); Map<String, Object> response = new HashMap<>(); response.put("requestId编程客栈", result.getRequestId()); response.put("functionName", result.getFunctionName()); response.put("success", result.isSuccess()); response.put("executionTime", result.getExecutionTime()); if (result.isSuccess()) { response.put("result", result.getResult()); } else { response.put("errorType", result.getErrorType()); response.put("errorMessage", result.getErrorMessage()); } return ResponseEntity.ok(response); } /** * 注册函数 */ @PostMapping("/functions/{functionName}") public ResponseEntity<Map<String, String>> registerFunction( @PathVariable String functionName, @RequestBody Map<String, Object> config) { String jarPath = (String) config.get("jarPath"); String className = (String) config.get("className"); Long timeoutMs = config.containsKey("timeoutMs") ? ((Number) config.get("timeoutMs")).longValue() : 30000L; Long maxMemory = config.containsKey("maxMemory") ? ((Number) config.get("maxMemory")).longValue() : 128 * 1024 * 1024L; @SuppressWarnings("unchecked") Map<String, Object> environment = (Map<String, Object>) config.get("environment"); functionManager.registerFunction(functionName, jarPath, className, timeoutMs, maxMemory, environment); Map<String, String> response = new HashMap<>(); response.put("message", "Function registered successfully"); response.put("functionName", functionName); return ResponseEntity.ok(response); } /** * 获取所有函数列表 */ @GetMapping("/functions") public ResponseEntity<Map<String, Object>> getAllFunctions() { Collection<FunctionManager.FunctionDefinition> functions = functionManager.getAllFunctions(); Map<String, Object> response = new HashMap<>(); response.put("functions", functions); response.put("count", functions.size()); return ResponseEntity.ok(response); } /** * 获取函数详情 */ @GetMapping("/functions/{functionName}") public ResponseEntity<FunctionManager.FunctionDefinition> getFunctionDetail( @PathVariable String functionName) { FunctionManager.FunctionDefinition function = functionManager.getFunction(functionName); if (function == null) { return ResponseEntity.notFound().build(); } return ResponseEntity.ok(function); } /** * 删除函数 */ @DeleteMapping("/functions/{functionName}") public ResponseEntity<Map<String, String>> deleteFunction(@PathVariable String functionName) { functionManager.removeFunction(functionName); Map<String, String> response = new HashMap<>(); response.put("message", "Function deleted successfully"); response.put("functionName", functionName); return ResponseEntity.ok(response); } /** * 获取函数指标 */ @GetMapping("/functions/{functionName}/metrics") public ResponseEntity<FunctionMetrics> getFunctionMetrics(@PathVariable String functionName) { FunctionMetrics metrics = functionManager.getFunctionMetrics(functionName); if (metrics == null) { return ResponseEntity.notFound().build(); } return ResponseEntity.ok(metrics); } /** * 获取所有函数指标 */ @GetMapping("/metrics") public ResponseEntity<Map<String, Object>> getAllMetrics() { Collection<FunctionMetrics> metrics = functionManager.getAllMetrics(); Map<String, Object> response = new HashMap<>(); response.put("metrics", metrics); response.put("count", metrics.size()); return ResponseEntity.ok(response); } /** * 注册定时任务 */ @PostMapping("/timer-tasks/{taskName}") public ResponseEntity<Map<String, String>> registerTimerTask( @PathVariable String taskName, @RequestBody Map<String, String> config) { String functionName = config.get("functionName"); String cronExpression = config.get("cronExpression"); timerTrigger.registerTimerTask(taskName, functionName, cronExpression); Map<String, String> response = new HashMap<>(); response.put("message", "Timer task registered successfully"); response.put("taskName", taskName); return ResponseEntity.ok(response); } /** * 获取所有定时任务 */ @GetMapping("/timer-tasks") public ResponseEntity<Map<String, Object>> getAllTimerTasks() { Map<String, TimerTrigger.TimerTask> tasks = timerTrigger.getAllTimerTasks(); Map<String, Object> response = new HashMap<>(); response.put("tasks", tasks); response.put("count", tasks.size()); return ResponseEntity.ok(response); } /** * 手动执行定时任务 */ @PostMapping("/timer-tasks/{taskName}/execute") public ResponseEntity<Map<String, Object>> executeTimerTask(@PathVariable String taskName) { ExecutionResult result = timerTrigger.executeTimerTask(taskName); Map<String, Object> response = new HashMap<>(); response.put("requestId", result.getRequestId()); response.put("success", result.isSuccess()); response.put("executionTime", result.getExecutionTime()); if (result.isSuccess()) { response.put("result", result.getResult()); } else { response.put("errorType", result.getErrorType()); response.put("errorMessage", result.getErrorMessage()); } return ResponseEntity.ok(response); } /** * 系统状态 */ @GetMapping("/status") public ResponseEntity<Map<String, Object>> getSystemStatus() { Map<String, Object> status = new HashMap<>(); // 系统信息 Runtime runtime = Runtime.getRuntime(); status.put("totalMemory", runtime.totalMemory()); status.put("freeMemory", runtime.freeMemory()); status.put("usedMemory", runtime.totalMemory() - runtime.freeMemory()); status.put("maxMemory", runtime.maxMemory()); status.put("availableProcessors", runtime.availableProcessors()); // 函数统计 status.put("functionCount", functionManager.getFunctionCount()); status.put("timerTaskCount", timerTrigger.getAllTimerTasks().size()); // 总执行次数 long totalInvocations = functionManager.getAllMetrics().stream() .mapToLong(FunctionMetrics::getInvocationCount) .sum(); status.put("totalInvocations", totalInvocations); return ResponseEntity.ok(status); } }
主启动类
package com.example; import cn.hutool.core.io.FileUtil; import com.example.core.FunctionManager; import com.example.trigger.TimerTrigger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; import java.util.HashMap; import java.util.Map; /** * Serverless引擎启动类 */ @SpringBootApplication @EnableScheduling public class ServerlessEngine implements CommandLineRunner { @Autowired private FunctionManager functionManager; @Autowired private TimerTrigger timerTrigger; public static void main(String[] args) { FileUtil.writeBytes("123".getBytes(),"functions/function.txt"); SpringApplication.run(ServerlessEngine.class, args); } @Override public void run(String... args) throws Exception { System.out.println("=== Serverless Engine Started ==="); // 注册示例函数 registerDemoFunctions(); // 注册示例定时任务 registerDemoTimerTasks(); System.out.println("=== Demo Functions and Tasks Registered ==="); System.out.println("API available at: http://localhost:8080/serverless"); } /** * 注册演示函数 */ private void registerDemoFunctions() { // 注册Hello World函数 functionManager.registerFunction( "hello-world", "functions/demo-function.jar", "com.example.functions.HelloWorldFunction" ); // 注册用户服务函数 Map<String, Object> userEnv = new HashMap<>(); userEnv.put("DB_URL", "jdbc:h2:mem:testdb"); userEnv.put("MAX_USERS", "1000"); functionManager.registerFunction( "user-service", "functions/user-function.jar", "com.example.functions.UserServiceFunction", 60000, // 60秒超时 userEnv ); } /** * 注册演示定时任务 */ private void registerDemoTimerTasks() { // 注册清理任务 timerTrigger.registerTimerTask( "cleanup-task", "user-service", "0 0 2 * * ?" // 每天凌晨2点执行 ); // 注册健康检查任务 timerTrigger.registerTimerTask( "health-check", "hello-world", "0/10 * * * * ?" // 每10秒执行一次 ); } }
配置文件
# application.yml server: port: 8080 spring: application: name: serverless-engine jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null # Serverless引擎配置 serverless: function: # 函数存储目录 function-dir: ./functions/ # 默认超时时间(毫秒) default-timeout: 30000 # 最大并发执行数 max-concurrent-executions: 100 executor: # 核心线程数 core-pool-size: 10 # 最大线程数 max-pool-size: 50 # 线程存活时间(秒) keep-alive-time: 60 # 队列容量 queue-capacity: 1000 logging: level: com.example: DEBUG pattern: console: "%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n" management: endpoints: web: exposure: include: health,info,metrics,env endpoint: health: show-details: always
示例函数
Hello World函数
package com.example.functions; import com.example.model.ExecutionContext; import com.example.model.ServerlessFunction; import java.time.LocalDateTime; import java.util.HashMap; import java.util.Map; /** * Hello World示例函数 */ public class HelloWorldFunction implements ServerlessFunction { @Override public Object handle(Map<String, Object> input, ExecutionContext context) throws Exception { Map<String, Object> result = new HashMap<>(); result.put("message", "Hello from Serverless Engine!"); result.put("timestamp", LocalDateTime.now().toString()); result.put("requestId", context.getRequestId()); result.put("functionName", context.getFunctionName()); result.put("input", input); // 模拟一些处理时间 Thread.sleep(100); return result; } }
用户服务函数
package com.example.functions; import com.example.model.ExecutionContext; import com.example.model.ServerlessFunction; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.uhttp://www.devze.comtil.concurrent.atomic.AtomicLong; /** * 用户服务示例函数 */ public class UserServiceFunction implements ServerlessFunction { // 模拟用户存储 private static final Map<Long, Map<String, Object>> users = new ConcurrentHashMap<>(); private static final AtomicLong idGenerator = new AtomicLong(1); static { // 初始化一些测试数据 Map<String, Object> user1 = new HashMap<>(); user1.put("id", 1L); user1.put("name", "John Doe"); user1.put("email", "john@example.com"); users.put(1L, user1); Map<String, Object> user2 = new HashMap<>(); user2.put("id", 2L); user2.put("name", "Jane Smith"); user2.put("email", "jane@example.com"); users.put(2L, user2); idGenerator.set(3); } @Override public Object handle(Map<String, Object> input, ExecutionContext context) throws Exception { String action = (String) ((Map)input.get("body")).get("action"); if (action == null) { action = "list"; } Map<String, Object> result = new HashMap<>(); switch (action.toLowerCase()) { case "list": result.put("users", users.values()); result.put("count", users.size()); break; case "get": Long userId = Long.valueOf(input.get("userId").toString()); Map<String, Object> user = users.get(userId); if (user != null) { result.put("user", user); } else { result.put("error", "User not found"); } break; case "create": @SuppressWarnings("unchecked") Map<String, Object> userData = (Map<String, Object>) ((Map)input.get("body")).get("user"); Long newId = idGenerator.getAndIncrement(); userData.put("id", newId); users.put(newId, userData); result.put("user", userData); result.put("message", "User created successfully"); break; case "delete": Long deleteId = Long.valueOf(input.get("userId").toString()); Map<String, Object> deletedUser = users.remove(deleteId); if (deletedUser != null) { result.put("message", "User deleted successfully"); } else { result.put("error", "User not found"); } break; default: result.put("error", "Unknown action: " + action); } result.put("action", action); result.put("timestamp", System.currentTimeMillis()); return result; } }
功能测试
#!/bin/bash # test-serverless-engine.sh BASE_URL="http://localhost:8080/serverless" echo "=== Testing Serverless Engine ===" # 1. 获取系统状态 echo "1. Getting system status..." curl -s "${BASE_URL}/status" | jq '.' echo # 2. 获取所有函数 echo "2. Getting all functions..." curl -s "${BASE_URL}/functions" | jq '.' echo # 3. 调用Hello World函数 echo "3. Invoking hello-world function..." curl -s -X POST "${BASE_URL}/functions/hello-world/invoke" \ -H "Content-Type: application/json" \ -d '{"name": "Serverless Test"}' | jq '.' echo # 4. 调用用户服务函数 - 列出用户 echo "4. Invoking user-service function - list users..." curl -s -X POST "${BASE_URL}/functions/user-service/invoke" \ -H "Content-Type: application/json" \ -d '{"action": "list"}' | jq '.' echo # 5. 调用用户服务函数 - 创建用户 echo "5. Invoking user-service function - create user..." curl -s -X POST "${BASE_URL}/functions/user-service/invoke" \ -H "Content-Type: application/json" \ -d '{ "action": "create", "user": { "name": "Bob Wilson", "email": "bob@example.com" } }' | jq '.' echo # 6. 获取函数指标 echo "6. Getting function metrics..." curl -s "${BASE_URL}/metrics" | jq '.' echo # 7. 获取定时任务 echo "7. Getting timer tasks..." curl -s "${BASE_URL}/timer-tasks" | jq '.' echo echo "=== Test Completed ==="
Maven配置
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>serverless-engine</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>SpringBoot Serverless Engine</name> <description>A serverless execution engine built with SpringBoot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.18</version> <relativePath/> </parent> <properties> <java.version>11</java.version> </properties> <dependencies> <!-- Spring Boot Starter Web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- Spring Boot Starter Actuator --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!-- Jackson for JSON processing --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> </dependencies> </project>
总结
通过SpringBoot,我们成功实现了一个功能完整的Serverless执行引擎。这个引擎具备了以下核心能力:
核心特性
函数隔离:每个函数在独立的类加载器中运行
生命周期管理:自动管理函数的创建、执行和销毁
多种触发方式:支持HTTP和定时器触发
监控统计:完整的执行指标和性能统计
RESTful API:完整的管理和调用接口
技术亮点
动态类加载:使用自定义ClassLoader实现函数隔离
异步执行:基于线程池的并发执行机制
资源控制:支持超时和内存限制
指标收集:实时统计函数执行情况
这套自研的Serverless引擎展示了SpringBoot强大的扩展能力,不仅能快速构建业务应用,还能打造底层基础设施。
以上就是从零开始设计基于SpringBoot的Serverless(本地函数计算)引擎的详细内容,更多关于SpringBoot Serverless引擎的资料请关注编程客栈(www.devze.com)其它相关文章!
精彩评论