Sentinel核心源码和熔断降级原理分析

Sentinel核心源码和熔断降级原理分析

Sentinel核心源码分析

sentinel简介

sentinel是alibaba开源的流控组件,以流量为核心,提供系统流量控制,熔断降级,系统保护等功能,保证系统的平稳运行。目前主要的流量控制手段主要有两种,一种是以Hystrix为代表,基于线程池隔离的方式,另一种则是通过信号量的方式,sentinel就是此方式来实现流控的。

使用介绍

本文不会对其详细的使用深入介绍,具体的可以参考

简单使用步骤:

1.引入依赖
 <dependency>
     <groupId>com.alibaba.csp</groupId>
     <artifactId>sentinel-core</artifactId>
     <version>1.7.2</version>
 </dependency>
2.在代码中定义资源和规则

资源 :在项目里的一段代码,一个方法等一切东西

规则 :你要对资源定义的流控规则,如qps等指标

  public static void main(String[] args) {
     initFlowRules();
     while (true) {
         Entry entry = null;
         try {
         entry = SphU.entry("HelloWorld");
             /*您的业务逻辑 - 开始*/
             System.out.println("hello world");
             /*您的业务逻辑 - 结束*/
     } catch (BlockException e1) {
             /*流控逻辑处理 - 开始*/
         System.out.println("block!");
             /*流控逻辑处理 - 结束*/
     } finally {
        if (entry != null) {
            entry.exit();
        }
     }
 }
  
 private static void initFlowRules(){
       List<FlowRule> rules = new ArrayList<>();
       FlowRule rule = new FlowRule();
       rule.setResource("HelloWorld");
       rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
       // Set limit QPS to 20.
       rule.setCount(20);
       rules.add(rule);
       FlowRuleManager.loadRules(rules);
    }
}

上述代码中业务代码被定义名为"HelloWorld"的资源,该资源的qps为20。

原理分析

从上面的代码可以看出,限流的入口是从一个叫 Entry 的对象开始的,Entry顾名思义可以理解为通行入口的意思。通过入口来把我们的代码即资源保护起来,任何请求到来时都要先经过入口的检查,如果检查通过了才能放行,否则的话拒绝放行。而通行规则则有开发人员自己来定。下图来源于sentinel官方,此图大概描述了sentinel运行的原理图。对于初次接触sentinel的人来说看到此图可能会比较懵逼,这里先简单说下大概的工作流程

当我们有请求到来时, Entry 会为每一个资源创建一个处理链 ProcessorSlotChain ,系统默认提供了8个Handler构成了此处理链,每个handler各司其职完成相应的功能,当然我们的流量校验处理对象也在其中名为 FlowSlot ,如果请求到来时能够通过 ProcessorSlotChain 的校验的话,就放行此请求,如果不通过,就会抛出相应的异常。Sentinel是以流量控制为核心,底层的流量统计是以滑动窗口来完成qps统计的,具体实现是通过名为 LeapArray 的对象来完成的。处理链如下图所示:

核心概念和类

在分析具体的源码之前,先介绍几个比较核心的概念和对象,不然进入代码会比较生涩。

Entry :前面说过了,对于要保护的资源须用Entry包裹起来即:

 Entry entry = SphU.entry("HelloWorld");
  ......//保护的资源
 entry.exit();

Context :上下文,每个entry都是存在特定上下文中,它是一个ThreadLocal变量,其内部通过name字段来区分不同的上下文,一个上下文即代表一个EntranceNode;

ResourceWrapper :资源包装类,上面SphU.entry(“HelloWorld”)语句实际上创建了一个HelloWorld的ResourceWrapper,资源具有全局唯一性

public abstract class ResourceWrapper {
    protected final String name; //资源名
    ......
}

ProcessorSlotChain : 这个是个核心组件,其各种限流,熔断等功能都是通过此对象来实现的。内部是一个个的Slot对象,每个Slot对象完成各自对应的功能,其Chain的构建是通过Spi的方式来构建的。

# Sentinel default ProcessorSlots

//为每个ProcessorSlotChain提供Node对象
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot

