开发者

源码分析Nacos如何动态刷新配置

目录
  • 一 Nacos 刷新配置的源码阅读
  • 二 @refreshScope注解
    • 定义
    • 使用背景
    • 与Nacos配合使用demo
  • 总结

    一 Nacos 刷新配置的源码阅读

    在 ClientWorker 中配置了 定义了一个 的内部类 LongPollingRunnable 并实现了Runnable 接口 直接到 cacheData.checkListenerMd5() 这个方法

    public void run() {
                 // 获取定义的Group
                List<CacheData> cacheDatas = new ArrayList<CacheData>();
                List<String> inInitializingCacheList = new ArrayList<String>();
                try {
                    // check failover config
                    for (CacheData cacheData : cacheMap.values()) {
                        if (cacheData.getTaskId() == taskId) {
                            cacheDatas.add(cacheData);
                            try {
                                checkLocalConfig(cacheData);
                                if (cacheData.isUseLocalConfigInfo()) {
                                    cacheData.checkListenerMd5();
                                }
                            } catch (Exception e) {
                                LOGGER.error("get local config info error", e);
                            }
                        }
                    }
                
                    // check server config
                    List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList);
                    if (!CollectionUtils.isEmpty(changedGroupKeys)) {
                        LOGGER.info("gehttp://www.devze.comt changedGroupKeys:" + changedGroupKeys);
                    }
                
                    for (String groupKey : changedGroupKeys) {
                        String[] key = GroupKey.parseKey(groupKey);
                        String dataId = key[0];
                        String group = key[1];
                        String tenant = null;
                        if (key.length == 3) {
                            tenant = key[2];
                        }
                        try {
                            String[] ct = getServerConfig(dataId, group, tenan编程客栈t, 3000L);
                            CacheData cache = cacheMap.get(GroupKey.getKeyTenant(dataId, group, tenant));
                            cache.setContent(ct[0]);
                            if (null != ct[1]) {
                                cache.setType(ct[1]);
                            }
                            LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",
                                    agent.getName(), dataId, group, tenant, cache.getMd5(),
                                    ContentUtils.truncateContent(ct[0]), ct[1]);
                        } catch (NacosException ioe) {
                            String message = String
                                    .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s",
                                            agent.getName(), dataId, group, tenant);
                            LOGGER.error(message, ioe);
                        }
                    }
                    for (CacheData cacheData : cacheDatas) {
                        if (!cacheData.isInitializing() || inInitializingCacheList
                                .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) {
                           // 检查当前 配置文件的md5值是否改变
                            cacheData.checkListenerMd5();
                            cacheData.setInitializing(false);
                        }
                    }
                    inInitializingCacheList.clear();
                
                    executorService.execute(this);
                
                } catch (Throwable e) {
                
                    // If the rotation training task is abnormal, the next execution time of the task will be punished
                    LOGGER.error("longPolling error : ", e);
                    executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
                }
            }
        }
    

    检查当前的md5值是否更改

    void checkListenerMd5() {
            for (ManagerListenerWrap wrap : listeners) {
                if (!md5.equals(wrap.lastCallMd5)) {
                  // 如果md5值变了,就发送对应事件通知
                    safeNotifyListener(dataId, group, content, type, md5, wrap);
                }
            }
        }
    

    安全的通知监听器配置改变:

    private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
                                    final String md5, final ManagerListenerWrap listenerWrap) {
        // 从包装类中取出监听器
        final Listener listener = listenerWrap.listener;
    
        // 创建一个通知任务(异步或同步执行)
        Runnable job = new Runnable() {
            @Override
            public void run() {
                // 当前线程的原始类加载器
                ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
                // 获取监听器所属类的类加载器(用于类加载隔离)
                ClassLoader appClassLoader = listener.getClass().getClassLoader();
    
                try {
                    // 如果监听器是共享监听器的子类,设置上下文信息
                    if (listener instanceof AbstractSharedListener) {
                        AbstractSharedListener adapter = (AbstractSharedListener) listener;
                        adapter.fillContext(dataId, group);
                        LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
                    }
    
                    // 设置线程上下文类加载器为应用加载器(避免多应用部署时,SPI等加载错类)
                    Thread.currentThread().setContextClassLoader(appClassLoader);
    
                    // 构造配置响应对象
                    ConfigResponse cr = new ConfigResponse();
                    cr.setDataId(dataId);
                    cr.setGroup(group);
                    cr.setContent(content);
    
                    // 通过过滤链处理配置(比如解密、转换等)
                    configFilterChainManager.doFilter(null, cr);
    
                    // 获取处理后的配置内容
                    String contentTmp = cr.getContent();
    
                    // 调用监听器的 receiveConfigInfo 方法通知变更
                    listener.receiveConfigInfo(contentTmp);
    
                    // 如果是支持配置变更事件的监听器,触发对应事件
                    if (listener instanceof AbstractConfigChangeListener) {
                        // 解析变更内容(对比老配置和新配置)
                        Map data = ConfigChangeHandler.getInstance()
                                .parseChangeData(listenerWrap.lastContent, content, type);
                    
                        // 构造事件对象并通知监听器
                        ConfigChangeEvent event = new ConfigChangeEvent(data);
                        ((AbstractConfigChangeListener) listener).receiveConfigChange(event);
    
                        // 记录这次通知的内容
                        listenerWrap.lastContent = content;
                    }
    
                    // 更新上一次调用的 MD5 值
                    listenerWrap.lastCallMd5 = md5;
    
                    // 打印通知成功日志
                    LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5,
                            listener);
                } catch (NacosException ex) {
                    // 特定 Nacos 异常处理
                    LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}",
                            name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());
                } catch (Throwable t) {
                    // 捕获所有其他异常,避免通知失败影响主线程
                    LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId,
                            group, md5, listener, t.getCause());
                } finally {
                    // 恢复原始线程类加载器,避免线程池复用带来问题
                    Thread.currentThread().setContextClassLoader(myClassLoader);
                }
            }
        };
    
        // 记录通知开始时间
        final long startNotify = System.currentTimeMillis();
        try {
            // 如果监听器提供了自定义线程池,则用线程池异步执行
            if (null != listener.getExecutor()) {
                listener.getExecutor().execute(job);
            } else {
                // 否则直接当前线程执行
                job.run();
            }
        } catch (Throwable t) {
            // 执行过程出错日志打印
            LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,
                    group, md5, listener, t.getCause());
        }
    
        // 记录通知完成时间
        final long finishNotify = System.currentTimeMillis();
        LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",
                name, (finishNotify - startNotify), dataId, group, md5, listener);
    }
    

    NacosContextRefresher 中 registerNacosListenersForApplications的方法

    /**
     * 为指定的 dataId + group 注册一个 Nacos 配置监听器
     * @param groupKey 配置分组(group)
     * @param dataKey 配置标识(dataId)
     */
    private void registerNacosListener(final String groupKey, final String dataKey) {
        // 生成一个唯一 key,用于标识监听器(key = group + "++" + dataId)
        String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
    
        // 从 listenerMap 中获取对应 key 的监听器,如果不存在则创建一个 AbstractSharedListener
        Listener listener = listenerMap.computeIfAbsent(key,
            lst -> new AbstractSharedListener() {
                /**
                 * 当配置变更时,会触发该方法
                 */
                @Override
                public void innerReceive(String dataId, String group, String configInfo) {
                    // 刷新次数 +1(用于监控/统计)
                    refreshCountIncrement();
    
                    // android记录刷新历史
                    nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
    
                    // 发布 Spring 的 RefreshEvent,通知上下文环境配置已变更
                    // 注意:这里是全量刷新,
                    applicationContext.publishEvent(
                        new RefreshEvent(this, null, "Refresh Nacos config"));
    
                    // 如果开启了 debug 日志,打印变更信息
                    if (log.isDebugEnabled()) {
                        log.debug(String.format(
                            "Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
                            group, dataId, configInfo));
                    }
                }
            });
    
        try {
            // 调用 Nacos 客户端 API,注册监听器
            configService.addListener(dataKey, groupKey, listener);
        }
        catch (NacosException e) {
            // 注册失败,记录警告日志
            log.warn(String.format(
                "register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
                groupKey), e);
        }
    }
    

    在SpringClould 中的 RefreshEventListener

    public void onApplicationEvent(ApplicationEvent event) {
    		if (event instanceof ApplicationReadyEvent) {
    			handle((ApplicationReadyEvent) event);
    		}
    		else if (event instanceof RefreshEvent) {
    			handle((RefreshEvent) event);
    		}
    	}
    
    public void handle(RefreshEvent event) {
    	if (this.ready.get()) { // don't handle events before app is ready
    		log.debug("Event received " + event.getEventDesc());
    		Set<String> keys = this.refresh.refresh();
    		log.info("Refresh keys changed: " + keys);
    	}
    }
    

    ContextRefresher 中的 refresh 方法刷新所有作用域为 refresh 的bean

    public synchronized Set<String> refresh() {
    		Set<String> keys = refreshEnvironment();
            // 刷新所有的
    		this.scope.refreshAll();
    		return keys;
    	}
    

    二 @refreshScope注解

    定义

    @RefreshScope 是 Spring Cloud 提供的注解,主要用于 ​支持配置的动态刷新​,特别是在结合像 Nacos、Consul、Spring Cloud Config 等配置中心时使用。@RefreshScope 使得标注的 Bean 在配置变更并发布刷新事件时,能够被重新实例化,从而实现“​配置热更新​”。

    使用背景

    Spring Boot 默认的 Bean 是单例的(@Singleton),一旦初始化完成,其属性就不会再变化。如果你想在运行时通过配置中心动态刷新某个 Bean 中的属性,就必须加上 @RefreshScope

    与Nacos配合使用demo

    1、依赖引入

    确保你引入了以下依赖(以 Spring Boot 2.x / Spring Cloud Alibaba 为例):

    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
    </dependency>
    

    2、application.yml配置

    server:
      port: 8080
    
    spring:
      application:
        name: nacos-refresh-demo
      cloud:
        nacos编程客栈:
          config:
            server-addr: 127.0.0.1:8848
            file-extension: yaml
            group: DEFAULT_GROUP
            namespace: public
            refresh-enabled: true
    

    3、编写配置类(使用 @RefreshScope)

    package com.example.nacosdemo.config;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.cloud.context.config.annotation.RefreshScope;
    import org.springframework.stereotype.Component;
    
    @Data
    @Component
    @RefreshScope // 开启动态刷新
    @ConfigurationProperties(prefix = "custom")
    public class CustomConfig {
        private String name;
        private Integer age;
    }
    

    4、测试 Controller

    package com.example.nacosdemo.controller;
    
    import com.example.nacosdemo.config.CustomConfig;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @RequiredArgsConstructor
    public class TestController {
    
        private final CustomConfig customConfig;
    
        @GetMapping("/config")
        public String getConfig() {
            return "name: " + customConfig.getName() + ", age: " + customConfig.getAge();
        }
    }
    

    然后更改你的Nacos中的配置,查看是否被更新呢

    总结

    触发流程

    • NacosConfigService 内部有 ClientWorker 线程定时轮询配置变化;
    • 当检测到配置变更后,会回调配置监听器;
    • NacosContextRefresher 是 Spring Cloud Alibaba 提供的监听器;
    • 它触发 RefreshEvent 事件;
    • Spring Cloud Context 的 RefreshScope 监听 RefreshEvent
    • 清除旧 Bean 实例,下次注入重新构建。

    到此这篇关于源码分析Nacos如何动态刷新配置的文章就介绍到这了,更多相关Nacos动编程客栈态刷新配置内容请搜索编程客栈(www.devze.com)以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程客栈(www.devze.com)!

    0

    上一篇:

    下一篇:

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