天天看點

5. SOFAJRaft源碼分析— RheaKV中如何存放資料?

概述

上一篇講了RheaKV是如何進行初始化的,因為RheaKV主要是用來做KV存儲的,RheaKV讀寫的是相當的複雜,一起寫會篇幅太長,是以這一篇主要來講一下RheaKV中如何存放資料。

我們這裡使用一個用戶端的例子來開始本次的講解:

public static void main(final String[] args) throws Exception {
    final Client client = new Client();
    client.init();
    //get(client.getRheaKVStore());
    RheaKVStore rheaKVStore = client.getRheaKVStore();
    final byte[] key = writeUtf8("hello");
    final byte[] value = writeUtf8("world");
    rheaKVStore.bPut(key, value);
    client.shutdown();
}
           

我們從這個main方法中啟動我們的執行個體,調用rheaKVStore.bPut(key, value)方法将資料放入到RheaKV中。

public class Client {

    private final RheaKVStore rheaKVStore = new DefaultRheaKVStore();

    public void init() {
        final List<RegionRouteTableOptions> regionRouteTableOptionsList = MultiRegionRouteTableOptionsConfigured
            .newConfigured() //
            .withInitialServerList(-1L /* default id */, Configs.ALL_NODE_ADDRESSES) //
            .config();
        final PlacementDriverOptions pdOpts = PlacementDriverOptionsConfigured.newConfigured() //
            .withFake(true) //
            .withRegionRouteTableOptionsList(regionRouteTableOptionsList) //
            .config();
        final RheaKVStoreOptions opts = RheaKVStoreOptionsConfigured.newConfigured() //
            .withClusterName(Configs.CLUSTER_NAME) //
            .withPlacementDriverOptions(pdOpts) //
            .config();
        System.out.println(opts);
        rheaKVStore.init(opts);
    }

    public void shutdown() {
        this.rheaKVStore.shutdown();
    }

    public RheaKVStore getRheaKVStore() {
        return rheaKVStore;
    }
}

public class Configs { 
    public static String ALL_NODE_ADDRESSES = "127.0.0.1:8181,127.0.0.1:8182,127.0.0.1:8183";

    public static String CLUSTER_NAME       = "rhea_example";
}
           

Client在調用init方法初始化rheaKVStore的時候和我們上一節中講的server例子很像,差別是少了StoreEngineOptions的設定和多配置了一個regionRouteTableOptionsList執行個體。

bPut存入資料

我們這裡存入資料會調用DefaultRheaKVStore的bPut方法:

DefaultRheaKVStore#bPut

public Boolean bPut(final byte[] key, final byte[] value) {
    return FutureHelper.get(put(key, value), this.futureTimeoutMillis);
}
           

bPut方法裡面主要的存放資料的操作在put方法裡面做的,put方法會傳回一個CompletableFuture給FutureHelper的get方法調用,并且在bPut方法裡面會放入一個逾時時間,在init方法中初始化的,預設是5秒。

接下來我們進入到put方法中:

DefaultRheaKVStore#put

public CompletableFuture<Boolean> put(final byte[] key, final byte[] value) {
    Requires.requireNonNull(key, "key");
    Requires.requireNonNull(value, "value");
    //是否嘗試進行批量的put
    return put(key, value, new CompletableFuture<>(), true);
}
           

這裡會調用put的重載的方法,第三個參數是表示傳入一個空的回調函數,第四個參數表示采用Batch 批量存儲

private CompletableFuture<Boolean> put(final byte[] key, final byte[] value,
                                       final CompletableFuture<Boolean> future, final boolean tryBatching) {
    //校驗一下是否已經init初始化了
    checkState();
    if (tryBatching) {
        //putBatching執行個體在init方法中被初始化
        final PutBatching putBatching = this.putBatching;
        if (putBatching != null && putBatching.apply(new KVEntry(key, value), future)) {
            //由于我們傳入的是一個空的執行個體,是以這裡直接傳回
            return future;
        }
    }
    //直接存入資料
    internalPut(key, value, future, this.failoverRetries, null);
    return future;
}
           

checkState方法會去校驗started這個屬性有沒有被設定,如果調用過DefaultRheaKVStore的init方法進行初始化過,那麼會設定started為ture。

這裡還會調用init方法裡面初始化過的putBatching執行個體,我們下面看看putBatching執行個體做了什麼。

putBatching批量存入資料

putBatching在init執行個體初始化的時候會傳入一個PutBatchingHandler作為處理器:

this.putBatching = new PutBatching(KVEvent::new, "put_batching",
        new PutBatchingHandler("put"));
           

我們下面看看PutBatching的構造方法:

public PutBatching(EventFactory<KVEvent> factory, String name, PutBatchingHandler handler) {
    super(factory, batchingOpts.getBufSize(), name, handler);
}
           

這裡由于PutBatching繼承了Batching這個抽象類,是以在執行個體化的時候直接調用父類的構造器執行個體化:

public Batching(EventFactory<T> factory, int bufSize, String name, EventHandler<T> handler) {
    this.name = name;
    this.disruptor = new Disruptor<>(factory, bufSize, new NamedThreadFactory(name, true));
    this.disruptor.handleEventsWith(handler);
    this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(name));
    this.ringBuffer = this.disruptor.start();
}
           

