天天看點

dubbo源碼分析-consumer端4-ClusterInvoker與LoadBalance

        dubbo中提供了多種叢集調用政策:

        1、FailbackClusterInvoker :  失敗自動恢複,背景記錄失敗請求,定時重發,通常用于消息通知操作;

        2、FailfastClusterInvoker: 快速失敗,隻發起一次調用,失敗立即報錯,通常用于非幂等性的寫操作;

        3、FailoverClusterInvoker: 失敗轉移,當出現失敗,重試其它伺服器,通常用于讀操作,但重試會帶來更長延遲;

        4、FailsafeClusterInvoker: 失敗安全,出現異常時,直接忽略,通常用于寫入審計日志等操作;

        5、FokingClusterInvoker: 并行調用,隻要一個成功即傳回,通常用于實時性要求較高的操作,但需要浪費更多服務資源;

        6、MergeableClusterInvoker:合并多個組的傳回資料;

        開發者可以根據實際情況選擇合适的政策,這裡我們選擇FailoverClusterInvoker(官方推薦)進行講解,通過它來了解叢集調用處理的問題,了解它以後其他的政策也很容易了。

        由多個相同服務共同組成的一套服務,通過分布式的部署,達到服務的高可用,這就是叢集。與單機的服務不同的是,我們至少需要:1、位址服務(Directory);2、負載均衡(LoadBalance)。  位址服務用于位址的管理,如緩存位址、服務上下線的處理、對外提供位址清單等,通過位址服務,我們可以知道所有可用服務的位址資訊。負載均衡,則是通過一定的算法将壓力分攤到各個服務上。 好了,知道這兩個概念後,我們開始正式的代碼閱讀。

        當應用需要調用服務時,會通過invoke方法發起調用(AbstractClusterInvoker):

public Result invoke(final Invocation invocation) throws RpcException {
        // 是否被銷毀
        checkWheatherDestoried();

        LoadBalance loadbalance;
        // 通過位址服務擷取所有可用的位址資訊
        List<Invoker<T>> invokers = list(invocation);
       
        if (invokers != null && invokers.size() > 0) {
            // 如果存在位址資訊,則根據位址中的配置加載LoadBalance,注意負責均衡政策配置的優先級 privider > consumer            
           loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            // 如果暫時沒有位址資訊,則使用預設的負載均衡政策政策(random)
           loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        // 如果是異步的話需要加入相應的資訊
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        // 根據位址及負載均衡政策發起調用
        return doInvoke(invocation, invokers, loadbalance);
    }

    protected  List<Invoker<T>> list(Invocation invocation) throws RpcException {
     List<Invoker<T>> invokers = directory.list(invocation);
     return invokers;
   }
           

        doInvoke則是各個子類來實作,以FailoverClusterInvoker為例:

public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    	List<Invoker<T>> copyinvokers = invokers;
        // 檢查位址清單是否正确(需要確定有可用的位址)
    	checkInvokers(copyinvokers, invocation);
        // 從retries參數擷取重試的次數,如retries=3,則最大可能調用的次數為4
        // 需要注意的是預設的重試次數為2(及最多執行3次),對于一些寫服務來說,如果無法做到幂等,最好是将retries參數設為0,或者使用failfast政策
        int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
        if (len <= 0) {
            len = 1;
        }
        // retry loop.
        RpcException le = null; // last exception.
        List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
        Set<String> providers = new HashSet<String>(len);
        // 發起指定次數的調用,一旦其中一次成功則傳回
        for (int i = 0; i < len; i++) {
        	//重試時,進行重新選擇,避免重試時invoker清單已發生變化.
        	//注意:如果清單發生了變化,那麼invoked判斷會失效,因為invoker示例已經改變
        	if (i > 0) {
        		checkWheatherDestoried();
        		copyinvokers = list(invocation);
        		//重新檢查一下
        		checkInvokers(copyinvokers, invocation);
        	}
            // 根據負載均衡算法得到一個位址
            Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
            // 記錄發起過調用的位址,防止重試時調用了已經調用過的位址
            invoked.add(invoker);
            // 
            RpcContext.getContext().setInvokers((List)invoked);
            try {
                // 通過之前選出的位址進行調用
                Result result = invoker.invoke(invocation);
                // 調用成功,判斷是否重試過,如果重試過,記錄下警告資訊,記錄失敗的位址
                if (le != null && logger.isWarnEnabled()) {
                    logger.warn("Although retry the method " + invocation.getMethodName()
                            + " in the service " + getInterface().getName()
                            + " was successful by the provider " + invoker.getUrl().getAddress()
                            + ", but there have been failed providers " + providers 
                            + " (" + providers.size() + "/" + copyinvokers.size()
                            + ") from the registry " + directory.getUrl().getAddress()
                            + " on the consumer " + NetUtils.getLocalHost()
                            + " using the dubbo version " + Version.getVersion() + ". Last error is: "
                            + le.getMessage(), le);
                }
                return result;
            } catch (RpcException e) {
                // 如果是業務異常則直接抛出錯誤,其他(如逾時等錯誤)則不重試
                if (e.isBiz()) { // biz exception.
                    throw e;
                }
                le = e;
            } catch (Throwable e) {
                le = new RpcException(e.getMessage(), e);
            } finally {
                // 發生過調用的位址記錄下來
                providers.add(invoker.getUrl().getAddress());
            }
        }
        throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
                + invocation.getMethodName() + " in the service " + getInterface().getName() 
                + ". Tried " + len + " times of the providers " + providers 
                + " (" + providers.size() + "/" + copyinvokers.size() 
                + ") from the registry " + directory.getUrl().getAddress()
                + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
                + Version.getVersion() + ". Last error is: "
                + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
    }
           

        可以看到failover政策實作很簡單,得到位址資訊後,通過負載均衡算法選取一個位址來發送請求, 如果産生了非業務異常則按照配置的次數進行重試。

        下面我們來看看位址的選取過程:

