dubbo中提供了多種叢集調用政策:
1、FailbackClusterInvoker : 失敗自動恢複,背景記錄失敗請求,定時重發,通常用于消息通知操作;
2、FailfastClusterInvoker: 快速失敗,隻發起一次調用,失敗立即報錯,通常用于非幂等性的寫操作;
3、FailoverClusterInvoker: 失敗轉移,當出現失敗,重試其它伺服器,通常用于讀操作,但重試會帶來更長延遲;
4、FailsafeClusterInvoker: 失敗安全,出現異常時,直接忽略,通常用于寫入審計日志等操作;
5、FokingClusterInvoker: 并行調用,隻要一個成功即傳回,通常用于實時性要求較高的操作,但需要浪費更多服務資源;
6、MergeableClusterInvoker:合并多個組的傳回資料;
開發者可以根據實際情況選擇合适的政策,這裡我們選擇FailoverClusterInvoker(官方推薦)進行講解,通過它來了解叢集調用處理的問題,了解它以後其他的政策也很容易了。
由多個相同服務共同組成的一套服務,通過分布式的部署,達到服務的高可用,這就是叢集。與單機的服務不同的是,我們至少需要:1、位址服務(Directory);2、負載均衡(LoadBalance)。 位址服務用于位址的管理,如緩存位址、服務上下線的處理、對外提供位址清單等,通過位址服務,我們可以知道所有可用服務的位址資訊。負載均衡,則是通過一定的算法将壓力分攤到各個服務上。 好了,知道這兩個概念後,我們開始正式的代碼閱讀。
當應用需要調用服務時,會通過invoke方法發起調用(AbstractClusterInvoker):
public Result invoke(final Invocation invocation) throws RpcException {
// 是否被銷毀
checkWheatherDestoried();
LoadBalance loadbalance;
// 通過位址服務擷取所有可用的位址資訊
List<Invoker<T>> invokers = list(invocation);
if (invokers != null && invokers.size() > 0) {
// 如果存在位址資訊,則根據位址中的配置加載LoadBalance,注意負責均衡政策配置的優先級 privider > consumer
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
// 如果暫時沒有位址資訊,則使用預設的負載均衡政策政策(random)
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
// 如果是異步的話需要加入相應的資訊
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
// 根據位址及負載均衡政策發起調用
return doInvoke(invocation, invokers, loadbalance);
}
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
doInvoke則是各個子類來實作,以FailoverClusterInvoker為例:
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
// 檢查位址清單是否正确(需要確定有可用的位址)
checkInvokers(copyinvokers, invocation);
// 從retries參數擷取重試的次數,如retries=3,則最大可能調用的次數為4
// 需要注意的是預設的重試次數為2(及最多執行3次),對于一些寫服務來說,如果無法做到幂等,最好是将retries參數設為0,或者使用failfast政策
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// 發起指定次數的調用,一旦其中一次成功則傳回
for (int i = 0; i < len; i++) {
//重試時,進行重新選擇,避免重試時invoker清單已發生變化.
//注意:如果清單發生了變化,那麼invoked判斷會失效,因為invoker示例已經改變
if (i > 0) {
checkWheatherDestoried();
copyinvokers = list(invocation);
//重新檢查一下
checkInvokers(copyinvokers, invocation);
}
// 根據負載均衡算法得到一個位址
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
// 記錄發起過調用的位址,防止重試時調用了已經調用過的位址
invoked.add(invoker);
//
RpcContext.getContext().setInvokers((List)invoked);
try {
// 通過之前選出的位址進行調用
Result result = invoker.invoke(invocation);
// 調用成功,判斷是否重試過,如果重試過,記錄下警告資訊,記錄失敗的位址
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + invocation.getMethodName()
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
// 如果是業務異常則直接抛出錯誤,其他(如逾時等錯誤)則不重試
if (e.isBiz()) { // biz exception.
throw e;
}
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
// 發生過調用的位址記錄下來
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "
+ invocation.getMethodName() + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyinvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);
}
可以看到failover政策實作很簡單,得到位址資訊後,通過負載均衡算法選取一個位址來發送請求, 如果産生了非業務異常則按照配置的次數進行重試。
下面我們來看看位址的選取過程:
/**
* 使用loadbalance選擇invoker.</br>
* a)先lb選擇,如果在selected清單中 或者 不可用且做檢驗時,進入下一步(重選),否則直接傳回</br>
* b)重選驗證規則:selected > available .保證重選出的結果盡量不在select中,并且是可用的
*
* @param availablecheck 如果設定true,在選擇的時候先選invoker.available == true
* @param selected 已選過的invoker.注意:輸入保證不重複
*
*/
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (invokers == null || invokers.size() == 0)
return null;
String methodName = invocation == null ? "" : invocation.getMethodName();
// 如果sticky為true,則該接口上的所有方法使用相同的provider
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName,Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY) ; {
//如果之前的provider已經不存在了則将其設定為null
if ( stickyInvoker != null && !invokers.contains(stickyInvoker) ){
stickyInvoker = null;
}
// 如果sticky為true,且之前有調用過的provider且該provider未失敗則繼續使用該provider
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))){
if (availablecheck && stickyInvoker.isAvailable()){
return stickyInvoker;
}
}
}
// 重新選擇invoker
Invoker<T> invoker = doselect(loadbalance, invocation, invokers, selected);
if (sticky){
stickyInvoker = invoker;
}
return invoker;
}
private Invoker<T> doselect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException{
if (invokers == null || invokers.size() == 0)
return null;
if (invokers.size() == 1)
return invokers.get(0);
// 如果隻有兩個invoker,且産生過失敗,則退化成輪循
if (invokers.size() == 2 && selected != null && selected.size() > 0) {
return selected.get(0) == invokers.get(0) ? invokers.get(1) : invokers.get(0);
}
// 通過負載均衡算法得到一個invoker
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//如果 selected中包含(優先判斷) 或者 需要檢測可用性且目前選擇的invoker不可用 則重新選擇.
if( (selected != null && selected.contains(invoker)) ||(!invoker.isAvailable() && getUrl()!=null && availablecheck)){
try{
Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if(rinvoker != null){
invoker = rinvoker;
}else{
//如果重新選擇沒有選出invoker,則選第一次選的下一個invoker.
int index = invokers.indexOf(invoker);
try{
//最後在避免碰撞
invoker = index <invokers.size()-1?invokers.get(index+1) :invoker;
}catch (Exception e) {
logger.warn(e.getMessage()+" may because invokers list dynamic change, ignore.",e);
}
}
}catch (Throwable t){
logger.error("clustor relselect fail reason is :"+t.getMessage() +" if can not slove ,you can set cluster.availablecheck=false in url",t);
}
}
return invoker;
}
/**
* 重選,先從非selected的清單中選擇,沒有在從selected清單中選擇.
* @param loadbalance
* @param invocation
* @param invokers
* @param selected
* @return
* @throws RpcException
*/
private Invoker<T> reselect(LoadBalance loadbalance,Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected ,boolean availablecheck)
throws RpcException {
//預先配置設定一個,這個清單是一定會用到的.
List<Invoker<T>> reselectInvokers = new ArrayList<Invoker<T>>(invokers.size()>1?(invokers.size()-1):invokers.size());
//先從非select中選
if( availablecheck ){ //選isAvailable 的非select
for(Invoker<T> invoker : invokers){
if(invoker.isAvailable()){
if(selected ==null || !selected.contains(invoker)){
reselectInvokers.add(invoker);
}
}
}
if(reselectInvokers.size()>0){
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}else{ //選全部非select
for(Invoker<T> invoker : invokers){
if(selected ==null || !selected.contains(invoker)){
reselectInvokers.add(invoker);
}
}
if(reselectInvokers.size()>0){
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
//最後從select中選可用的.
{
if(selected != null){
for(Invoker<T> invoker : selected){
if((invoker.isAvailable()) //優先選available
&& !reselectInvokers.contains(invoker)){
reselectInvokers.add(invoker);
}
}
}
if(reselectInvokers.size()>0){
return loadbalance.select(reselectInvokers, getUrl(), invocation);
}
}
return null;
}
選擇invoker的過程如下:
1、如果配置sticky為true,則檢視之前是否有已經調用過的invoker,如果有且可用則直接使用
2、如果隻有一個位址,則直接使用該位址
3、如果有兩個位址,且已經調用過一個位址,則使用另一個位址
4、第一次使用負載均衡算法得到一個invoker。滿足兩個條件則使用該invoker,如果不滿足則繼續第5步的重新選擇
條件1 該位址在該請求中未使用(注一次請求含第一次調用及後面的重試)
條件2 設定了檢測可用性且可用 或 沒設定檢測可用性
5、如果設定了檢測可用性則擷取所有可用且本次請求未使用過的invoker,如果未設定則擷取所有本次請求未使用過的invoker,
如果得到的invoker不為空,則使用負載均衡從這批invoker中選擇一個
6、如果還是沒有選出invoker則從已經使用過的invoker中找可用的invoker,從這些可用的invoker中利用負載均衡算法得到一個invoker
7、如果以上步驟均未選出invoker,則選擇第4步得到的invoker的下一個invoker,如果第4步得到的invoker已經是最後一個則直接選此invoker
可以看到雖然有負載均衡政策,但仍然有很多分支狀況需要處理。
選出invoker後就可以進行調用了。後面的過程由單獨的文章繼續講解。我們回過頭來看看負載均衡算法的實作, 首先看看負載均衡的接口:
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
/**
* select one invoker in list.
*
* @param invokers invokers.
* @param url refer url
* @param invocation invocation.
* @return selected invoker.
*/
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
負載均衡提供的select方法共有三個參數,invokers:可用的服務清單,url:包含consumer的資訊,invocation:目前調用的資訊。 預設的負載均衡算法為RandomLoadBalance, 這裡我們就先講講這個随機排程算法。
public class RandomLoadBalance extends AbstractLoadBalance {
public static final String NAME = "random";
private final Random random = new Random();
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // 總個數
int totalWeight = 0; // 總權重
boolean sameWeight = true; // 權重是否都一樣
for (int i = 0; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
totalWeight += weight; // 累計總權重
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false; // 計算所有權重是否一樣
}
}
if (totalWeight > 0 && ! sameWeight) {
// 如果權重不相同且權重大于0則按總權重數随機
int offset = random.nextInt(totalWeight);
// 并确定随機值落在哪個片斷上
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}
}
// 如果權重相同或權重為0則均等随機
return invokers.get(random.nextInt(length));
}
}
随機排程算法分兩種情況:
1、當所有服務提供者權重相同或者無權重時,根據清單size得到一個值,再随機出一個[0, size)的數值,根據這個數值取對應位置的服務提供者;
2、計算所有服務提供者權重之和,例如以下5個Invoker,總權重為25,則随機出[0, 24]的一個值,根據各個Invoker的區間來取Invoker,
如随機值為10,則選擇Invoker2;
需要注意的是取權重的方法getWeight不是直接取配置中的權重,其算法如下:
protected int getWeight(Invoker<?> invoker, Invocation invocation) {
// 先擷取provider配置的權重(預設100)
int weight = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.WEIGHT_KEY, Constants.DEFAULT_WEIGHT);
if (weight > 0) {
// 擷取provider的啟動時間
long timestamp = invoker.getUrl().getParameter(Constants.TIMESTAMP_KEY, 0L);
if (timestamp > 0L) {
// 計算出啟動時長
int uptime = (int) (System.currentTimeMillis() - timestamp);
// 擷取預熱時間(預設600000,即10分鐘),注意warmup不是provider的基本參數,需要通過dubbo:paramater配置
int warmup = invoker.getUrl().getParameter(Constants.WARMUP_KEY, Constants.DEFAULT_WARMUP);
// 如果啟動時長小于預熱時間,則需要降級。 權重計算方式為啟動時長占預熱時間的百分比乘以權重,
// 如啟動時長為20000ms,預熱時間為60000ms,權重為120,則最終權重為 120 * (1/3) = 40,
// 注意calculateWarmupWeight使用float進行計算,是以結果并不精确。
if (uptime > 0 && uptime < warmup) {
weight = calculateWarmupWeight(uptime, warmup, weight);
}
}
}
return weight;
}
static int calculateWarmupWeight(int uptime, int warmup, int weight) {
int ww = (int) ( (float) uptime / ( (float) warmup / (float) weight ) );
return ww < 1 ? 1 : (ww > weight ? weight : ww);
}
到這裡,負載均衡的随機排程算法就分析完了,實作還是比較簡單的。dubbo還實作了其他幾個算法:
RoundRobinLoadBalance:輪詢排程算法(2.5.3版本有bug,2.5.4-snapshot正常,有興趣的請看後面這個版本的代碼) 例如invoker0-權重3,invoker1-權重1,invoker2-權重2,則選取的invoker順序依次是:第一輪:0,1(本輪invoker1消耗完),2,0,2(本輪invoker2消耗完),0(本輪invoker0消耗完), 第二輪重複第一輪的順序。
LeastActiveLoadBalance:最少活躍調用數排程算法,通過活躍數統計,找出活躍數最少的provider,如果隻有一個最小的則直接選這個,如果活躍數最少的provider有多個,則用與RandomLoadBalance相同的政策來從這幾個provider中選取一個。
ConsistentHashLoadBalance:一緻性hash排程算法,根據參數計算得到一個provider,後續相同的參數使用同樣的provider。