开发者

java异步导出的实现过程

目录
  • 背景
  • 数据库设计
  • 代码实现
  • 总结

背景

假设我们有一个在线学习平台,管理员需要定期生成销售报告,包括课程销售情况和用户购买情况等重要数据。然而,由于数据量较大,生成报告可能需要较长时间,并且可能会占用大量系统资源,从而影响用户的使用体验。为了解决这个问题,我们考虑采用异步导出的方案。

异步导出的工作原理是将导出操作放在一个异步任务中执行,而不是立即在用户发起导出请求后执行导出操作。这样一来,用户无需等待导出任务完成,就可以继续进行其他操作,而系统则在后台完成导出任务。

这种方案有以下优点:

  • 提高系统响应速度: 用户发起导出请求后,系统可以立即响应而不必等待导出任务完成,从而提高了系统的响应速度。
  • 改善用户体验: 用户无需等待导出任务完成,可以继续使用系统进行其他操作,这有助于提升用户体验。
  • 降低系统负载: 将耗时的导出操作放在异步任务中执行,可以避免阻塞系统资源,从而降低系统的负载,确保其他用户js的操作不受影响。

异步导出在许多需要处理大量数据或耗时操作的场景中都非常有用,可以有效提升系统的性能和用户体验。

数据库设计

首先我们需要设计一个保存导出任务的表,需要记录流转状态、操作人、任务参数,后续任务的创建、导出完成/失败都需要操作这张表

CREATE TABLE `t_export_task`
(
    `id`               bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
    `task_id`          varchar(50)   NOT NULL COMMENT '任务id',
    `task_type`        tinyint(4) NOT NULL COMMENT '任务类型',
    `task_param`       varchar(1000) NOT NULL COMMENT '任务参数',
    `status`           tinyint(3) NOT NULL DEFAULT 0 COMMENT '状态 0-处理中 1-成功 -1失败',
    `file_url`         varchar(500)           DEFAULT NULL COMMENT '文件url',
    `remark`         varchar(200)           DEFAULT NULL COMMENT '备注',
    `create_user_id`   int(11) NOT NULL COMMENT '操作人id',
    `create_user_name` varchar(50)   NOT NULL COMMENT '操作人名称',
    `create_time`      datetime      NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    `update_time`      datetime      NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    PRIMARY KEY (`id`)
) ENGINE=InnoDB COMMENT='导出任务记录';

代码实现

导出工具类 :

负责提交导出任务、取消任务以及上传导出文件到OSS服务器等功能。

  • 导出任务线程池: 通过 ExecutorService 线程池来执行导出任务,并确保线程池的单例化,防止重复创建,提高性能。
  • 导出类型对应的任务: 使用 Map<Integer, CompletableFuture<String>> 来保存正在运行的导出任务,以便后续取消任务或跟踪任务状态。
  • 提交导出任务: 提交导出任务时,先初始化线程池,然后使用 CompletableFuture.supplyAsync() 方法执行异步任务,并在异步任务中生成导出文件,然后上传到OSS服务器,最后返回导出文件的URL。
  • 取消任务: 取消任务时,从保存的任务映射中获取对应的 CompletableFuture 实例,并调用 cancel() 方法取消任务。
  • 上传文件到OSS服务器: uploadToOSS() 方法负责实际的文件上传逻辑,将导出的文件上传到OSS服务器,并返回文件的URL
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class ExportTaskUtil {

    /**
     * 导出任务线程池
     */
    private static volatile ExecutorService executorService;

    /**
     * 导出类型对应的任务
     */
    private final Map<Integer, CompletableFuture<String>> runningTasks =javascript Maps.newConcurrentMap();

    private final ExportTaskHandlerFactory exportTaskHandlerFactory;
    private final FileUploadService fileUploadService;

    /**
     * @description 提交导出任务
     * @author youmu
     * @date 2024/1/26 17:58
     * @param exportTask 导出任务
     */
    public CompletableFuture<String> submit(ExportTask exportTask) {
        // 初始化线程池
        initThreadPool();
        CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
            File exportFile = null;
            // 获取handler
            ExportTaskHandler handler = exportTaskHandlerFactory.getHandler(exportTask.getTaskType());
            // 生成文件
            try {
                exportFile = handler.generateExportFile(exportTask.getTaskParam());
                if (exportFile == null) {
                    throw new BizException(CodeEnum.NOT_FOUND, "导出文件为空");
                }
                // 上传文件到OSS服务器,获取文件URL
                return uploadToOSS(exportFile);
            } catch (BizException e) {
                throw e;
            } catch (Exception e) {
                throw new RuntimeException(e);
            } finally {
                if (exportFile != null) {
                    FileUtil.del(exportFile);
                }
            }
        },exejscutorService);
        runningTasks.put(exportTask.getTaskType(), future);
        return future;
    }

    private static void initThreadPool() {
        if (executorService == null) {
            synchronized (ExportTaskUtil.class) {
                if (executorService == null) {
                    executorService = ThreadUtil.newFixedExecutor(4, "asyncExport", false);;
                }
            }
        }
    }

    /**
     * @description 取消任务
     * @author youmu
     * @date 2024/1/26 19:04
     * @param exportTask 任务
     */
    public void cancel(ExportTask exportTask) {
        CompletableFuture<String> future = runningTasks.get(exportTask.getTaskType());
        if (future != null && !future.isDone()) {
            future.cancel(true);
        }
    }

    /**
     * @description 上传文件到OSS服务器
     * @author youmu
     * @date 2024/1/29 16:56
     */
    private String uploadToOSS(File exportFile) {
        // 实现文件上传逻辑,返回文件URL
        return fileUploadService.uploadFileBySize(exportFile,"export/" + exportFile.getName());
    }


}