在Batching構造器裡面會初始化一個Disruptor執行個體,并将我們傳入的PutBatchingHandler處理器作為Disruptor的處理器,所有傳入PutBatching的資料都會經過PutBatchingHandler來處理。

我們下面看看PutBatchingHandler是怎麼處理資料的:

PutBatchingHandler#onEvent

public void onEvent(final KVEvent event, final long sequence, final boolean endOfBatch) throws Exception {
    //1.把傳入的時間加入到集合中
    this.events.add(event);
    //加上key和value的長度
    this.cachedBytes += event.kvEntry.length();
    final int size = this.events.size();
    //BatchSize等于100 ,并且maxWriteBytes位元組數32768
    //2. 如果不是最後一個event,也沒有這麼多數量的資料,那麼就不發送
    if (!endOfBatch && size < batchingOpts.getBatchSize() && this.cachedBytes < batchingOpts.getMaxWriteBytes()) {
        return;
    }
    //3.如果傳入的size為1,那麼就重新調用put方法放入到Batching裡面
    if (size == 1) {
        //重置events和cachedBytes
        reset();
        final KVEntry kv = event.kvEntry;
        try {
            put(kv.getKey(), kv.getValue(), event.future, false);
        } catch (final Throwable t) {
            exceptionally(t, event.future);
        }
    //    4.如果size不為1,那麼把資料周遊到集合裡面批量處理
    } else {
        //初始化一個長度為size的list
        final List<KVEntry> entries = Lists.newArrayListWithCapacity(size);
        final CompletableFuture<Boolean>[] futures = new CompletableFuture[size];
        for (int i = 0; i < size; i++) {
            final KVEvent e = this.events.get(i);
            entries.add(e.kvEntry);
            //使用CompletableFuture建構異步應用
            futures[i] = e.future;
        }
        //周遊完events資料到entries之後,重置
        reset();
        try {
            //當put方法完成後執行whenComplete中的内容
            put(entries).whenComplete((result, throwable) -> {
                //如果沒有抛出異常,那麼通知所有future已經執行完畢了
                if (throwable == null) {
                    for (int i = 0; i < futures.length; i++) {
                        futures[i].complete(result);
                    }
                    return;
                }
                exceptionally(throwable, futures);
            });
        } catch (final Throwable t) {
            exceptionally(t, futures);
        }
    }
} 
           
  1. 進入這個方法的時候會把這個event加入到events集合中,然後把彙總長度和events的size
  2. 由于所有的event都是發往Disruptor,然後分發到PutBatchingHandler進行處理,是以可以通過endOfBatch參數判斷這個分發過來的event是不是最後一個,如果不是最後一個,并且總共的event數量沒有超過預設的100,cachedBytes沒有超過32768,那麼就直接傳回,等湊夠了批次再處理
  3. 走到這個判斷,說明隻有一條資料過來,那麼就重新調用put方法,設定tryBatching為false,那麼會直接走internalPut方法
  4. 如果size不等于1,那麼就會把所有的event都加入到集合裡面,然後調用put方法批量處理,當處理完之後調用whenComplete方法對傳回的結果進行一場或回調處理

往RheaKV中批量put設值

下面我來講一下PutBatchingHandler#onEvent中的put(entries)這個方法是怎麼處理批量資料的,這個方法會調用到DefaultRheaKVStore的put方法。

public CompletableFuture<Boolean> put(final List<KVEntry> entries) {
    //檢查狀态
    checkState();
    Requires.requireNonNull(entries, "entries");
    Requires.requireTrue(!entries.isEmpty(), "entries empty");
    //存放資料
    final FutureGroup<Boolean> futureGroup = internalPut(entries, this.failoverRetries, null);
    //處理傳回狀态
    return FutureHelper.joinBooleans(futureGroup);
}
           

該方法會調用internalPut進行設值操作。

DefaultRheaKVStore#internalPut

private FutureGroup<Boolean> internalPut(final List<KVEntry> entries, final int retriesLeft,
                                         final Throwable lastCause) {
    //組裝Region和KVEntry的映射關系
    final Map<Region, List<KVEntry>> regionMap = this.pdClient
            .findRegionsByKvEntries(entries, ApiExceptionHelper.isInvalidEpoch(lastCause));
    final List<CompletableFuture<Boolean>> futures = Lists.newArrayListWithCapacity(regionMap.size());
    final Errors lastError = lastCause == null ? null : Errors.forException(lastCause);
    for (final Map.Entry<Region, List<KVEntry>> entry : regionMap.entrySet()) {
        final Region region = entry.getKey();
        final List<KVEntry> subEntries = entry.getValue();
        //設定重試回調函數,并将重試次數減一
        final RetryCallable<Boolean> retryCallable = retryCause -> internalPut(subEntries, retriesLeft - 1,
                retryCause);
        final BoolFailoverFuture future = new BoolFailoverFuture(retriesLeft, retryCallable);
        //把資料存放到region中
        internalRegionPut(region, subEntries, future, retriesLeft, lastError);
        futures.add(future);
    }
    return new FutureGroup<>(futures);
}
           

因為一個Store裡面會有很多的Region,是以這個方法首先會去組裝Region和KVEntry的關系,确定這個KVEntry是屬于哪個Region的。

然後設定好回調函數後調用internalRegionPut方法将subEntries存入到Region中。

組裝Region和KVEntry的映射關系

我們下面看看是怎麼組裝的:

pdClient是FakePlacementDriverClient的執行個體,繼承了AbstractPlacementDriverClient,是以調用的是父類的findRegionsByKvEntries方法

AbstractPlacementDriverClient#findRegionsByKvEntries

public Map<Region, List<KVEntry>> findRegionsByKvEntries(final List<KVEntry> kvEntries, final boolean forceRefresh) {
    if (forceRefresh) {
        refreshRouteTable();
    }
    //regionRouteTable裡面存了region的路由資訊
    return this.regionRouteTable.findRegionsByKvEntries(kvEntries);
}
           

因為我們這裡是用的FakePlacementDriverClient,是以refreshRouteTable傳回的是一個空方法,是以往下走是調用RegionRouteTable的findRegionsByKvEntries的方法

RegionRouteTable#findRegionsByKvEntries

public Map<Region, List<KVEntry>> findRegionsByKvEntries(final List<KVEntry> kvEntries) {
    Requires.requireNonNull(kvEntries, "kvEntries");
    //執行個體化一個map
    final Map<Region, List<KVEntry>> regionMap = Maps.newHashMap();
    final StampedLock stampedLock = this.stampedLock;
    final long stamp = stampedLock.readLock();
    try {
        for (final KVEntry kvEntry : kvEntries) {
            //根據kvEntry的key去找和region的startKey最接近的region
            final Region region = findRegionByKeyWithoutLock(kvEntry.getKey());
            //設定region和KVEntry的映射關系
            regionMap.computeIfAbsent(region, k -> Lists.newArrayList()).add(kvEntry);
        }
        return regionMap;
    } finally {
        stampedLock.unlockRead(stamp);
    }
}

private Region findRegionByKeyWithoutLock(final byte[] key) {
    // return the greatest key less than or equal to the given key
    //rangeTable裡面存的是region的startKey,value是regionId
    // 這裡傳回小于等于key的第一個元素
    final Map.Entry<byte[], Long> entry = this.rangeTable.floorEntry(key);
    if (entry == null) {
        reportFail(key);
        throw reject(key, "fail to find region by key");
    }
    //regionTable裡面存的regionId,value是region
    return this.regionTable.get(entry.getValue());
}
           

findRegionsByKvEntries方法會周遊所有的KVEntry集合,然後調用findRegionByKeyWithoutLock去rangeTable裡面找合适的region,由于rangeTable是一個treemap,是以調用了floorEntry傳回的是小于等于key的第一個region。

然後将region放入到regionMap裡,key是regionMap,value是一個KVEntry集合。

regionRouteTable裡面的資料是在DefaultRheaKVStore初始化的時候傳入的,不記得的同學我給出了初始化路由表的過程:

DefaultRheaKVStore#init->FakePlacementDriverClient#init->
AbstractPlacementDriverClient#init->AbstractPlacementDriverClient#initRouteTableByRegion->regionRouteTable#addOrUpdateRegion
           
資料存放到相應的region中

我們接着DefaultRheaKVStore的internalPut的方法往下看到internalRegionPut方法,這個方法是真正存儲資料的地方:

DefaultRheaKVStore#internalRegionPut

private void internalRegionPut(final Region region, final List<KVEntry> subEntries,
                               final CompletableFuture<Boolean> future, final int retriesLeft,
                               final Errors lastCause) {
    //擷取regionEngine
    final RegionEngine regionEngine = getRegionEngine(region.getId(), true);
    //重試函數,會回調目前的方法
    final RetryRunner retryRunner = retryCause -> internalRegionPut(region, subEntries, future,
            retriesLeft - 1, retryCause);
    final FailoverClosure<Boolean> closure = new FailoverClosureImpl<>(future, false, retriesLeft,
            retryRunner);
    if (regionEngine != null) {
        if (ensureOnValidEpoch(region, regionEngine, closure)) {
            //擷取MetricsRawKVStore
            final RawKVStore rawKVStore = getRawKVStore(regionEngine);
            //在init方法中根據useParallelKVExecutor屬性決定是不是空
            if (this.kvDispatcher == null) {
                //調用RockDB的api進行插入
                rawKVStore.put(subEntries, closure);
            } else {
                //把put操作分發到kvDispatcher中異步執行
                this.kvDispatcher.execute(() -> rawKVStore.put(subEntries, closure));
            }
        }
    } else {
        //如果目前節點不是leader,那麼則傳回的regionEngine為null
        //那麼發起rpc調用到leader節點中
        final BatchPutRequest request = new BatchPutRequest();
        request.setKvEntries(subEntries);
        request.setRegionId(region.getId());
        request.setRegionEpoch(region.getRegionEpoch());
        this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause);
    }
}
           

這個方法首先調用getRegionEngine擷取regionEngine,因為我們這裡是client節點,沒有初始化RegionEngine,是以這裡擷取的為空,會直接通過rpc請求發送,然後交由KVCommandProcessor進行處理。

如果目前的節點是server,并且該RegionEngine是leader,那麼會調用rawKVStore然後調用put方法插入到RockDB中。

我們最後再看看rheaKVRpcService發送的rpc請求是怎麼被處理的。

向服務端發送BatchPutRequest請求插入資料

向服務端發送put請求是通過調用DefaultRheaKVRpcService的callAsyncWithRpc方法發起的:

DefaultRheaKVRpcService#callAsyncWithRpc

