天天看点

Sentinel流控实现原理+代码实现分析原理分析源码分析

目录

原理分析

第一点、拦截点

第二点、流控的具体实现

NodeSelectorSlot

ClusterBuilderSlot

StatisticSlot

FlowSlot

DegradeSlot

SystemSlot

源码分析

创建SlotChain

执行SlotChain的entry方法

执行Slot的entry方法

原理分析

Sentinel流控涉及两个方面:

1、方法的拦截、处理

2、多种流控规则、策略的实现

第一点、拦截点

以java web服务为例进行讲解,在过滤器对请求进行拦截来实现。

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-web-servlet</artifactId>
    <version>x.y.z</version>
</dependency>
           

在这个实现包中提供了具体的filter实现类:

com.alibaba.csp.sentinel.adapter.servlet.CommonFilter

@Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
            throws IOException, ServletException {
        HttpServletRequest sRequest = (HttpServletRequest) request;
        Entry urlEntry = null;

        try {
            String target = FilterUtil.filterTarget(sRequest);
            // Clean and unify the URL.
            // For REST APIs, you have to clean the URL (e.g. `/foo/1` and `/foo/2` -> `/foo/:id`), or
            // the amount of context and resources will exceed the threshold.
            UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();
            if (urlCleaner != null) {
                target = urlCleaner.clean(target);
            }

            // If you intend to exclude some URLs, you can convert the URLs to the empty string ""
            // in the UrlCleaner implementation.
            if (!StringUtil.isEmpty(target)) {
                // Parse the request origin using registered origin parser.
                String origin = parseOrigin(sRequest);
                String contextName = webContextUnify ? WebServletConfig.WEB_SERVLET_CONTEXT_NAME : target;
                ContextUtil.enter(contextName, origin);

                if (httpMethodSpecify) {
                    // Add HTTP method prefix if necessary.
                    String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + COLON + target;
                    urlEntry = SphU.entry(pathWithHttpMethod, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
                } else {
                    urlEntry = SphU.entry(target, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
                }
            }
            chain.doFilter(request, response);
        } catch (BlockException e) {
            HttpServletResponse sResponse = (HttpServletResponse) response;
            // Return the block page, or redirect to another URL.
            WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);
        } catch (IOException | ServletException | RuntimeException e2) {
            Tracer.traceEntry(e2, urlEntry);
            throw e2;
        } finally {
            if (urlEntry != null) {
                urlEntry.exit();
            }
            ContextUtil.exit();
        }
    }

    private String parseOrigin(HttpServletRequest request) {
        RequestOriginParser originParser = WebCallbackManager.getRequestOriginParser();
        String origin = EMPTY_ORIGIN;
        if (originParser != null) {
            origin = originParser.parseOrigin(request);
            if (StringUtil.isEmpty(origin)) {
                return EMPTY_ORIGIN;
            }
        }
        return origin;
    }
           

从代码来看,先进行:ContextUtil.enter(contextName, origin);

接着通过SphU.entry(String name, int resourceType, EntryType type)方法判断是否放行,如果出现BlockException,则执行阻塞处理器里设置的动作:WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);

第二点、流控的具体实现

在Sentinel里,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个Entry对象。Entry可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用SphU API显式创建。Entry创建的时候,同时也会创建一系列功能插槽(slot chain),这些插槽有不同的职责,例如:

1、NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;

2、ClusterBuilderSlot则用于存储资源的统计信息以及调用者信息,例如该资源的RT、QPS、thread count等等,这些信息将用作多维度限流、降级的依据

3、StatisticSlot则用于记录、统计不同维度的runtime指标监控信息

4、FlowSlot则用于根据预设的限流规则以及前面slot统计的状态,来进行流量控制

5、AuthoritySlot则根据配置的黑白名单和调用来源信息,来做黑白名单控制

6、DegradeSlot则通过统计信息以及预设的规则,来做熔断降级