//为每个ProcessorSlotChain提clusterNode对象和originNode
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot

//记录日志
com.alibaba.csp.sentinel.slots.logger.LogSlot

//统计流量
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot

//系统规则校验保护
com.alibaba.csp.sentinel.slots.system.SystemSlot

//认证校验
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot

//流控检查
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot

//熔断降级检测
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

除了上面系统默认提供的8个Slot对象外,开发者还可以自己定义相关的Slot对象,按照SPI的方式加入即可。

Node : 用于完成统计相关的功能, ProcessorSlotChain 中的第一个Slot对象NodeSelectorSlot就是用于创建或获取Node对象,后续的每个Slot都会透传此Node对象,用于统计相关功能。

核心源码分析

有了上面几个比较核心的概念后,下面正式进入源码的分析。

1. 入口创建 SphU

此类提供了创建Entry的api,类似的类还有一个叫做SphO,两个类的区别在于前者是通过抛出异常的方式来拒绝请求通过,而后者则是通过返回Bool类型的结果来表示结果。

SphU

  public static Entry entry(String name) throws BlockException {
      return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0);
  }

SphO

  public static boolean entry(String name) {
      return entry(name, EntryType.OUT, 1, OBJECTS0);
  }

后面我们已SphU为入口来分析,在SphU的entry方法中调用了Env类:

public class Env {
    //创建CtSph对象,用于统计和规则校验
    public static final Sph sph = new CtSph();

    static {
        // If init fails, the process will exit.
        //初始化InitFunc相关接口,通过SPI定义
        InitExecutor.doInit();
    }
}

进入CtSph的entry方法,此类中entry方法有多个重载,我们只分析一个即可,其他的都是一样的

    @Override
    public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
        StringResourceWrapper resource = new StringResourceWrapper(name, type);
        return entry(resource, count, args);
    }
2.创建conext和processorSlotChain对象

在上面的方法中我们可以看出,每个entry都会关联一个资源,资源通过name和type来唯一关联。接着代码往下走,最后会进入一个entryWithPriority的方法,此方法是一个很重要的方法,在方法类会创建上下文对象Context,处理链对象ProcessorSlotChain,也是规则校验的入口

/**
* resourceWrapper:资源
* count:请求许可
* prioritized:优先级
* args:额外携带的参数
*/
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        //获取上下文对象,一个ThreadLocal变量
        Context context = ContextUtil.getContext();
        //如果创建的上下文数量达到上限(2000),会返回一个NullContext对象
        if (context instanceof NullContext) {
            //创建一个不执行规则校验的entry对象
            return new CtEntry(resourceWrapper, null, context);
        }
        //第一次进入会走到走到这里
        if (context == null) {
            //创建上下文对象,并设置默认名称为:sentinel_default_context
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }
        //全局控制开关,如果关闭则不执行规则校验
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }
        //创建或获取ProcessorSlotChain对象,用于执行规则校验
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

        //如果走到这里,意味着创建了太多的资源,默认不能超过6000
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }
        //走到这里,意味着上下文对象创建和ProcessorSlotChain对象创建都ok
        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            //进入规则校验,如果不通过则会抛出异常
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            //限流触发的异常
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

上面的代码实际上就包含了整个流控规则的校验流程。下面来看看上下文对象Context的创建

1.ContextUtil.getContext()

    private static ThreadLocal<Context> contextHolder = new ThreadLocal<>();
    ......
    public static Context getContext() {
        return contextHolder.get();
    }

可以看到context是一个线程本地变量,第一次进入的时候返回空,在ContextUtil类的trueEnter方法中会创建新的context对象

