天天看点

Dubbo负载均衡策略之最小活跃策略

大家好,又见面了,我是你们的朋友全栈君。

今天我来学习一下Dubbo负载均衡之一的最小活跃策略-LeastActiveLoadBalance

首先,让我们对负载均衡做一个简单的介绍。所谓集负载均衡,其含义就是指将负载(工作任务)进行平衡、分摊到多个操作单元上进行运行。负载均衡、集群容错、服务降级这三个概念在微服务中非常重要。从调用顺序来看,一次完整的RPC调用首先是负载均衡、其次是集群容错、最后是服务降级:负载均衡解决了选哪一个的问题、集群容错解决了换哪一个的问题、而服务降级则是解决了全错了怎么办的问题

今天我们要学习的策略是最小活跃策略-LeastActiveLoadBalance。顾名思义,这种策略就是寻找调用活跃度最低的服务提供者。

话不多说,直接分析源码

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();
        // The least active value of all invokers
        int leastActive = -1;
        // The number of invokers having the same least active value (leastActive)
        int leastCount = 0;
        // The index of invokers having the same least active value (leastActive)
        int[] leastIndexes = new int[length];
        // the weight of every invokers
        int[] weights = new int[length];
        // The sum of the warmup weights of all the least active invokers
        int totalWeight = 0;
        // The weight of the first least active invoker
        int firstWeight = 0;
        // Every least active invoker has the same weight value?
        boolean sameWeight = true;


        // Filter out all the least active invokers
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // Get the active number of the invoker
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            // Get the weight of the invoker's configuration. The default value is 100.
            int afterWarmup = getWeight(invoker, invocation);
            // save for later use
            weights[i] = afterWarmup;
            // If it is the first invoker or the active number of the invoker is less than the current least active number
            if (leastActive == -1 || active < leastActive) {
                // Reset the active number of the current invoker to the least active number
                leastActive = active;
                // Reset the number of least active invokers
                leastCount = 1;
                // Put the first least active invoker first in leastIndexes
                leastIndexes[0] = i;
                // Reset totalWeight
                totalWeight = afterWarmup;
                // Record the weight the first least active invoker
                firstWeight = afterWarmup;
                // Each invoke has the same weight (only one invoker here)
                sameWeight = true;
                // If current invoker's active value equals with leaseActive, then accumulating.
            } else if (active == leastActive) {
                // Record the index of the least active invoker in leastIndexes order
                leastIndexes[leastCount++] = i;
                // Accumulate the total weight of the least active invoker
                totalWeight += afterWarmup;
                // If every invoker has the same weight?
                if (sameWeight && afterWarmup != firstWeight) {
                    sameWeight = false;
                }
            }
        }
        // Choose an invoker from all the least active invokers
        if (leastCount == 1) {
            // If we got exactly one invoker having the least active value, return this invoker directly.
            return invokers.get(leastIndexes[0]);
        }
        if (!sameWeight && totalWeight > 0) {
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on 
            // totalWeight.
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
                offsetWeight -= weights[leastIndex];
                if (offsetWeight < 0) {
                    return invokers.get(leastIndex);
                }
            }
        }
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
    }           

复制

通过代码我们可以看出, 作者通过

RpcStatus.getStatus(invoker.getUrl(),invocation.getMethodName()).getActive()获取了每个提供者的活跃度,通过 getWeight(invoker, invocation) 获取了每个提供者的权重;最后再返回最低活跃提供者           

复制

如果只有一个最低活跃提供者,直接返回

if (leastCount == 1) {
            // 如果只存在一个最低活跃的提供者  直接返回
            return invokers.get(leastIndexes[0]);
        }           

复制

如果存在多个、但是不是所有提供者的权重都是一样的:先生成一个随机数(0-最小总权重),循环用这个随机数-最小活跃度提供者们的权重 当减到小于0时 返回此时的最小活跃提供者

if (!sameWeight && totalWeight > 0) {
            // 生成一个0-最小活跃度提供者总权重之间的整数
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            // 循环用这个随机数-最小活跃度提供者们的权重 当减到小于0时 返回此时的最小活跃提供者
            for (int i = 0; i < leastCount; i++) {
                int leastIndex = leastIndexes[i];
                offsetWeight -= weights[leastIndex];
                if (offsetWeight < 0) {
                    return invokers.get(leastIndex);
                }
            }
        }           

复制

当所有最小活跃提供者权重都一样:随机返回其中一个

invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);           

复制

通过上面的代码我们可以看出这个最小活跃的大致逻辑,那么每个提供者的活跃度又是怎么算的呢?

public static RpcStatus getStatus(URL url, String methodName) {
        String uri = url.toIdentityString();
        ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.computeIfAbsent(uri, k -> new ConcurrentHashMap<>());
        return map.computeIfAbsent(methodName, k -> new RpcStatus());
    }           

复制

是通过url和methodName来查出来的活跃度,url我们可以理解为一个提供者的ip,methodName的dubbo接口的方法,可见该活跃度是方法级别的活跃度而非接口级别!此处是取活跃度的地方,那么什么地方又是增加活跃度的呢?请接着往下看

public static boolean beginCount(URL url, String methodName, int max) {
        max = (max <= 0) ? Integer.MAX_VALUE : max;
        RpcStatus appStatus = getStatus(url);
        RpcStatus methodStatus = getStatus(url, methodName);
        if (methodStatus.active.get() == Integer.MAX_VALUE) {
            return false;
        }
        for (int i; ; ) {
            i = methodStatus.active.get();

            if (i == Integer.MAX_VALUE || i + 1 > max) {
                return false;
            }

            if (methodStatus.active.compareAndSet(i, i + 1)) {
                break;
            }
        }

        appStatus.active.incrementAndGet();

        return true;
    }           

复制

在RpcStatus中有一个beginCount的方法就是用来计数的,而调用这个方法的ActiveLimitFilter的dubbo的filter,这说明每一次发生rpc调用的时都会执行beginCount方法。

由此,我们可以清晰地看出最小活跃策略就是取目标rpc接口方法调用次数最少的一个提供者来作为本次rpc调用的提供者。但是这种最小活跃只是针对当前消费者,而不能针对整个集群,所以该最小活跃策略仍然不能将资源充分运用起来。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/157494.html原文链接:https://javaforall.cn