Spring AOP实现断路器方式
目录
- 1. 概述
- 2. 实现方案
- 3. 代码实现
- 3.1 自定义注解
- 3.2 自定义断路器的状态
- 3.3 切面定义
- 总结
环境:Spring5.3.23
1. 概述
Spring Cloud体系中,断路器有Hystrix,Resilience4j,Sentinel等组件,它们的核心功能是当某个服务不可用时,断路器会屏蔽相关故障,返回一个用户预设的fallback。
具体来说,断路器有以下一些作用:
- 阻止故障的向上传递:对服务的健康状况进行监控和防护。
- 对故障快速失败并积极回复:回退并优雅降级。
- 提供三种容错方式来帮助达成目标:资源隔离,熔断和降级。
总的来说,断路器的核心作用是增强分布式系统的弹性,避免级联故障,以提高系统的整体可用性。
2. 实现方案
我们将通过使用AOP和自定义注解,实现断路器功能。根据自己的需要在关键的方法上添加注解,然后在运行时通过AOP拦截这些注解,并执行相应的断路器逻辑。
断路器的主要作用是防止故障的扩散,并保护系统的稳定性。当某个服务出现故障时,断路器可以快速中断与该服务的连接,并返回一个预设的fallback响应,从而避免故障对整个系统的影响。
通过自定义注解和AOP的结合,我们可以实现以下功能:
在需要的接口上添加自定义注解,注解中可以包含与断路器相关的配置信息,如:错误次数,时间窗口等。
通过AOP拦截这些注解,并在运行时动态地创建断路器。
当服务调用时,断路器会根据配置的逻辑判断是否需要中断连接或返回fallback响应。
如果服务正常,断路器将不会进行任何操作;如果服务故障,断路器将根据预设的逻辑进行处理。
3. 代码实现
3.1 自定义注解
AOP只会拦截该注解的方法或类。
@Target({ElementType.TYPE, ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface Pac编程客栈kFuse { /**降级方法*/ String fallback() default "" ; /**失败次数*/ int fails() default 5 ; /**窗口时间:s*/ int Windowsize() default 10 ; }
3.2 自定义断路器的状态
断路器的状态有以下几种:
- Closed(关闭状态):默认情况下,断路器处于关闭状态,允许远程服务调用正常进行。
- Open(打开状态):当远程服务调用失败次数达到预设的阈值时,断路器会自动打开,中断与该服务的所有调用,并返回fallback响应。
- Half-Open(半开状态):在一段时间后,断路器会自动从Open状态转换到Half-Open状态。在Half-Open状态下,断路器会尝试少量请求以测试服务是否已恢复。如果测试请求成功,断路器将自动关闭并恢复到Closed状态;否则,将保持Half-Open状态,如果超过指定的错误次数,则再次转变为Open状态。
状态定义
public enum EnumState { CLOSE, HALF_OPEN, OPEN ; }
每个断路器都会自己的状态
public class PackFuseState { /**当前状态*/ private EnumState state = EnumState.CLOSE ; /**失败次数*/ private AtomicInteger failCount = new AtomicInteger(0) ; /**最大失败次数*/ private int maxFailCount = 5 ; /**窗口大小;默认每10秒重置*/ private int windowTime = 10 ; private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new ArrayblockingQueue<>(1)) ; private Object lock = new Object() ; public PackFuseState(int maxFailCount, int windowTime) { this.maxFailCount = maxFailCount ; this.windowTime = windowTime ; executor.execute(() -> { while(true) { if (state == EnumState.CLOSE) { try { TimeUnit.SECONDS.sleep(windowTime) ; if (state == EnumState.CLOSE) { failCount.set(0) ; } } catch (InterruptedException e) { e.printStackTrace() ; } } else { synchronized (lock) { try { lock.wait() ; } catch (InterruptedException e) { e.printStackTrace() ; } } } } }) ; } public EnumState getState() { return state; } public void setState(EnumState state) { this.state = state; } public AtomicInteger getFailCount() { return failCount; } public void setFailCount(AtomicInteger failCount) { this.failCount = failCount; } public int getwindowTime() { return windowTime; } public void setwindowTime(int windowTime) { this.windowTime = windowTime; } public PackFuseState addFailCount() { int count = this.failCount.incrementAndGet() ; if (count >= maxFailCount) { this.setState(EnumState.OPEN) ; executor.execute(() -> { try { TiphpmeUnit.SECONDS.sleep(windowTime) ; setState(EnumState.HALF_OPEN) ; failCount.set(0) ; } catch (InterruptedException e) { e.printStackTrace() ; } }) ; } return this ; } public PackFuseState closeStapythonte() { this.setState(EnumState.CLOSE) ; this.failCount.set(0) ; return this ; } public Object getLock() { return lock; } }
3.3 切面定义
该切面拦截所有标有@PackFuse注解的方法
@ASPect @Component public class PackFuseAspect { private static final Map<String, PackFuseState> META_HOLDER_MAP = new ConcurrentHashMap<>() ; private static final Map<String, Object> FALLBACK = new ConcurrentHashMap<>() ; private static final String DEFAULT_RET_DATA = "服务不可用" ; @Pointcut("@annotation(fuse)") private void fuse(PackFuse fuse) {} @Around("fusandroide(fuse)") public Object packFuse(ProceedingJoinPoint pjp, PackFuse fuse) { MethodSignature joinPointObject = (MethodSignature) pjp.getSignature() ; Class<?> targetType = joinPointObject.getDeclaringType() ; Method method = joinPointObject.getMethod() ; String targetKey = getKey(targetType, method); String fallback = fuse.fallback() ; if (!FALLBACK.containsKey(targetKey)) { if (StringUtils.hasLength(fallback)) { try { Method fallbackMethod = targetType.getDeclaredMethod(fallback) ; FALLBACK.put(targetKey, fallbackMethod.invoke(pjp.getTarget())) ; } catch (Exception e) { e.printStackTrace() ; } } else { FALLBACK.put(targetKey, DEFAULT_RET_DATA) ; } } int fails = fuse.fails() ; int windowSize = fuse.windowSize() ; PackFuseState fuseState = null ; try { fuseState = META_HOLDER_MAP.computeIfAbsent(targetKey, key -> new PackFuseState(fails, windowSize)) ; switch (fuseState.getState()) { case CLOSE: return pjp.proceed() ; case HALF_OPEN: Random rd = new Random() ; int c = rd.nextInt(fails) ; if (c >= (fails / 2)) { Object ret = pjp.proceed() ; fuseState.closeState() ; synchronized (fuseState.getLock()) { fuseState.getLock().notifyAll() ; } return ret ; } return FALLBACK.get(targetKey) ; case OPEN: return FALLBACK.get(targetKey) ; } } catch (Throwable e) { fuseState.addFailCount() ; } return FALLBACK.get(targetKey) ; } private String getKey(Class<?> targetType, Method method) { StringBuilder builder = new StringBuilder(); builder.append(targetType.getSimpleName()); builder.append('#').append(method.getName()).append('('); if (method.getParameterTypes().length > 0) { builder.deleteCharAt(builder.length() - 1); } return builder.append(')').toString().replaceAll("[^a-zA-Z0-9]", "") ; } }
以上就实现了一个简单的断路器功能。
通过使用AOP+自定义注解的方式成功地实现了断路器功能。这种方法给予了我们很大的灵活性和扩展性,可以轻松地对特定的服务进行故障隔离,避免故障扩散,保护整个系统的稳定性。同时,通过自定义注解,我们能够清晰地定义断路器的配置和逻辑,使代码更易于阅读和维护。
这里只是一个非常简单的小例子给大家一个实现的思路,大家可以根据自己的想法或者编程结合Hystrix的实现来丰富功能。
总结
以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程客栈(www.devze.com)。
精彩评论