public <V> CompletableFuture<V> callAsyncWithRpc(final BaseRequest request, final FailoverClosure<V> closure,
                                                 final Errors lastCause) {
    return callAsyncWithRpc(request, closure, lastCause, true);
}

public <V> CompletableFuture<V> callAsyncWithRpc(final BaseRequest request, final FailoverClosure<V> closure,
                                                 final Errors lastCause, final boolean requireLeader) {
    final boolean forceRefresh = ErrorsHelper.isInvalidPeer(lastCause);
    //擷取leader的endpoint
    final Endpoint endpoint = getRpcEndpoint(request.getRegionId(), forceRefresh, this.rpcTimeoutMillis,
            requireLeader);
    //發起rpc調用
    internalCallAsyncWithRpc(endpoint, request, closure);
    return closure.future();
}
           

在這個方法裡會調用getRpcEndpoint方法來擷取region所對應server的endpoint,然後對這個節點調用rpc請求。調用rpc請求都是sofa的bolt架構進行調用的,是以下面我們重點看怎麼擷取endpoint

DefaultRheaKVRpcService#getRpcEndpoint

public Endpoint getRpcEndpoint(final long regionId, final boolean forceRefresh, final long timeoutMillis,
                               final boolean requireLeader) {
    if (requireLeader) {
        //擷取leader
        return getLeader(regionId, forceRefresh, timeoutMillis);
    } else {
        //輪詢擷取一個不是自己的節點
        return getLuckyPeer(regionId, forceRefresh, timeoutMillis);
    }
}
           

這裡有兩個分支,一個是擷取leader節點,一個是輪詢擷取節點。由于這兩個方法挺有意思的,是以我們下面兩個方法都講一下

根據regionId擷取leader節點

根據regionId擷取leader節點是由getLeader方法觸發的,在我們調用DefaultRheaKVStore的init方法執行個體化DefaultRheaKVRpcService的時候會重寫getLeader方法:

DefaultRheaKVStore#init

this.rheaKVRpcService = new DefaultRheaKVRpcService(this.pdClient, selfEndpoint) {

    @Override
    public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
        final Endpoint leader = getLeaderByRegionEngine(regionId);
        if (leader != null) {
            return leader;
        }
        return super.getLeader(regionId, forceRefresh, timeoutMillis);
    }
};
           

重寫的getLeader方法會調用getLeaderByRegionEngine方法區根據regionId找Endpoint,如果找不到,那麼會調用父類的getLeader方法。

DefaultRheaKVStore#getLeaderByRegionEngine

private Endpoint getLeaderByRegionEngine(final long regionId) {
    final RegionEngine regionEngine = getRegionEngine(regionId);
    if (regionEngine != null) {
        final PeerId leader = regionEngine.getLeaderId();
        if (leader != null) {
            final String raftGroupId = JRaftHelper.getJRaftGroupId(this.pdClient.getClusterName(), regionId);
            RouteTable.getInstance().updateLeader(raftGroupId, leader);
            return leader.getEndpoint();
        }
    }
    return null;
}
           

這個方法這裡會擷取RegionEngine,但是我們這裡是client節點,是沒有初始化RegionEngine的,是以這裡就會傳回null,接着傳回到上一級中調用父類的getLeader方法。

DefaultRheaKVRpcService#getLeader

public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
    return this.pdClient.getLeader(regionId, forceRefresh, timeoutMillis);
}
           

這裡會調用pdClient的getLeader方法,這裡我們傳入的pdClient是FakePlacementDriverClient,它繼承了AbstractPlacementDriverClient,是以會調用到父類的getLeader方法中。

AbstractPlacementDriverClient#getLeader

public Endpoint getLeader(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
    //這裡會根據clusterName和regionId拼接出raftGroupId
    final String raftGroupId = JRaftHelper.getJRaftGroupId(this.clusterName, regionId);
    //去路由表裡找這個叢集的leader
    PeerId leader = getLeader(raftGroupId, forceRefresh, timeoutMillis);
    if (leader == null && !forceRefresh) {
        // Could not found leader from cache, try again and force refresh cache
        // 如果第一次沒有找到,那麼執行強制重新整理的方法再找一次
        leader = getLeader(raftGroupId, true, timeoutMillis);
    }
    if (leader == null) {
        throw new RouteTableException("no leader in group: " + raftGroupId);
    }
    return leader.getEndpoint();
}
           

這個方法裡面會根據clusterName和regionId拼接raftGroupId,如果傳入的clusterName為demo,regionId為1,那麼拼接出來的raftGroupId就是:

demo--1

然後會去調用getLeader擷取leader的PeerId,第一次調用這個方法傳入的forceRefresh為false,表示不用重新整理,如果傳回的為null,那麼會執行強制重新整理再去找一次。

