天天看點

ElasticJob‐Lite:作業分片政策介紹與源碼分析

分片

彈性排程是​

​ElasticJob​

​​最重要的功能,也是這款産品名稱的由來。它是一款能夠讓任務通過分片進行水

平擴充的任務處理系統。

​ElasticJob​

​​中任務分片項的概念,使得任務可以在分布式的環境下運作,每台任務伺服器隻運作配置設定給該伺服器的分片。随着伺服器的增加或當機,​

​ElasticJob​

​​會近乎實時的感覺伺服器數量的變更,進而重新為分布式的任務伺服器配置設定更加合理的任務分片項,使得任務可以随着資源的增加而提升效率。

任務的分布式執行,需要将一個任務拆分為多個獨立的任務項,然後由分布式的伺服器分别執行某一個或幾個分片項。

舉例說明,如果作業分為​

​4​

​​片,用兩台伺服器執行,則每個伺服器分到​

​2​

​​片,分别負責作業的​

​50%​

​的負載,如下圖所示。

ElasticJob‐Lite:作業分片政策介紹與源碼分析

​ElasticJob​

​​并不直接提供資料處理的功能,而是将分片項配置設定至各個運作中的作業伺服器,開發者需要自行處理分片項與業務的對應關系。分片項為數字,始于​

​0​

​​而終于分片總數減​

​1​

​​。以上是​

​ElasticJob​

​​的官方文檔對分片的描述,而文檔對作業分片政策的介紹非常簡單,隻給了作業分片政策的​

​SPI​

​名稱,如下圖所示:

ElasticJob‐Lite:作業分片政策介紹與源碼分析

作業分片政策

部落客目前使用的是​

​3.0.1​

​​版本的​

​ElasticJob‐Lite​

​(目前最新版本)。

<dependency>
            <groupId>org.apache.shardingsphere.elasticjob</groupId>
            <artifactId>elasticjob-lite-core</artifactId>
            <version>3.0.1</version>
        </dependency>      
ElasticJob‐Lite:作業分片政策介紹與源碼分析

作業分片政策的​

​SPI​

​​名稱是​

​JobShardingStrategy​

​,是作業分片政策的頂層設計。

package org.apache.shardingsphere.elasticjob.infra.handler.sharding;

import org.apache.shardingsphere.elasticjob.infra.spi.TypedSPI;

import java.util.List;
import java.util.Map;

/**
 * 作業分片政策
 */
public interface JobShardingStrategy extends TypedSPI {
    
    /**
     * 作業分片
     * jobInstances – 參與分片的所有作業執行個體(作業伺服器)
     * jobName -作業名稱
     * shardingTotalCount – 分片總數
     */
    Map<JobInstance, List<Integer>> sharding(List<JobInstance> jobInstances, String jobName, int shardingTotalCount);
}      

​JobShardingStrategy​

​​接口的​

​sharding​

​​方法就是用來定義作業分片的邏輯,供子類實作,目前有三個實作類:​

​AverageAllocationJobShardingStrategy​

​​、​

​OdevitySortByNameJobShardingStrategy​

​​以及​

​RoundRobinByNameJobShardingStrategy​

​。

ElasticJob‐Lite:作業分片政策介紹與源碼分析

AverageAllocationJobShardingStrategy

源碼如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

public final class AverageAllocationJobShardingStrategy implements JobShardingStrategy {
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        if (jobInstances.isEmpty()) {
            return Collections.emptyMap();
        }
        Map<JobInstance, List<Integer>> result = shardingAliquot(jobInstances, shardingTotalCount);
        addAliquant(jobInstances, shardingTotalCount, result);
        return result;
    }
    
    private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
        Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
        int count = 0;
        for (JobInstance each : shardingUnits) {
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                shardingItems.add(i);
            }
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }
    
    private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
        int aliquant = shardingTotalCount % shardingUnits.size();
        int count = 0;
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
            if (count < aliquant) {
                entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
            }
            count++;
        }
    }
    
    @Override
    public String getType() {
        return "AVG_ALLOCATION";
    }
}      

這是一種盡量平均配置設定的分片政策,如果作業的分片項無法平均配置設定給所有的作業伺服器,即作業的分片項數​

​%​

​作業伺服器數不為零,則将無法平均配置設定的備援分片項依次添加到序号較小的伺服器中。 例如:

  • 如果有​

    ​3​

    ​​個作業伺服器,總分片數為​

    ​9​

    ​​,每個作業伺服器的分片項為:​

    ​1=[0,1,2]​

    ​​,​

    ​2=[3,4,5]​

    ​​,​

    ​3=[6,7,8]​

    ​。
  • 如果有​

    ​3​

    ​​個作業伺服器,總分片數為​

    ​8​

    ​​,每個作業伺服器的分片項為:​

    ​1=[0,1,6]​

    ​​,​

    ​2=[2,3,7]​

    ​​,​

    ​3=[4,5]​

    ​。
  • 如果有​

    ​3​

    ​​個作業伺服器,總分片數為​

    ​10​

    ​​,每個作業伺服器的分片項為:​

    ​1=[0,1,2,9]​

    ​​,​

    ​2=[3,4,5]​

    ​​,​

    ​3=[6,7,8]​

    ​。