protected static Context trueEnter(String name, String origin) {
        Context context = contextHolder.get();
        if (context == null) {
            //获取上下文node缓存,key为context的name
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            //获取上下文入口EntranceNode
            DefaultNode node = localCacheNameMap.get(name);
            if (node == null) {
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                    setNullContext();
                    return NULL_CONTEXT;
                } else {
                    LOCK.lock();
                    try {
                        //二次校验
                        node = contextNameNodeMap.get(name);
                        if (node == null) {
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {
                                //创建入口EntranceNode,每个上下文都有唯一的一个入口
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // 将创建的entranceNode挂载到根node下
                                Constants.ROOT.addChild(node);
                                //将新创建的node加入到缓冲中
                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        LOCK.unlock();
                    }
                }
            }
            //创建上下文
            context = new Context(node, name);
            context.setOrigin(origin);
            contextHolder.set(context);
        }

        return context;
    }

上述代码就是context的创建的全过程,每个context都是线程本地变量,并且都会关联一个EntranceNode,并将其挂载根node节点下面。context对象创建完毕后就会创建ProcessorSlotChain对象,我们回到上面entryWithPriority方法中的lookProcessChain方法

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        //先从缓存中查询获取对象
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }
                    //通过SPI的方式创建对象
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }

ProcessorSlotChain对象的创建是通过spi的方式,其接口实现类定义在如下文件中

,将文件中的对象按照约定顺序组织起来就形成了ProcessorSlotChain对象,具体就不深入进去了。当相关的对象都创建好了以后就是具体的规则校验了,回到entryWithPriority方法中的chain.entry(context, resourceWrapper, null, count, prioritized, args)这行代码来,此方法就是规则校验的入口。进入处理链的一个对象是NodeSelectorSlot,来看看源码

@SpiOrder(-10000)
public class NodeSelectorSlot extends AbstractLinkedProcessorSlot<Object> {

    //DefaultNode缓存,key为context的名称
    private volatile Map<String, DefaultNode> map = new HashMap<String, DefaultNode>(10);

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
       
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    //创建node
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }

            }
        }
        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

这段代码还是比较简单,根据上下文的名字获取DefalutNode,如果没有则创建。这里先梳理一下context、entry、ProcesssorSlotChain、和DefaultNode的关系。

  • context代表的是一个上下文,每个entry都是在某个具体的context下运行的,同时每个context都会有一个EntranceNode;
  • 每个entry都关联一个具体resource;
  • 每个resource都会有一个ProcesssorSlotChain来做规则校验;
  • 每个ProcessSlotChain可以包含多个DefaultNode,但只会有一个clusterNode;

node的关系图如下(来自官网):

EntranceNode1、EntranceNode2分别代表两个上下文环境的入口node。处理链中第二个对象是ClusterBuilderSlot,此对象的作用维护ClusterNode和originNode,clusterNode的作用是针对resource的所有上下文来统计的,originNode的作用是针对具有origin属性的entry来说的,来看看主要代码

public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args)
        throws Throwable {
        if (clusterNode == null) {
            synchronized (lock) {
                if (clusterNode == null) {
                    //创建clusterNode
                    clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
                    HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
                    newMap.putAll(clusterNodeMap);
                    newMap.put(node.getId(), clusterNode);

                    clusterNodeMap = newMap;
                }
            }
        }
        //node为当前上下文的entranceNode
        node.setClusterNode(clusterNode);

        //如果上下文设置了origin属性
        if (!"".equals(context.getOrigin())) {
            Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
            context.getCurEntry().setOriginNode(originNode);
        }

        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

接下来的处理器就是LogSlot,此类是功能主要就是记录异常的日志,这里就不细说了。下一个处理器StatisticSlot是非常重要的一个对象,它维护着流量、线程、异常等统计信息。代码如下:

@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                      boolean prioritized, Object... args) throws Throwable {
        try {
            // 后续流控、熔断降级等校验
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // 如果校验通过会走到这里,增加线程和pass的计数统计
            node.increaseThreadNum();
            node.addPassRequest(count);
            //如果设置了origin,增加originNode统计
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                context.getCurEntry().getOriginNode().addPassRequest(count);
            }
            //增加全局计数统计
            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
                Constants.ENTRY_NODE.addPassRequest(count);
            }

            // 调用回调
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (PriorityWaitException ex) {
            node.increaseThreadNum();
            if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseThreadNum();
            }
            // Handle pass event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onPass(context, resourceWrapper, node, count, args);
            }
        } catch (BlockException e) {
            // Blocked, set block exception to current entry.
            context.getCurEntry().setBlockError(e);

            // 增加block计数统计
            node.increaseBlockQps(count);
            if (context.getCurEntry().getOriginNode() != null) {
                context.getCurEntry().getOriginNode().increaseBlockQps(count);
            }

            if (resourceWrapper.getEntryType() == EntryType.IN) {
                // Add count for global inbound entry node for global statistics.
                Constants.ENTRY_NODE.increaseBlockQps(count);
            }

            // Handle block event with registered entry callback handlers.
            for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
                handler.onBlocked(e, context, resourceWrapper, node, count, args);
            }

            throw e;
        } catch (Throwable e) {
            // Unexpected internal error, set error to current entry.
            context.getCurEntry().setError(e);

            throw e;
        }
    }

