解读Sentinel滑动窗口算法实现源码

在前面搞清楚了Sentinel的使用后,大致理了一下Sentinel的责任链,搞清楚了这个,基本就已经梳理清楚sentinel-core模块的大部分内容,顺着这条链路可以继续梳理很多东西。

知其然、知其所以然。而阅读源码就是最好的知其所以然的方式。这一次找了一些空闲时间,捋了一下它的滑动窗口算法,在这里做一个记录。后面会继续去梳理它的令牌算法和漏桶算法。

关于滑动窗口的原理,Sentinel为什么要使用滑动窗口,Sentinel是怎样使用的滑动,直接使用下面这两张图。一图胜千言,一张好的图足以说明问题,在这里我引用两张图。


图片说明:第一张图为Sentinel github上的图片,因为有时加载不出来,所以拷贝出来了。图二为一张微信公众号的图片,具体公众号见水印。这里引用只是为了学习使用,但是还是注明一下来源。

首先从StatisticSlot类开始,它是Sentinel统计的核心功能槽,先看它的entry[^对这个方法做了一下精简,只保留了几行能够说明问题的代码。]方法:

@SpiOrder(-7000)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // 先执行后续限流、降级等功能
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // 上面执行通过,更新通过请求数据
            node.addPassRequest(count);

        } catch (PriorityWaitException ex) {

        } catch (BlockException e) {
            // 上面执行阻塞,更新阻塞请求数据
            node.increaseBlockQps(count);
        } catch (Throwable e) {
        }
    }
}

通过代码可以看出,它是先执行后面的限流、降级等,然后以后面的执行结果为基础来更新对应资源的通过、阻塞、异常等统计数据。上面的执行通过和异常处理逻辑大体一致。这里就以执行通过这条线来说明问题,所以对应代码就是node.addPassRequest(count);进入到这一行代码,经过几次调用转到了StatisticNode这个类上,根据类名可以知道这个表示一个统计节点,调用的方法是addPassRequest:

@Override
public void addPassRequest(int count) {
    rollingCounterInSecond.addPass(count);
    rollingCounterInMinute.addPass(count);
}

由这个方法可以看出,StatisticNode在处理统计数据的时候,分了两个维度,分别是秒级的和分钟级的。对应的rollingCounterInSecond和rollingCounterInMinute是它的两个成员属性。其定义如下:

public class StatisticNode implements Node {

    /**
     * Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans
     * by given {@code sampleCount}.
     */
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
        IntervalProperty.INTERVAL);

    /**
     * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
     * meaning each bucket per second, in this way we can get accurate statistics of each second.
     */
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
}

Metric是一个度量单位接口,其具体实现下面需要提一下,在这里先不展开,只需要知道它存储的是一些执行数据,如成功数、异常数等。而在上面的StatisticNode.addPassRequest方法中就是分别调用两个维度的统计单位增加请求通过数量。从rollingCounterInSecond.addPass(count)这一句进入,对应的方法在ArrayMetric类中,这一个就是Metric的唯一实现。ArrayMetric.addPass这个方法代码如下:

@Override
public void addPass(int count) {
    // 获取当前时间对应的窗口,返回的是当前窗口的一个包装类
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    wrap.value().addPass(count);
}

可以看出这里就已经是滑动窗口算法的入口了。通过滑动窗口算法,使用当前时间获取一个合适的窗口,然后在这个窗口中增加通过的请求数。进入到代码里面,最终落实到了LeapArray的currentWindow方法中了。就LeapArray这个类名来说,非常有我在开篇第二张图的那味道了。

在看LeapArray.currentWindow这个方法之前,先来看一个短小简单但是足够核心的一个方法LeapArray.calculateTimeIdx,整个方法只有两行代码,如下:

private int calculateTimeIdx(long timeMillis) {
    // 将传入的当前时间按照窗口时长进行分段,拿到当前时间对应的分段ID
    long timeId = timeMillis / windowLengthInMs;
    // 将当前时间的分段段ID对应到窗口数组的下标ID上
    return (int)(timeId % array.length());
}

上面的array定义如下:

protected final AtomicReferenceArray<WindowWrap<T>> array;

其赋值在构造方法中,如下语句:

this.array = new AtomicReferenceArray<>(sampleCount);

通过上面这个方法,我们就能够得到当前时间对应的窗口在窗口数组中的位置了,接下来我们要做的事情就是根据这个位置取出对应的窗口返回去给对应的统计逻辑使用。

直接看LeapArray.currentWindow[^为了方便阅读,精简了它的注释]方法定义:

public WindowWrap<T> currentWindow(long timeMillis) {
    if (timeMillis < 0) {
        return null;
    }
	// 获取当前时间在窗口数组中映射的下标
    int idx = calculateTimeIdx(timeMillis);
	// 计算当前时间对应的窗口的开始时间,具体方法见下面
    long windowStart = calculateWindowStart(timeMillis);

    /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
    while (true) {
        WindowWrap<T> old = array.get(idx);
        if (old == null) {
            // 第一次进入,新建窗口,并使用cas的方式设置,如果出现争抢导致设置失败,暂时让出执行权待其它线程成功设置
            WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            if (array.compareAndSet(idx, null, window)) {
                return window;
            } else {
                Thread.yield();
            }
        } else if (windowStart == old.windowStart()) {
            // 当前时间对应的窗口开始时间等于获取到的窗口开始时间,那么当前获取到的窗口就是我们需要的
            return old;
        } else if (windowStart > old.windowStart()) {
            // 当前时间对应的窗口开始时间大于获取到的窗口开始时间,那么当前获取到的窗口为已过期窗口,加锁重置
            if (updateLock.tryLock()) {
                try {
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            } else {
                Thread.yield();
            }
        } else if (windowStart < old.windowStart()) {
            // Should not go through here, as the provided time is already behind.
            return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        }
    }
}

LeapArray.calculateWindowStart方法:

protected long calculateWindowStart(long timeMillis) {
    return timeMillis - timeMillis % windowLengthInMs;
}

总结上面的代码就是:先将当前时间按照统计时长分段,得到当前时间对应的分段ID。因为窗口数组是固定的,所以随着时间线向前发展,会不断的顺序循环使用数组中的窗口。所以使用当前时间对应的分段ID与窗口数组的长度求余得到当前时间对应的窗口在窗口数组中的下标,拿到这个下标后,接着就是在循环中获取这个下标对应的窗口了。

在获取指定下标对应的窗口时,要分情况进行处理:

  • 如果对应下标窗口为null,那么就是第一次进入,创建新窗口并使用cas设置。如果非空走下面的逻辑。
  • 如果获取到的窗口开始时间等于当前时间计算出来的对应窗口开始时间,那么就拿到了当前时间需要的窗口,直接返回。
  • 如果获取到的窗口开始时间小于当前时间计算出来的对应窗口开始时间,那么就说明这个窗口已经过期了,所以加锁重置,然后重复使用。
  • 当前时间小于旧的窗口的开始时间,理论上来说是不应该出现这种情况的,如果存在这种情况,那么返回一个无效的空窗口。

整个Sentinel滑动窗口算法的使用就上面这些代码,看完后第一感觉是代码如此简介,但是功能却如此高效强大。


原文:Sentinel滑动窗口算法 - 小白先生哦 - 博客园
作者:小白先生哦