导出任务处理的工厂类以及相关的接口和枚举定义 

导出任务采用来工厂+策略的设计模式,工厂模式将对象的创建逻辑封装到工厂类中,策略模式将不同的行为封装到不同的策略类中,使得代码具有良好的可扩展性、灵活性和可维护性。

  • ExportTaskHandlerFactory: 这是一个工厂类,用于根据导出任务类型获取对应的任务处理器。在初始化时,它会将所有实现了 ExportTaskHandler 接口的处理器注入进来,并根据任务类型建立起映射关系。
  • ExportTaskHandler 接口: 这是一个导出任务处理器的接口,定义了生成导出文件和获取导出任务类型的方法,具体的导出任务处理器需要实现该接口。
  • ExportTaskTypeEnum 枚举: 这是一个枚举类型,定义了导出任务的类型,包括了任务类型的代码和描述信息。
  • OrderExportHandler 类: 这是一个具体的导出任务处理器的实现类,用于处理订单导出任务。它实现了 ExportTaskHandler 接口,根据具体业务逻辑生成导出文件,并提供了获取任务类型的方法。
/**
 * @description ExportTaskHandler 工厂类
 * @author youmu
 * @date 2024/1/26 18:04
 */
@Slf4j
@Component
public class ExportTaskHandlerFactory {
    private final Map<Integer, ExportTaskHandler> handlerMap = Maps.newHashMap();

    @Autowired
    public ExportTaskHandlerFactory(List<ExportTaskHandler> handlers) {
        for (ExportTaskHandler taskHandler : handlers) {
            handlerMap.put(taskHandler.getExportType().getCode(), taskHandler);
        }
    }

    public ExportTaskHandler getHandler(Integer exportType) {
        return handlerMap.get(exportType);
    }
}
/**
 * @description ExportTaskHandler
 * @author youmu
 * @date 2024/1/26 18:06
 */
public interface ExportTaskHandler {
    File generateExportFile编程客栈(String param) throws Exception;

    ExportTaskTypeEnum getExportType();

}
/**
 * @description 导出任务类型
 * @author youmu
 * @date 2024/1/29 11:01
 */
@AllArgsConstructor
@Getter
public enum ExportTaskTypeEnum implements IEnum<Integer, String>  {
    CROWD_PACKAGE(1, "人群包"),
    ;

    private final Integer code;
    private final String message;

}
public class OrderExportHandler implements ExportTaskHandler{
    @Override
    public File generateExportFile(String param) throws Exception {
        return null;
    }

    @Override
    public ExportTaskTypeEnum getExportType() {
        return null;
    }
}

业务调用

导出任务的门面类 ExportTaskFacade,它提供了一系列方法来提交、取消、重试导出任务,并提供了查询导出任务的分页接口。

  • 提交任务(submitTask): 提交导出任务时,根据是否传入 taskId 参数来判断是新建任务还是更新任务。如果是新建任务,则创建一个新的 ExportTask 实例并保存到数据库中,然后调用 DOSubmit 方法提交任务;如果是更新任务,则更新任务的状态为正在处理,并调用 doSubmit 方法提交任务。
  • 任务提交处理(doSubmit): 使用 exportTaskUtil.submit(exportTask) 提交异步导出任务,并定义了任务完成后的处理逻辑。如果任务执行成功,则更新任务状态为成功,并设置文件的URL;如果任务执行失败,则记录失败日志,并更新任务状态为失败,同时记录异常信息。
  • 取消任务(cancelTask): 根据传入的 taskId 获取对应的导出任务,然后调用 exportTaskUtil.cancel(exportTask) 取消任务。
  • 重试任务(retryTask): 根据传入的 taskId 获取对应的导出任务,先取消任务以防止异常情况,然后重新提交任务。
@Slf4j
@Component
@RequiredArgsConstructor(onConstructor_ = @Autowired)
public class ExportTaskFacade {
    private final ExportTaskService exportTaskService;
    private final UserService userService;
    private final ExportTaskUtil exportTaskUtil;

    public void submitTask(Integer exportType, String param) {
        submitTask(null,exportType,param);
    }

    public void cancelTask(Long taskId) {
        ExportTask exportTask = exportTaskService.getById(taskId);
        AssertUtils.notNull(exportTask, new BizException(CodeEnum.NOT_FOUND,"导出任务不存在"));
        exportTaskUtil.cancel(exportTask);
    }

