Spring Retry框架入门

软件架构从当初的单机,演变到后来的集群,再到后来的分布式应用。原本看似可以信任的服务调用,加上了网络因素就变得不再可靠。再考虑到一些调用链路的特殊性,又要保证性能,又要尽可能增加成功率,所以调用方必须肩负起重试的责任。

自己写,怎样实现?

重试并不复杂,首先来分析下重试的调用场景,可以想到业务当中不止一处会需要重试能力,并且业务其实更关乎自己的代码块被重试就可以了,而不在乎如何实现的重试。

所以变化的是一段可以重复执行的代码块,以及重试次数等。

① 代码块可以用Java8支持的函数式编程解决

② 次数可以用入参/配置实现

首先定义一个执行的模版,结合上面我们的分析,这个模版需要一个待实行的 **方法块** ,以及 **配置。** (次数等区分场景,用枚举定义,方便全局管控):
@AllArgsConstructor
@Getter
public enum RetrySceneEnums {

    QUERY_USER_INFO("查询用户信息", 2),
    ;

    private String desc;

    private int retryTimes;
}
abstract class MyRetryTemplate<Req, Resp> {
    /** 配置 */
    private IntegrationRetryEnums retryEnum;
    /** 方法块 */
    private Function<Req, Resp> fuction;

    MyRetryTemplate(IntegrationRetryEnums retryEnum, Function<Req, Resp> fuction) {
        this.retryEnum = retryEnum;
        this.fuction = fuction;
    }

    /**
     * 构建函数
     */
    private Supplier<Function<Req, Resp>> retry = () -> (param) -> {
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        return execute(param, 1, stopWatch);
    };

    private Resp execute(Req param, int index, StopWatch stopWatch) {
        if (index > retryEnum.getRetryTimes()) {
            stopWatch.split();
            // 最大的重试次数后仍失败,可以打印摘要日志
            return null;
        }
        try {
            // 执行方法块
            Resp result = fuction.apply(param);
            stopWatch.split();
            // 认为成功
            return result;
        } catch (Throwable e) {
            // 递归重试
            return execute(param, ++index, stopWatch);
        }
    }

    Function<Req, Resp> getRetryService() {
        return retry.get();
    }
}
逻辑很简单,利用递归思想(当然也可以用循环),默认调用方法抛出异常才会重试,成功则返回,否则递归直到最大次数,使用 StopWatch 统计总耗时,业务可以打印摘要日志并配置监控等,代码简短,但基本实现了功能。

抛出异常才会重试 稍微解释下,为什么不支持对业务成功/失败进行重试。首先重试框架感知不到具体的业务响应结构,进而也无法判断业务的成功与失败,当然重试框架也可以自定义一个响应体并强制使用,但这便于业务代码形成耦合。再退一步讲,也没有必要对业务失败进行重试,因为框架毕竟是框架,只需要做通用的事情,业务的失败到底需不需要重试,需要看具体的场景。

模板完工了,看下具体怎样改造一个已有的方法,使其支持重试功能,假设当前有一个查询用户信息的方法  *UserService#queryById(String id)*  。需要在 Spring 启动后将其包装以支持重试,通过事件机制监听 ContextRefreshedEvent 事件。
@Component
public class RetryServiceFactory implements ApplicationListener<ContextRefreshedEvent> {

    private static AtomicBoolean INIT = new AtomicBoolean(false);

    @Autowired
    private UserService userService;

    private Function<String, WorkOrderDTO> uerQueryFunction;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if (INIT.compareAndSet(false, true)) {
            this.uerQueryFunction = new MyRetryTemplate<String, WorkOrderDTO>(QUERY_USER_INFO, userService::queryById) {
            }.getRetryService();
        }
    }

    public Function<String, WorkOrderDTO> getUserQueryService() {
        return uerQueryFunction;
    }
}
需要使用此能力的地方,这么调用即可:
@Autowired
private RetryServiceFactory retryServiceFactory;

public void businessLogic(String userId){

    // 调用工厂获取自带重试功能的服务,并执行
    UserInfo userInfo = retryServiceFactory.getUserQueryService().apply(userId);
}
到此,自己手写的重试工具已经完成,如果执行时遇到网络抖动,超时等,都会进行最大2次的重试(次数由 RetrySceneEnums 设置)。