protected PeerId getLeader(final String raftGroupId, final boolean forceRefresh, final long timeoutMillis) {
    final RouteTable routeTable = RouteTable.getInstance();
    //是否要強制重新整理路由表
    if (forceRefresh) {
        final long deadline = System.currentTimeMillis() + timeoutMillis;
        final StringBuilder error = new StringBuilder();
        // A newly launched raft group may not have been successful in the election,
        // or in the 'leader-transfer' state, it needs to be re-tried
        Throwable lastCause = null;
        for (;;) {
            try {
                //重新整理節點路由表
                final Status st = routeTable.refreshLeader(this.cliClientService, raftGroupId, 2000);
                if (st.isOk()) {
                    break;
                }
                error.append(st.toString());
            } catch (final InterruptedException e) {
                ThrowUtil.throwException(e);
            } catch (final Throwable t) {
                lastCause = t;
                error.append(t.getMessage());
            }
            //如果還沒有到截止時間,那麼sleep10毫秒之後再重新整理
            if (System.currentTimeMillis() < deadline) {
                LOG.debug("Fail to find leader, retry again, {}.", error);
                error.append(", ");
                try {
                    Thread.sleep(10);
                } catch (final InterruptedException e) {
                    ThrowUtil.throwException(e);
                }
            //    到了截止時間,那麼抛出異常
            } else {
                throw lastCause != null ? new RouteTableException(error.toString(), lastCause)
                    : new RouteTableException(error.toString());
            }
        }
    }
    //傳回路由表裡面的leader
    return routeTable.selectLeader(raftGroupId);
}
           

如果要執行強制重新整理,那麼會計算一下逾時時間,然後調用死循環,在循環體裡面會去重新整理路由表,如果沒有重新整理成功也沒有逾時,那麼會sleep10毫秒重新再刷。

RouteTable#refreshLeader

public Status refreshLeader(final CliClientService cliClientService, final String groupId, final int timeoutMs)
                                                                                                               throws InterruptedException,
                                                                                                               TimeoutException {
    Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id");
    Requires.requireTrue(timeoutMs > 0, "Invalid timeout: " + timeoutMs);
    //根據叢集的id去擷取叢集的配置資訊,裡面包括叢集的ip和端口号
    final Configuration conf = getConfiguration(groupId);
    if (conf == null) {
        return new Status(RaftError.ENOENT,
            "Group %s is not registered in RouteTable, forgot to call updateConfiguration?", groupId);
    }
    final Status st = Status.OK();
    final CliRequests.GetLeaderRequest.Builder rb = CliRequests.GetLeaderRequest.newBuilder();
    rb.setGroupId(groupId);
    //發送擷取leader節點的請求
    final CliRequests.GetLeaderRequest request = rb.build();
    TimeoutException timeoutException = null;
    for (final PeerId peer : conf) {
        //如果連接配接不上,先設定狀态為error,然後continue
        if (!cliClientService.connect(peer.getEndpoint())) {
            if (st.isOk()) {
                st.setError(-1, "Fail to init channel to %s", peer);
            } else {
                final String savedMsg = st.getErrorMsg();
                st.setError(-1, "%s, Fail to init channel to %s", savedMsg, peer);
            }
            continue;
        }
        //向這個節點發送擷取leader的GetLeaderRequest請求
        final Future<Message> result = cliClientService.getLeader(peer.getEndpoint(), request, null);
        try {
            final Message msg = result.get(timeoutMs, TimeUnit.MILLISECONDS);
            //異常情況的處理
            if (msg instanceof RpcRequests.ErrorResponse) {
                if (st.isOk()) {
                    st.setError(-1, ((RpcRequests.ErrorResponse) msg).getErrorMsg());
                } else {
                    final String savedMsg = st.getErrorMsg();
                    st.setError(-1, "%s, %s", savedMsg, ((RpcRequests.ErrorResponse) msg).getErrorMsg());
                }
            } else {
                final CliRequests.GetLeaderResponse response = (CliRequests.GetLeaderResponse) msg;
                //重置leader
                updateLeader(groupId, response.getLeaderId());
                return Status.OK();
            }
        } catch (final TimeoutException e) {
            timeoutException = e;
        } catch (final ExecutionException e) {
            if (st.isOk()) {
                st.setError(-1, e.getMessage());
            } else {
                final String savedMsg = st.getErrorMsg();
                st.setError(-1, "%s, %s", savedMsg, e.getMessage());
            }
        }
    }
    if (timeoutException != null) {
        throw timeoutException;
    }

    return st;
}
           

大家不要一開始就被這樣的長的方法給迷惑住了,這個方法實際上非常的簡單:

  1. 根據groupId擷取叢集節點的配置資訊,其中包括了其他節點的ip和端口号
  2. 周遊conf裡面的叢集節點
  3. 嘗試連接配接被周遊的節點,如果連接配接不上直接continue換到下一個節點
  4. 向這個節點發送GetLeaderRequest請求,如果在逾時時間内可以傳回正常的響應,那麼就調用updateLeader更新leader資訊

updateLeader方法相當節點,裡面就是更新一下路由表的leader屬性,我們這裡看看server是怎麼處理GetLeaderRequest請求的

GetLeaderRequest由GetLeaderRequestProcessor處理器來進行處理。

GetLeaderRequestProcessor#processRequest

public Message processRequest(GetLeaderRequest request, RpcRequestClosure done) {
    List<Node> nodes = new ArrayList<>();
    String groupId = getGroupId(request);
    //如果請求是指定某個PeerId
    //那麼則則去叢集裡找到指定Peer所對應的node
    if (request.hasPeerId()) {
        String peerIdStr = getPeerId(request);
        PeerId peer = new PeerId();
        if (peer.parse(peerIdStr)) {
            Status st = new Status();
            nodes.add(getNode(groupId, peer, st));
            if (!st.isOk()) {
                return RpcResponseFactory.newResponse(st);
            }
        } else {
            return RpcResponseFactory.newResponse(RaftError.EINVAL, "Fail to parse peer id %", peerIdStr);
        }
    } else {
        //擷取叢集所有的節點
        nodes = NodeManager.getInstance().getNodesByGroupId(groupId);
    }
    if (nodes == null || nodes.isEmpty()) {
        return RpcResponseFactory.newResponse(RaftError.ENOENT, "No nodes in group %s", groupId);
    }
    //周遊叢集node,擷取leaderId
    for (Node node : nodes) {
        PeerId leader = node.getLeaderId();
        if (leader != null && !leader.isEmpty()) {
            return GetLeaderResponse.newBuilder().setLeaderId(leader.toString()).build();
        }
    }
    return RpcResponseFactory.newResponse(RaftError.EAGAIN, "Unknown leader");
}
           