7、SystemSlot则通过系统的状态,例如load1等,来控制总的入口流量

总体框架如下:

Sentinel流控实现原理+代码实现分析原理分析源码分析

Sentinel将SlotChainBuilder作为SPI接口进行扩展,使得Slot Chain具备了扩展的能力。当然我们可以自行加入自定义的slot并编排slot间的顺序,从而可以给Sentinel添加自定义的功能。

Sentinel流控实现原理+代码实现分析原理分析源码分析

下面我们来看看每一个slot的功能

NodeSelectorSlot

这个slot主要负责收集资源的路径,并将这些资源的调用路径以树状结构存储起来,用于根据调用路径进行流量控制。

ContextUtil.enter("entrance1", "appA");
Entry nodeA = SphU.entry("nodeA");
if (nodeA != null) {
    nodeA.exit();
}
ContextUtil.exit();
           

上述代码通过Context.enter()创建了一个名为entrance1的上下文,同时指定调用发起者的appA;接着通过SphU.entry()请求一个token,如果该方法顺利执行没有抛BlockException,表明token请求成功。

以上代码将在内存中生成以下结构:

Sentinel流控实现原理+代码实现分析原理分析源码分析

注意:每个DefaultNode由资源ID和输入名称来标识。换句话说,一个资源ID可以有多个不同入口的DefaultNode。

ContextUtil.enter("entrance1", "appA");
Entry nodeA = SphU.entry("nodeA");
if (nodeA != null) {
    nodeA.exit();
}
ContextUtil.exit();

ContextUtil.enter("entrance2", "appA");
nodeA = SphU.entry("nodeA");
if (nodeA != null) {
    nodeA.exit();
}
ContextUtil.exit();
           

以上代码将在内存中生成以下结构:

上面的结构可以通过调用 curl http://localhost:8719/tree?type=root来显示

Sentinel流控实现原理+代码实现分析原理分析源码分析

ClusterBuilderSlot

此插槽用于构建资源的ClusterNode以及调用来源节点。ClusterNode保持某个资源运行统计信息(响应时间、QPS、block数目、线程数、异常数等)以及调用来源统计信息列表。调用来源的名称由ContextUtil.enter(contextName, origin)中的origin标记。可通过如下命令查看某个资源不同调用者的访问情况:

curl http://localhost:8719/origin?id=caller:

Sentinel流控实现原理+代码实现分析原理分析源码分析

StatisticSlot

StatisticSlot是Sentinel的核心功能插槽之一,用于统计实时的调用数据。

clusterNode:资源唯一标识的ClusterNode的runtime统计

origin:根据来自不同调用者的统计信息

defaultNode:根据上下文条目名称和资源ID的runtime统计

入口流量的统计

Sentinel底层采用高性能的滑动窗口数据结构LeapArray来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。

Sentinel流控实现原理+代码实现分析原理分析源码分析

FlowSlot

这个slot主要根据预设的资源的统计信息,按照固定的次序,依次生效。如果一个资源对应两条或者多条流控规则,则会根据如下次序依次检验,直到全部通过或者有一个规则生效为止:

  • 指定应用生效的规则,即针对调用方限流的
  • 调用方为other的规则
  • 调用方为default的规则

DegradeSlot

这个slot主要针对资源的平均响应时间(RT)以及异常比率,来决定资源是否在接下来的时间被自动熔断掉。

SystemSlot

这个slot会根据对于当前系统的整体情况,对入口的资源进行调配。其原理是让入口的流量和当前系统的load达到一个动态平衡。注意这个功能的两个限制:

只对入口流量起作用(调用类型为EntryType.IN),对出口流量无效。可通过SphU.entry()指定调用类型,如果不指定,默认是EntryType.OUT。

Entry entry = SphU.entry("resourceName", EntryType.IN);
           

只在Unix-like的操作系统上生效。

源码分析

