spring schedule任务调度方式
目录
- spring schedule任务调度
- 查找 @Scheduled 注解
- 封装任务
- 注册任务
- taskScheduler
- 任务执行
- 总结
spring schedule任务调度
启用 Spring 的任务调度功能需要使用@EnableScheduling注解,该注解会引入ScheduledAnnotationBeanPostProcessor。
beanprocessor是一个bean后置处理器,负责扫描带有 @Scheduled 注解的方法,将其转换为可执行的任务,并根据注解的属性将其注册到 TaskScheduler 中进行管理和执行。
这样,开发者只需要在普通 Spring Bean 的方法上添加 @Scheduled 注解,Spring 就能自动地按照指定的时间策略执行这些方法,而无需手动创建和管理线程。其内部有一个registrar是 ScheduledTaskRegistrar用来注册任务。
查找 @Scheduled 注解
在ScheduledAnnotationBeanPostProcessor.postProcessAfterInitialization()方法处理所有的@Scheduled,具体处理每个注解方法是
public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler || bean instanceof ScheduledExecutorService) { // Ignore AOP infrastructure such as scoped proxies. return bean; } //解析所有的@Scheduled注解 Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean); if (!this.nonAnnotatedClasses.contains(targetClass) && AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) { Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> { Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations( method, Scheduled.class, Schedules.class); return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null); }); if (annotatedMethods.isEmpty()) { this.nonAnnotatedClasses.add(targetClass); } else { // 调用processScheduled()方法初始化调度任务 annotatedMethods.forEach((method, scheduledAnnotations) -> scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean))); } } return bean; }
processScheduled(Scheduled scheduled, Method method, Object bean)
scheduled
:注解配置任务周期相关信息method
:注解所在方法bean
:注解所在实例对象。有bean和method就可以通过反射 进行方法调用。
processScheduled()方法首先将schedueld注解上的方法封装传给你一个runnable任务,然后
封装任务
protected Runnable createRunnable(Object target, Method method) { //@Scheduled 注解修饰的方法必须是无参的 Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled"); Method invocableMethod = AopUtils.selectInvocableMethod(method, target.gjsetClass()); return new ScheduledMethodRunnable(target, invocableMethod); }
这里就是将实例方法包装成一个ScheduledMethodRunnable对象,
ScheduledMethodRunnable.run方法就是通过反射调用该方法。
ReflectionUtils.makeAccessible(this.method); this.method.invoke(this.target);
注册任务
下一步会解析 @Scheduled 注解中的属性,如
fixedRate
: 以固定的时间间隔(毫秒)执行任务,在上一次任务开始后等待指定的时间。fixedDelay
: 在上一次任务完成后,等待指定的时间(毫秒)再执行下一次任务。cron
: 使用 Cron 表达式定义任务的执行时间。initialDelay
: 任务第一次执行前的延迟时间(毫秒)。
不同的类型会通过registrar不同方法进行注册。
this.registrar.scheduleCronTask() this.registrar.scheduleFixedDelayTask() this.registrar.scheduleFixedRateTask()
这里又引入了一个重要的类ScheduledTaskRegistrar来注册任务。
这里以scheduleCronTask方法为例来看下cron表达式类型任务的注册:
public ScheduledTask scheduleCronTask(CronTask task) { ScheduledTask scheduledTask = this.unresolvedTasks.remove(task); boolean newTask = false; if (scheduledTask == null) { scheduledTask = new ScheduledTask(task); newTask = true; } //taskScheduler是否初始化 if (this.taskScheduler != null) { 编程//创建任务 scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger()); } else {//taskScheduler未初始化,将任务放到未处理列表里 addCronTask(task); this.unresolvedTasks.put(task, scheduledTask); } return (newTask ? scheduledTask : null); }
入参是一个CronTask类型,在上面的processScheduled()方法调用实例是
/** runnable就是封装的ScheduledMethodRunnable */ this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron /**cron表达式*/, timeZone)))
最后调用taskScheduler.schedule(task.getRunnable(), task.getTrigger())来启动任务。
taskScheduler
在看taskScheduler.schedule()方法前首先来看taskScheduler是怎么初始化的。
这还要看ScheduledAnnotationBeanPostProcessor ,其实现了SmartInitializingSingleton接口。在所有Singleton Bean 初始化完成后被调用afterSingletonsInstantiated()方法。该方法又会调用finishRegistration();来完成惹我你的注册。
private void finishRegistration() { if (this.scheduler != null) { this.registrar.setScheduler(this.scheduler); } if (this.beanFactory instanceof ListableBeanFactory) { Map<String, SchedulingConfigurer> beans = ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class); List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values()); AnnotationAwareOrderComparator.sort(configurers); for (SchedulingConfigurer configurer : configurers) { configurer.configureTasks(this.registrar); } } if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type"); try { // Search for TaskScheduler bean... this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false)); } catch (NoUniqueBeanDefinitionException ex) { try { this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true)); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskScheduler bean exists within the context, and " + "none jsis named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " + "(possibly as an alias); or implement the SchedulingConfigurer interface and call " + "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { // Search for ScheduledExecutorService bean next... try { this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false)); } catch (NoUniqueBeanDefinitionException ex2) { try { this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true)); } catch (NoSuchBeanDefinitionException ex3) { } } catch (NoSuchBeanDefinitionException ex2) { sPHoHv // Giving up -> falling back to default scheduler within the registrar... logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing"); } } } this.registrar.afterPropertiesSet(); }
这里首先会从beanfacotry中查找SchedulingConfigurer类型的bean,然后调用configureTasks来加载自定义schedule配置信息,这里入参是registrar,然后从容器中查找TaskScheduler、ScheduledExecutorService类型的bean来初始化taskScheduler。最后调用registrar.afterPropertiesSet()。这里还有一步兜底,如果schedule还是为空,则默认创建一个ConcurrentTaskScheduler类型的scheduler。
registrar.afterPropertiesSet()方法
protected void scheduleTasks() { if (this.taskScheduler == null) { this.localExecutor = Executors.newSingleThreadScheduledExecutor(); this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor); } if (this.triggerTasks != null) { for (TriggerTask task : this.triggerTasks) { addScheduledTask(scheduleTriggerTask(task)); } } if (this.cronTasks != null) { for (CronTask task : this.cronTasks) { addScheduledTask(scheduleCronTask(task)); } } if (this.fixedRateTasks != null) { for (IntervalTask task : this.fixedRateTasks) { addScheduledTask(scheduleFixedRateTask(task)); } } if (this.fixedDelayTasks != null) { for (IntervalTask task : this.fixedDelayTasks) { addScheduledTask(scheduleFixedDelayTask(task)); } } }
任务执行
回过头来继续看taskScheduler.schedule()任务的注册,这里就看默认的ConcurrentTaskScheduler.schedule()方法。
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) { try { if (this.enterpriseConcurrentScheduler) { return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger); } else { ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true)); return new ReschedulingRunnable(task, trigger, this.clock, this.scheduledExecutor, errorHandler).schedule(); } } catch (RejectedExecutionException ex) { throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex); } }
最后调用ReschedulingRunnable(task, trigger, this.clock, executor, errorHandler).schedule()
来看ReschedulingRunnable的schedule方法
public ScheduledFuture<?> schedule() { synchronized (this.triggerContextMonitor) { this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext); if (this.scheduledExecutionTime == null) { return null; } long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis(); this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); return this; } }
计算好下次执行时间initialDelay,使用线程池executor延迟执行当前ReschedulingRunnable。
run方法
public void run() { Date actualExecutionTime = new Date(this.triggerContext.getClock().millis()); super.run(); Date completionTime = new Date(this.triggerContext.getClock().millis()); synchronized (this.triggerContextMonitor) { Assert.state(this.scheduledExecutionTime != null, "No scheduled ejsxecution"); this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime); if (!obtainCurrentFuture().isCancelled()) { schedule(); } } }
这里super.run()就是调用ReschedulingRunnable extends DelegatingErrorHandlingRunnable构造方法创建传入的task,也就是原始schedule注解方法。
super.run()就是DelegatingErrorHandlingRunnable的run方法
public DelegatingErrorHandlingRunnable(Runnable delegate, ErrorHandler errorHandler) { Assert.notNull(delegate, "Delegate must not be null"); Assert.notNull(errorHandler, "ErrorHandler must not be null"); this.delegate = delegate; this.errorHandler = errorHandler; } public void run() { try { this.delegate.run(); } catch (UndeclaredThrowableException ex) { this.errorHandler.handleError(ex.getUndeclaredThrowable()); } catch (Throwable ex) { this.errorHandler.handleError(ex); } }
回到ReschedulingRunnable的run方法,在执行完被代理task后,如果任务没有被取消,又调用schedule()方法进行下一次任务执行。这样就完成了任务的周期性执行。
怎么动态控制定时任务?
如果想取消或修改某个任务执行周期,这个时候该如何做呢?
这个时候可以使用上面说的SchedulingConfigurer接口,该接口回暴露ScheduledTaskRegistrar类实例。上面代码分析可以看到,所有的任务都是通过该类进行初始化的,通过该类可以动态的添加任务。并且schedule()方法返回的是一个ScheduledFuture,可以通过调用cancel方法来取消任务。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论