这时候再回头审视下代码,可以发现诸多局限性:
  • 比如 Function 的使用,就限定了入参只能是一个,且必须有返回(即不支持 void);
  • 每次接入一个服务,就要在 RetryServiceFactory 扩充;
  • 考虑网络抖动可能有区间性,盲目连续请求可能全部失败,如要实现等待一段实现再重试,怎么办?
  • 假使底层服务真的挂了,但上层业务系统依旧重试,重试每次都以超时告终(占用线程资源),流量上来以后,必然会牵连业务系统也不可用。自然想到熔断,那重试如何加入断路器(Circuit Breaker)模式?

遇到共性问题,首先要想到有没有现成的框架可以使用,而不是造轮子。目前我已知的重试框架,有 spring-retry 以及 guava-retrying ,接下来将对 spring-retry 的使用和原理进行讲解。

Spring Retry 使用

同spring事务框架一样,retry框架同样支持 编程式 和 声明式 两种。仅需要一个maven依赖。如果使用声明式,需要额外引入 aspectj 。
<dependency>
    <groupId>org.springframework.retry</groupId>
    <artifactId>spring-retry</artifactId>
    <version>1.3.0</version>
</dependency>

<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
    <version>1.9.2</version>
    <scope>runtime</scope>
</dependency>

编程式

先来介绍下编程式使用到的几个主要组件:
  • 重试模板类:org.springframework.retry.support.RetryTemplate

  • 重试策略类:org.springframework.retry.RetryPolicy

  • 重试回退策略(两次重试间等待策略):org.springframework.retry.backoff.BackOffPolicy

  • 重试上下文:org.springframework.retry.RetryContext

  • 监听器:org.springframework.retry.RetryListener

    接下来来看下如何使用,然后穿插讲下不同策略的区别。

    首先是开启重试,并声明一个模板bean。因为只是模板,所以可以借助 Spring IOC 声明为单例。

@Configuration
@EnableRetry(proxyTargetClass = true)
public class RetryConfig {

    @Bean
    public RetryTemplate simpleRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        // 最大重试次数策略
        SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3);
        retryTemplate.setRetryPolicy(simpleRetryPolicy);
        FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
        // 每隔1s后再重试
        fixedBackOffPolicy.setBackOffPeriod(1000);
        retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
        return retryTemplate;
    }

}
例子代码使用了 SimpleRetryPolicy 和 FixedBackOffPolicy,当然还有其他重试策略和回退策略。枚举如下:

重试策略

策略类 效果 关键参数
MaxAttemptsRetryPolicy 设置最大的重试次数,超过之后执行recover maxAttempts:最大重试次数
BinaryExceptionClassifierRetryPolicy 可以指定哪些异常需要重试,哪些异常不需要重试 exceptionClassifier:BinaryExceptionClassifier,异常识别类,其实就是存在一个Map映射,Map<Class<? extends Throwable>, Boolean>,value为true的话就说明需要重试。
SimpleRetryPolicy 上面两者的结合,支持次数和自定义异常重试 maxAttempts 和 exceptionClassifier
TimeoutRetryPolicy 在指定的一段时间内重试 timeout:单位ms
ExceptionClassifierRetryPolicy 支持异常和重试策略的映射,比如异常A对应重试策略A,异常B对应重试策略B exceptionClassifier:本质就是异常和重试策略的映射
CompositeRetryPolicy 策略类的组合类,支持两种模式,乐观模式:只要一个重试策略满足即执行,悲观模式:另一种是所有策略模式满足再执行 policies:策略数组
optimistic:true-乐观;false-悲观
CircuitBreakerRetryPolicy 熔断器模式 delegate:策略代理类openTimeout:[0,openTimeout]会一直执行代理类策略,(openTimeout, resetTimeout] 会半打开,如果代理类判断不可以重试,就会熔断,执行recover逻辑,如果代理类判断还可以重试且重试成功,开关会闭合。resetTimeout:超过指定时间,开关重置闭合
ExpressionRetryPolicy 符合表达式就重试 expression:表达式,见 org.springframework.expression.Expression实现
AlwaysRetryPolicy 一直重试
NeverRetryPolicy 从不重试

回退策略

策略类 效果 关键参数
FixedBackOffPolicy 间隔固定时间重试 sleeper:支持线程sleep和Object#wait,区别在于是否释放锁
backOffPeriod:等待间隔
NoBackOffPolicy 无等待直接重试
UniformRandomBackOffPolicy 在一个设置的时间区间内。随机等待后重试 minBackOffPeriod:区间下限 maxBackOffPeriod:区间上限 sleeper:同上
ExponentialBackOffPolicy 在一个设置的时间区间内,等待时长为上一次时长的递增 initialInterval:默认起始等待时间 maxInterval:最大等待时间 multiplier:递增倍数 sleeper:同上
ExponentialRandomBackOffPolicy 在 ExponentialBackOffPolicy 基础上,乘数随机