/**
     * 使用loadbalance選擇invoker.</br>
     * a)先lb選擇,如果在selected清單中 或者 不可用且做檢驗時,進入下一步(重選),否則直接傳回</br>
     * b)重選驗證規則:selected > available .保證重選出的結果盡量不在select中,并且是可用的 
     * 
     * @param availablecheck 如果設定true,在選擇的時候先選invoker.available == true
     * @param selected 已選過的invoker.注意:輸入保證不重複
     * 
     */
    protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
        if (invokers == null || invokers.size() == 0)
            return null;
        String methodName = invocation == null ? "" : invocation.getMethodName();
    
        // 如果sticky為true,則該接口上的所有方法使用相同的provider    
        boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ; {
            //如果之前的provider已經不存在了則将其設定為null 
            if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){ 
                stickyInvoker = null; 
            } 
            // 如果sticky為true,且之前有調用過的provider且該provider未失敗則繼續使用該provider 
            if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){ 
                if (availablecheck && stickyInvoker.isAvailable()){ 
                    return stickyInvoker; 
                } 
            } 
        } 
        // 重新選擇invoker
        Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected); 
        if (sticky){ 
           stickyInvoker = invoker; 
        } 

        return invoker; 
    } 

    private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException{
       if (invokers == null || invokers.size() == 0) 
            return null; 

        if (invokers.size() == 1) 
            return invokers.get(0); 
        // 如果隻有兩個invoker,且産生過失敗,則退化成輪循 
        if (invokers.size() == 2 && selected != null && selected.size() > 0) { 
            return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0); 
        } 

        // 通過負載均衡算法得到一個invoker
        Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation); 
        //如果 selected中包含(優先判斷) 或者 需要檢測可用性且目前選擇的invoker不可用 則重新選擇. 
        if( (selected != null && selected.contains(invoker)) ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){ 
            try{ 
                Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); 
                if(rinvoker != null){ 
                    invoker = rinvoker; 
                }else{ 
                    //如果重新選擇沒有選出invoker,則選第一次選的下一個invoker. 
                    int index = invokers.indexOf(invoker); 
                    try{ 
                        //最後在避免碰撞 
                        invoker = index <invokers.size()-1?invokers.get(index+1) :invoker; 
                    }catch (Exception e) { 
                       logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e); 
                    } 
                } 
            }catch (Throwable t){ 
                logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);
           } 
         }
 
         return invoker; 
    } 

     /**
     * 重選,先從非selected的清單中選擇,沒有在從selected清單中選擇.
     * @param loadbalance
     * @param invocation
     * @param invokers
     * @param selected
     * @return
     * @throws RpcException
     */
    private Invoker<T> reselect(LoadBalance loadbalance,Invocation invocation,
                                List<Invoker<T>> invokers, List<Invoker<T>> selected ,boolean availablecheck)
            throws RpcException {
        
        //預先配置設定一個,這個清單是一定會用到的.
        List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size()>1?(invokers.size()-1):invokers.size());
        
        //先從非select中選
        if( availablecheck ){ //選isAvailable 的非select
            for(Invoker<T> invoker : invokers){
                if(invoker.isAvailable()){
                    if(selected ==null || !selected.contains(invoker)){
                        reselectInvokers.add(invoker);
                    }
                }
            }
            if(reselectInvokers.size()>0){
                return  loadbalance.select(reselectInvokers, getUrl(), invocation);
            }
        }else{ //選全部非select
            for(Invoker<T> invoker : invokers){
                if(selected ==null || !selected.contains(invoker)){
                    reselectInvokers.add(invoker);
                }
            }
            if(reselectInvokers.size()>0){
                return  loadbalance.select(reselectInvokers, getUrl(), invocation);
            }
        }
        //最後從select中選可用的. 
        {
            if(selected != null){
                for(Invoker<T> invoker : selected){
                    if((invoker.isAvailable()) //優先選available 
                            && !reselectInvokers.contains(invoker)){
                        reselectInvokers.add(invoker);
                    }
                }
            }
            if(reselectInvokers.size()>0){
                return  loadbalance.select(reselectInvokers, getUrl(), invocation);
            }
        }
        return null;
    }

           

         選擇invoker的過程如下:

        1、如果配置sticky為true,則檢視之前是否有已經調用過的invoker,如果有且可用則直接使用

         2、如果隻有一個位址,則直接使用該位址

         3、如果有兩個位址,且已經調用過一個位址,則使用另一個位址

         4、第一次使用負載均衡算法得到一個invoker。滿足兩個條件則使用該invoker,如果不滿足則繼續第5步的重新選擇

                 條件1 該位址在該請求中未使用(注一次請求含第一次調用及後面的重試)

                 條件2 設定了檢測可用性且可用  或  沒設定檢測可用性

         5、如果設定了檢測可用性則擷取所有可用且本次請求未使用過的invoker,如果未設定則擷取所有本次請求未使用過的invoker,

               如果得到的invoker不為空,則使用負載均衡從這批invoker中選擇一個

         6、如果還是沒有選出invoker則從已經使用過的invoker中找可用的invoker,從這些可用的invoker中利用負載均衡算法得到一個invoker

          7、如果以上步驟均未選出invoker,則選擇第4步得到的invoker的下一個invoker,如果第4步得到的invoker已經是最後一個則直接選此invoker

          可以看到雖然有負載均衡政策,但仍然有很多分支狀況需要處理。

          選出invoker後就可以進行調用了。後面的過程由單獨的文章繼續講解。我們回過頭來看看負載均衡算法的實作, 首先看看負載均衡的接口:

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

	/**
	 * select one invoker in list.
	 * 
	 * @param invokers invokers.
	 * @param url refer url
	 * @param invocation invocation.
	 * @return selected invoker.
	 */
    @Adaptive("loadbalance")
	<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}
           

        負載均衡提供的select方法共有三個參數,invokers:可用的服務清單,url:包含consumer的資訊,invocation:目前調用的資訊。 預設的負載均衡算法為RandomLoadBalance, 這裡我們就先講講這個随機排程算法。

