开发者

Java实现Excel通用异步导出框架方式

目录
  • 1、审题
    • 分析1
    • 分析2
    • 分析3
  • 2、创建任务表,记录任务的状态,进度,以及任务的类型
    • 3、有了任务表后,就可以开始考虑怎么执行任务了
      • 总结

        请注意!请注意!请注意!(代码中很多实体类需要自己创建哈 比如前端提交的参数 比如数据库实体类,当前文章是面向一定开发经验的选手,CV是没用的)

        1、审题

        1.小于5W直接导出,大于5W则需要创建任务

        2.异步导出

        3.框架

        分析1

        小于5W直接导出,所以我们框架需要从使用者手里知道 本次导出的数据总量是多少

        分析2

        既然是异步导出,所以我们不能让主线程去执行导出操作,所以主线程只管将导出任务的相关信息交给其他线程即可

        分析3

        既然是框架,那么我们不能关心具体的导出实现逻辑,例如怎么获取表的数据,怎么查询总量,但是我们需要关心一些通用的逻辑,例如创建文件,使用导出组件进行导出,等等

        好了,现在开始实现。

        2、创建任务表,记录任务的状态,进度,以及任务的类型

        -- public.export_task definition
        
        -- Drop table
        
        -- DROP TABLE public.export_task;
        
        CREATE TABLE public.export_task (
        	id text NOT NULL,
        	export_key text NOT NULL,  -- 任务类型
        	params text NOT NULL, -- 任务所需要的参数
        	status int2 NOT NULL, -- 任务状态
        	progress text NOT NULL, -- 任务进度
        	create_user text NOT NULL, -- 创建人
        	gmt_create text NOT NULL, -- 创建时间
        	single_export_num int4 NOT NULL DEFAULT 50000, -- 单次导出条数
        	file_name text NOT NULL, -- 导出的文件名称
        	file_path text NULL, -- 导出的文件路径
        	complete_time text NULL, -- 导出完成时间
        	CONSTRAINT export_task_pkey PRIMARY KEY (id) -- 主键
        );
        
        -- 索引
        CREATE INDEX export_task_export_key_index ON public.export_task (export_key text_ops);

        3、有了任务表后,就可以开始考虑怎么执行任务了

        我的思路是:新增任务 -> 必要校验完成 -> 通过任务类型(也就是上表中的export_key)找到需要处理的处理类 -> 调用获取总量的方法,判断总量是否大于5W

        • 总量大于5W -> 将当前任务插入数据库 -> 然后将当前任务丢给队列,等待被消费处理
        • 总量小于5W -> 调用处理类的导出方法直接进行导出

        上代码:controller类,新增任务(公司代码里面有记录日志的逻辑你们不能复用)

         @POST
        @Path("/task-manage")
        @Produces({ MediaType.APPLICATION_jsON })
        @ApiOperation(value = "新建导出任务", notes = "新建导出任务", httpMethod = "POST", tags = {"新建导出任务"})
        public void taskCreate(TaskInfoDTO condition, @Context HttpServletResponse response) throws Exception {
        ServiceBaseInfoBean infoBean = new ServiceBaseInfoBean(UserThreadLocal.getuserName(),
        UserThreadLocal.getRemoteHost(),UserThreadLocal.getLanguageOption());
        
        ExportTaskMgr.Operation operation = ExportTaskMgr.Operation.DIRECT_EXPORT;
        ExportTaskMgr.initOperlogBean(infoBean, operation);
        String detailZh = String.format(operation.getDetailZh(), condition.getFileName());
        String detailEn = String.format(operation.getDetailEn(), condition.getFileName());
        try{
        condition.checkFileName();
        Long allDataTotal = taskCommandApplicationService.exportOrCreateTask(condition, response);
        if(allDataTotal > GlobalConstants.EXPORT_DEFAULT_COUNT){
        operation = ExportTaskMgr.Operation.CREATE_EXPORT_TASK;
        ExportTaskMgr.initOperlogBean(infoBean, operation);
        }
        detailZh = String.format(operation.getDetailZh(), condition.getFileName());
        detailEn = String.format(operation.getDetailEn(), condition.getFileName());
        ExportTaskMgr.refreshSuccessDetail(infoBean,operation, detailZh,detailEn);
        }
        catch (Exception e){
        ResponseUtils.resetContentType(response);
        ExportTaskMgr.refreshFailDetail(infoBean,operation, detailZh,detailEn,e);
        throw e;
        }
        finally {
        // 发送操作记录日志
        String operlogMsg = JSON.toJSONString(infoBean.getOperlogBean());
        msgSenderService.sendMsgAsync(OperlogBean.IMOP_LOG_MANAGE_TOPIC, operlogMsg);
        }
        }

        service: 通过当前任务类型 调用工厂找到处理类 然后... 看注释把

         @Override
        public Long exportOrCreateTask(TaskInfoDTO condition, HttpServletResponse response) throws Exception {
        BiFunction<ExportTaskPO, Map<String, Future<?>>, ExportHandler> handlerFuc = ExportHandlerFactory
        .getHandlerByExportKey(condition.getExportKey());
        if(null == handlerFuc){
        throw new MonitorException(-1, I18nConstants.UNKNOWN_EXPORT_TYPE);
        }
        
        // 新建任务相关信息 暂不入库
        ExportTaskPO taskPO = new ExportTaskPO();
        taskPO.setId(UUID.randomUUID().toString());
        taskPO.setProgress(GlobalConstants.DEFAULT_VALUE);
        taskPO.setStatus(ExportStatusEnums.INIT.getStatus());
        taskPO.setCreateUser(UserThreadLocal.getuserName());
        taskPO.setFileName(condition.getFileName());
        taskPO.setExportKey(condition.getExportKey());
        taskPO.setParams(condition.getParams());
        taskPO.setGmtCreate(dateTimeService.getCurrentTime());
        taskPO.setSingleExportNum(GlobalConstants.EXPORT_DEFAULT_COUNT);
        taskPO.setFilePath(GlobalConstants.DEFAULT_VALUE);
        taskPO.setCompleteTime(GlobalConstants.DEFAULT_VALUE);
        
        // 构建handler
        ExportHandler handler = handlerFuc.apply(taskPO, new HashMap<>());
        
        // 获取total 如果大于单次导出条数则创建任务 然后后台异步导出
        Long total = handler.getTotal();
        if(total > taskPO.getSingleExportNum()){
        exportTaskRepository.save(taskPO);
        ExportHandlerFactory.addTaskToQue(new ArrayList<>(Collections.singletonList(taskPO)));
        }
        // 满足单次导出 则直接导出
        else {
        handler.directExport(response);
        }
        
        return total;
        }

        工厂类的具体实现:

        import lombok.extern.slf4j.Slf4j;
        import org.apache.commons.collections4.CollectionUtils;
        import org.springframework.stereotype.Component;
        
        import Javax.annotation.PostConstruct;
        import java.util.*;
        import java.util.concurrent.*;
        import java.util.function.BiFunction;
        
        @Slf4j
        @Component
        public class ExportHandlerFactory {
        private static final ExportTaskRepository EXPORT_TASK_REPOSITORY = SpringUtil.getBean(ExportTaskRepository.class);
        /**
         * 线程池  3个线程消息导出任务
         */
        private static final ExecutorService THREAD_POOL = Executors.newFixjsedThreadPool(3);
        
        /**
         * 记录执行中的线程
         */
        private static final Map<String, Future<?>> RUNNING_THREAD = new ConcurrentHashMap<>();
        
        /**
         * 任务队列 轮训消费此队列
         */
        private static final Queue<ExportTaskPO> TASK_QUE = new ConcurrentLinkedQueue<>();
        
        /**
         * 定义处理器 例如DEMO类型导出任务由DEMO_HANDLER进行处理
         */
        private static final Map<String, BiFunction<ExportTaskPO,Map<String, Future<?>>,ExportHandler>> HANDLER_MAP = new ConcurrentHashMap<>();
        static {
        HANDLER_MAP.put(ExportTypeEnums.DEMO_EXPORT.getExportKey(), DemoExportHandler::new);
        }
        
        @PostConstruct
        @SuppressWarnings("InfiniteLoopStatement")
        public static void init() {
        Runnable runnable = () -> {
        while (true) {
        try {
        // 每隔50ms拉取一次导出任务 如果已经消费完 则不再执行
        TimeUnit.MILLISECONDS.sleep(50L);
        ExportTaskPO taskInfo = TASK_QUE.poll();
        if(taskInfo == null) {
        continue;
        }
        
        // 根据导出类型key 找到handler 开始处理
        BiFunction<ExportTaskPO,Map<String, Future<?>>,ExportHandler> biFunction =
        HANDLER_MAP.get(taskInfo.getExportKey());
        log.info("begin consume taskInfo={},biFunction={}",taskInfo,biFunction);
        
        if(biFunction != null) {
        // 设置任务信息 以及记录运行状态的map 得到处理者
        ExportHandler exportHandler = biFunction.apply(taskInfo, RUNNING_THREAD);
        log.info("begin consume exportHandler={}",exportHandler);
        // 执行任务
        Future<Boolean> exceuteFuture = THREAD_POOL.submit(exportHandler);
        // 将正在执行的任务放入map中
        RUNNING_THREAD.put(taskInfo.getId(),exceuteFuture);
        }
        }
        catch (Exception e) {
        log.error("consume export task error.",e);
        }
        }
        };
        
        Executors.newFixedThreadPool(1).execute(runnable);
        }
        
        public static void stopTaskByTaskId(String taskId){
        // 从运行中的map 拿到执行中的线程
        Future<?> future = RUNNING_THREAD.get(taskId);
        
        // 如果存在  直接停止运行
        if(future != null){
        try{
        future.cancel(Boolean.TRUE);
        }
        catch (Exception e){
        log.debug("destroy thread error",e);
        }
        finally {
        RUNNING_THREAD.remove(taskId);
        }
        }
        }
        
        public static BiFunction<ExportTaskPO,Map<String, Future<?>>,ExportHandler> getHandlerByExportKey(String ExportKey){
        // 根据导出类型key 找到handler 开始处理
        return HANDLER_MAP.get(ExportKey);
        }
        
        
        public static void addTaskToQue(List<ExportTaskPO> needExecuteTask){
        if(CollectionUtils.isNotEmpty(needExecuteTask)){
        TASK_QUE.addAll(needExecuteTask);
        }
        }
        
        public static void initTaskWhenAppRun(){
        try{
        List<ExportTaskPO> needExecuteTask = EXPORT_TASK_REPOSITORY
        .lambdaQuery()
        .in(ExportTaskPO::getStatus, Arrays.asList(ExportStatusEnums.INIT.getStatus(), ExportStatusEnums.RUNNING.getStatus()))
        .orderByAsc(ExportTaskPO::getGmtCreate)
        .list();
        log.info("initAllWaitExecuteTask={}",needExecuteTask);
        addTaskToQue(needExecuteTask);
        }
        catch (Exception e){
        log.error("initAllWaitExecuteTask error",e);
        }
        }
        }
        

        工厂类定了轮训消费,定义了哪个类型的任务由哪个处理器处理,定义了执行中的任务以便删除任务时立刻任务执行,等等

        工厂类相当于总管,但是具体的实现是由ExportHandler这个类来完成处理,那么这个类又做了什么些事情呢~~~

        import javax.servlet.http.HttpServletResponse;
        import java.io.BufferedOutputStream;
        import java.io.File;
        import java.math.BigDecimal;
        import java.math.RoundingMode;
        import java.net.URLEncoder;
        import java.nio.charset.StandardCharsets;
        import java.nio.file.Files;
        import java.util.*;
        import java.util.concurrent.Callable;
        import java.util.concurrent.Future;
        import java.util.zip.ZipEntry;
        import java.util.zip.ZipOutputStream;
        
        @Slf4j
        public abstract class ExportHandler implements Callable<Boolean> {
        public final ExportTaskPO taskInfo;
        private final Map<String, Future<?>> runMap;
        private static final ExportTaskRepository EXPORT_TASK_REPOSITORY = SpringUtil.getBean(ExportTaskRepository.class);
        private static final FileConfig FILE_CONFIG = SpringUtil.getBean(FileConfig.class);
        private static final DateTimeService DATE_TIME_SERVICE = SpringUtil.getBean(DateTimeService.class);
        
        public ExportHandler(ExportTaskPO taskInfo,Map<String,Future<?>> runMap){
        this.taskInfo = taskInfo;
        this.runMap = runMap;
        }
        
        /**
         * 数据小于5w条时直接导出
         * @param response 导出response
         * @throws Exception 异常时直接抛出
         */
        public void directExport(HttpServletResponse response) throws Exception {
        FileUtils.checkFileName(taskInfo.getFileName());
        
        // 设置响应类型以及文件名
        response.reset();
        response.setContentType(GlobalConstants.CONTENT_TYPE_XLSX);
        String encodeName = URLEncoder.encode(taskInfo.getFileName(),GlobalConstants.ENCODER_UTF8);
        response.setHeader(GlobalConstants.CONTENT_DISPOSITION_KEY, GlobalConstants.CONTENT_DISPOSITION_FILE_PREFIX.concat(encodeName));
        
        // 写入数据
        EasyExcel
        .write(response.getOutputStream())
        .excelType(ExcelTypeEnum.XLSX)
        .charset(StandardCharsets.UTF_8)
        .sheet(FileNameUtil.mainName(taskInfo.getFileName()))
        .head(getHeader())
        .doWrite(getData(null,null));
        }
        
        /**
         * 默认的数据大于5W时的异步导出处理逻辑 默认单sheet页 简单实现更新任务进度
         * 如果有复杂的多sheet页导出 请在你的handler里面重写此方法
         */
        public void asyncExport(){
        ZipOutputStream zipOut = null;
        File file = null;
        // 执行导出逻辑
        try {
        // 任务执行中
        log.info("begin export,taskId:{}",taskInfo.getId());
        updateTaskInfo(BigDecimal.ZERO + GlobalConstants.PER_CENT,ExportStatusEnums.RUNNING.getStatus(),null);
        
        // 检查文件名称 获取没有后缀的文件名
        FileUtils.checkFileName(taskInfo.getFileName());
        String fileMainName = FileNameUtil.mainName(taskInfo.getFileName());
        
        // 先在服务器生成文件,得到输出流
        String filePath = FileUtils.getDateFileName(fileMainName, GlobalConstants.FILE_TYPE_ZIP);
        file = FileUtils.getAbsoluteFile(FILE_CONFIG.getExportPath(), filePath);
        zipOut = new ZipOutputStream(new BufferedOutputStream(Files.newOutputStream(file.toPath())));
        
        // 根据总量数据以及分页每条的数据  得到要导出多少次并且遍历 开始导入数据
        long exportCount = calculateExportCount(getTotal(),taskInfo.getSingleExportNum());
        
        // 遍历导出的次数 每次生成一个excel文件
        dealExport(exportCount,zipOut);
        
        // 导出完成后更新文件路径,完成时间以及状态
        taskInfo.setFilePath(filePath);
        updateTaskInfo(null,ExportStatusEnums.SUCCESS.getStatus(),DATE_TIME_SERVICE.getCurrentTime());
        log.info("end export,taskId:{}",taskInfo.getId());
        }
        // 异常打印异常信息 更新任务进度为-- 状态为失败  如果生成了文件 还需要把文件删除
        catch (Exception e) {
        log.error("export error,taskId:{}",taskInfo.getId(),e);
        updateTaskInfo(GlobalConstants.DEFAULT_VALUE,ExportStatusEnums.FAILED.getStatus(),DATE_TIME_SERVICE.getCurrentTime());
        
        if(file != null){
        boolean delete = file.delete();
        log.info("export error,taskId:{},file delete:{}",taskInfo.getId(),delete);
        }
        }
        // 最终从执行中的map里移除当前任务
        finally {
        if(zipOut != null){
        FileUtils.safeClose(zipOut);
        }
        }
        }
        
        private void dealExport(long exportCount,ZipO编程客栈utputStream zipOut) throws Excephttp://www.devze.comtion{
        // 遍历导出的次数 每次生成一个excel文件
        for (long i = 1; i <= exportCount; i++) {
        // 新建zip中的其中一个文件
        String eachName = String.format("%02d", i) + ExcelTypeEnum.XLSX.getValue();
        zipOut.putNextEntry(new ZipEntry(new String(eachName.getBytes(StandardCharsets.UTF_8))));
        
        // 新建好了后往zip写入数据
        List<List<String>> singleData = getData(i, taskInfo.getSingleExportNum());
        EasyExcel
        .write(zipOut)
        .autoCloseStream(Boolean.FALSE)
        .excelType(ExcelTypeEnum.XLSX)
        .charset(StandardCharsets.UTF_8)
        .sheet()
        .head(getHeader())
        .doWrite(singleData);
        
        // 关闭entry 代表zip当前的其中一个文件已经写入结束
        zipOut.closeEntry();
        
        // 更新进度
        updateTaskInfo(calculateProgress(i,exportCount),ExportStatusEnums.RUNNING.getStatus(),null);
        singleData.clear();
        }
        }
        
        private void updateTaskInfo(String progress,Integer status,String completeTime){
        Optional.ofNullable(progress).ifPresent(taskInfo::setProgress);
        Optional.ofNullable(status).ifPresent(taskInfo::setStatus);
        Optional.ofNullable(completeTime).ifPresent(taskInfo::setCompleteTime);
        EXPORT_TASK_REPOSITORY.updateById(taskInfo);
        }
        
        private Long calculateExportCount(Long total,Long singleExportNum){
        long exportCount = total / singleExportNum;
        long mod = total % singleExportNum;
        if(mod != 0){
        exportCount += 1;
        }
        
        return exportCount;
        }
        
        private String calculateProgress(Long thisCount,Long exportCount){
        BigDecimal progress = BigDecimal.valueOf(thisCount)
        .divide(BigDecimal.valueOf(exportCount), 2, RoundingMode.HALF_UP)
        .multiply(new BigDecimal(GlobalConstants.PROGRESS_OVER))
        .setScale(BigDecimal.ZERO.intValue(),RoundingMode.HALF_UP);
        
        return progress + GlobalConstants.PER_CENT;
        }
        
        @Override
        public Boolean call(){
        try{
        asyncExport();
        return Boolean.TRUE;
        }
        catch (Exception e){
        log.error("export task:{} error",taskInfo.getId(),e);
        return Boolean.FALSE;
        }
        finally {
        if(taskInfo != null && runMap != null){
        runMap.remove(taskInfo.getId());
        }
        }
        }
        
        /**
         * 需要实现的查询total的方法
         */
        public abstract Long getTotal() throws Exception;
        
        /**
         * 需要实现的组装表头的方法
         */
        public abstract List<List<String>> getHeader();
        
        /**
         * 需要实现的组装表体数据的方法
         */
        public abstract List<List<String>> getData(Long pageNo,Long pageSize) throws Exception;
        }

        噢,原来是一个抽象类,里面把需要用到的一些逻辑,例如生成文件,导入等都已经实现了,那么所有用到该架子导出的人 只需要继承当前类,实现里面的getTotal getHeader getData三个方法就可以啦,比如我们工厂类里面写的这段代码,demo_export由DemoExportHandler进行处理,现在最后来看看这个DemoExportHandler怎么实现的把

        static {
        HANDLER_MAP.put(ExportTypeEnums.DEMO_EXPORT.getExportKey(), DemoExportHandler::new);
        }

        DemoExportHandler代码:

        import java.util.*;
        import java.util.concurrent.Future;
        import java.util.stream.Collectors;
        
        @Slf4j
        public class DemoExportHandler extends ExportHandler {
        
        private static final StandardPointRepository STANDARD_POINT_REPOSITORY = SpringUtil.getBean(StandardPointRepository.class);
        private static final JsonService JSON_SERVICE = SpringUtil.getBean(JsonService.class);
        private static final I18nUtilService I_18_N_UTIL_SERVICE = SpringUtil.getBean(I18nUtilService.class);
        
        public DemoExportHandler(ExportTaskPO taskInfo, Map<String, Future<?>python> runMap) {
        super(taskInfo, runMap);
        }
        
        @Override
        @SuppressWarnings("unchecked")
        public Long getTotal() throws Exception{
        
        // 导出参数 反序列化为自己想要的类型的对象
        Map<String,Object> paramMap = getParamObj(taskInfo.getParams());
        
        // 取出自己想要的数据,根据条件进行导出
        List<String> pointList = (List<String>)paramMap.get("pointList");
        
        // 此处demo使用 total请根据自己的逻辑查询
        List<PointInfoBean> pointBeans = STANDARD_POINT_REPOSITORY.selectPointByIdList(pointList);
        return (long) pointBeans.size();
        }
        
        @Override
        public List<List<String>> getHeader() {
        // 组装表头
        return Arrays.asList(Collections.singletonList("测点id"),
        Collections.singletonList("测点名称"),Collections.singletonList("测点单位"));
        }
        
        @Override
        @SuppressWarnings("unchecked")
        public List<List<String>> getData(Long pageNo, Long pageSize) throws Exception {
        // 模拟执行导出逻辑执行20S
        Thread.slee编程p(20000L);
        
        // 导出参数 反序列化为自己想要的类型的对象
        Map<String,Object> paramMap = getParamObj(taskInfo.getParams());
        
        // 取出自己想要的数据,根据条件进行导出
        String lang = (String)paramMap.get("language-option");
        List<String> pointList = (List<String>)paramMap.get("pointList");
        
        // 查询数据
        List<PointInfoBean> pointBeans = STANDARD_POINT_REPOSITORY.selectPointByIdList(pointList);
        List<List<String>> allData = pointBeans
        .stream()
        .map(bean -> Arrays.asList(bean.getId(), I_18_N_UTIL_SERVICE
        .getMapFieldByLanguageOption(bean.getNameI18n(), lang), bean.getUnit()))
        .collect(Collectors.toList());
        
        // 分页或者不分页返回
        if(pageNo == null || pageSize == null){
        return allData;
        }
        
        return PageUtils.getPageList(allData,pageNo.intValue(),pageSize.intValue());
        }
        
        @SuppressWarnings("unchecked")
        private Map<String,Object> getParamObj(String params) throws UedmException {
        Map<String,Object> paramMap = JSON_SERVICE.jsonToObject(params,Map.class);
        log.debug("getParamObj params={},obj={}",params,paramMap);
        
        return paramMap;
        }
        }

        总结

        以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

        0

        上一篇:

        下一篇:

        精彩评论

        暂无评论...
        验证码 换一张
        取 消

        最新开发

        开发排行榜