天天看點

es實戰-rebalance功能及源碼解析源碼解析

  1. rebalance tasks在es叢集裡面的表現形式:

通過調用 GET _cat/tasks?v API

傳回結果中 action 為 internal:index/shard/recovery/start_recovery(不僅僅是rebalance)

  1. 判斷shards移動狀況:

通過調用 GET _cat/recovery?v API

傳回結果中 type 為 peer;source_node 和 target_node 可以看出分片移動的方向;stage可以看出移動進行到哪一步: INIT->......->DONE

  1. 檢視分片狀态

通過調用 GET _cat/shards?v API

傳回結果中 可以看到移動的分片state為RELOCATING狀态

  1. 檢視每個節點分片數

使用kibana的monitor觀測或者通過:GET _nodes/stats/indices?level=shards 統計每個node的shards數組長度(感覺_cat/nodes API有必要添加shards數的監控)

Rebalance相關配置參數有以下3+3個:

cluster.routing.rebalance.enable//誰可以進行rebalance
cluster.routing.allocation.allow_rebalance//什麼時候可以rebalance
cluster.routing.allocation.cluster_concurrent_rebalance//rebalance的并行度(shards級别)

cluster.routing.allocation.balance.shard//allocate每個node上shard總數時計算的權重,提高這個值以後會使node上的shard總數基本趨于一緻
cluster.routing.allocation.balance.index//allocate每個index在一個node上shard數時計算的權重,提高這個值會使單個index的shard在叢集節點中均衡分布
cluster.routing.allocation.balance.threshold//門檻值,提高這個值可以提高叢集rebalance的惰性           

具體分析見下文......

源碼解析

抽象基類:AllocationDecider提供兩個判斷是否需要rebalane的方法

public abstract class AllocationDecider {
    //判斷是否可以進行shard routing
    public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) {
        return Decision.ALWAYS;
    }
    //判斷叢集是否可以進行rebalance操作(主要研究)
    public Decision canRebalance(RoutingAllocation allocation) {
        return Decision.ALWAYS;
    }
}           

AllocationDeciders類繼承了基類,用于彙總一組決策者的決定來确定最終決定。

public Decision canRebalance(RoutingAllocation allocation) {
    Decision.Multi ret = new Decision.Multi();
    for (AllocationDecider allocationDecider : allocations) {
        Decision decision = allocationDecider.canRebalance(allocation);
        // short track if a NO is returned.
        if (decision == Decision.NO) {
            if (!allocation.debugDecision()) {
                return decision;
            } else {
                ret.add(decision);
            }
        } else {
            addDecision(ret, decision, allocation);
        }
    }
    return ret;
}           

其中判斷叢集是否可以進行rebalance的決策者們如下:

  • EnableAllocationDecider

針對index.routing.rebalance.enable參數

  • ClusterRebalanceAllocationDecider

針對cluster.routing.allocation.allow_rebalance參數

  • ConcurrentRebalanceAllocationDecider

針對cluster.routing.allocation.cluster_concurrent_rebalance參數

具體的rebalance過程是由BalancedShardsAllocator類中allocate()方法中:調用Balancer的balanceByWeights()方法執行。

BalancedShardsAllocator初始化時會根據上文三個參數設定weightFunction(上文參數4,5)和Threshold(上文參數6)。

public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) {
    setWeightFunction(INDEX_BALANCE_FACTOR_SETTING.get(settings), SHARD_BALANCE_FACTOR_SETTING.get(settings));
    setThreshold(THRESHOLD_SETTING.get(settings));
    clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, SHARD_BALANCE_FACTOR_SETTING, this::setWeightFunction);
    clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
}

private void setWeightFunction(float indexBalance, float shardBalanceFactor) {
    weightFunction = new WeightFunction(indexBalance, shardBalanceFactor);
}

private void setThreshold(float threshold) {
    this.threshold = threshold;
}           

WeightFunction權重函數用于均衡計算節點間shards數量平衡和節點間每個索引shards數平衡,看注釋:

private static class WeightFunction {

    private final float indexBalance;
    private final float shardBalance;
    private final float theta0;
    private final float theta1;
    //預設 0.45 和 0.55 相加等于一
    WeightFunction(float indexBalance, float shardBalance) {
        float sum = indexBalance + shardBalance;
        if (sum <= 0.0f) {
            throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
        }
        //相加等于一則權重保持參數配置
        theta0 = shardBalance / sum;
        theta1 = indexBalance / sum;
        this.indexBalance = indexBalance;
        this.shardBalance = shardBalance;
    }
    //擷取權重計算結果,方式為通過Balancer政策和目前節點和目前索引計算
    float weight(Balancer balancer, ModelNode node, String index) {
        //目前節點的shards數減去平均的shards數
        final float weightShard = node.numShards() - balancer.avgShardsPerNode();
        //目前節點目前索引shards數減去平均的shards數
        final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
        //乘以系數得出結果
        return theta0 * weightShard + theta1 * weightIndex;
    }
}           

再說Balancer:它的具體三個工作如下所示(本文主要想研究balance):

public void allocate(RoutingAllocation allocation) {
    if (allocation.routingNodes().size() == 0) {
        failAllocationOfNewPrimaries(allocation);
        return;
    }
    final Balancer balancer = new Balancer(logger, allocation, weightFunction, threshold);
    //配置設定未配置設定的shards
    balancer.allocateUnassigned();
    //重配置設定需要遷移的shards(一些配置設定規則的限制)
    balancer.moveShards();
    //盡量平衡分片在節點的數量
    balancer.balance();//最終調用balanceByWeights()
}           