接下来的SystemSlot和AuthoritySlot这里就不做介绍了。重点说下FlowSlot这个类,整个流控的校验都是在这里面进行的。代码比较简单:

  @Override
  public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                    boolean prioritized, Object... args) throws Throwable {
      checkFlow(resourceWrapper, context, node, count, prioritized);

      fireEntry(context, resourceWrapper, node, count, prioritized, args);
  }

进入checkFlow方法,最终会调用FlowRuleChecker的checkFlow方法:

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                          Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) {
            return;
        }
        //获取流控规则,从这里可以看出,如果设置了多个规则,会逐一校验每一个规则
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }

进入canPassCheck方法,

    public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                                    boolean prioritized) {
        String limitApp = rule.getLimitApp();
        if (limitApp == null) {
            return true;
        }
        //如果是集群模式,进行集群模式校验
        if (rule.isClusterMode()) {
            return passClusterCheck(rule, context, node, acquireCount, prioritized);
        }
        //本地校验
        return passLocalCheck(rule, context, node, acquireCount, prioritized);
    }
    
     private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        //选取合适的node,根据其统计技术来判断是否通过
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }

        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }

最后会调用流量整形控制器的canPass方法,这里看下默认的流量整形控制器DefaultController的实现

  @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        //获取已使用的许可
        int curCount = avgUsedTokens(node);
        //如果 当前已使用许可 + 请求许可  > 设置的数量
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                //采用滑动窗口来统计,每个窗口分割成了多个小的窗体,通过判断后续窗体的容量来进行流控
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

上述的过程分析了sentinel限流的原理和主要工作流程。其核心在于ProcessorSlotChain,把握住它就可以抓住其主脉络。

sentinel熔断降级原理分析

上面介绍了sentinel的基本工作原理和限流原理。本文将从源码的角度对熔断降级的原理进行分析。熔断操作位于slot处理链的末尾,由名为DegradeSlot的类来处理的,来看看其源码

@SpiOrder(-1000)
public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
        throws Throwable {
        //熔断降级判断
        DegradeRuleManager.checkDegrade(resourceWrapper, context, node, count);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        fireExit(context, resourceWrapper, count, args);
    }
}

源码很简单,委托DegradeRuleManager来处理,进入DegradeRuleManager的checkDegrade方法

public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
        throws BlockException {
        //获取资源熔断规则
        Set<DegradeRule> rules = degradeRules.get(resource.getName());
        if (rules == null) {
            return;
        }
        //遍历每个熔断规则,校验是否满足熔断条件
        for (DegradeRule rule : rules) {
            //如果达到了熔断条件,就会抛出DegradeException的异常
            if (!rule.passCheck(context, node, count)) {
                throw new DegradeException(rule.getLimitApp(), rule);
            }
        }
    }

熔断的判断就是针对资源设置的规则,逐一判断处理。如果有一个条件不满足的话,就会抛出DegradeException异常。那么熔断判断具体是怎么做的呢?继续深入DegradeRule类中的passCheck方法,在分析passCheck方法之前,先介绍DegradeRule类几个比较重要的字段。

    //慢请求或异常请求的计数
    private double count;

    //熔断窗口
    private int timeWindow;
    
    //熔断策略 (0: 慢调用, 1: 异常率, 2: 异常数) 
    private int grade = RuleConstant.DEGRADE_GRADE_RT;

    /**
     * 针对慢调用,如果慢调用数小于其值(默认为5),是不会触发熔断的
     *
     * @since 1.7.0
     */
    private int rtSlowRequestAmount = RuleConstant.DEGRADE_DEFAULT_SLOW_REQUEST_AMOUNT;

    /**
     * 针对异常率,如果异常数小于其值(默认为5),是不会触发熔断的
     *
     * @since 1.7.0
     */
    private int minRequestAmount = RuleConstant.DEGRADE_DEFAULT_MIN_REQUEST_AMOUNT;