public class RandomLoadBalance extends AbstractLoadBalance {

    public static final String NAME = "random";

    private final Random random = new Random();

    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        int length = invokers.size(); // 總個數
        int totalWeight = 0; // 總權重
        boolean sameWeight = true; // 權重是否都一樣
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            totalWeight += weight; // 累計總權重
            if (sameWeight && i > 0
                    && weight != getWeight(invokers.get(i - 1), invocation)) {
                sameWeight = false; // 計算所有權重是否一樣
            }
        }
        if (totalWeight > 0 && ! sameWeight) {
            // 如果權重不相同且權重大于0則按總權重數随機
            int offset = random.nextInt(totalWeight);
            // 并确定随機值落在哪個片斷上
            for (int i = 0; i < length; i++) {
                offset -= getWeight(invokers.get(i), invocation);
                if (offset < 0) {
                    return invokers.get(i);
                }
            }
        }
        // 如果權重相同或權重為0則均等随機
        return invokers.get(random.nextInt(length));
    }

}
           

        随機排程算法分兩種情況:

        1、當所有服務提供者權重相同或者無權重時,根據清單size得到一個值,再随機出一個[0, size)的數值,根據這個數值取對應位置的服務提供者;

        2、計算所有服務提供者權重之和,例如以下5個Invoker,總權重為25,則随機出[0, 24]的一個值,根據各個Invoker的區間來取Invoker,

               如随機值為10,則選擇Invoker2;