    public void retryTask(Long taskId) {
        ExportTask exportTask = exportTaskService.getById(taskId);
        AssertUtils.notNull(exportTask, new BizException(CodeEnum.NOT_FOUND,"导出任务不存在"));
        // 取消任务,防止异常情况还在执行
        exportTaskUtil.cancel(exportTask);
        // 提交任务
        submitTask(taskId,exportTask.getTaskType(),exportTask.getTaskParam());
    }

    private void submitTask(Long taskId, Integer exportType, String param) {
        ExportTask exportTask;
        if(taskId == null) {
            // 保存导出任务
            exportTask = new ExportTask();
            Integer userId = AuthInfoHolder.getUserId();
            exportTask.setTaskId(CodeGenUtil.genCode(GenCodeTypeEnum.DL));
            exportTask.setCreateUserId(userId);
            exportTask.setCreateUserName(userService.findById(userId).getUserName());
            exportTask.setTaskType(exportType);
            exportTask.setTaskParam(param);
            exportTaskService.save(exportTask);
        } else {
            // 更新导出任务
            exportTask = exportTaskService.getById(taskId);
            exportTaskService.lambdaUpdate()
                    .eq(ExportTask::getId, exportTask.getId())
                    .set(ExportTask::getStatus, ExportStatusEnum.PROCESSING.getCode())
                    .update();
        }
        doSubmit(exportTask);
    }


    private void doSubmit(ExportTask exportTask) {
        exportTaskUtil.submit(exportTask).thenAccept(url->{
            exportTaskService.lambdaUpdate()
                    .eq(ExportTask::getId, exportTask.getId())
                    .set(ExportTask::getStatus, ExportStatusEnum.SUCCESS.getCode())
                    .update();
        }).exceptionally(ex->{
            log.error("[导出任务]执行失败,{}", exportTask.getTaskId(),ex);
            exportTaskService.lambdaUpdate()
                    .eq(ExportTask::getId, exportTask.getId())
                    .set(ExportTask::getStatus, ExportStatusEnum.FAILURE.getCode())
                    .set(ExportTask::getRemark, ex instanceof BizException ? ex.getMessage() : "未知异常")
                    .update();
            return null;
        });
    }

    public Page<ExportTaskVO> findPage(ExportTaskRequest request) {
        Page<ExportTask> page = exportTaskService.findPage(request);
        List<ExportTaskVO> voList = ConverterUtil.toVO(ExportTaskConverter.class, page.getRecords());
        Page<ExportTaskVO> pageVO = new Page<>();
        pageVO.setTotal(page.getTotal());
        pageVO.setSize(page.getSize());
        pageVO.setCurrent(page.getCurrent());
        pageVO.setPages(page.getPages());
        pageVO.setRecords(voList);
        return pageVO;
    }
}

流程图

java异步导出的实现过程

总结

过以上实践,我们成功实现了一个轻量级的异步导出方案,具有以下优点:

  • 使用线程池管理异步任务,确保了任务的并发执行和资源的合理利用。
  • 采用 CompletableFuture 实现异步导出和回调更新,简化了异步任务的编写和管理。
  • 使用工厂模式和策略模式实现导出任务处理器,使得系统具有良好的可扩展性和灵活性。

然而,这种方案也存在一些缺点:

  • 资源管理不足: 如果异步导出任务的并发量过大,而线程池的资源配置不足,则可能导致任务排队等待执行,影响任务的实时性和响应速度。
  • 任务执行效率低下: 如果导出任务的处理时间过长,且线程池的工作线程数量有限,则可能导致任务执行效率低下,无法及时完成任务,影响系统的整体性能。
  • 可靠性不高,无法保证任务一定会被执行或执行成功,特别是在系统故障或异常情况下。

针对这些缺点,可以考虑以下优化方案:

  • 合理调整线程池配置: 根据系统的实际负载情况和性能需求,合理配置线程池的大小和工作线程数量,确保资源的有效利用和任务的及时执行。
  • 优化任务处理逻辑: 对任务的处理逻辑进行优化,尽量减少任务的执行时间和资源消耗,提高任务的执行效率和响应速度。
  • 引入异步消息处理机制: 使用消息队列或事件驱动模型来实现任务的异步处理,进一步解耦任务提交和任务执行过程,提高系统的可扩展性和灵活性。
  • 引入定时任务调度器: 使用定时任务调度器(如 xxl-job)来定期扫描和重试执行异常任务。当任务执行时间超过一定阈值(如2小时)或者任务执行异常时,自动触发重试机制,保证任务的及时执行。
  • 增加任务监控和告警机制: 实时监控任务的执行情况,当发现任务执行异常或超时时编程客栈,及时发送告警通知,以便运维人员及时处理和修复。

通过以上优化方案,可以提高异步导出方案的可靠性和稳定性,确保任务能够及时执行并完成,同时降低了系统的维护成本和风险。

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

0

上一篇:

下一篇:

精彩评论

暂无评论...
验证码 换一张
取 消

最新开发

开发排行榜