开发者

Spring源码之事件监听机制(实现EventListener接口方式)

目录
  • 一、Spring实现自定义事件的发布订阅
    • 1、事件定义
    • 2、事件监听(泛型)
    • 3、模拟事件发送
    • 4、启动项目,调用 127.0.0.1:8080/publishOrderEvent
  • 二、Spring事件驱动原理分析(Spring版本为5.1.7)
    • 1、ApplicationContext委派ApplicationEventPublisher发送事件
    • 2、ApplicationEventMutulcaster类型的确认和初始化
    • 3、SimpleApplicationEventMulticaster的发送事件方法
    • 4、ResolvableType类型确认
    • 5、获取所有的监听列表,并且看看是怎么做到监听泛型类型
    • 6、根据监听列表,循环调用(同步或异步)
  • 总结

    一、Spring实现自定义事件的发布订阅

    github地址为:https://github.com/kevin-lihongmin/designpattern/tree/master/src/main/Java/com/kevin/designpattern/headfirst/observer/spring

    1、事件定义

    /**
     *  定义事件类型
     *
     * @author lihongmin
     * @date 2019/11/3 20:30
     */
    public class OrderEvent extends ApplicationEvent {
    
        public OrderEvent(Object source) {
            super(source);
        }
    }

    2、事件监听(泛型)

    /**
     *  订单事件监听
     * @author lihongmin
     * @date 2019/11/3 20:33
     */
    @Component
    public class OrderEventListener implements ApplicationListener<OrderEvent> {
    
        @Override
        public void onApplicationEvent(OrderEvent orderEvent) {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("我受到了一个事件:" + orderEvent.getSource());
        }
    }

    3、模拟事件发送

    /**
     *  事件触发模拟
     *
     *  我受到了一个事件:我发布了事件!!编程!
     *  我执行完毕了!!!
     *
     * @author lihongmin
     * @date 2019/11/3 20:35
     */
    @Controller
    public class OrderEventController implements ApplicationContextAware {
    
        private ApplicationContext applicationContext;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    
        @GetMapping("publishOrderEvent")
        public String publishOrderEvent() {
            applicationContext.publishEvent(new OrderEvent("我发布了事件!!!"));
            System.out.println("我执行完毕了!!!");
            return "发送事件了!";
        }
    }

    4、启动项目,调用 127.0.0.1:8080/publishOrderEvent

    我受到了一个事件:我发布了事件!!!

    我执行完毕了!!!

    总结:事件发送非常的简单,一个事件类型,一个监听,一个触发机制。并且该事件为同步机制(后续在Spring Boot中可以方便切换为异步)。

    二、Spring事件驱动原理分析(Spring版本为5.1.7)

    1、ApplicationContext委派ApplicationEventPublisher发送事件

    我们调用的是 ApplicationContext的

    publishEvent(new OrderEvent("我发布了事件!!!")); 

    查看ApplicationContext 结构,发现调用的是父类 ApplicationEventPublisher的接口

    如下:

    public interface ApplicationContext extends EnvironmentCapable, ListableBeanFactory,
     HierarchicalBeanFactory, MessageSource, ApplicationEventPublisher, ResourcePatternResolver {
        @Nullable
        String getId();
    
        String getApplicationName();
    
        String getDisplayName();
    
        long getStartupDate();
    
        @Nullable
        ApplicationContext getParent();
    
        AutowireCapableBeanFactory getAutowireCapableBeanFactory() throws IllegalStateException;
    }
    public interface ApplicationEventPublisher {
        default void publishEvent(ApplicationEvent event) {
            this.publishEvent((Object)event);
        }
    
        void publishEvent(Object var1);
    }

    那么就是其子类 AbstractApplicationContext 实现的发送操作

    public void publishEvent(Object event) {
            this.publishEvent(event, (ResolvableType)null);
        }
    
        protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
            Assert.notNull(event, "Event must not be null");
            Object applicationEvent;
            if (event instanceof ApplicationEvent) {
                applicationEvent = (ApplicationEvent)event;
            } else {
                applicationEvent = new PayloadApplicationEvent(this, event);
                if (eventType == null) {
                    eventType = ((PayloadApplicationEvent)applicationEvent).getResolvableType();
                }
            }
    
            if (this.earlyApplicationEvents != null) {
                this.earlyApplicationEvents.add(applicationEvent);
            } else {
                this.getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType);
            }
    
            if (this.parent != null) {
                if (this.parent instanceof AbstractApplicationContext) {
                    ((AbstractApplicationContext)this.parent).publishEvent(event, eventType);
                } else {
                    this.parent.publishEvent(event);
                }
            }
    
        }

    发现执行到

    getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType); 

    那么其实这里算是一个委派模式了(个人认为),spring容器将发送事件委派给 AbstractApplicationContext的ApplicationEventMulticaster applicationEventMulticaster对象。

    2、ApplicationEventMutulcaster类型的确认和初始化

    不难发现(或者对Spring ApplicationCopythonntext比较熟悉的话)是项目启动时,不同类型的ApplicationContext(如:ClassPathXMLApplicationContext)

    在调用父类 AbstractApplicationContext的refresh方法(之前分析过是一个模板方法)时, initApplicationEventMulticaster()

    如下:

    protected void initApplicationEventMulticaster() {
            ConfigurableListableBeanFactory beanFactory = this.getBeanFactory();
            if (beanFactory.containsLocalBean("applicationEventMulticaster")) {
                this.applicationEventMulticaster = (ApplicationEventMulticaster)beanFactory.getBean("applicationEventMulticaster", ApplicationEventMulticaster.class);
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
                }
            } else {
                this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
                beanFactory.registerSingleton("applicationEventMulticaster", this.applicationEventMulticaster);
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("No 'applicationEventMulticaster' bean, using [" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
                }
            }
    
        }

    逻辑比较简单,在BeanFactory中获取名称为 applicationEventMulticaster的Bean,当然如果我们没有自定义并且注册为该名称的Bean,肯定是获取不到的。

    那么会new一个 SimpleApplicationEventMulticaster类型的bean注册到容器中。

    也就是说上面的getApplicationEventMulticaster()获取到的就是SimpleApplicationEventMulticaster

    但是还需要注意使用的是有参数构造进行初始化,如下:

    public SimpleApplicationEventMulticaster(BeanFactory beanFactory) {
        this.setBeanFactory(beanFactory);
    }

    在父类中实现:

    public void setBeanFactory(BeanFactory beanFactory) {
    	this.beanFactory = beanFactory;
    	if (beanFactory instanceof ConfigurableBeanFactory) {
    		ConfigurableBeanFactory cbf = (ConfigurableBeanFactory)beanFactory;
    		if (this.beanClassLoader == null) {
    			this.beanClassLoader = cbf.getBeanClassLoader();
    		}
    
    		this.retrievalMutex = cbf.getSingletonMutex();
    	}
    
    }

    获取bean工厂中所以的所以单例对象放入属性retrievalMutex 中,将类加载器也进行赋值,后续会用到。

    3、SimpleApplicationEventMulticaster的发送事件方法

    public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
            ResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event);
            Iterator var4 = this.getApplicationListeners(event, type).iterator();
    
            while(var4.hasNext()) {
                ApplicationListener<?> listener = (ApplicationListener)var4.next();
                Executor executor = this.getTaskExecutor();
                if (executor != null) {
                    executor.execute(() -> {
                        this.invokeListener(listener, event);
                    });
                } else {
                    this.invokeListener(listener, event);
                }
            }
    
        }

    分析一下这个方法:

    • 1)、获取或确认 ResolvableType 类型
    • 2)、根据事件对象和ResolvableType 类型,获取订阅者列表
    • 3)、发现如果 SimpleApplicationEventMulticaster对象的线程池属性 Executor taskExecutor不为null则异步执行监听方法。但是我们看到的是自己new了一个对象,所以如果想 事件监听使用线程池异步执行的话(自己想到应该可以这样玩,自己比较喜欢自定义线程参数,心里有数,当前一般还会设置线程池前缀名称):
    @Component
    public class DesignpatternApplication implements BeanFactoryAware {
    
    	private BeanFactory beanFactory;
    	
    	@Override
    	public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
    		this.beanFactory = beanFactory;
    	}
    	
    	@Bean("APPLICATION_EVENT_MULTICASTER_BEAN_NAME")
    	public SimpleApplicationEventMulticaster init() {
    		ThreadPoolExecutor MulticasterExecutor = new ThreadPoolExecutor(5, 5, 60, TimeUnit.SECONDS,
    				new LinkedblockingDeque<>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
    		SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster();
    		multicaster.setTaskExecutor(MulticasterExecutor);
    		multicaster.setBeanFactory(beanFactory);
    		return multicaster;
    	}
    }
    • 4)、最后肯定是invokeListener(listener, event);

    4、ResolvableType类型确认

    首先我们传入的eventType是null,所以先根据我们传入的对象调用resolveDefaultEventType方法

    如下:

    private ResolvableType resolveDefaultEventType(ApplicationEvent event) {
        return ResolvableType.forInstance(event);
    }

    再调用,肯定OrderEvent肯定没有实现ResolvableTypeProvider接口:

    public static ResolvableType forInstance(Object instance) {
    	Assert.notNull(instance, "Instance must not be null");
    	if (instance instanceof ResolvableTypeProvider) {
    		ResolvableType type = ((ResolvableTypeProvider) instance).getResolvableType();
    		if (type != null) {
    			return type;
    		}
    	}
    	return ResolvableType.forClass(instance.getClass());
    }

    再调用:

    public static ResolvableType forClass(@Nullable Class<?> clazz) {
    	retandroidurn new ResolvableType(clazz);
    }

    所以我们或者到了一个新创建的 ResolvableType 对象,对象的clazz字段为我们的 OrderEvent。

    为什么追这么深,是因为下面就是根据类型来获取监听器的。

    5、获取所有的监听列表,并且看看是怎么做到监听泛型类型

    protected Collection<ApplicationListener<?>> getApplicationListeners(ApplicationEvent event, ResolvableType eventType) {
            Object source = event.getSource();
            Class<?> sourceType = source != null ? source.getClass() : null;
            AbstractApplicationEventMulticaster.ListenerCacheKey cacheKey = new AbstractApplicationEventMulticaster.ListenerCacheKey(eventType, sourceType);
            AbstractApplicationEventMulticaster.ListenerRetriever retriever = (AbstractApplicationEventMulticaster.ListenerRetriever)this.retrieverCache.get(cacheKey);
            if (retriever != null) {
                return retriever.getApplicationListeners();
            } else if (this.beanClassLoader == null || ClassUtils.isCacheSafe(event.getClass(), this.beanClassLoader) && (sourceType == null || ClassUtils.isCacheSafe(sourceType, this.beanClassLoader))) {
                Object var7 = this.retrievalMutex;
                synchronized(this.retrievalMutex) {
                    retriever = (AbstractApplicationEventMulticaster.ListenerRetriever)this.retrieverCache.get(cacheKey);
                    if (retriever != null) {
                        return retriever.getApplicationListeners();
                    } else {
                        retriever = new AbstractApplicationEventMulticaster.ListenerRetriever(true);
                        Collection<ApplicationListener<?>> listeners = this.retrieveApplicationListeners(eventType, sourceType, retriever);
                        this.retrieverCache.put(cacheKey, retriever);
                        return listeners;
                    }
                }
            } else {
                return this.retrieveApplicationListeners(eventType, sourceType, (AbstractApplicationEventMulticaster.ListenerRetriever)null);
            }
        }

    在自己的 ConcurrentHashMap类型的retrieverCache缓存中获取,key是根据 OrderEvent类型和我发送的数据源(当前为String类型)如下:

    • Map的key:
    private static final class ListenerCacheKey implements 
    Comparable<AbstractApplicationEventMultiandroidcaster.ListenerCacheKey> {
        private final ResolvableType eventType;
        @Nullable
        private final Class<?> sourceType;
    
        // .....
    }
    • Map的value类型:
    private class ListenerRetriever {
        public final Set<ApplicationListener<?>> applicationListeners = 
            new LinkedHashSet();
        public final Set<String> applicationListenerBeans = new LinkedHashSet();
        private final boolean preFiltered;
    }

    很清楚的结构,两个LinkedHashSet, 就是为了保证两个Set个数相同,并且顺序一一对应。用于存放当前的监听对象和监听的类型。

    当前的缓存是在AbstractApplicationContext的refresh的registerBeanPostProcessors(注册所有的BeanPostProcess),的最后一步,注册了ApplicationListenerDetector类型。

    并且在refresh的最后会将所有懒加载的Bean都初始化,则会将所有的实现了该接口的Bean放入容器中。

    则重点是 retrieveApplicationListeners方法,比较长:

    private Collection<ApplicationListener<?>> retrieveApplicationListeners(ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable AbstractApplicationEventMulticaster.ListenerRetriever retriever) {
    	List<ApplicationListener<?>> allListeners = new ArrayList();
    	Object var7 = this.retrievalMutex;
    	LinkedHashSet listeners;
    	LinkedHashSet listenerBeans;
    	synchronized(this.retrievalMutex) {
    		listeners = new LinkedHashSet(thphpis.defaultRetriever.applicationListeners);
    		listenerBeans = new LinkedHashSet(this.defaultRetriever.applicationListenerBeans);
    	}
    
    	Iterator var14 = listeners.iterator();
    
    	while(var14.hasNext()) {
    		ApplicationListener<?> listener = (ApplicationListener)var14.next();
    		if (this.supportsEvent(listener, eventType, sourceType)) {
    			if (retriever != null) {
    				retriever.applicationListeners.add(listener);
    			}
    
    			allListeners.add(listener);
    		}
    	}
    
    	if (!listenerBeans.isEmpty()) {
    		BeanFactory beanFactory = this.getBeanFactory();
    		Iterator var16 = listenerBeans.iterator();
    
    		while(var16.hasNext()) {
    			String listenerBeanName = (String)var16.next();
    
    			try {
    				Class<?> listenerType = beanFactory.getType(listenerBeanName);
    				if (listenerType == null || this.supportsEvent(listenerType, eventType)) {
    					ApplicationListener<?> listener = (ApplicationListener)beanFactory.getBean(listenerBeanName, ApplicationListener.class);
    					if (!allListeners.contains(listener) && this.supportsEvent(listener, eventType, sourceType)) {
    						if (retriever != null) {
    							if (beanFactory.isSingleton(listenerBeanName)) {
    								retriever.applicationListeners.add(listener);
    							} else {
    								retriever.applicationListenerBeans.add(listenerBeanName);
    							}
    						}
    
    						allListeners.add(listener);
    					}
    				}
    			} catch (NoSuchBeanDefinitionException var13) {
    				;
    			}
    		}
    	}
    
    	AnnotationAwareOrderComparator.sort(allListeners);
    	if (retriever != null && retriever.applicationListenerBeans.isEmpty()) {
    		retriever.applicationListeners.clear();
    		retriever.applicationListeners.addAll(allListeners);
    	}
    
    	return allListeners;
    }

    分析该方法,上面锁住的是 retrievalMutex对象,现在又是同步锁该对象。

    为了保证LinkedHashSet中的值不会乱(monitor enter两次exit两次),去缓存中的每个查看每个监听器是否是对象的类型,检查了监听器的泛型对象和事件源类型。

    6、根据监听列表,循环调用(同步或异步)

    我们实现的 onApplicationEvent(OrderEvent orderEvent)方法

    protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
    	ErrorHandler errorHandler = this.getErrorHandler();
    	if (errorHandler != null) {
    		try {
    			this.doInvokeListener(listener, event);
    		} catch (Throwable var5) {
    			errorHandler.handleError(var5);
    		}
    	} else {
    		this.doInvokeListener(listener, event);
    	}
    
    }

    所以 ErrorHandler想在这里处理,则需要在该对象中创建该异常处理器(可以有很多中方式处理,利用bean的生命周期,这是一个很好的扩展点,后续可以去实现),继续 doInvokeListener方法

    private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
    	try {
    		listener.onApplicationEvent(event);
    	} catch (ClassCastException var6) {
    		String msg = var6.getMessage();
    		if (msg != null && !this.matchesClassCastMessage(msg, event.getClass())) {
    			throw var6;
    		}
    
    		Log logger = LogFactory.getLog(this.getClass());
    		if (logger.isTraceEnabled()) {
    			logger.trace("Non-matching event type for listener: " + listener, var6);
    		}
    	}
    
    }

    最后看见 listener.onApplicationEvent(event);

    it is over!!!

    总结

    1、ApplicationContext发送事件是委托给了一个 Spring容器在refresh时初始化的SimpleApplicationEventMulticaster bean(由于没有初始化内部线程池对象,所以事件是同步发送的)。

    2、发送前先获取事件的ResolvableType类型(当前为OrderEvent clazz)和事件源类型(当前为String)

    3、获取监听者列表。 先去自己Bean内部先查询缓存,否则从BeanFactory中获取所有单利bean进行匹配(再放入缓存ConturrentHashMap)。

    4、监听者列表循环(同步或异步)地调用我们自己写的监听方法OnApplicationEvent。

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