开发者

Spring AOP实现断路器方式

目录
  • 1. 概述
  • 2. 实现方案
  • 3. 代码实现
    • 3.1 自定义注解
    • 3.2 自定义断路器的状态
    • 3.3 切面定义
  • 总结

    环境:Spring5.3.23

    1. 概述

    Spring Cloud体系中,断路器有Hystrix,Resilience4j,Sentinel等组件,它们的核心功能是当某个服务不可用时,断路器会屏蔽相关故障,返回一个用户预设的fallback。

    具体来说,断路器有以下一些作用:

    • 阻止故障的向上传递:对服务的健康状况进行监控和防护。
    • 对故障快速失败并积极回复:回退并优雅降级。
    • 提供三种容错方式来帮助达成目标:资源隔离,熔断和降级。

    总的来说,断路器的核心作用是增强分布式系统的弹性,避免级联故障,以提高系统的整体可用性。

    2. 实现方案

    我们将通过使用AOP和自定义注解,实现断路器功能。根据自己的需要在关键的方法上添加注解,然后在运行时通过AOP拦截这些注解,并执行相应的断路器逻辑。

    断路器的主要作用是防止故障的扩散,并保护系统的稳定性。当某个服务出现故障时,断路器可以快速中断与该服务的连接,并返回一个预设的fallback响应,从而避免故障对整个系统的影响。

    通过自定义注解和AOP的结合,我们可以实现以下功能:

    在需要的接口上添加自定义注解,注解中可以包含与断路器相关的配置信息,如:错误次数,时间窗口等。

    通过AOP拦截这些注解,并在运行时动态地创建断路器。

    当服务调用时,断路器会根据配置的逻辑判断是否需要中断连接或返回fallback响应。

    如果服务正常,断路器将不会进行任何操作;如果服务故障,断路器将根据预设的逻辑进行处理。

    3. 代码实现

    3.1 自定义注解

    AOP只会拦截该注解的方法或类。

    @Target({ElementType.TYPE, ElementType.METHOD})
    @Retention(RetentionPolicy.RUNTIME)
    @Inherited
    public @interface Pac编程客栈kFuse {
    
      /**降级方法*/
      String fallback() default "" ;
    
      /**失败次数*/
      int fails() default 5 ;
    
      /**窗口时间:s*/
      int Windowsize() default 10 ;
    
    }

    3.2 自定义断路器的状态

    断路器的状态有以下几种:

    • Closed(关闭状态):默认情况下,断路器处于关闭状态,允许远程服务调用正常进行。
    • Open(打开状态):当远程服务调用失败次数达到预设的阈值时,断路器会自动打开,中断与该服务的所有调用,并返回fallback响应。
    • Half-Open(半开状态):在一段时间后,断路器会自动从Open状态转换到Half-Open状态。在Half-Open状态下,断路器会尝试少量请求以测试服务是否已恢复。如果测试请求成功,断路器将自动关闭并恢复到Closed状态;否则,将保持Half-Open状态,如果超过指定的错误次数,则再次转变为Open状态。

    状态定义

    public enum EnumState {
    
      CLOSE, HALF_OPEN, OPEN ;
      
    }

    每个断路器都会自己的状态

    public class PackFuseState {
    
      /**当前状态*/
      private EnumState state = EnumState.CLOSE ;
      /**失败次数*/
      private AtomicInteger failCount = new AtomicInteger(0) ;
      /**最大失败次数*/
      private int maxFailCount = 5 ;
      /**窗口大小;默认每10秒重置*/
      private int windowTime = 10 ;
      
      private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayblockingQueue<>(1)) ;
      
      private Object lock = new Object() ;
      
      public PackFuseState(int maxFailCount, int windowTime) {
        this.maxFailCount = maxFailCount ;
        this.windowTime = windowTime ;
        executor.execute(() -> {
          while(true) {
            if (state == EnumState.CLOSE) {
              try {
                TimeUnit.SECONDS.sleep(windowTime) ;
                if (state == EnumState.CLOSE) {
                  failCount.set(0) ;
                }
              } catch (InterruptedException e) {
                e.printStackTrace() ;
              }
            } else {
              synchronized (lock) {
                try {
                  lock.wait() ;
                } catch (InterruptedException e) {
                  e.printStackTrace() ;
                }
              }
            }
          }
        }) ;
      }
      
      public EnumState getState() {
        return state;
      }
      public void setState(EnumState state) {
        this.state = state;
      }
      public AtomicInteger getFailCount() {
        return failCount;
      }
      public void setFailCount(AtomicInteger failCount) {
        this.failCount = failCount;
      }
      public int getwindowTime() {
        return windowTime;
      }
      public void setwindowTime(int windowTime) {
        this.windowTime = windowTime;
      }
      
      public PackFuseState addFailCount() {
        int count = this.failCount.incrementAndGet() ;
        if (count >= maxFailCount) {
          this.setState(EnumState.OPEN) ;
          executor.execute(() -> {
            try {
              TiphpmeUnit.SECONDS.sleep(windowTime) ;
              setState(EnumState.HALF_OPEN) ;
              failCount.set(0) ;
            } catch (InterruptedException e) {
              e.printStackTrace() ;
            }
          }) ;
        }
        return this ;
      }
      
      public PackFuseState closeStapythonte() {
        this.setState(EnumState.CLOSE) ;
        this.failCount.set(0) ;
        return this ;
      }
    
      public Object getLock() {
        return lock;
      }
    
    }

    3.3 切面定义

    该切面拦截所有标有@PackFuse注解的方法

    @ASPect
    @Component
    public class PackFuseAspect {
    
      private static final Map<String, PackFuseState> META_HOLDER_MAP = new ConcurrentHashMap<>() ;
      private static final Map<String, Object> FALLBACK = new ConcurrentHashMap<>() ;
      private static final String DEFAULT_RET_DATA = "服务不可用" ;
      
      @Pointcut("@annotation(fuse)")
      private void fuse(PackFuse fuse) {}
      
      @Around("fusandroide(fuse)")
      public Object packFuse(ProceedingJoinPoint pjp, PackFuse fuse) {
        MethodSignature joinPointObject = (MethodSignature) pjp.getSignature() ;
        Class<?> targetType = joinPointObject.getDeclaringType() ;
        Method method = joinPointObject.getMethod() ;
        String targetKey = getKey(targetType, method);
        
        String fallback = fuse.fallback() ;
        if (!FALLBACK.containsKey(targetKey)) {
          if (StringUtils.hasLength(fallback)) {
            try {
              Method fallbackMethod = targetType.getDeclaredMethod(fallback) ;
              FALLBACK.put(targetKey, fallbackMethod.invoke(pjp.getTarget())) ;
            } catch (Exception e) {
              e.printStackTrace() ;
            }
          } else {
            FALLBACK.put(targetKey, DEFAULT_RET_DATA) ;
          }
        }
        int fails = fuse.fails() ;
        int windowSize = fuse.windowSize() ;
        
        PackFuseState fuseState = null ;
        try {
          fuseState = META_HOLDER_MAP.computeIfAbsent(targetKey, key -> new PackFuseState(fails, windowSize)) ;
          switch (fuseState.getState()) {
            case CLOSE:
              return pjp.proceed() ;
            case HALF_OPEN:
              Random rd = new Random() ;
              int c = rd.nextInt(fails) ;
              if (c >= (fails / 2)) {
                Object ret = pjp.proceed() ;
                fuseState.closeState() ;
                synchronized (fuseState.getLock()) {
                  fuseState.getLock().notifyAll() ; 
                }
                return ret ;
              }
              return FALLBACK.get(targetKey) ;
            case OPEN:
              return FALLBACK.get(targetKey) ;
          }
        } catch (Throwable e) {
          fuseState.addFailCount() ;
        }
        return FALLBACK.get(targetKey) ;
      }
      
      private String getKey(Class<?> targetType, Method method) {
        StringBuilder builder = new StringBuilder();
        builder.append(targetType.getSimpleName());
        builder.append('#').append(method.getName()).append('(');
        if (method.getParameterTypes().length > 0) {
          builder.deleteCharAt(builder.length() - 1);
        }
        return builder.append(')').toString().replaceAll("[^a-zA-Z0-9]", "") ;
      }
      
    }

    以上就实现了一个简单的断路器功能。

    通过使用AOP+自定义注解的方式成功地实现了断路器功能。这种方法给予了我们很大的灵活性和扩展性,可以轻松地对特定的服务进行故障隔离,避免故障扩散,保护整个系统的稳定性。同时,通过自定义注解,我们能够清晰地定义断路器的配置和逻辑,使代码更易于阅读和维护。

    这里只是一个非常简单的小例子给大家一个实现的思路,大家可以根据自己的想法或者编程结合Hystrix的实现来丰富功能。

    总结

    以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。

    0

    上一篇:

    下一篇:没有了

    精彩评论

    暂无评论...
    验证码 换一张
    取 消

    最新开发

    开发排行榜