接下來看balance():

  • 首先你想看balance過程得開啟日log的trace
  • issue 14387,叢集OK且shards OK才rebalance,否則可能做無用功
  • 調用上文提到的canRebalance()判斷是否可以進行
  • 節點隻有一個沒必要進行
  • 開始進行rebalance
private void balance() {
    if (logger.isTraceEnabled()) {
        logger.trace("Start balancing cluster");
    }
    if (allocation.hasPendingAsyncFetch()) {
        /*
         * see https://github.com/elastic/elasticsearch/issues/14387
         * if we allow rebalance operations while we are still fetching shard store data
         * we might end up with unnecessary rebalance operations which can be super confusion/frustrating
         * since once the fetches come back we might just move all the shards back again.
         * Therefore we only do a rebalance if we have fetched all information.
         */
        logger.debug("skipping rebalance due to in-flight shard/store fetches");
        return;
    }
    if (allocation.deciders().canRebalance(allocation).type() != Type.YES) {
        logger.trace("skipping rebalance as it is disabled");
        return;
    }
    if (nodes.size() < 2) { /* skip if we only have one node */
        logger.trace("skipping rebalance as single node only");
        return;
    }
    balanceByWeights();//核心方法
}           

接下來看balanceByWeights():

核心代碼在此

内容比較多,英文注釋已去除,添加了詳細的中文注釋,一定要捋一遍......

private void balanceByWeights() {
    //判斷是否要rebanlance的決策者
    final AllocationDeciders deciders = allocation.deciders();
    //節點資訊:包括節點shards數和節點内每個index的shards數
    final ModelNode[] modelNodes = sorter.modelNodes;
    //節點内每個索引的權重資訊
    final float[] weights = sorter.weights;
    //處理每個索引
    for (String index : buildWeightOrderedIndices()) {
        IndexMetadata indexMetadata = metadata.index(index);
        //找到含有索引shards或者索引shards可以移動過去的節點,并将其移動到ModelNode數組靠前的位置
        int relevantNodes = 0;
        for (int i = 0; i < modelNodes.length; i++) {
            ModelNode modelNode = modelNodes[i];
            if (modelNode.getIndex(index) != null
                || deciders.canAllocate(indexMetadata, modelNode.getRoutingNode(), allocation).type() != Type.NO) {
                // swap nodes at position i and relevantNodes
                modelNodes[i] = modelNodes[relevantNodes];
                modelNodes[relevantNodes] = modelNode;
                relevantNodes++;
            }
        }
        //沒有或者隻有一個相關節點則跳過
        if (relevantNodes < 2) {
            continue;
        }
        //對相關節點重新計算權重并排序
        sorter.reset(index, 0, relevantNodes);
        //準備對相關節點即前relevantNodes個節點下手
        int lowIdx = 0;
        int highIdx = relevantNodes - 1;
        while (true) {
            final ModelNode minNode = modelNodes[lowIdx];
            final ModelNode maxNode = modelNodes[highIdx];
            advance_range:
            if (maxNode.numShards(index) > 0) {
                //計算相關節點的最大權重內插補點,如果低于參數3配置的值則跳過
                final float delta = absDelta(weights[lowIdx], weights[highIdx]);
                if (lessThan(delta, threshold)) {
                    if (lowIdx > 0 && highIdx-1 > 0 && (absDelta(weights[0], weights[highIdx-1]) > threshold) ) {
                        break advance_range;
                    }
                    if (logger.isTraceEnabled()) {
                        logger.trace("Stop balancing index [{}]  min_node [{}] weight: [{}]" +
                                "  max_node [{}] weight: [{}]  delta: [{}]",
                                index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
                    }
                    break;
                }
                if (logger.isTraceEnabled()) {
                    logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}]  delta: [{}]",
                            maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
                }
                //權重內插補點小于預設值1則跳過?應該寫配置參數而不是寫死1吧?
                if (delta <= 1.0f) {
                    logger.trace("Couldn't find shard to relocate from node [{}] to node [{}]",
                        maxNode.getNodeId(), minNode.getNodeId());
                    //進行分片們移動,在兩個節點間進行全部可能的ShardRouting。
                } else if (tryRelocateShard(minNode, maxNode, index)) {
                    //移動完成後由于節點shards數發生編發,會重新計算他們的權重并重新排序,開啟下一輪計算
                    weights[lowIdx] = sorter.weight(modelNodes[lowIdx]);
                    weights[highIdx] = sorter.weight(modelNodes[highIdx]);
                    sorter.sort(0, relevantNodes);
                    lowIdx = 0;
                    highIdx = relevantNodes - 1;
                    continue;
                }
            }
            //如果本輪沒有移動情況,節點權重沒有發生改變,則繼續處理其他的相關節點
            if (lowIdx < highIdx - 1) {
                lowIdx++;
            } else if (lowIdx > 0) {
                lowIdx = 0;
                highIdx--;
            } else {
                //目前索引已經平衡
                break;
            }
        }
    }
}           

接下來看tryRelocateShard()方法,在兩個節點進行分片們的平衡:

//TODO

繼續閱讀