dubbo源碼分析-consumer端4-ClusterInvoker與LoadBalance

        需要注意的是取權重的方法getWeight不是直接取配置中的權重,其算法如下:

protected int getWeight(Invoker<?> invoker, Invocation invocation) {
        // 先擷取provider配置的權重(預設100)
        int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
        if (weight > 0) {
                // 擷取provider的啟動時間
	        long timestamp = invoker.getUrl().getParameter(Constants.TIMESTAMP_KEY, 0L);
	    	if (timestamp > 0L) {
                        // 計算出啟動時長
	    		int uptime = (int) (System.currentTimeMillis() - timestamp);
                        // 擷取預熱時間(預設600000,即10分鐘),注意warmup不是provider的基本參數,需要通過dubbo:paramater配置
	    		int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
                        // 如果啟動時長小于預熱時間,則需要降級。 權重計算方式為啟動時長占預熱時間的百分比乘以權重,
                        // 如啟動時長為20000ms,預熱時間為60000ms,權重為120,則最終權重為 120 * (1/3) = 40,
                        // 注意calculateWarmupWeight使用float進行計算,是以結果并不精确。    		
                        if (uptime > 0 && uptime < warmup) {
	    			weight = calculateWarmupWeight(uptime, warmup, weight);
	    		}
	    	}
        }
    	return weight;
    }
    
    static int calculateWarmupWeight(int uptime, int warmup, int weight) {
    	int ww = (int) ( (float) uptime / ( (float) warmup / (float) weight ) );
    	return ww < 1 ? 1 : (ww > weight ? weight : ww);
    }
           

        到這裡,負載均衡的随機排程算法就分析完了,實作還是比較簡單的。dubbo還實作了其他幾個算法:

        RoundRobinLoadBalance:輪詢排程算法(2.5.3版本有bug,2.5.4-snapshot正常,有興趣的請看後面這個版本的代碼) 例如invoker0-權重3,invoker1-權重1,invoker2-權重2,則選取的invoker順序依次是:第一輪:0,1(本輪invoker1消耗完),2,0,2(本輪invoker2消耗完),0(本輪invoker0消耗完), 第二輪重複第一輪的順序。

        LeastActiveLoadBalance:最少活躍調用數排程算法,通過活躍數統計,找出活躍數最少的provider,如果隻有一個最小的則直接選這個,如果活躍數最少的provider有多個,則用與RandomLoadBalance相同的政策來從這幾個provider中選取一個。

       ConsistentHashLoadBalance:一緻性hash排程算法,根據參數計算得到一個provider,後續相同的參數使用同樣的provider。

繼續閱讀