至此,完成了重试模板的定义。接下来是需要在具体的业务场景下调用模板方法即可。同样的,为了保证复用性,仍然借助函数式编程定义:

@Service
public class RetryTemplateService {

    @Autowired
    private RetryTemplate simpleRetryTemplate;

    public <R, T> R retry(Function<T, R> method, T param) throws Throwable {
        return simpleRetryTemplate.execute(new RetryCallback<R, Throwable>() {
            @Override
            public R doWithRetry(RetryContext context) throws Throwable {
                return method.apply(param);
            }
        }, new RecoveryCallback<R>() {
            @Override
            public R recover(RetryContext context) throws Exception {
                return null;
            }
        });
    }
}
@Service
public class UserService {

    @Autowired
    private RetryTemplateService retryTemplateService;

    public User queryById(String id) {
        // TODO 业务逻辑
    }

    public User queryByIdWithRetry(String id) throws Throwable {
        return retryTemplateService.retry(this::queryById, id);
    }
}
通过调用  *org.springframework.retry.support.RetryTemplate#execute* ,指定需要支持重试的业务代码回调  *org.springframework.retry.RetryCallback* ,以及全部重试失败的兜底回调  *org.springframework.retry.RecoveryCallback* 。

RetryTemplate 支持四种重载的 execute 方法,这里不全部展开分析,大体十分相似,变化的是策略(见上面表格),以及 RetryState(有无状态,下面分析)。

声明式

通过上面的编码式,还需要针对不同场景编写一到多套模板方法,那有没有什么可以简化这一步呢,毕竟业务没必要关注这些具体的模板代码。就像事务一样,只需要方法上加  *@Transactional*  就可以了。当然,spring-retry 也支持,直接上代码:
@Service
public class UserService {

    @Retryable(include = RetryException.class, exclude = IllegalArgumentException.class, 
      listeners = {"defaultRetryListener"}, backoff = @Backoff(delay = 1000, maxDelay = 2000))
    public User queryById(String id) {
        // TODO 业务逻辑
    }

    @Recover
    public User recover(RetryException e, String id) {
        User user = new User();
        user.setName("兜底");
        return user;
    }
}
@Service
public class DefaultRetryListener implements RetryListener {

    @Override
    public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
        System.out.println("==前置回调==");
        return true;
    }

    @Override
    public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        System.out.println("==后置回调==");
    }

    @Override
    public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
        System.out.println("==执行报错==");
    }
}
这样,直接调用  *UserService#queryById*  就支持重试了,比编程式节省了不少工作量。

我这里一股脑设置了很多注解参数,其实都对应上面编程式的策略,毕竟编程式有的,我声明式也得要。使用时按需即可,接下来对照着  *@Retryable*  中定义的所有属性,解释下含义:
  • recover:指定兜底/补偿的方法名。如果不指定,默认对照 @Recover 标识的,第一入参为重试异常,其余入参和出参一致的方法;
  • interceptor:指定方法切面 bean, org.aopalliance.intercept.MethodInterceptor 实现类
  • value / include:两者用途一致,指出哪些类型需要重试;
  • exclude:和 include 相反,指出哪些异常不需要重试;
  • label:可以指定唯一标签,用于统计;
  • stateful:默认false。重试是否是有状态的;
  • maxAttempts:最大的重试次数;
  • backoff:指定 @Backoff ,回退策略;
  • listeners:指定 org.springframework.retry.RetryListener 实现 bean。
对照着上面编程式,可以看出回退策略用注解  *@Backoff*  支持了。这里还补充了上面编程式没有用到的  *RetryListener* ,它定义了3个方法:

<T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback);

<T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);

<T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable);

open 和 close 分别在重试整体的前置和后置被回调一次,onError 则是重试整体过程中,每次异常都会被回调。(具体细节见下面  **源码分析** )

@Backoff 的属性如下,基本都对应上面编程式的几种策略,仅简单解释下

  • value / delay:两者都标识延迟时间,为 0则对应 NoBackOffPolicy 策略。
  • maxDelay:最大延迟时间
  • multiplier:递增乘数
  • random:递增乘数是否随机
