开发者

SpringBoot @Schedule的使用注意与原理分析

目录
  • 简介
  • 注意事项
    • @Schedule默认线程池大小
    • 固定延迟与固定速率
  • SpringBoot @Schedule原理
    • ScheduledAnnotationBeanPostProcessor
      • DestructionAwareBeanPostProcessor封装任务
      • ApplicationListener执行任务
    • 总结

      简介

      之前使用@Schedjsule一直没有遇到什么问题,那种拿来就用的感觉还挺好,最近使用@Schedule遇到一点问题,才仔细的研究了一下@Schedule的一些细节和原理问题。

      这篇文章就将分享一下,使用@Schedule一些可能被忽略的问题。

      注意事项

      @Schedule默认线程池大小

      我相信@Schedule默认线程池大小的问题肯定是被很多拿来就用的朋友忽略的问题,默认情况下@Schedule使用线程池的大小为1。

      一般情况下没有什么问题,但是如果有多个定时任务,每个定时任务执行时间可能不短的情况下,那么有的定时任务可能一直没有机会执行。

      有兴趣的朋友,可以试一下:

      @Component
      public class BrigeTask {
      
          private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
      
          @Scheduled(cron = "*/5 * * * * ?")
          private void cron() throws InterruptedException {
              System.out.println(Thread.currentThread().getName() + "-cron:" + LocalDateTime.now().format(FORMATTER));
              TimeUnit.SECONDS.sleep(6);
          }
      
          @Scheduled(fixedDelay = 5000)
          private void fixedDelay() throws InterruptedException {
              System.out.println(Thread.currentThread().getName() + "-fixedDelay:" + LocalDateTime.now().format(FORMATTER));
          python    TimeUnit.SECONDS.sleep(6);
          }
      
          @Scheduled(fixedRate = 5000)
          private void fixedRate() throws InterruptedException {
              System.out.println(Thread.currentThread().getName() + "-fixedRate:" + LocalDateTime.now().format(FORMATTER));
              TimeUnit.SECONDS.sleep(6);
          }
      }

      上面的任务中,fixedDelay与cron,可能很久都不会被执行。

      SpringBoot @Schedule的使用注意与原理分析

      要解决上面的问题,可以把执行任务的线程池设置大一点,怎样设置通过javascript实现SchedulingConfigurer接口,在configureTasks方法中配置,这种方式参见后面的代码,这里可以直接注入一个TaskScheduler来解决问题。

      @Bean
      public TaskScheduler taskScheduler() {
          ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
          taskScheduler.setPoolSize(5);
          return taskScheduler;
      }

      当然也可以使用ScheduledExecutorService:

      @Bean
      public ScheduledExecutorService scheduledExecutorService() {
          return Executors.newScheduledThreadPool(10);
      }

      为啥这样有效,请参考后面@Schedule原理。

      固定延迟与固定速率

      @Schedule的三种方式cron、fixedDelay、fixedRate不管线程够不够都会阻塞到上一次执行完成,才会执行下一次。

      如果任务方法执行时间非常短,上面三种方式其实基本没有太多的区别。

      如果,任务方法执行时间比较长,大于了设置的执行周期,那么就有很大的区别。例如,假设执行任务的线程足够,执行周期是5s,任务方法会执行6s。

      • cron的执行方式是,任务方法执行完,遇到下一次匹配的时间再次执行,基本就会10s执行一次,因为执行任务方法的时间区间会错过一次匹配。
      • fixedDelay的执行方式是,方法执行了6s,然后会再等5s再执行下一次,在上面的条件下,基本就是每11s执行一次。
      • fixedRate的执行方式就变成了每隔6s执行一次,因为按固定区间执行它没5s就应该执行一次,但是任务方法执行了6s,没办法,只好6s执行一次。

      上面的结论都可以通过,最上面的示例验证,有兴趣的朋友可以调整一下休眠时间测试一下。

      SpringBoot @Schedule原理

      在SpringBoot中,我们使用@EnableScheduling来启用@Schedule。

      @Target(ElementType.TYPE)
      @Retention(RetentionPolicy.RUNTIME)
      @Import(SchedulingConfiguration.class)
      @Documented
      public @interface EnableScheduling {
      
      }

      EnableSchedEBGyOjRuling注解没什么特殊,需要注意import了SchedulingConfiguration。

      SchedulingConfiguration一看名字就知道是一个配置类,肯定是为了添加相应的依赖类。

      @Configuration
      @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
      public class SchedulingConfiguration {
      
      	@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
      	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
      	public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
      		return new ScheduledAnnotationBeanPostProcessor();
      	}
      }

      我们可以看到在SchedulingConfiguration创建了一个ScheduledAnnotationBeanPostProcessor。

      看样子SpringBoot定时任务的核心就是ScheduledAnnotationBeanPostProcessor类了,下面我们来看一看ScheduledAnnotationBeanPostProcessor类。

      ScheduledAnnotationBeanPostProcessor

      public class ScheduledAnnotationBeanPostProcessor
      		implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
      		Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
      		SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
      }

      ScheduledAnnotationBeanPostProcessor实现了很多接口,这里重点关注2个,ApplicationListener和DestructionAwareBeanPostProcessor。

      DestructionAwareBeanPostProcessor封装任务

      DestructionAwareBeanPostProcessor继承了BeanPostProcessor。

      BeanPostProcessor相信大家已经非常熟悉了,就是在Bean创建执行setter之后,在自定义的afterPropertiesSet和init-method前后提供拦截点,大致执行的先后顺序是:

      Bean实例化 -> setter -> BeanPostProcessor#postProcessBeforeInitialization ->
      -> InitializingBean#afterPropertiesSet -> init-method -> BeanPostProcessor#postProcessAfterInitialization

      我们看一下ScheduledAnnotationBeanPostProcessor的postProcessAfterInitialization方法:

      @Override
      public Object postProcessAfterInitialization(Object bean, String beanName) {
          if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
                  bean instanceof ScheduledExecutorService) {
              // Ignore AOP infrastructure such as scoped proxies.
              return bean;
          }
      
          Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
          if (!this.nonAnnotatedClasses.contains(targetClass) &&
                  AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
              Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                      (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                          Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                  method, Scheduled.class, Schedules.class);
                          return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                      });
              if (annotatedMethods.isEmpty()) {
                  this.nonAnnotatedClasses.add(targetClass);
                  if (logger.isTraceEnabled()) {
                      logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
                  }
              }
              else {
                  // Non-empty set of methods
                  annotatedMethods.forEach((method, scheduledMethods) ->
                          scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
                  if (logger.isTraceEnabled()) {
                      logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                              "': " + annotatedMethods);
                  }
              }
          }
          return bean;
      }

      简单说一下流程:

      找到所有的Schedule方法,把它封装为ScheduledMethodRunnabpythonle类(ScheduledMethodRunnable类实现了Runnable接口),并把其做为一个任务注册到ScheduledTaskRegistrar中。

      如果对具体的逻辑感兴趣,可以从postProcessAfterInitialization方法顺着processScheduled方法一次debug。

      ApplicationListener执行任务

      前面我们介绍通过BeanPostProcessor解析出了所有的任务,接下来要做的事情就是提交任务了。

      @Override
      public void onApplicationEvent(ContextRefreshedEvent event) {
          if (event.getApplicationContext() == this.applicationContext) {
              // Running in an ApplicationContext -> register tasks this late...
              // giving other ContextRefreshedEvent listeners a chance to perform
              // their work at the same time (e.g. Spring BATch's job registration).
              finishRegistration();
          }
      }

      ScheduledAnnotationBeanPostProcessor监听的事件是ContextRefreshedEvent,就是在容器初始化,或者刷新的时候被调用。

      监听到ContextRefreshedEvent事件之后,值调用了finishRegistration方法,这个方法的基本流程如下:

      • 1.找到容器中的SchedulingConfigurer,并调用它的configureTasks,SchedulingConfigurer的作用主要就是配置ScheduledTaskRegistrar类,例如线程池等参数,例如:
      import org.springframework.context.annotation.Configuration;
      import org.springframework.scheduling.annotation.SchedulingConfigurer;
      import org.springframework.scheduling.config.ScheduledTaskRegistrar;
      
      import Java.util.concurrent.Executors;
      
      @Configuration
      public class MyScheduleConfig implements SchedulingConfigurer {
      
          @Override
          public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
              taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
          }
      }
      • 2.调用ScheduledTaskRegistrar的afterPropertiesSet方法执行任务,如果对具体的逻辑感兴趣,可以阅读ScheduledTaskRegistrar的scheduleTasks方法。

      关于为啥直接在容器中注入一个TaskScheduler、ScheduledExecutorService也可以有效,也可以在finishRegistration方法中找到答案。

      总结

      以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

      0

      上一篇:

      下一篇:

      精彩评论

      暂无评论...
      验证码 换一张
      取 消

      最新开发

      开发排行榜