這裡由于我們穿過來的request并沒有攜帶PeerId,是以不會去擷取指定的peer對應node節點的leaderId,而是會去找到叢集groupId對應的所有節點,然後周遊節點找到對應的leaderId。

getLuckyPeer輪詢擷取一個節點

在上面我們講完了getLeader是怎麼實作的,下面我們講一下getLuckyPeer這個方法裡面是怎麼操作的。

public Endpoint getLuckyPeer(final long regionId, final boolean forceRefresh, final long timeoutMillis) {
    return this.pdClient.getLuckyPeer(regionId, forceRefresh, timeoutMillis, this.selfEndpoint);
}
           

這裡和getLeader方法一樣會調用到AbstractPlacementDriverClient的getLuckyPeer方法中

AbstractPlacementDriverClient#getLuckyPeer

public Endpoint getLuckyPeer(final long regionId, final boolean forceRefresh, final long timeoutMillis,
                             final Endpoint unExpect) {
    final String raftGroupId = JRaftHelper.getJRaftGroupId(this.clusterName, regionId);
    final RouteTable routeTable = RouteTable.getInstance();
    //是否要強制重新整理一下最新的叢集節點資訊
    if (forceRefresh) {
        final long deadline = System.currentTimeMillis() + timeoutMillis;
        final StringBuilder error = new StringBuilder();
        // A newly launched raft group may not have been successful in the election,
        // or in the 'leader-transfer' state, it needs to be re-tried
        for (;;) {
            try {
                final Status st = routeTable.refreshConfiguration(this.cliClientService, raftGroupId, 5000);
                if (st.isOk()) {
                    break;
                }
                error.append(st.toString());
            } catch (final InterruptedException e) {
                ThrowUtil.throwException(e);
            } catch (final TimeoutException e) {
                error.append(e.getMessage());
            }
            if (System.currentTimeMillis() < deadline) {
                LOG.debug("Fail to get peers, retry again, {}.", error);
                error.append(", ");
                try {
                    Thread.sleep(5);
                } catch (final InterruptedException e) {
                    ThrowUtil.throwException(e);
                }
            } else {
                throw new RouteTableException(error.toString());
            }
        }
    }
    final Configuration configs = routeTable.getConfiguration(raftGroupId);
    if (configs == null) {
        throw new RouteTableException("empty configs in group: " + raftGroupId);
    }
    final List<PeerId> peerList = configs.getPeers();
    if (peerList == null || peerList.isEmpty()) {
        throw new RouteTableException("empty peers in group: " + raftGroupId);
    }
    //如果這個叢集裡隻有一個節點了,那麼直接傳回就好了
    final int size = peerList.size();
    if (size == 1) {
        return peerList.get(0).getEndpoint();
    }
    //擷取負載均衡器,這裡用的是輪詢政策
    final RoundRobinLoadBalancer balancer = RoundRobinLoadBalancer.getInstance(regionId);
    for (int i = 0; i < size; i++) {
        final PeerId candidate = balancer.select(peerList);
        final Endpoint luckyOne = candidate.getEndpoint();
        if (!luckyOne.equals(unExpect)) {
            return luckyOne;
        }
    }
    throw new RouteTableException("have no choice in group(peers): " + raftGroupId);
}
           

這個方法裡面也有一個是否要強制重新整理的判斷,和getLeader方法一樣,不再贅述。然後會判斷一下叢集裡面如果不止一個有效節點,那麼會調用輪詢政策來選取節點,這個輪詢的操作十分簡單,就是一個全局的index每次調用加一,然後和傳入的peerList集合的size取模。

到這裡DefaultRheaKVRpcService的callAsyncWithRpc方法就差不多講解完畢了,然後會向server端發起請求,在KVCommandProcessor處理BatchPutRequest請求。

Server端處理BatchPutRequest請求

BatchPutRequest的請求在KVCommandProcessor中被處理。

KVCommandProcessor#handleRequest

public void handleRequest(final BizContext bizCtx, final AsyncContext asyncCtx, final T request) {
    Requires.requireNonNull(request, "request");
    final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure = new RequestProcessClosure<>(request,
        bizCtx, asyncCtx);
    //根據傳入的RegionId去找到對應的RegionKVService
    //每個 RegionKVService 對應一個 Region,隻處理本身 Region 範疇内的請求
    final RegionKVService regionKVService = this.storeEngine.getRegionKVService(request.getRegionId());
    if (regionKVService == null) {
        //如果不存在則傳回空
        final NoRegionFoundResponse noRegion = new NoRegionFoundResponse();
        noRegion.setRegionId(request.getRegionId());
        noRegion.setError(Errors.NO_REGION_FOUND);
        noRegion.setValue(false);
        closure.sendResponse(noRegion);
        return;
    }
    switch (request.magic()) {
        case BaseRequest.PUT:
            regionKVService.handlePutRequest((PutRequest) request, closure);
            break;
        case BaseRequest.BATCH_PUT:
            regionKVService.handleBatchPutRequest((BatchPutRequest) request, closure);
            break;
        .....
        default:
            throw new RheaRuntimeException("Unsupported request type: " + request.getClass().getName());
    }
}
           