重试模式中的断路器,由于其场景的特殊性以及属性的复杂性,则被单独定义成了一个注解  *@CircuitBreaker* ,从注释的定义可以看出,它是在  *@Retryable*  基础之上的扩展。
@Target({ ElementType.METHOD, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Retryable(stateful = true)
public @interface CircuitBreaker {
   ....
}
其中属性的定义,也是在  *@Retryable*  的属性基础上,额外扩充了断路器特有的几个属性,对应上面重试策略表格中—— *CircuitBreakerRetryPolicy*  的关键参数:
  • resetTimeout:重置闭合超时时间
  • openTImeout:半打开状态超时时间

源码解析

那具体 spring-retry 的原理是怎样的呢?是递归还是循环,注解又是怎样实现的?带着这些问题开始源码分析,顺带着可以看下上面没有讲到的 org.springframework.retry.RetryContext 组件在业务中如何使用。 

首先从模板方法源码开始:
public class RetryTemplate implements RetryOperations {

	@Override
	public final <T, E extends Throwable> T execute(RetryCallback<T, E> retryCallback,
			RecoveryCallback<T> recoveryCallback) throws E {
		return doExecute(retryCallback, recoveryCallback, null);
	}

    protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
                                                   RecoveryCallback<T> recoveryCallback, RetryState state) throws E, ExhaustedRetryException {

        // 通过 set方法设置的重试策略,默认 SimpleRetryPolicy
        RetryPolicy retryPolicy = this.retryPolicy;
        // 通过 set方法设置的回退策略,默认 NoBackOffPolicy
        BackOffPolicy backOffPolicy = this.backOffPolicy;

        // 无状态的:重试策略自定义 org.springframework.retry.RetryPolicy.open
        // 有状态的:根据策略,每次新建/从缓存获取
        RetryContext context = open(retryPolicy, state);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("RetryContext retrieved: " + context);
        }
        // 绑定到 ThreadLocal 以便线程内全局获取
        RetrySynchronizationManager.register(context);

        Throwable lastException = null;

        boolean exhausted = false;
        try {
            // 回调所有的 org.springframework.retry.RetryListener.open
            boolean running = doOpenInterceptors(retryCallback, context);
            // 任意一个监听器的open返回 false则抛出异常
            if (!running) {
                throw new TerminatedRetryException("Retry terminated abnormally by interceptor before first attempt");
            }

            // Get or Start the backoff context...
            BackOffContext backOffContext = null;
            Object resource = context.getAttribute("backOffContext");

            if (resource instanceof BackOffContext) {
                backOffContext = (BackOffContext) resource;
            }

            if (backOffContext == null) {
                // 回调重试策略的 start 方法
                backOffContext = backOffPolicy.start(context);
                if (backOffContext != null) {
                    context.setAttribute("backOffContext", backOffContext);
                }
            }

            // 回调 org.springframework.retry.RetryPolicy.canRetry 判断是否可以重试
            // 同时可以调用 org.springframework.retry.RetryContext.setExhaustedOnly进行干预,不再进行重试
            while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {

                try {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Retry: count=" + context.getRetryCount());
                    }
                    lastException = null;
                    // 回调业务代码块
                    return retryCallback.doWithRetry(context);
                } catch (Throwable e) {
                    lastException = e;
                    try {
                        // 回调 org.springframework.retry.RetryPolicy.registerThrowable
                        registerThrowable(retryPolicy, state, context, e);
                    } catch (Exception ex) {
                        throw new TerminatedRetryException("Could not register throwable", ex);
                    } finally {
                        // 回调 org.springframework.retry.RetryListener.onError
                        doOnErrorInterceptors(retryCallback, context, e);
                    }
                    // 同上面的判断一样,允许重试的话,会回调回退策略
                    if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                        try {
                            // 这一步会根据策略进行等待/立即执行
                            backOffPolicy.backOff(backOffContext);
                        } catch (BackOffInterruptedException ex) {
                            lastException = e;
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug("Abort retry because interrupted: count=" + context.getRetryCount());
                            }
                            throw ex;
                        }
                    }

                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Checking for rethrow: count=" + context.getRetryCount());
                    }
                    // 如果指定了 org.springframework.retry.RetryState,会判断是否针对该异常进行抛出,即进行重试阻断
                    if (shouldRethrow(retryPolicy, context, state)) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Rethrow in retry for policy: count=" + context.getRetryCount());
                        }
                        throw RetryTemplate.<E>wrapIfNecessary(e);
                    }

                }

                if (state != null && context.hasAttribute(GLOBAL_STATE)) {
                    break;
                }
            }

            if (state == null && this.logger.isDebugEnabled()) {
                this.logger.debug("Retry failed last attempt: count=" + context.getRetryCount());
            }

            exhausted = true;
            // 回调兜底/补偿 org.springframework.retry.RecoveryCallback.recover
            return handleRetryExhausted(recoveryCallback, context, state);

        } catch (Throwable e) {
            throw org.springframework.retry.support.RetryTemplate.<E>wrapIfNecessary(e);
        } finally {
            // 回调 org.springframework.retry.RetryPolicy.close
            close(retryPolicy, context, state, lastException == null || exhausted);
            // 回调 org.springframework.retry.RetryListener.close
            doCloseInterceptors(retryCallback, context, lastException);
            // 清除 ThreadLoacal中存储的 org.springframework.retry.RetryContext
            RetrySynchronizationManager.clear();
        }

    }
}
整体模板方法不是很复杂,上面注释也标明具体回调哪些方法。大体逻辑就是:

初始化上下文并绑定 -> 回调监听器open判断是否可以重试 -> 回调重试策略start方法 -> 通过重试策略判断是否可以重试 -> 可以的话执行业务代码块 -> 下面两个分支 成功/异常

成功:回调重试策略的close -> 回调监听器的close -> 清除上下文 链路 ①

异常:回调重试策略的registerThrowable -> 回调监听器的onError -> 回调回退策略的backoff -> 如果设置了 Retrystate,判断是否需要抛异常阻断重试 -> 兜底/补偿逻辑 -> 链路 ①

大体流程了解后,先来看下上下文接口的定义:
public interface RetryContext extends AttributeAccessor {

    String NAME = "context.name";  // 上下文的自定义名称

    String STATE_KEY = "context.state";  // RetryState定义的key

    String CLOSED = "context.closed"; // 标识重试是否close

    String RECOVERED = "context.recovered";  // 标识是否执行了兜底/补偿

    String EXHAUSTED = "context.exhausted";  // 标识重试最大次数仍失败

	void setExhaustedOnly();

	boolean isExhaustedOnly();

	RetryContext getParent();

	int getRetryCount();

	Throwable getLastThrowable();
}
public interface AttributeAccessor {
    void setAttribute(String name, @Nullable Object value);

    @Nullable
    Object getAttribute(String name);

    @Nullable
    Object removeAttribute(String name);

    boolean hasAttribute(String name);

    String[] attributeNames();
}
加上继承类  *AttributeAccessor*  支持的key-value存储,作为上下文,贯穿重试框架的一次调用。框架定义的key常量接口内定义了部分,还有一些分散在实现类以及策略类中。如果业务所需,也可以放置数据到上下文,在一次重试策略调用周期内共享。

那就来看下上下文的初始化逻辑:
public class RetryTemplate implements RetryOperations {

    protected RetryContext open(RetryPolicy retryPolicy, RetryState state) {
        // 如果是无状态的,每次都调用 org.springframework.retry.RetryPolicy.open创建
        if (state == null) {
            return doOpenInternal(retryPolicy);
        }

        Object key = state.getKey();
        // 如果有状态,但设置了强制刷新,同样 org.springframework.retry.RetryPolicy.open创建,并写入 state.key
        if (state.isForceRefresh()) {
            return doOpenInternal(retryPolicy, state);
        }
        // 尝试从缓存获取,不存在走新建
        if (!this.retryContextCache.containsKey(key)) {
            // The cache is only used if there is a failure.
            return doOpenInternal(retryPolicy, state);
        }
        // 缓存获取
        RetryContext context = this.retryContextCache.get(key);
        if (context == null) {
            // 异常场景
            if (this.retryContextCache.containsKey(key)) {
                throw new RetryException("Inconsistent state for failed item: no history found. "
                        + "Consider whether equals() or hashCode() for the item might be inconsistent, "
                        + "or if you need to supply a better ItemKeyGenerator");
            }
            // 因为整体没有锁,所以有这一步作为补偿(没仔细琢磨,会不会有并发逻辑)
            return doOpenInternal(retryPolicy, state);
        }

        // 因为是缓存,所以同一个 state.key 会共享这个上下文,清除会影响其他正在校验这几个key的地方
        context.removeAttribute(RetryContext.CLOSED);
        context.removeAttribute(RetryContext.EXHAUSTED);
        context.removeAttribute(RetryContext.RECOVERED);
        return context;

    }
}
从这里可以看出,如果个别场景想要在多次调用间共享  *RetryContext* ,就需要定义 state.key,且设置 forceRefresh=false。每次重试独占上下文的话,要么就使用无状态的重试,要么就设置 forceRefresh=true。

