开发者

SpringBoot+RustFS 实现文件切片极速上传的实例代码

目录
  • 一、为什么选择 RustFS + SpringBoot?
  • 二、环境准备与部署
    • 2.1 安装 RustFS
    • 2.2 SpringBoot 项目配置
  • 三、核心代码实现
    • 3.1 配置 RustFS 客户端
    • 3.2 文件切片上传服务
    • 3.3 控制器实现
  • 四、前端实现关键代码
    • 4.1 文件切片处理
  • 五、高级功能与优化
    • 5.1 断点续传实现
    • 5.2 分片验证与安全
  • 六、部署与性能优化
    • 6.1 系统级优化建议
    • 6.2 监控与告警
  • 七、总结

    本文将手把手教你如何通过 SpringBoot 和 RustFS 构建高性能文件切片上传系统,解决大文件传输的痛点,实现秒传、断点续传和分片上传等高级功能。

    一、为什么选择 RustFS + SpringBoot?

    在传统文件上传方案中,大文件传输面临诸多挑战:网络传输不稳定、服务器内存溢出、上传失败需重新传输等。而 ​RustFS​ 作为一款基于 Rust 语言开发的高性能分布式对象存储系统,具有以下突出优势:

    • 高性能​:充分利用 Rust 的内存安全和高并发特性,响应速度极快
    • 分布式架构​:可扩展且具备容错能力,适用于海量数据存储
    • AWS S3 兼容性​:支持标准 S3 API,可与现有生态无缝集成
    • 可视化管理​:内置功能丰富的 Web 控制台,管理更方便
    • 开源友好​:采用 Apache 2.0 协议,鼓励社区贡献

    结合 SpringBoot 的快速开发特性,我们可以轻松构建企业级文件上传解决方案。

    二、环境准备与部署

    2.1 安装 RustFS

    使用 docker 快速部署 RustFS:

    # docker-compose.yml
    version: '3.8'
    services:
      rustfs:
        image: registry.cn-shanghai.aliyuncs.com/study-03/rustfs:latest
        container_name: rustfs
        ports:
          - "9000:9000"  # 管理控制台
          - "9090:9090"  # API服务端口
        volumes:
          - ./data:/data
        environment:
          - RUSTFS_ROOT_USER=admin
          - RUSTFS_ROOT_PASSWORD=admin123
        restart: unless-stopped

    运行 docker-compose up -d即可启动服务。访问 http://localhost:9000使用 admin/admin123 登录管理控制台。

    2.2 SpringBoowww.devze.comt 项目配置

    pom.XML中添加必要依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>io.minio</groupId>
            <artifactId>minio</artifactId>
            <version>8.5.2</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.11.0</version>
        </dependency>
    </dependencies>

    配置 application.yml:

    rustfs:
      endpoint: http://localhost:9090
      Access-key: admin
      secret-key: admin123
      bucket-name: mybucket

    三、核心代码实现

    3.1 配置 RustFS 客户端

    @Configuration
    @ConfigurationProperties(prefix = "rustfs")
    public class RustFSConfig {
        private String endpoint;
        private String accessKey;
        private String secretKey;
        private String bucketName;
    
        @Bean
        public MinioClient rustFSClient() {
            return MinioClient.builder()
                    .endpoint(endpoint)
                    .credentials(accessKey, secretKey)
                    .build();
        }
    }

    3.2 文件切片上传服务

    @Service
    @Slf4j
    public class FileUploadService {
        @Autowired
        private MinioClient rustFSClient;
        
        @Value("${rustfs.bucket-name}")
        private String bucketName;
        
        /**
         * 初始化分片上传
         */
        public String initUpload(String fileName, String fileMd5) {
            String uploadId = UUID.randomUUID().toString();
            // 检查文件是否已存在(秒传实现)
            if (checkFileExists(fileMd5)) {
                throw new RuntimeException("文件已存在,可直接秒传");
            }
            // 存储上传记录到Redis或数据库
            redisTemplate.opsForValue().set("upload:" + fileMd5, uploadId);
            return uploadId;
        }
        
        /**
         * 上传文件分片
         */
        public void uploadChunk(MultipartFile chunk, String fileMd5, 
                               int chunkIndex, int totalChunks) {
            try {
                // 生成分片唯一名称
                String chunkName = fileMd5 + "_chunk_" + chunkIndex;
                
                // 上传分片到RustFS
                rustFSClient.putObject(
                    PutObjectArgs.builder()
                        .bucket(bucketName)
                        .object(chunkName)
                        .stream(chunk.getInputStream(), chunk.getSize(), -1)
                        .build()
                );
                
                // 记录已上传分片
                redisTemplate.opsForSet().add("chunks:" + fileMd5, chunkIndex);
                
            } catch (Exception e) {
                log.error("分片上传失败", e);
                throw new RuntimeException("分片上传失败");
            }
        }
        
        /**
         * 合并文件分片
         */
        public void mergeChunks(String fileMd5, String fileName, int totalChunks) {
            try {
                // 创建临时文件
                Path tempFile = Files.createTempFile("merge_", ".tmp");
                
                try (FileOutputStream fos = new FileOutputStream(tempFile.toFile())) {
                    // 按顺序下载并合并所有分片
                    for (int i = 0; i < totalChunks; i++) {
                        String chunkName = fileMd5 + "_chunk_" + i;
                        
                        try (InputStream is = rustFSClient.getObject(
                            GetObjectArgs.builder()
                                .bucket(bucketName)
                                .object(chunkName)
                                .build()
                        )) {
                            IOUtils.copy(is, fos);
                        }
                        
                        // 删除已合并的分片
                        rustFSClient.removeObject(
                            RemoveObjectArgs.builder()
                                .bucket(bucketName)
                                .object(chunkName)
                                .build()
                        );
                    }
                }
                
                // 上传最终文件
                rustFSClient.uploadObject(
                    UploadObjectArgs.builder()
                        .bucket(bucketName)
                        .object(fileName)
                        .filename(tempFile.toString())
                        .build()
                );
                
                // 清理临时文件
                Files.deleteIfExists(tempFile);
                
                // 更新文件记录
                saveFileRecord(fileMd5, fileName);
                
            } catch (Exception e) {
                log.error("分片合并失败", e);
                throw new RuntimeException("分片合并失败");
            }
        }
        
        /**
         * 检查文件是否存在(秒传功能)
         */
        private boolean checkFileExists(String fileMd5) {
            // 查询数据库或Redis检查文件是否已存在
            return redisTemplate.hasKey("file:" + fileMd5);
        }
    }

    3.3 控制器实现

    @RestController
    @RequestMappi编程ng("/api/upload")
    @Slf4j
    public class FileUploadController {
        @Autowired
        private FileUploadService fileUploadService;
        
        /**
         * 初始化上传
         */
        @PostMapping("/init")
        public ResponseEntity<?> initUpload(@RequestParam String fileName,
                                          @RequestParam String fileMd5) {
            try {
                String uploadId = fileUploadService.initUpload(fileName, fileMd5);
                return ResponseEntity.ok(Map.of("uploadId", uploadId));
            } catch (RuntimeException e) {
                return ResponseEntity.ok(Map.of("exists", true)); // 文www.devze.com件已存在
            }
        }
        
        /**
         * 上传分片
         */
        @PostMapping("/chunk")
        public ResponseEntity<String> uploadChunk(@RequestParam MultipartFile chunk,
                                                 @RequestParam String fileMd5,
                                                 @RequestParam int chunkIndex,
                                                 @RequestParam int totalChunks) {
            fileUploadService.uploadChunk(chunk, fileMd5, chunkIndex, totalChunks);
            return ResponseEntity.ok("分片上传成功");
        }
        
        /**
         * 合并分片
         */
        @PostMapping("/merge")
        public ResponseEntity<String> mergeChunks(@RequestParam String fileMd5,
                                                 @RequestParam String fileName,
                                                 @RequestParam int totalChunks) {
            fileUploadService.mergeChunks(fileMd5, fileName, totalChunks);
            return ResponseEntity.ok("文件合并成功编程客栈");
        }
        
        /**
         * 获取已上传分片列表(断点续传)
         */
        @GetMapping("/chunks/{fileMd5}")
        public ResponseEntity<List<Integer>> getUploadedChunks(@PathVariable String fileMd5) {
            Set<Object> uploaded = redisTemplate.opsForSet().members("chunks:" + fileMd5);
            List<Integer> chunks = uploaded.stream()
                    .map(obj -> Integer.parseInt(obj.toString()))
                    .collect(Collectors.toList());
            return ResponseEntity.ok(chunks);
        }
    }

    四、前端实现关键代码

    4.1 文件切片处理

    class FileUploader {
        constructor() {
            this.chunkSize = 5 * 1024 * 1024; // 5MB分片大小
            this.concurrentLimit = 3; // 并发上传数
        }
        
        // 计算文件MD5(秒传功能)
        async calculateFileMD5(file) {
            return new Promise((resolve) => {
                const reader = new FileReader();
                const spark = new SparkMD5.ArrayBuffer();
                
                reader.onload = e => {
                    spark.append(e.target.result);
                    resolve(spark.end());
                };
                
                reader.readAsArrayBuffer(file);
            });
        }
        
        // 切片上传
        async uploadFile(file) {
            // 计算文件MD5
            const fileMd5 = await this.calculateFileMD5(file);
            
            // 初始化上传
            const initResponse = await fetch('/api/upload/init', {
                method: 'POST',
                body: jsON.stringify({
                    fileName: file.name,
                    fileMd5: fileMd5
                }),
                headers: {
                    'Content-Type': 'application/json'
                }
            });
            
            const initResult = await initResponse.json();
            
            // 如果文件已存在,直接返回
            if (initResult.exists) {
                alert('文件已存在,秒传成功!');
                return;
            }
            
            // 获取已上传分片(断点续传)
            const uploadedChunks = await this.getUploadedChunks(fileMd5);
            
            // 计算分片信息
            const totalChunks = Math.ceil(file.size / this.chunkSize);
            const uploadPromises = [];
            
            for (let i = 0; i < totalChunks; i++) {
                // 跳过已上传的分片
                if (uploadedChunks.includes(i)) {
                    continue;
                }
                
                const start = i * this.chunkSize;
                const end = Math.min(file.size, start + this.chunkSize);
                const chunk = file.slice(start, end);
                
                // 控制并发数
                if (uploadPromises.length >= this.concurrentLimit) {
                    await Promise.race(uploadPromises);
                }
                
                const uploadPromise = this.uploadChunk(chunk, fileMd5, i, totalChunks)
                    .finally(() => {
                        const index = uploadPromises.indexOf(uploadPromise);
                        if (index > -1) {
                            uploadPromises.splice(index, 1);
                        }
                    });
                
                uploadPromises.push(uploadPromise);
          yAfVZ  }
            
            // 等待所有分片上传完成
            await Promise.all(uploadPromises);
            
            // 合并分片
            await this.mergeChunks(fileMd5, file.name, totalChunks);
        }
        
        // 上传单个分片
        async uploadChunk(chunk, fileMd5, chunkIndex, totalChunks) {
            const formData = new FormData();
            formData.append('chunk', chunk);
            formData.append('fileMd5', fileMd5);
            formData.append('chunkIndex', chunkIndex);
            formData.append('totalChunks', totalChunks);
            
            const response = await fetch('/api/upload/chunk', {
                method: 'POST',
                body: formData
            });
            
            if (!response.ok) {
                throw new Error(`分片上传失败: ${response.statusText}`);
            }
        }
    }

    五、高级功能与优化

    5.1 断点续传实现

    通过记录已上传的分片信息,实现上传中断后从中断处继续上传:

    @Service
    public class UploadProgressService {
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
        
        /**
         * 获取已上传分片列表
         */
        public List<Integer> getUploadedChunks(String fileMd5) {
            Set<Object> uploaded = redisTemplate.opsForSet().members("chunks:" + fileMd5);
            return uploaded.stream()
                    .map(obj -> Integer.parseInt(obj.toString()))
                    .collect(Collectors.toList());
        }
        
        /**
         * 清理上传记录
         */
        public void clearUploadRecord(String fileMd5) {
            redisTemplate.delete("chunks:" + fileMd5);
            redisTemplate.delete("upload:" + fileMd5);
        }
    }

    5.2 分片验证与安全

    确保分片传输的完整性和安全性:

    /**
     * 分片验证服务
     */
    @Service
    public class ChunkValidationService {
        
        /**
         * 验证分片哈希
         */
        public boolean validateChunkHash(MultipartFile chunk, String expectedHash) {
            try {
                String actualHash = HMACUtils.hmacSha256Hex("secret-key", 
                    chunk.getBytes());
                return actualHash.equals(expectedHash);
            } catch (IOException e) {
                return false;
            }
        }
        
        /**
         * 验证分片顺序
         */
        public boolean validateChunkOrder(String fileMd5, int chunkIndex) {
            // 获取已上传分片
            Set<Object> uploaded = redisTemplate.opsForSet().members("chunks:" + fileMd5);
            List<Integer> chunks = uploaded.stream()
                    .map(obj -> Integer.parseInt(obj.toString()))
                    .sorted()
                    .collect(Collectors.toList());
            
            // 检查分片是否按顺序上传
            return chunks.isEmpty() || chunkIndex == chunks.size();
        }
    }

    六、部署与性能优化

    6.1 系统级优化建议

    分片大小选择

    • 内网环境:10MB-20MB
    • 移动网络:1MB-5MB
    • 广域网:500KB-1MB

    并发控制

    # 应用配置
    spring:
      servlet:
        multipart:
          max-file-size: 10MB
          max-request-size: 100MB

    定时清理策略

    @Scheduled(fixedRate = 24 * 60 * 60 * 1000) // 每日清理
    public void cleanTempFiles() {
        // 删除超过24小时的临时分片
        redisTemplate.keys("chunks:*").forEach(key -> {
            if (redisTemplate.getExpire(key) < 0) {
                redisTemplate.delete(key);
            }
        });
    }

    6.2 监控与告警

    集成 Prometheus 监控上传性能:

    # 监控指标配置
    management:
      endpoints:
        web:
          exposure:
            include: health,metrics,prometheus
      metrics:
        tags:
          application: ${spring.application.name}

    七、总结

    通过 SpringBoot 和 RustFS 的组合,我们实现了一个高性能的文件切片上传系统,具备以下优势:

    • 高性能​:利用 RustFS 的高并发特性和分片并行上传,大幅提升传输速度
    • 可靠性​:断点续传机制确保上传中断后从中断处继续,避免重复劳动
    • 智能优化​:秒传功能避免重复文件上传,节省带宽和存储空间
    • 易于扩展​:分布式架构支持水平扩展,适应不同规模的应用场景

    这种方案特别适用于:

    • 视频平台的大文件上传
    • 企业级文档管理系统
    • 云存储和备份服务
    • AI 训练数据集上传

    到此这篇关于SpringBoot+RustFS 实现文件切片极速上传的实例代码的文章就介绍到这了,更多相关SpringBoot RustFS 文件切片上传内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