先給每個作業伺服器配置設定相同數量的作業分片項(數量為:作業的分片項數​

​/​

​作業伺服器數)。

private Map<JobInstance, List<Integer>> shardingAliquot(final List<JobInstance> shardingUnits, final int shardingTotalCount) {
        Map<JobInstance, List<Integer>> result = new LinkedHashMap<>(shardingUnits.size(), 1);
        // 每個作業伺服器最少應該配置設定的作業分片項數
        int itemCountPerSharding = shardingTotalCount / shardingUnits.size();
        int count = 0;
        for (JobInstance each : shardingUnits) {
            // 每個作業伺服器申請的作業分片項清單(容量為itemCountPerSharding + 1)
            // itemCountPerSharding + 1為每個作業伺服器最多應該配置設定的作業分片項數
            List<Integer> shardingItems = new ArrayList<>(itemCountPerSharding + 1);
            for (int i = count * itemCountPerSharding; i < (count + 1) * itemCountPerSharding; i++) {
                // 給作業分片項清單添加第i個作業分片項
                shardingItems.add(i);
            }
            // 将作業伺服器與它執行的作業分片項清單進行關聯
            result.put(each, shardingItems);
            count++;
        }
        return result;
    }      

如果作業的分片項無法平均配置設定給所有的作業伺服器,則将無法平均配置設定的備援分片項依次添加到序号較小的伺服器中。

private void addAliquant(final List<JobInstance> shardingUnits, final int shardingTotalCount, final Map<JobInstance, List<Integer>> shardingResults) {
        // 無法平均配置設定的分片項數
        int aliquant = shardingTotalCount % shardingUnits.size();
        // 已配置設定的無法平均配置設定的分片項數
        int count = 0;
        for (Map.Entry<JobInstance, List<Integer>> entry : shardingResults.entrySet()) {
            // 是否還有無法平均配置設定的分片項
            if (count < aliquant) {
                // 配置設定給序号較小的作業伺服器
                entry.getValue().add(shardingTotalCount / shardingUnits.size() * shardingUnits.size() + count);
            }
            // 已配置設定數更新
            count++;
        }
    }      

OdevitySortByNameJobShardingStrategy

源碼如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public final class OdevitySortByNameJobShardingStrategy implements JobShardingStrategy {
    
    private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        long jobNameHash = jobName.hashCode();
        if (0 == jobNameHash % 2) {
            Collections.reverse(jobInstances);
        }
        return averageAllocationJobShardingStrategy.sharding(jobInstances, jobName, shardingTotalCount);
    }
    
    @Override
    public String getType() {
        return "ODEVITY";
    }
}      

其實還是使用​

​AverageAllocationJobShardingStrategy​

​​作業分片政策進行配置設定,隻是會先根據作業名稱的哈希碼的奇偶性來決定是否對作業伺服器清單進行​

​reverse​

​操作。例如:

  • 如果有​

    ​3​

    ​​個作業伺服器,總分片數為​

    ​2​

    ​​,作業名稱的哈希碼為奇數(對作業伺服器清單不進行​

    ​reverse​

    ​​操作),每個作業伺服器的分片項為:​

    ​1=[0]​

    ​​,​

    ​2=[1]​

    ​​,​

    ​3=[]​

    ​。
  • 如果有​

    ​3​

    ​​個作業伺服器,總分片數為​

    ​2​

    ​​,作業名的哈希碼是偶數(對作業伺服器清單進行​

    ​reverse​

    ​​操作),每個作業伺服器的分片項為:​

    ​3=[0]​

    ​​,​

    ​2=[1]​

    ​​,​

    ​1=[]​

    ​。

RoundRobinByNameJobShardingStrategy

源碼如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding.impl;

import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobShardingStrategy;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public final class RoundRobinByNameJobShardingStrategy implements JobShardingStrategy {
    
    private final AverageAllocationJobShardingStrategy averageAllocationJobShardingStrategy = new AverageAllocationJobShardingStrategy();
    
    @Override
    public Map<JobInstance, List<Integer>> sharding(final List<JobInstance> jobInstances, final String jobName, final int shardingTotalCount) {
        return averageAllocationJobShardingStrategy.sharding(rotateServerList(jobInstances, jobName), jobName, shardingTotalCount);
    }
    
    private List<JobInstance> rotateServerList(final List<JobInstance> shardingUnits, final String jobName) {
        int shardingUnitsSize = shardingUnits.size();
        int offset = Math.abs(jobName.hashCode()) % shardingUnitsSize;
        if (0 == offset) {
            return shardingUnits;
        }
        List<JobInstance> result = new ArrayList<>(shardingUnitsSize);
        for (int i = 0; i < shardingUnitsSize; i++) {
            int index = (i + offset) % shardingUnitsSize;
            result.add(shardingUnits.get(index));
        }
        return result;
    }
    
    @Override
    public String getType() {
        return "ROUND_ROBIN";
    }
}      