熔断的实现原理简单说来就是在一个设定的窗口时间内,根据设置的具体熔断策略,判断相应的计数统计是否超过了门限值,如果超过了则会触发熔断机制。深入passCheck的源码

 //满调用计数
 private AtomicLong passCount = new AtomicLong(0);
 //熔断降级标记位,如果为true,则表示触发了熔断
 private final AtomicBoolean cut = new AtomicBoolean(false);
 
 public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
        //如果标记位为真,表示已触发熔断
        if (cut.get()) {
            return false;
        }
        //获取资源计数统计node
        ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
        if (clusterNode == null) {
            return true;
        }
        //如果熔断降级策略为慢调用
        if (grade == RuleConstant.DEGRADE_GRADE_RT) {
           //获取慢调用平均响应时间
            double rt = clusterNode.avgRt();
            //如果调用平均响应时间小于设定的门限值,则重置慢调用计数统计
            if (rt < this.count) {
                passCount.set(0);
                return true;
            }

            //如果满调用数小于默认的最小门限数(5),则不进行熔断降级
            if (passCount.incrementAndGet() < rtSlowRequestAmount) {
                return true;
            }
          //如果熔断降级策略是异常率
        } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
            //每秒的异常数
            double exception = clusterNode.exceptionQps();
            //每秒成功调用数
            double success = clusterNode.successQps();
            //每秒总调用数
            double total = clusterNode.totalQps();
            //如果总调用数小于默认的门限值(5),则不会触发熔断降级   
            if (total < minRequestAmount) {
                return true;
            }
            //此句需要好好理解下,它表达的意思是:在异常数小于最小门限的条件是不进行熔断降级的,但前提是所用调用都不能全是异常调用
            double realSuccess = success - exception;
            if (realSuccess <= 0 && exception < minRequestAmount) {
                return true;
            }
            //异常率小于设置的门限,则不熔断降级
            if (exception / success < count) {
                return true;
            }
            
            //如果熔断降级策略是异常数
        } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
            //注意,这个异常数是每分钟统计的
            double exception = clusterNode.totalException();
            //小于设置的门限值,则不熔断
            if (exception < count) {
                return true;
            }
        }
        //如果走到了这里,则表示将要触发熔断降级了
        //重置慢调用统计时间窗口,此处用了CAS的方法来设置标志位,防止并发。时间窗口的重置是依赖于定时任务来完成的,当timeWindow时间后,会重置熔断标志位和计数统计
        if (cut.compareAndSet(false, true)) {
            ResetTask resetTask = new ResetTask(this);
            pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
        }

        return false;
    }
    
    //重置时间窗口
    private static final class ResetTask implements Runnable {

        private DegradeRule rule;

        ResetTask(DegradeRule rule) {
            this.rule = rule;
        }

        @Override
        public void run() {
            //重置慢调用计数
            rule.passCount.set(0);
            //熔断标志位
            rule.cut.set(false);
        }
    }

上面的代码描述了熔断降级核心流程,针对上面代码需要注意的是:

  • 慢调用是通过一个 时间窗口 来计数满调用的次数来实现的
  • 异常率是针对 每秒 的异常数和成功数的比值来判断是否满足触发条件的
  • 异常数是针对 每分钟 的异常数统计来实现的

当熔断被触发后,标志位会被设置为true,并会持续timeWindow长的时间,这个时间就是开发者在设置熔断降级规则时设置的。上述就是整个熔断降级的实现过程,从代码来看,熔断窗口通过一个定时任务来更新,设计的还是比较新颖的。


作者:呼儿嘿呦本尊
原文:https://juejin.im/post/6857382397059694599
原文:https://juejin.im/post/6875577249078935566