handleRequest首先會根據RegionId去找RegionKVService,RegionKVService在初始化RegionEngine的時候會注冊到regionKVServiceTable中。

然後根據請求的類型判斷request是什麼請求。這裡我們省略其他請求,隻看BATCH_PUT是怎麼做的。

在往下講代碼之前,我先來給個流程調用指指路:

5. SOFAJRaft源碼分析— RheaKV中如何存放資料?

BATCH_PUT對應會調用到DefaultRegionKVService的handleBatchPutRequest方法中 。

DefaultRegionKVService#handleBatchPutRequest

public void handlePutRequest(final PutRequest request,
                             final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure) {
    //設定一個響應response
    final PutResponse response = new PutResponse();
    response.setRegionId(getRegionId());
    response.setRegionEpoch(getRegionEpoch());
    try {
        KVParameterRequires.requireSameEpoch(request, getRegionEpoch());
        final byte[] key = KVParameterRequires.requireNonNull(request.getKey(), "put.key");
        final byte[] value = KVParameterRequires.requireNonNull(request.getValue(), "put.value");
        //這個執行個體是MetricsRawKVStore
        this.rawKVStore.put(key, value, new BaseKVStoreClosure() {

            //設定回調函數
            @Override
            public void run(final Status status) {
                if (status.isOk()) {
                    response.setValue((Boolean) getData());
                } else {
                    setFailure(request, response, status, getError());
                }
                closure.sendResponse(response);
            }
        });
    } catch (final Throwable t) {
        LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t));
        response.setError(Errors.forException(t));
        closure.sendResponse(response);
    }
}
           

handlePutRequest方法十分地簡單,通過擷取key和value之後調用MetricsRawKVStore的put方法,傳入key和value并設定回調函數。

MetricsRawKVStore#put

public void put(final byte[] key, final byte[] value, final KVStoreClosure closure) {
    final KVStoreClosure c = metricsAdapter(closure, PUT, 1, value.length);
    //rawKVStore是RaftRawKVStore的執行個體
    this.rawKVStore.put(key, value, c);
}
           

put方法會繼續調用RaftRawKVStore的put方法。

RaftRawKVStore#put

public void put(final byte[] key, final byte[] value, final KVStoreClosure closure) {
    applyOperation(KVOperation.createPut(key, value), closure);
}
           

Put方法會調用KVOperation的靜态方法建立一個類型為put的KVOperation執行個體,然後調用applyOperation方法。

RaftRawKVStore#applyOperation

private void applyOperation(final KVOperation op, final KVStoreClosure closure) {
    //這裡必須保證 Leader 節點操作申請任務
    if (!isLeader()) {
        closure.setError(Errors.NOT_LEADER);
        closure.run(new Status(RaftError.EPERM, "Not leader"));
        return;
    }
    final Task task = new Task();
    //封裝資料
    task.setData(ByteBuffer.wrap(Serializers.getDefault().writeObject(op)));
    //封裝回調方法
    task.setDone(new KVClosureAdapter(closure, op));
    //調用NodeImpl的apply方法
    this.node.apply(task);
}
           

applyOperation方法裡面會校驗是不是leader,如果不是leader那麼就不能執行任務申請的操作。然後執行個體化一個Task執行個體,設定資料和回調Adapter後調用NodeImple的apply釋出任務。

NodeImpl#apply

public void apply(final Task task) {
    //檢查Node是不是被關閉了
    if (this.shutdownLatch != null) {
        Utils.runClosureInThread(task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
        throw new IllegalStateException("Node is shutting down");
    }
    //校驗不能為空
    Requires.requireNonNull(task, "Null task");

    //将task裡面的資料放入到LogEntry中
    final LogEntry entry = new LogEntry();
    entry.setData(task.getData());
    //重試次數
    int retryTimes = 0;
    try {
        //執行個體化一個Disruptor事件
        final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
            event.reset();
            event.done = task.getDone();
            event.entry = entry;
            event.expectedTerm = task.getExpectedTerm();
        };
        while (true) {
            //釋出事件後交給LogEntryAndClosureHandler事件處理器處理
            if (this.applyQueue.tryPublishEvent(translator)) {
                break;
            } else {
                retryTimes++;
                //最多重試3次
                if (retryTimes > MAX_APPLY_RETRY_TIMES) {
                    //不成功則進行回調,通知處理狀态
                    Utils.runClosureInThread(task.getDone(),
                            new Status(RaftError.EBUSY, "Node is busy, has too many tasks."));
                    LOG.warn("Node {} applyQueue is overload.", getNodeId());
                    this.metrics.recordTimes("apply-task-overload-times", 1);
                    return;
                }
                ThreadHelper.onSpinWait();
            }
        }

    } catch (final Exception e) {
        Utils.runClosureInThread(task.getDone(), new Status(RaftError.EPERM, "Node is down."));
    }
}
           

