开发者

SpringBoot分段处理List集合多线程批量插入数据方式

目录
  • 项目场景
  • 解决方案
    • 1.实体类
    • 2.Mapper
    • 3.spring容器注入线程池bejsan对象
    • 4.创建异步线程业务类
    • 5.拆分list调用异步的业务方法
    • 6.Controller测试
  • 总结

    项目场景

    大数据量的List集合,需要把List集合中的数据批量插入数据库中。

    解决方案

    拆分list集合后,然后使用多线程批量插入数据库

    1.实体类

    package com.test.entity;
    
    import lombok.Data;
    
    @Data
    public class TestEntity {
    	
    	private String id;
    	private String name;
    }
    

    2.Mapper

    如果数据量不大,用foreach标签就足够了。如果数据量很大,建议使用BATch模式。

    package com.test.mapper;
    
    import Java.util.List;
    
    import org.apache.ibatis.annotations.Insert;
    import org.apache.ibatis.annotations.Param;
    
    import com.test.entity.TestEntity;
    
    public interface TestMapper {
    	
    	/**
    	  * 1.用于使用batch模式,ExecutorType.BATCH开启批处理模式
    	  * 数据量很大,推荐这种方式
    	  */
    	@Insert("insert into test(id, name) "
    			   + " values"
    			   + " (#{id,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR})")
    	void testInsert(TestEntity testEntity);
    	
    	/**
    	  * 2.使用foreach标签,批量保存
    	  * 数据量少可以使用这种方式
    	  */
    	@Insert("insert into test(id, name) "
    			   + " values"
    			   + " <foreach collection='list' item='item' index='index' separator=','>"
    			   + " (#{item.id,jdbcType=VARCHAR}, #{item.name,jdbcType=VARCHAR})"
    			   + " </foreach>")
    	void testBatchInsert(@Param("list") List<TestEntity> list);
    }
    

    3.spring容器注入线程池bean对象

    package com.test.config;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    @Configuration
    @EnableAsync
    public class ExecutorConfig {
        /**
         * 异步任务自定义线程池
         */
        @Bean(name = "asyncServiceExecutor")
        public Executor asyncServiceExecutor() {
        	ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //配置核心线程数
            executor.setCorePoolSize(50);
            //配置最大线程数
            executor.setMaxPoolSize(500);
            //配置队列大小
            executor.setQueueCapacity(300);
            //配置线程池中的线程的名称前缀
            executor.setThreadNamePrefix("testExecutor-");
            // rejection-policy:当pool已经达到max size的时候,如何处理新任务
            // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            //调用shutdown()方法时等待所有的任务完成后再关闭
            executor.setWaitForTasksToCompleteOnShutdown(true);
            //等待所有任务完成后的最大等待时间
    		executor.setAwaitTermyIeLdPcginationSeconds(60);
            return executor;
        }
    }
    

    4.创建异步线程业务类

    package com.test.service;
    
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.ibatis.session.ExecutorType;
    import org.apache.ibatis.session.SqlSession;
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    impandroidort org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    
    import com.test.entity.TestEntity;
    import com.test.mapper.TestMapper;
    
    @Service
    public class AsyncService {
    	@Autowired
    	private SqlSessionFactory sqlSessionFactory;
    	
    	@Async("asyncServiceExecutor")
        public void executeAsync(List<String> logOutputResults, CountDownLatch countDownLatch) {
    		//获取session,打开批处理,因为是多线程,所以每个线程都要开启一个事务
            SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
    		
            try{
            	
            	TestMapper mapper = session.getMapper(TestMapper.class);
            	
                //异步线程要做的事情
            	for (int i = 0; i < logOutputResults.size(); i++) {
        			System.out.println(Thread.currentThread().getName() + "线程:" + logOutputResults.get(i));
        			
        			TestEntity test = new TestEntity();
        			//test.set()
        			//.............
        			//批量保存
        			mapper.testInsert(test);
        			//每1000条提交一次防止内存溢出
        			if(i%1000==0){
        				session.flushStatements();
        			}
    			}
            	//提交剩下未处理的事务
        		session.flushStatements();
            }finally {
                countDownLatch.countDown();// 很关键, 无论上面程序是否异常必须执行countDown,否则await无法释放
    			if(session != null){
    				session.close();
    			}
            }
        }
    }
    

    5.拆分list调用异步的业务方法

    package com.test.service;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import javax.annotation.Resource;
    
    import org.springframework.stereotype.Service;
    
    
    @Service
    public class TestService {
    
    	@Resource
    	private AsyncService asyncService;
    	
    	public int testMultiThread() {
            List<String> logOutputResults = getTestData();
            //按线程数拆分后的list
            List<List<String>> lists = splitList(logOutputResults);
            CountDownLatch countDownLatch = new CountDownLatch(lists.size());
            for (List<String> listSub:lists) {
                asyncService.executeAsync(listSub, countDownLatch);
            }
            try {
                countDownLatc编程客栈h.await(); //保证之前的所有的线程都执行完成,才会走下面的;
                // 这样就可以在下面拿到所有线程执行完的集合结果
            } catch (Exception e) {
                e.printStackTrace();
            }
            return logOutputResults.size();
        }
    	
    	public List<String> getTestData() {
    		List<String> logOutputResults = new ArrayList<String>();
            for (int i = 0; i < 3000; i++) {
            	logOutputResults.add("测试数据"+i);
    		}
            return logOutputResults;
        }
    	
    	public List<List<String>> splitList(List<String> logOutputResults) {
    		List<List<String>> results = new ArrayList<List<String>>();
    		
    		/*动态线程数方式*/
    		// 每500条数据开启一条线程
    		int threadSize = 500;
    		// 总数据条数
    		int dataSize = logOutputResults.size();
    		// 线程数,动态生成
    		int threadNum = dataSize / threadSize + 1;
    	 
    	    /*固定线程数方式
    		    // 线程数
    		    int threadNum = 6;
    		    // 总数据条数
    		    int dataSize = logOutputResults.size();
    		    // 每一条线程处理多少条数据
    		    int threadSize = dataSize / (threadNum - 1);
    	    */
    	 
    		// 定义标记,过滤threadNum为整数
    		boolean special = dataSize % threadSize == 0;
    	 
    		List<String> cutList = null;
    	 
    		// 确定每条线程的数据
    		for (int i = 0; i < threadNum; i++) {
    			if (i == threadNum - 1) {
    				if (special) {
    					break;
    				}
    				cutList = logOutputResults.subList(threadSize * i, dataSize);
    			} else {
    				cutList = logOutputResults.subList(threadSize * i, threadSize * (i + 1));
    			}
    			
    			results.add(cutList);
    		}
    		
            return results;
        }
    }

    6.Controller测试

    @RestController
    public class TestController {
    	
    	@Resource
    	private TestService testService;
    	
    
    	@RequestMapping(value = "/log", method = RequestMethod.GET)
    	@ApiOperation(value = "测试")
    	public String test() {
    		testService.testMultiThread();
    		return "success";
    	}
    }

    总结

    注意这里执行插入的数据是无序的。

    以上为个人php经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