其實跟​

​OdevitySortByNameJobShardingStrategy​

​​作業分片政策類似,都是使用​

​AverageAllocationJobShardingStrategy​

​​作業分片政策進行配置設定,并且在配置設定前都會根據作業名稱的哈希碼将作業伺服器清單中的作業伺服器項改變順序,隻是變序規則不一樣而已,​

​OdevitySortByNameJobShardingStrategy​

​​作業分片政策根據作業名稱的哈希碼的奇偶性來決定是否對作業伺服器清單進行​

​reverse​

​​操作,而​

​RoundRobinByNameJobShardingStrategy​

​​作業分片政策根據作業名稱的哈希碼的絕對值​

​%​

​​作業伺服器數的值對作業伺服器清單進行​

​rotate​

​操作。例如:

  • 如果有​

    ​3​

    ​​個作業伺服器,總分片數為​

    ​2​

    ​​,作業名稱的哈希碼的絕對值​

    ​%​

    ​​作業伺服器數的值為​

    ​0​

    ​​,每個作業伺服器的分片項為:​

    ​1=[0]​

    ​​,​

    ​2=[1]​

    ​​,​

    ​3=[]​

    ​。
  • 如果有​

    ​3​

    ​​個作業伺服器,總分片數為​

    ​2​

    ​​,作業名稱的哈希碼的絕對值​

    ​%​

    ​​作業伺服器數的值為​

    ​1​

    ​​,每個作業伺服器的分片項為:​

    ​2=[0]​

    ​​,​

    ​3=[1]​

    ​​,​

    ​1=[]​

    ​。
  • 如果有​

    ​3​

    ​​個作業伺服器,總分片數為​

    ​2​

    ​​,作業名稱的哈希碼的絕對值​

    ​%​

    ​​作業伺服器數的值為​

    ​2​

    ​​,每個作業伺服器的分片項為:​

    ​3=[0]​

    ​​,​

    ​1=[1]​

    ​​,​

    ​2=[]​

    ​。

JobShardingStrategyFactory

作業的分片政策通過​

​JobShardingStrategyFactory​

​​類(作業分片政策工廠類)的​

​getStrategy​

​方法擷取,源碼如下:

package org.apache.shardingsphere.elasticjob.infra.handler.sharding;

import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.elasticjob.infra.exception.JobConfigurationException;
import org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobShardingStrategyFactory {
    
    private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";
    
    static {
        ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
    }
    
    public static JobShardingStrategy getStrategy(final String type) {
        if (Strings.isNullOrEmpty(type)) {
            return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, DEFAULT_STRATEGY).get();
        }
        return ElasticJobServiceLoader.getCachedTypedServiceInstance(JobShardingStrategy.class, type)
                .orElseThrow(() -> new JobConfigurationException("Cannot find sharding strategy using type '%s'.", type));
    }
}      

在​

​JobShardingStrategyFactory​

​​類的靜态塊中使用​

​ElasticJobServiceLoader​

​​類的​

​registerTypedService​

​方法加載所有作業分片政策。

static {
        ElasticJobServiceLoader.registerTypedService(JobShardingStrategy.class);
    }      

​ElasticJobServiceLoader​

​​類的相關代碼如下所示,通過​

​Java​

​​提供的​

​SPI​

​​機制(​

​ServiceLoader​

​類)加載所有作業分片政策。

private static final ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, TypedSPI>> TYPED_SERVICES = new ConcurrentHashMap<>();
    
    private static final ConcurrentMap<Class<? extends TypedSPI>, ConcurrentMap<String, Class<? extends TypedSPI>>> TYPED_SERVICE_CLASSES = new ConcurrentHashMap<>();
 
    public static <T extends TypedSPI> void registerTypedService(final Class<T> typedService) {
        if (TYPED_SERVICES.containsKey(typedService)) {
            return;
        }
        ServiceLoader.load(typedService).forEach(each -> registerTypedServiceClass(typedService, each));
    }
    
    private static <T extends TypedSPI> void registerTypedServiceClass(final Class<T> typedService, final TypedSPI instance) {
        TYPED_SERVICES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance);
        TYPED_SERVICE_CLASSES.computeIfAbsent(typedService, unused -> new ConcurrentHashMap<>()).putIfAbsent(instance.getType(), instance.getClass());
    }      

預設為​

​AverageAllocationJobShardingStrategy​

​作業分片政策,和官方文檔給的示意圖是對應的。

private static final String DEFAULT_STRATEGY = "AVG_ALLOCATION";      
@Override
    public String getType() {
        return "AVG_ALLOCATION";
    }