到此为止,基本线索都集中到  *RetryPolicy*  的实现上了,基本都是回调  *RetryPolicy*  定义的方法。话不多说,来看两个重试策略的实现。
public class SimpleRetryPolicy implements RetryPolicy {

    private volatile int maxAttempts;

    private BinaryExceptionClassifier retryableClassifier = new BinaryExceptionClassifier(false);

    public SimpleRetryPolicy(int maxAttempts, BinaryExceptionClassifier classifier) {
        super();
        this.maxAttempts = maxAttempts;
        this.retryableClassifier = classifier;
    }

    @Override
    public boolean canRetry(RetryContext context) {
        // 获取最新一次重试的异常
        Throwable t = context.getLastThrowable();
        // 允许重试的异常 并且 次数<最大重试次数
        return (t == null || retryForException(t)) && context.getRetryCount() < this.maxAttempts;
    }

    @Override
    public void close(RetryContext status) {
    }

    @Override
    public void registerThrowable(RetryContext context, Throwable throwable) {
        SimpleRetryContext simpleContext = ((SimpleRetryContext) context);
        // 记录最近一次异常,并自增重试次数
        simpleContext.registerThrowable(throwable);
    }

    @Override
    public RetryContext open(RetryContext parent) {
        return new SimpleRetryContext(parent);
    }

    private static class SimpleRetryContext extends RetryContextSupport {

        public SimpleRetryContext(RetryContext parent) {
            super(parent);
        }

    }

    private boolean retryForException(Throwable ex) {
        return this.retryableClassifier.classify(ex);
    }

}
public class CircuitBreakerRetryPolicy implements RetryPolicy {

    public static final String CIRCUIT_OPEN = "circuit.open";

    public static final String CIRCUIT_SHORT_COUNT = "circuit.shortCount";

    private static Log logger = LogFactory.getLog(CircuitBreakerRetryPolicy.class);

    private final RetryPolicy delegate;

    private long resetTimeout = 20000;

    private long openTimeout = 5000;

    public CircuitBreakerRetryPolicy(RetryPolicy delegate) {
        this.delegate = delegate;
    }

    public void setResetTimeout(long timeout) {
        this.resetTimeout = timeout;
    }

    public void setOpenTimeout(long timeout) {
        this.openTimeout = timeout;
    }

    @Override
    public boolean canRetry(RetryContext context) {
        CircuitBreakerRetryContext circuit = (CircuitBreakerRetryContext) context;
        // 判断断路器开关是否打开
        if (circuit.isOpen()) {
            // 打开则不允许重试
            circuit.incrementShortCircuitCount();
            return false;
        } else {
            circuit.reset();
        }
        // 断路器开关闭合 or 半打开,是否允许重试交给代理实现
        return this.delegate.canRetry(circuit.context);
    }

    @Override
    public RetryContext open(RetryContext parent) {
        return new CircuitBreakerRetryContext(parent, this.delegate, this.resetTimeout, this.openTimeout);
    }

    @Override
    public void close(RetryContext context) {
        CircuitBreakerRetryContext circuit = (CircuitBreakerRetryContext) context;
        // 代理
        this.delegate.close(circuit.context);
    }

    @Override
    public void registerThrowable(RetryContext context, Throwable throwable) {
        CircuitBreakerRetryContext circuit = (CircuitBreakerRetryContext) context;
        circuit.registerThrowable(throwable);
        // 代理
        this.delegate.registerThrowable(circuit.context, throwable);
    }

    static class CircuitBreakerRetryContext extends RetryContextSupport {

        private volatile RetryContext context;

        private final RetryPolicy policy;

        private volatile long start = System.currentTimeMillis();

        private final long timeout;

        private final long openWindow;

        private final AtomicInteger shortCircuitCount = new AtomicInteger();

        public CircuitBreakerRetryContext(RetryContext parent, RetryPolicy policy, long timeout, long openWindow) {
            super(parent);
            this.policy = policy;
            this.timeout = timeout;
            this.openWindow = openWindow;
            this.context = createDelegateContext(policy, parent);
            setAttribute("state.global", true);
        }

        public void reset() {
            shortCircuitCount.set(0);
            setAttribute(CIRCUIT_SHORT_COUNT, shortCircuitCount.get());
        }

        public void incrementShortCircuitCount() {
            shortCircuitCount.incrementAndGet();
            setAttribute(CIRCUIT_SHORT_COUNT, shortCircuitCount.get());
        }

        private RetryContext createDelegateContext(RetryPolicy policy, RetryContext parent) {
            RetryContext context = policy.open(parent);
            reset();
            return context;
        }