在apply方法裡面會将資料封裝到LogEntry執行個體中,然後将LogEntry打包成一個Disruptor事件釋出到applyQueue隊列裡面去。applyQueue隊列在NodeImpl的init方法裡面初始化,并設定處理器為LogEntryAndClosureHandler。

LogEntryAndClosureHandler#onEvent

private final List<LogEntryAndClosure> tasks = new ArrayList<>(NodeImpl.this.raftOptions.getApplyBatch());

@Override
public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch)
        throws Exception {
    //如果接收到了要關閉的請求
    if (event.shutdownLatch != null) {
        //tasks隊列裡面的任務又不為空,那麼先處理隊列裡面的資料
        if (!this.tasks.isEmpty()) {
            //處理tasks
            executeApplyingTasks(this.tasks);
        }
        final int num = GLOBAL_NUM_NODES.decrementAndGet();
        LOG.info("The number of active nodes decrement to {}.", num);
        event.shutdownLatch.countDown();
        return;
    }
    //将新的event加入到tasks中
    this.tasks.add(event);
    //因為設定了32為一個批次,是以如果tasks裡面的任務達到了32或者已經是最後一個event,
    // 那麼就執行tasks集合裡面的資料
    if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
        executeApplyingTasks(this.tasks);
        this.tasks.clear();
    }
}
           

onEvent方法會校驗收到的事件是否是請求關閉隊列,如果是的話,那麼會先把tasks集合裡面的資料執行完畢再傳回。如果是正常的事件,那麼校驗一下tasks集合裡面的個數是不是已經到達了32個,或者是不是已經是最後一個事件了,那麼會執行executeApplyingTasks進行批量處理資料。

NodeImpl#executeApplyingTasks

private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
    this.writeLock.lock();
    try {
        final int size = tasks.size();
        //如果目前節點不是leader,那麼就不往下進行
        if (this.state != State.STATE_LEADER) {
            final Status st = new Status();

            if (this.state != State.STATE_TRANSFERRING) {
                st.setError(RaftError.EPERM, "Is not leader.");
            } else {
                st.setError(RaftError.EBUSY, "Is transferring leadership.");
            }
            LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);
            //處理所有的LogEntryAndClosure,發送回調響應
            for (int i = 0; i < size; i++) {
                Utils.runClosureInThread(tasks.get(i).done, st);
            }
            return;
        }
        final List<LogEntry> entries = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            final LogEntryAndClosure task = tasks.get(i);
            //如果任其不對,那麼直接調用回調函數發送Error
            if (task.expectedTerm != -1 && task.expectedTerm != this.currTerm) {
                LOG.debug("Node {} can't apply task whose expectedTerm={} doesn't match currTerm={}.", getNodeId(),
                    task.expectedTerm, this.currTerm);
                if (task.done != null) {
                    final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d",
                        task.expectedTerm, this.currTerm);
                    Utils.runClosureInThread(task.done, st);
                }
                continue;
            }
            //儲存應用上下文
            if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
                this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) {
                Utils.runClosureInThread(task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
                continue;
            }
            // set task entry info before adding to list.
            task.entry.getId().setTerm(this.currTerm);
            //設定entry的類型為ENTRY_TYPE_DATA
            task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
            entries.add(task.entry);
        }
        //批量送出申請任務日志寫入 RocksDB
        this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
        // update conf.first
        this.conf = this.logManager.checkAndSetConfiguration(this.conf);
    } finally {
        this.writeLock.unlock();
    }
}
           

executeApplyingTasks中會校驗目前的節點是不是leader,因為Raft 副本節點 Node 執行申請任務檢查目前狀态是否為 STATE_LEADER,必須保證 Leader 節點操作申請任務。

循環周遊節點服務事件判斷任務的預估任期是否等于目前節點任期,Leader 沒有發生變更的階段内送出的日志擁有相同的 Term 編号,節點 Node 任期滿足預期則 Raft 協定投票箱 BallotBox 調用 appendPendingTask(conf, oldConf, done) 日志複制之前儲存應用上下文,即基于目前節點配置以及原始配置建立選票 Ballot 添加到選票雙向隊列 pendingMetaQueue。

然後日志管理器 LogManager 調用底層日志存儲 LogStorage#appendEntries(entries) 批量送出申請任務日志寫入 RocksDB。

接下來通過 Node#apply(task) 送出的申請任務最終将會複制應用到所有 Raft 節點上的狀态機,RheaKV 狀态機通過繼承 StateMachineAdapter 狀态機擴充卡的 KVStoreStateMachine 表示。

Raft 狀态機 KVStoreStateMachine 調用 onApply(iterator) 方法按照送出順序應用任務清單到狀态機。

KVStoreStateMachine 狀态機疊代狀态輸出清單積攢鍵值狀态清單批量申請 RocksRawKVStore 調用 batch(kvStates) 方法運作相應鍵值操作存儲到 RocksDB。

總結

這一篇是相當的長流程也是非常的複雜,裡面的各個地方代碼寫的都非常的缜密。我們主要介紹了putBatching皮處理器是怎麼使用Disruptor批量的處理資料,進而做到提升整體的吞吐量。還講解了在發起請求的時候是如何擷取server端的endpoint的。然後還了解了BatchPutRequest請求是怎麼被server處理的,以及在代碼中怎麼展現通過Batch + 全異步機制大幅度提升吞吐的。