Springboot集成SpringBatch批处理组件
目录
- 1.Spring BATch 简介
- 2.批处理工具架构和示例
- 2.1 批处理任务持久化控制
- 2.2 实现一个读取器
- 2.3 定义批处理JOB
- 2.4数据实体
- 2.5 接口类
- 2.6 H2 配置
- 2.7 H2 数据库脚本
- 3.测试
1.Spring Batch 简介
Spring Batch 是 Spring 生态系统中的企业级批处理框架,专门设计用于处理大规模数据作业。它提供了批处理应用所需的核心功能,解决了传统批处理应用开发中的重复性问题,使开发人员能够专注于业务逻辑而非基础设施。
核心价值与定位
问题解决:自动化处理周期性的、数据密集型的任务(如报表生成、数据迁移、对账结算)
典型场景:每月财务报表生成银行日终批量交易处理电商平台每日用户行为分析百万级数据迁移(如旧系统到新系统)
2.批处理工具架构和示例
项 | 接口 |
---|---|
读 | ItemReader |
处理 | ItemProcessor |
写 | ItemWriter |
项目结构
依赖包
<?XML version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>SpringBatcher</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>21</maven.compiler.source> <maven.compiler.target>21</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <spring-boot.version>3.5.3</spring-boot.version> </properties> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>${spring-boot.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-Redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> android <version>1.18.38</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>easyexcel</artifactId> <version>4.0.3</version> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> <!-- 通常只需运行时依赖 --> </dependency> </dependencies> </project>
启动类
package org.example; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @EnableBatchProcessing @SpringBootApplication public class BatchApp { public static void main(String[] args) { SpringApplication.run(BatchApp.class, args); } }
2.1 批处理任务持久化控制
示例代码基于 H2 存储
package org.example.config; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import Javax.sql.DataSource; /** * @Author zhx && moon * @Since 21 * @Date 2025-06-20 PM 4:21 */ @Configuration public class BatchJobConfig { @Bean public JobRepository jobRepository(DataSource dataSource) throws Exception { JobRepositoryFactoryBean bean = new JobRepositoryFactoryBean(); bean.setDataSource(dataSource); bean.setDatabaseType("H2"); bean.setTransactionManager(new DataSourceTransactionManager(dataSource)); bean.afterPropertiesSet(); return bean.getObject(); } }
2.2 实现一个读取器
以 Excel 文件读取为例
package org.example.job.common; import com.alibaba.excel.EasyExcel; import org.springframework.batch.item.ItemReader; import org.springframework.beans.factory.InitializingBean; import java.io.File; import java.util.List; /** * @Author zhx && moon * @Since 21 * @Date 2025-06-24 PM 2:16 */ public class EasyExcelItemReader<T> implements ItemReader<T>, InitializingBean { private final Class<T> clazz; private final String filePath; private List<T> cacheList; private int index = 0; public EasyExcelItemReader(Class<T> clazz, String filePath) { this.clazz = clazz; this.filePath = filePath; } @Override public void afterPropertiesSet() { try { // 一次性读取Excel所有数据(适用于中小文件) cacheList = EasyExcel.read(new File(filePath)) .head(clazz) .sheet() .headRowNumber(1) // 跳过标题行 .doReadSync(); } catch (Exception e) { throw new RuntimeException("read excel failed ", e); } } @Override public T read() { if (index < cacheList.size()) { return cacheList.get(index++); } // 重置读取的位置 index = 0; return null; } }
2.3 定义批处理JOB
package org.example.job; import org.example.entity.User; import org.example.job.common.EasyExcelItemReader; import org.springframework.batch.core.*; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.launch.support.RunIdIncrementer; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.ItemWriter; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.springframework.transaction.PlatformTransactionManager; /** * @Author zhx && moon * @Since 21 * @Date 2025-06-24 PM 2:08 */ @Component public class SVCJob { /** * Excel 读取 * @return */ @Bean("easyExcelItemReader") public EasyExcelItemReader<User> easyExcelItemReader() { return new EasyExcelItemReader<>(User.class, "C:\\Users\\Administrator\\Desktop\\Test.xlsx"); } /** * 数据处理器 对读取的数据进行加工 * @return */ @Bean("getNameProcessors") public ItemProcessor<User, String> getNameProcessors() { return item -> { return item.getName(); }; } /** * 配置写入器(保持不变) * @return */ @Bean("nameWriter") public ItemWriter<String> nameWriter() { return items -> { for (String item : items) { System.out.println("User Name: " + item); } }; } /** * 配置批处理步骤(使用新版API) * @param jobRepository * @param transactionManager * @param reader * @param processor * @param writer * @return */ @Bean("easyExcelStep") public Step easyExcelStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Qualifier("easyExcelItemReader") EasyExcelItemReader<User> reader, @Qualifier("getNameProcessors") ItemProcessor<User, String> processor, @Qualifier("nameWriter") ItemWriter<String> writer) { return new StepBuilder("easyExcelStep", jobRepository) .<User, String>chunk(100, transactionManager) .reader(reader) .processor(processor) .writer(writer) .faultTolerant() .skipLimit(1) .skip(IllegalArgumentException.class) .listener(new StepExecutionListener() { @Override public void beforeStep(StepExecution stepExecution) { System.out.println("start to processor data ..."); } }) .build(); } /** www.devze.com * 配置批处理作业 * @param jobRepository * @param importStep * @return */ @Bean("easyExcelImportJobs") public Job customerImportJob(JobRepository jobRepository, @Qualifier("easyExcelStep") Step importStep) { return new JobBuilder("easyExcelImportJobs", jobRepository) .incrementer(new RunIdIncrementer()) .start(importStep) .listener(new JobExecutionListener() { @Override public void afterJob(JobExecution jobExecution) { System.out.println("Job Finished!State: " + jobExecution.getStatus()); } }) .build(); } }
2.4数据实体
package org.example.entity; import com.alibaba.excel.annotation.ExcelProperty; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; /** * @Author zhx && moon * @Since 21 * @Date 2025-06-24 PM 2:20 */ @Data @NoArgsConstructor @AllArgsConstructor public class User { @ExcelProperty("姓名") private String name; @ExcelProperty("编号") private String employeeId; @ExcelProperty("年龄") private Integer age; }
2.5 接口类
package org.example.controller; import jakarta.annotation.Resource; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Author zhx && moon * @Since 21 * @Date 2025-06-23 PM 4:40 */ @RestController @RequestMapping("/job") public class JobManage { @Autowired private JobLauncher jobLauncher; @Resource(name = "easyExcelImportJobs") Job job; @GetMapping("/start") public void start(){ try { JobParameters params = new JobParametersBuilder() .addLong("uniqueId", System.nanoTime()) .toJobParameters(); jobLauncher.run(job, params); } catch (Exception e) { throw new RuntimeException(e); } } }
2.6 H2 配置
spring: datasource: url: jdbc:h2:file:Z:/IdeaProjects/SpringBatcher/SpringBatcher/springbatchdb #jdbc:h2:tcp://localhost/mem:springbatchdb;DB_CLOSE_DELAY=-1 #jdbc:h2:mem:springbatchdb driver-class-name: org.h2.Driver username: sa password: sa h2: console: enabled: true path: /h2/db-console settings: web-allow-others: true batch: jdbc: initialize-schema: always
2.7 H2 数据库脚本
-- Autogenerated: do not edit this file CREATE TABLE BATCH_JOB_INSTANCE ( JOB_INSTANCE_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY , VERSION BIGINT , JOB_NAME VARCHAR(100) NOT NULL, JOB_KEY VARCHAR(32) NOT NULL, constraint JOB_INST_UN unique (JOB_NAME, JOB_KEY) ) ; CREATE TABLE BATCH_JOB_EXECUTION ( JOB_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY , VERSION BIGINT , JOB_INSTANCE_ID BIGINT NOT NULL, CREATE_TIME TIMESTAMP(9) NOT NULL, START_TIME TIMESTAMP(9) DEFAULT NULL , END_TIME TIMESTAMP(9) DEFAULT NULL , STATUS VARCHAR(10) , EXIT_CODE VARCHAR(2500) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED TIMESTAMP(9), constraint JOB_INST_EXEC_FK foreign key (JOB_INSTANCE_ID) references BATCH_JOB_INSTANCE(JOB_INSTANCE_ID) ) ; CREATE TABLE BATCH_JOB_EXECUTION_PARAM编程S ( JOB_EXECUTION_ID BIGINT NOT NULL , PARAMETER_NAME VARCHAR(100) NOT NULL , PARAMETER_TYPE VARCHAR(100) NOT NULL , PARAMETER_VALUE VARCHAR(2500) , IDENTIFYING CHAR(1) NOT NULandroidL , constraint JOB_EXEC_PARAMS_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; CREATE TABLE BATCH_STEP_EXECUTION ( STEP_EXECUTION_ID BIGINT GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY , VERSI编程客栈ON BIGINT NOT NULL, STEP_NAME VARCHAR(100) NOT NULL, JOB_EXECUTION_ID BIGINT NOT NULL, CREATE_TIME TIMESTAMP(9) NOT NULL, START_TIME TIMESTAMP(9) DEFAULT NULL , END_TIME TIMESTAMP(9) DEFAULT NULL , STATUS VARCHAR(10) , COMMIT_COUNT BIGINT , READ_COUNT BIGINT , FILTER_COUNT BIGINT , WRITE_COUNT BIGINT , READ_SKIP_COUNT BIGINT , WRITE_SKIP_COUNT BIGINT , PROCESS_SKIP_COUNT BIGINT , ROLLBACK_COUNT BIGINT , EXIT_CODE VARCHAR(2500) , EXIT_MESSAGE VARCHAR(2500) , LAST_UPDATED TIMESTAMP(9), constraint JOB_EXEC_STEP_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; CREATE TABLE BATCH_STEP_EXECUTION_CONTEXT ( STEP_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT LONGVARCHAR , constraint STEP_EXEC_CTX_FK foreign key (STEP_EXECUTION_ID) references BATCH_STEP_EXECUTION(STEP_EXECUTION_ID) ) ; CREATE TABLE BATCH_JOB_EXECUTION_CONTEXT ( JOB_EXECUTION_ID BIGINT NOT NULL PRIMARY KEY, SHORT_CONTEXT VARCHAR(2500) NOT NULL, SERIALIZED_CONTEXT LONGVARCHAR , constraint JOB_EXEC_CTX_FK foreign key (JOB_EXECUTION_ID) references BATCH_JOB_EXECUTION(JOB_EXECUTION_ID) ) ; CREATE SEQUENCE BATCH_STEP_EXECUTION_SEQ; CREATE SEQUENCE BATCH_JOB_EXECUTION_SEQ; CREATE SEQUENCE BATCH_JOB_SEQ;
3.测试
启动服务
测试 H2 连接
测试数据
触发 JOB
JOB 执行记录
到此这篇关于Springboot集成SpringBatch批处理组件的文章就介绍到这了,更多相关SpringBatch批处理内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!
精彩评论