开发者

spring event 事件异步处理方式(发布,监听,异步处理)

目录
  • spring event 事件异步处理(发布,监听,异步处理)
  • spring事件之异步线程执行
    • Spring 事件
    • 发布
    • 监听的多种实现
    • 异步方法
    • 解决方案
  • 总结

    spring event 事件异步处理(发布,监听,异步处理)

    // 定义事件
    public class EventDemo extends ApplicationEvent {
     
        private String supplierCode;
        private String productCode;
     
        public EventDemo(Object source, String supplierCode, String productCode) {
            super(source);
            this.supplierCode = supplierCode;
            this.productCode = productCode;
        }
     
        public String getSupplierCode() {
            return supplierCode;
        }
     
        public String getProductCode() {
            return productCode;
        }
    }
    // 发布事件
    @Component
    public class EventDemoPub编程lish {
     
        @Autowired
        private ApplicationEventPublisher applicationEventPublisher;
     
        public void publish(String message) {
            EventDemo demo = new EventDemo(this, message);
            applicationEventPublisher.publishEvent(demo);
            System.out.println("发布事件执行结束");
        }
    }
    // 监听事件
    @Component
    public class EventDemoListener implements ApplicationListener<EventDemo> {
        @Override
        public void onApplicationEvent(EventDemo event) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("事件监听开始...... " + "商家编码:" + event.getSupplierCode() + ",商品编码:" + event.getProductCode());
        }
    }
    <!--定义事件异步处理-->
     
    <bean id="commonTaskExecutor"
    		  class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    		<!-- 线程池维持处于Keep-alive状态的线程数量。如果设置了allowCoreThreadTimeOut为true,该值可能为0。
                并发线程数,想达到真正的并发效果,最好对应CPU的线程数及核心数 -->
    		<property name="corePoolSize" value="2" />
    		<!-- 最大线程池容量 -->
    		<property name="maxPoolSize" value="2" />www.devze.com
    		<!-- 超过最大线程池容量后,允许的线程队列数 -->
    		<property name="queueCapacity" value="2" />
    		<!-- 线程池维护线程所允许的空闲时间 .单位毫秒,默认为60s,超过这个时间后会将大于corePoolSize的线程关闭,保持corePoolSize的个数 -->
    		<property name="keepAliveSeconds" value="1000" />
    		<!-- 允许核心线程超时: false(默认值)不允许超时,哪怕空闲;true则使用keepAliveSeconds来控制等待超时时间,最终corePoolSize的个数可能为0 -->
    		<property name="allowCoreThreadTimeOut" value="true" />
     
    		<!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
    		<property name="rejectedExecutionHandler">
    			<bean class="Java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
    			<!-- java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中 -->
    			<!-- java.util.concurrent.ThreadPoolExecutor$AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
    		</property>
    	</bean>
     
    	<!--名字必须是applicationEventMulticaster,因为AbstractApplicationContext默认找个-->
    	<bean id="applicationEventMulticaster" class="org.springframework.context.event.SimpleApplicationEventMulticaster">
    		<!--注入任务执行器 这样就实现了异步调用-->
    		<property name="taskExecutor" ref="commonTaskExecutor"></property>
    	</bean>

    spring事件之异步线程执行

    Spring 不仅为我们提供了IOC , AOP功能外,还在这个基础上提供了许多的功能,我们用的最多的可能就是Spring MVC了吧,但是让我们来看下spring-context包,其中包含了缓存、调度、校验功能等等

    spring event 事件异步处理方式(发布,监听,异步处理)

    这里主要想介绍一下Spring提供的观察者模式实现(事件发布监听)及异步方法执行,这些功能也都是基于AOP实现的

    Spring 事件

    观察者模式大家都了解,它可以解耦各个功能,但是自己实现的话比较麻烦,Spring为我们提供了一种事件发布机制,可以按需要发布事件,之后由监听此事件的类或方法来执行各自对应的功能,代码互相不影响,以后修改订单后续的逻辑时不会影响到订单创建,有点类似于使用MQ的感觉~

    比如在配置中心apollo项目中,在portal创建了app后会发送app创建事件,监听此事件的逻辑处将此消息同步到各个环境的admin sevice中,大家有兴趣可以看下相关代码

    现在我们来看看具体如何使用:假设一个下单场景,订单创建成功后可能有一些后续逻辑要处理,但是和创建订单本身没有关系,此时就可以在创建订单完成后,发送一个消息,又相应部分的代码进行监听处理,避免代码耦合到一起

    首先创建对应的事件

    import org.springframework.context.ApplicationEvent;
     
    public class CreatedOrderEvent extends ApplicationEvent {
    	
        private final String orderSn;
        
        public CreatedOrderEvent(Object source, String orderSn) {
        	super(source);
            this.orderSn = orderSn;
        }
        
        public String getOrderSn() {
        	return this.orderSn;
        }
    }

    现在还需要一个事件发布者和监听者,创建一下

    发布

    import org.springframework.context.ApplicationEventPublisher;
     
    private ApplicationEventPublisher applicationEventPublisher;
     
    applicationEventPublisher.publishEvent(new CreatedOrderEvent(this, orderSn));

    监听的多种实现

    1:注解实现  @EventListener

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.event.EventListener;
    import org.springframework.stereotype.Component;
     
    @Slf4j
    @Component
    public class OrderEventListener {
        
        @EventListener
        public void orderEventListener(CreatedOrderEvent event) {
        	
        }
    }

    2:代码实现

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.ApplicationListener;
     
    @Slf4j
    @Component
    public class OrderEventListener implements ApplicationListener<CreatedOrderEvent> {
        
        @Override
        public void onApplicationEvent(CreatedOrderEvent event) {
        	
        }
    }

    简单的事件发布就完成了,其中的其他复杂逻辑由Spring替我们处理了

    这里我们要注意一点:发布和监听后处理的逻辑是在一个线程中执行的,不是异步执行

    异步方法

    有时候我们为了提高响应速度,有些方法可以异步去执行,一般情况下我们可能是手动将方法调用提交到线程池中去执行,但是Spring 为我们提供了简化的写法,在开启了异步情况下,不用修改代码,只使用注解即可完成此功能

    这时只需要在要异步执行的方法上添加@Async注解即可异步执行;@EnableAsync 启动异步线程, 如

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.event.EventListener;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.stereotype.Component;
     
    @Slf4j
    @Component
    @EnableAsync
    public class OrderEventListener {
        @Async
        @EventListener
        public void orderEventListener(CreatedOrderEvent event) {
        	
        }
    }

    在使用@Async会有一些问题建议看各位看下相关文档及源码

    我们通过Spring事件同步线程改为异步线程,默认的线程池是不复用线程

    我觉得这是这个注解最坑的地方,没有之一!我们来看看它默认使用的线程池是哪个,在前文的源码分析中,我们可以看到决定要使用线程池的方法是

    org.springframework.aop.interceptor.AsyncExecutionASPectSupport#determineAsyncExecutor

    其源码如下:

    protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
        AsyncTaskExecutor executor = this.executors.get(method);
        if (executor == null) {
            Executor targetExecutor;
            // 可以在@Async注解中配置线程池的名字
            String qualifier = getExecutorQualifier(method);
            if (StringUtils.hasLength(qualifier)) {
                targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
            }
            else {
                // 获取默认的线程池
                targetExecutor = this.defaultExecutor.get();
            }
            if (targetExecutor == null) {
                return null;
            }
            executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
                        (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
            this.executors.put(method, executor);
        }
        return executor;
    }

    最终会调用到

    org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor

    这个方法中

    protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
       Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
       return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
    }

    可以看到,它默认使用的线程池是SimpleAsyncTaskExecutor。我们不看这个类的源码,只看它上面的文档注释,如下:

    spring event 事件异步处理方式(发布,监听,异步处理)

    主要说了三点

    • 为每个任务新起一个线程
    • 默认线程数不做限制
    • 不复用线程

    就这三点,你还敢用吗?只要你的任务耗时长一点,说不定服务器就给你来个OOM

    解决方案

    最好的办法就是使用自定义的线程池,主要有这么几种配置方法

    1.在之前的源码分析中,我们可以知道,可以通过AsyncConfigurer来配置使用的线程池

    如下:

    import lombok.extern.slf4j.Slf4j;
    import org.slf4j.MDC;
    import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
    import org.springframework.lang.NonNull;
    import org.springframework.scheduling.annotation.AsyncConfigurer;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    import org.springframework.stereotype.Component;
     
    import java.util.Map;
    import java.util.concurrent.Callable;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Future;
    import java.util.concurrent.ThreadPoolExecutor;
     
    /**
     * 异步线程池配置
     */
    @Slf4j
    @Component
    public class AsyncConfig implements AsyncConfigurer {
     
        @Override
        public Executor getAsyncExecutor() {
            MdcThreadPoolTaskExecutor executor = new MdcThreadPoolTaskExecutor();
            executor.setCorePoolSize(5);
            executor.setMaxPoolSize(200);
            executor.setKeepAliveSeconds(5 * 60);
            executor.setQueueCapacity(1000);
            // 自定义实现拒绝策略
            executor.setRejectedExecutionHandler((Runnable runnable, ThreadPoolExecutor exe) -> log.error("当前任务线程池队列已满."));
            // 或者选择已经定义好的其中一种拒绝策略
            // 丢弃任务并抛出RejectedExecutionException异常
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
            // 丢弃任务,但是不抛出异常
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
            // 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            // 由调用线程处理该任务
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
            // 线程名称前缀
            executor.setThreadNamePrefix("Async-");
            executor.initialize();
            return executor;
        }
     
        @Override
        public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
            return (ex, method, params) -> log.error("线程池执行任务发生未知异常.", ex);
        }
     
        /**
         * 增加日志MDC
         */
    	public static class MdcThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
     
            /**
             * Gets context for task *
             *
             * @return context for task
             */
            private Map<String, String> getContextForTask() {
                return MDC.getCopyOfContextMap();
            }
     
            /**
             * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code execute()} etc.)
             * all delegate to this.
             */
            @Override
            public void execute(@NonNull Runnable command) {
                super.execute(wrap(command, getContextForTask()));
            }
     
            /**
             * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
             * all delegate to this.
             */
            @NonNull
            @Override
            public Future<?> submit(@NonNull Runnable task) {
                return super.submit(wrap(task, getContextForTask()));
            }
     
            /**
             * All executions will have MDC injected. {@code ThreadPoolExecutor}'s submission methods ({@code submit()} etc.)
             * all delegate to this.
             */
            @NonNull
            @Override
            public <T> Future<T> submit(@NonNull Callable<T> task) {
                return super.submit(wrap(task, getContextForTask()));
            }
     
            /**
             * Wrap callable
             *
             * @param <T>     parameter
             * @param task    task
             * @param context context
             * @return the callable
             */
            private <T> Callable<T> wrap(final Callable<T> task, final Map<String, String> context) {
                return () -> {
                    Map<String, String> previous = MDC.getCopyOfContextMap();
                    if (context == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(context);
                    }
                    try {
                        return task.call();
                    } finally {
                        iEBxDsbjwHf (previous == null) {
                            MDC.clear();
                        } else {
                            MDC.setContextMap(previous);
                        }
                    }
                };
            }
     
            /**
             * Wrap runnable
             *
             * @param runnable runnable
             * @param context  context
             * @return the runnable
             */
            private Runnable wrap(final Runnable runnable, final Map<String, String> context) {
                return () -> {
                www.devze.com    Map<String, String> previous = MDC.ge开发者_C学习tCopyOfContextMap();
                    if (context == null) {
                        MDC.clear();
                    } else {
                        MDC.setContextMap(context);
                    }
                    try {
                        runnable.run();
                    } finally {
                        if (previous == null) {
                            MDC.clear();
                        } else js{
                            MDC.setContextMap(previous);
                        }
                    }
                };
            }
        }
     
    }

    该方式实现线程的复用以及,子线程继承父线程全链路traceId,方便定位问题

    2.直接在@Async注解中配置要使用的线程池的名称

    @Async(value = "自定义线程名")

    总结

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