        /* 判断断路器开关是否打开  **/
        public boolean isOpen() {
            // 计算时间间隔
            long time = System.currentTimeMillis() - this.start;
            // 判断是否允许重试
            boolean retryable = this.policy.canRetry(this.context);
            if (!retryable) {
                // 闭合重置开关超时时间
                if (time > this.timeout) {
                    logger.trace("Closing");
                    // 重建上下文
                    this.context = createDelegateContext(policy, getParent());
                    this.start = System.currentTimeMillis();
                    // 判断是否允许重试
                    retryable = this.policy.canRetry(this.context);
                }
                // [0,openTimeout)
                else if (time < this.openWindow) {
                    // 指定开关打开
                    if ((Boolean) getAttribute(CIRCUIT_OPEN) == false) {
                        logger.trace("Opening circuit");
                        setAttribute(CIRCUIT_OPEN, true);
                    }
                    this.start = System.currentTimeMillis();
                    return true;
                }
            }
            // 允许重试
            else {
                // (openWindow,resetTimeout]
                if (time > this.openWindow) {
                    logger.trace("Resetting context");
                    // 重建上下文
                    this.start = System.currentTimeMillis();
                    this.context = createDelegateContext(policy, getParent());
                }
            }
            if (logger.isTraceEnabled()) {
                logger.trace("Open: " + !retryable);
            }
            // 设置开关打开的标志位,不能重试=开关打开
            setAttribute(CIRCUIT_OPEN, !retryable);
            return !retryable;
        }

        @Override
        public int getRetryCount() {
            return this.context.getRetryCount();
        }

        @Override
        public String toString() {
            return this.context.toString();
        }

    }

}
除了断路器策略( *CircuitBreakerRetryPolicy* )稍微复杂点,其他基本都像  *SimpleRetryPolicy*  一样,逻辑简单。

首先  *canRetry*  在模板方法内是判断是否要重试的实现。 *SimpleRetryPolicy*  实现就是判断了次数和异常,是否满足要求。 *CircuitBreakerRetryPolicy* ,主要逻辑集中在  *org.springframework.retry.policy.CircuitBreakerRetryPolicy.CircuitBreakerRetryContext#isOpen* ,开关打开不允许重试,否则根据代理类判断是否允许重试。

重试策略看完,再来看回退策略代码就更简单了,看一个固定时长的回退策略实现:
public class FixedBackOffPolicy extends StatelessBackOffPolicy implements SleepingBackOffPolicy<FixedBackOffPolicy> {
    // 默认使用 Thread.sleep等待
	private Sleeper sleeper = new ThreadWaitSleeper();

    // 可以指定 org.springframework.retry.backoff.ObjectWaitSleeper
	public void setSleeper(Sleeper sleeper) {
		this.sleeper = sleeper;
	}

	protected void doBackOff() throws BackOffInterruptedException {
		try {
			sleeper.sleep(backOffPeriod);
		}
		catch (InterruptedException e) {
			throw new BackOffInterruptedException("Thread interrupted while sleeping", e);
		}
	}
}
默认的就是  *Thread.sleep*  一段时间。  



至此,编程式的源码简单的流程已经了解了。接下来分析下注解的实现原理,可预见的,注解也借助了编程式这套模板。

大体逻辑主要分为应用启动时,对全局内 @Retryable方法的收集,以及运行期的动态代理。
@Configuration
public class RetryConfiguration extends AbstractPointcutAdvisor implements IntroductionAdvisor, BeanFactoryAware {