现在以SphU.entry方法为切入点来开始分析。这个方法会去申请一个entry,如果能够申请成功,则说明没有被限流,否则会抛出BlockException,表明已经被限流了。

从SphU.entry()方法往下执行会进入到Sph.entry(),Sph的默认实现类是CtSph,在CtSph中最终会执行到entry(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException这个方法。

我们来看一下这个方法的具体实现:

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        Context context = ContextUtil.getContext();
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            // Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // Global switch is close, no rule checking will do.
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        // 获取资源对应的SlotChain
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

        /*
         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * so no rule checking will be done.
         */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            // 执行Slot的entry方法
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            //抛出BlockException
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }
           

这个方法可以分为以下几个部分:

1、对参数和全局配置项做检测,如果不符合要求就直接返回一个CtEntry对象,不会再进行后面的限流检测,否则进入下面的检测流程。

2、根据包装过的资源对象获取对应的SlotChain

3、执行SlotChain的entry方法

    3.1、如果SlotChain的entry方法抛出了BlockException,则将该异常抛到上层

    3.2、如果SlotChain的entry方法正常执行了,则最后会将该entry对象返回

4、如果上层方法捕获了BlockException,则说明请求被限流了,否则请求能正常执行

创建SlotChain

首先看一下lookProcessChain的方法实现:

/**
     * Get {@link ProcessorSlotChain} of the resource. new {@link ProcessorSlotChain} will
     * be created if the resource doesn't relate one.
     *
     * <p>Same resource({@link ResourceWrapper#equals(Object)}) will share the same
     * {@link ProcessorSlotChain} globally, no matter in witch {@link Context}.<p/>
     *
     * <p>
     * Note that total {@link ProcessorSlot} count must not exceed {@link Constants#MAX_SLOT_CHAIN_SIZE},
     * otherwise null will return.
     * </p>
     *
     * @param resourceWrapper target resource
     * @return {@link ProcessorSlotChain} of the resource
     */
    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }

                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }
           

该方法使用了一个HashMap做缓存,key是资源对象。这里加了锁,并且做了double check。具体构造chain的方法是通过spi创建的。

/**
 * Builder for a default {@link ProcessorSlotChain}.
 *
 * @author qinan.qn
 * @author leyou
 */
public class DefaultSlotChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new FlowSlot());
        chain.addLast(new DegradeSlot());

        return chain;
    }

}
           

Chain是链条的意思,从build的方法可看出,ProcessorSlotChain是一个链表,里面添加了很多个Slot。具体的实现需要到DefaultProcessorSlotChain中去看。

public class DefaultProcessorSlotChain extends ProcessorSlotChain {

    AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() {

        @Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        }

        @Override
        public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
            super.fireExit(context, resourceWrapper, count, args);
        }

    };
    AbstractLinkedProcessorSlot<?> end = first;

    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first.getNext());
        first.setNext(protocolProcessor);
        if (end == first) {
            end = protocolProcessor;
        }
    }

    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }

    /**
     * Same as {@link #addLast(AbstractLinkedProcessorSlot)}.
     *
     * @param next processor to be added.
     */
    @Override
    public void setNext(AbstractLinkedProcessorSlot<?> next) {
        addLast(next);
    }

    @Override
    public AbstractLinkedProcessorSlot<?> getNext() {
        return first.getNext();
    }

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
        throws Throwable {
        first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
    }

    @Override
    public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
        first.exit(context, resourceWrapper, count, args);
    }

}
           

DefaultProcessorSlotChain中有两个AbstractLinkedProcessorSlot类型的变量:first和end,这就是链表的头结点和尾结点。

创建DefaultProcessorSlotChain对象时,首先创建了首节点,然后把首节点赋值给了尾结点,可以用下图表示:

Sentinel流控实现原理+代码实现分析原理分析源码分析

将第一个节点添加到链表中后,整个链表的结构变成了如下图这样:

Sentinel流控实现原理+代码实现分析原理分析源码分析

将所有的节点都加入到链表中后,整个链表的结构变成了如下图所示:

Sentinel流控实现原理+代码实现分析原理分析源码分析

这样就将所有的Slot对象添加到链表中去了,每一个Slot都是继承自AbstractLinkedProcessorSlot。而AbstractLinkedProcessorSlot是一种责任链的设计,每个对象中都有一个next属性,指向的是另一个AbstractLinkedProcessorSlot对象。

执行SlotChain的entry方法

lookProcessChain方法获得的ProecssorSlotChain的实例是DefaultProcessorSlotChain,那么执行chain.entry方法,就会执行DefaultProcessorSlotChain的entry方法,而DefaultProcessorSlotChain的entry方法是这样的:

@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
        throws Throwable {
        first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
    }
           

也就是说,DefaultProcessorSlotChain的entry实际是执行的first属性的transformEntry方法。

而transformEntry方法会执行当前节点的entry方法,在DefaultProcessorSlotChain中first节点中重写了entry方法,具体如下:

@Override
        public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
            throws Throwable {
            super.fireEntry(context, resourceWrapper, t, count, prioritized, args);
        }
           

first节点的entry方法,实际又是执行的super的fireEntry方法:

@Override
    public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        if (next != null) {
            next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
        }
    }
           

从这里可以看到,从fireEntry方法中就开始执行entry了,这里会执行当前节点的下一个节点transformEntry方法,上面已经分析过了,transformEntry方法会触发当前节点的entry,也就是说fireEntry方法实际是触发了下一个节点的entry方法。具体的流程如下图所示:

Sentinel流控实现原理+代码实现分析原理分析源码分析

从图中可以看出,从最初的调用Chain的entry()方法,转变成调用SlotChain中Slot的entry()方法。从上面的分析可以知道,SlotChain中的第一个Slot节点是NodeSelectorSlot。

执行Slot的entry方法

现在看SlotChain中第一个节点NodeSelectorSlot的entry方法,具体代码如下:

@Override
    public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
        throws Throwable {
        /*
         * It's interesting that we use context name rather resource name as the map key.
         *
         * Remember that same resource({@link ResourceWrapper#equals(Object)}) will share
         * the same {@link ProcessorSlotChain} globally, no matter in which context. So if
         * code goes into {@link #entry(Context, ResourceWrapper, DefaultNode, int, Object...)},
         * the resource name must be same but context name may not.
         *
         * If we use {@link com.alibaba.csp.sentinel.SphU#entry(String resource)} to
         * enter same resource in different context, using context name as map key can
         * distinguish the same resource. In this case, multiple {@link DefaultNode}s will be created
         * of the same resource name, for every distinct context (different context name) each.
         *
         * Consider another question. One resource may have multiple {@link DefaultNode},
         * so what is the fastest way to get total statistics of the same resource?
         * The answer is all {@link DefaultNode}s with same resource name share one
         * {@link ClusterNode}. See {@link ClusterBuilderSlot} for detail.
         */
        DefaultNode node = map.get(context.getName());
        if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }

            }
        }

        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
           

从代码中可以看到,NodeSelectorSlot节点做了一些自己的业务逻辑处理,接着调用fireEntry()方法,由此触发下一个节点的entry方法。此时我们知道sentinel的责任链的传递方式:每个Slot节点执行完自己的业务后,会调用fireEntry来触发下一个节点的entry方法。所以可以将上面的图完整了,具体如下:

Sentinel流控实现原理+代码实现分析原理分析源码分析

至此,就通过SlotChain完成了对每个节点的entry()方法的调用,每个节点会根据创建的规则,进行自己的逻辑处理,当统计的结果达到设置的阈值时,就会触发限流、降级等事件,具体就是抛出BlockException异常。

类关系图

Sentinel流控实现原理+代码实现分析原理分析源码分析