	@PostConstruct
	public void init() {
		Set<Class<? extends Annotation>> retryableAnnotationTypes = new LinkedHashSet<Class<? extends Annotation>>(1);
		retryableAnnotationTypes.add(Retryable.class);
        // 切点定义,也就是所有被 @Retryable标识的方法
		this.pointcut = buildPointcut(retryableAnnotationTypes);
        // 切面构建
		this.advice = buildAdvice();
		if (this.advice instanceof BeanFactoryAware) {
			((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
		}
	}

	protected Advice buildAdvice() {
        // 看这个实现
		AnnotationAwareRetryOperationsInterceptor interceptor = new AnnotationAwareRetryOperationsInterceptor();
		if (retryContextCache != null) {
			interceptor.setRetryContextCache(retryContextCache);
		}
		if (retryListeners != null) {
			interceptor.setListeners(retryListeners);
		}
		if (methodArgumentsKeyGenerator != null) {
			interceptor.setKeyGenerator(methodArgumentsKeyGenerator);
		}
		if (newMethodArgumentsIdentifier != null) {
			interceptor.setNewItemIdentifier(newMethodArgumentsIdentifier);
		}
		if (sleeper != null) {
			interceptor.setSleeper(sleeper);
		}
		return interceptor;
	}
}
上面配置类,init 生命中期内定义了切点和切面,切点很简单,就是所有  *@Retryable*  标识的方法;切面的话需要移步  *AnnotationAwareRetryOperationsInterceptor* 。
public class AnnotationAwareRetryOperationsInterceptor implements IntroductionInterceptor, BeanFactoryAware {

	@Override
	public Object invoke(MethodInvocation invocation) throws Throwable {
        // 获取切面代理
		MethodInterceptor delegate = getDelegate(invocation.getThis(), invocation.getMethod());
		if (delegate != null) {
            // 执行代理方法
			return delegate.invoke(invocation);
		}
		else {
			return invocation.proceed();
		}
	}
}
这里切面的构建并不是我们关心的,所以 getDelegate 感兴趣的可以自行研究,里面加了一道缓存,保证代理对象只会创建一次。

继续跟源码,可以找到代理切面实现类根据有无状态有两种(根据  *@Retryable#stateful()*  设置):
  • org.springframework.retry.interceptor.RetryOperationsInterceptor
  • org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor (@CircuitBreaker 强制是这种
public class RetryOperationsInterceptor implements MethodInterceptor {

    private RetryOperations retryOperations = new RetryTemplate();

    public Object invoke(final MethodInvocation invocation) throws Throwable {

        String name;
        if (StringUtils.hasText(label)) {
            name = label;
        } else {
            name = invocation.getMethod().toGenericString();
        }
        final String label = name;
        // 构建 org.springframework.retry.RetryCallback
        RetryCallback<Object, Throwable> retryCallback = new MethodInvocationRetryCallback<Object, Throwable>(
                invocation, label) {

            @Override
            public Object doWithRetry(RetryContext context) throws Exception {

                context.setAttribute(RetryContext.NAME, label);

                if (invocation instanceof ProxyMethodInvocation) {
                    try {
                        // 回调业务代码块
                        return ((ProxyMethodInvocation) invocation).invocableClone().proceed();
                    } catch (Exception e) {
                        throw e;
                    } catch (Error e) {
                        throw e;
                    } catch (Throwable e) {
                        throw new IllegalStateException(e);
                    }
                } else {
                    throw new IllegalStateException(
                            "MethodInvocation of the wrong type detected - this should not happen with Spring AOP, "
                                    + "so please raise an issue if you see this exception");
                }
            }

        };

        // 根据是否有 @Recover 兜底/补偿方法,调用模板的不同方法
        if (recoverer != null) {
            RetryOperationsInterceptor.ItemRecovererCallback recoveryCallback = new RetryOperationsInterceptor.ItemRecovererCallback(invocation.getArguments(), recoverer);
            return this.retryOperations.execute(retryCallback, recoveryCallback);
        }

        return this.retryOperations.execute(retryCallback);

    }

}
public class StatefulRetryOperationsInterceptor implements MethodInterceptor {

    private RetryOperations retryOperations;

    @Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {

        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Executing proxied method in stateful retry: " + invocation.getStaticPart() + "("
                    + ObjectUtils.getIdentityHexString(invocation) + ")");
        }

        Object[] args = invocation.getArguments();
        Object defaultKey = Arrays.asList(args);
        if (args.length == 1) {
            defaultKey = args[0];
        }

        Object key = createKey(invocation, defaultKey);
        RetryState retryState = new DefaultRetryState(key,
                this.newMethodArgumentsIdentifier != null && this.newMethodArgumentsIdentifier.isNew(args),
                this.rollbackClassifier);

        Object result = this.retryOperations.execute(new StatefulRetryOperationsInterceptor.StatefulMethodInvocationRetryCallback(invocation, label),
                this.recoverer != null ? new StatefulRetryOperationsInterceptor.ItemRecovererCallback(args, this.recoverer) : null, retryState);

        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Exiting proxied method in stateful retry with result: (" + result + ")");
        }

        return result;
    }
}
最终  *invoke*  的实现里面,可以看到调用模板方法。利用切面,省得每个方法都写一遍  *execute*  方法。    

总结

总之,重试框架并不复杂,已经有现成的工具,就不要重复造轮子。本文也是通过阅读源码后写出来的,如果有理解不正确的地方,欢迎各位指正。

原文:https://my.oschina.net/marvelcode/blog/4563352
作者: MarvelCode