天天看點

6. SOFAJRaft源碼分析— 透過RheaKV看線性一緻性讀

開篇

其實這篇文章我本來想在講完選舉的時候就開始講線性一緻性讀的,但是感覺直接講沒頭沒尾的看起來比比較困難,是以就有了RheaKV的系列,這是RheaKV,終于可以講一下SOFAJRaft的線性一緻性讀是怎麼做到了的。所謂線性一緻性,一個簡單的例子是在 T1 的時間寫入一個值,那麼在 T1 之後讀一定能讀到這個值,不可能讀到 T1 之前的值。

其中部分内容參考SOFAJRaft文檔:

SOFAJRaft 線性一緻讀實作剖析 | SOFAJRaft 實作原理

SOFAJRaft 實作原理 - SOFAJRaft-RheaKV 是如何使用 Raft 的

RheaKV讀取資料

RheaKV的讀取資料的入口是DefaultRheaKVStore的bGet。

DefaultRheaKVStore#bGet

public byte[] bGet(final String key) {
    return FutureHelper.get(get(key), this.futureTimeoutMillis);
}
           

bGet方法中會一直調用到DefaultRheaKVStore的一個get方法中:

DefaultRheaKVStore#get

private CompletableFuture<byte[]> get(final byte[] key, final boolean readOnlySafe,
                                      final CompletableFuture<byte[]> future, final boolean tryBatching) {
    //校驗started狀态
    checkState();
    Requires.requireNonNull(key, "key");
    if (tryBatching) {
        final GetBatching getBatching = readOnlySafe ? this.getBatchingOnlySafe : this.getBatching;
        if (getBatching != null && getBatching.apply(key, future)) {
            return future;
        }
    }
    internalGet(key, readOnlySafe, future, this.failoverRetries, null, this.onlyLeaderRead);
    return future;
}
           

get方法會根據傳入的參數來判斷是否采用批處理的方式來讀取資料,readOnlySafe表示是否開啟線程一緻性讀,由于我們調用的是get方法,是以readOnlySafe和tryBatching都會傳回true。

是以這裡會調用getBatchingOnlySafe的apply方法,将key和future傳入。

getBatchingOnlySafe是在我們初始化DefaultRheaKVStore的時候初始化的:

DefaultRheaKVStore#init

.....
this.getBatchingOnlySafe = new GetBatching(KeyEvent::new, "get_batching_only_safe",
        new GetBatchingHandler("get_only_safe", true));
.....
           

在初始化getBatchingOnlySafe的時候傳入的處理器是GetBatchingHandler。

然後我們回到getBatchingOnlySafe#apply中,看看這個方法做了什麼:

public boolean apply(final byte[] message, final CompletableFuture<byte[]> future) {
    //GetBatchingHandler
    return this.ringBuffer.tryPublishEvent((event, sequence) -> {
        event.reset();
        event.key = message;
        event.future = future;
    });
}
           

apply方法會向Disruptor發送一個事件進行異步處理,并把我們的key封裝到event的key中。getBatchingOnlySafe的處理器是GetBatchingHandler。

批量擷取資料

GetBatchingHandler#onEvent

public void onEvent(final KeyEvent event, final long sequence, final boolean endOfBatch) throws Exception {
    this.events.add(event);
    this.cachedBytes += event.key.length;
    final int size = this.events.size();
    //校驗一下資料量,沒有達到MaxReadBytes并且不是最後一個event,那麼直接傳回
    if (!endOfBatch && size < batchingOpts.getBatchSize() && this.cachedBytes < batchingOpts.getMaxReadBytes()) {
        return;
    }

    if (size == 1) {
        reset();
        try {
            //如果隻是一個get請求,那麼不需要進行批量處理
            get(event.key, this.readOnlySafe, event.future, false);
        } catch (final Throwable t) {
            exceptionally(t, event.future);
        }
    } else {
        //初始化一個剛剛好大小的集合
        final List<byte[]> keys = Lists.newArrayListWithCapacity(size);
        final CompletableFuture<byte[]>[] futures = new CompletableFuture[size];
        for (int i = 0; i < size; i++) {
            final KeyEvent e = this.events.get(i);
            keys.add(e.key);
            futures[i] = e.future;
        }
        //周遊完events資料到entries之後,重置
        reset();
        try {
            multiGet(keys, this.readOnlySafe).whenComplete((result, throwable) -> {
                //異步回調處理資料
                if (throwable == null) {
                    for (int i = 0; i < futures.length; i++) {
                        final ByteArray realKey = ByteArray.wrap(keys.get(i));
                        futures[i].complete(result.get(realKey));
                    }
                    return;
                }
                exceptionally(throwable, futures);
            });
        } catch (final Throwable t) {
            exceptionally(t, futures);
        }
    }
}
}
           

onEvent方法首先會校驗一下目前的event數量有沒有達到門檻值以及目前的event是不是Disruptor中最後一個event;然後會根據不同的events集合中的數量來走不同的實作,這裡做了一個優化,如果是隻有一條資料那麼不會走批處理;最後将所有的key放入到keys集合中并調用multiGet進行批處理。

multiGet方法會調用internalMultiGet傳回一個Future,進而實作異步的傳回結果。

DefaultRheaKVStore#internalMultiGet

private FutureGroup<Map<ByteArray, byte[]>> internalMultiGet(final List<byte[]> keys, final boolean readOnlySafe,
                                                             final int retriesLeft, final Throwable lastCause) {
    //因為不同的key是存放在不同的region中的,是以一個region會對應多個key,封裝到map中
    final Map<Region, List<byte[]>> regionMap = this.pdClient
            .findRegionsByKeys(keys, ApiExceptionHelper.isInvalidEpoch(lastCause));
    //傳回值
    final List<CompletableFuture<Map<ByteArray, byte[]>>> futures =
            Lists.newArrayListWithCapacity(regionMap.size());
    //lastCause傳入為null
    final Errors lastError = lastCause == null ? null : Errors.forException(lastCause);

    for (final Map.Entry<Region, List<byte[]>> entry : regionMap.entrySet()) {
        final Region region = entry.getKey();
        final List<byte[]> subKeys = entry.getValue();
        //重試次數減1,設定一個重試函數
        final RetryCallable<Map<ByteArray, byte[]>> retryCallable = retryCause -> internalMultiGet(subKeys,
                readOnlySafe, retriesLeft - 1, retryCause);
        final MapFailoverFuture<ByteArray, byte[]> future = new MapFailoverFuture<>(retriesLeft, retryCallable);
        //發送MultiGetRequest請求,擷取資料
        internalRegionMultiGet(region, subKeys, readOnlySafe, future, retriesLeft, lastError, this.onlyLeaderRead);
        futures.add(future);
    }
    return new FutureGroup<>(futures);
}
           

internalMultiGet裡會根據key去組裝region,不同的key會對應不同的region,資料時存在region中的,是以要從不同的region中擷取資料,region和key是一對多的關系是以這裡會封裝成一個map。然後會周遊regionMap,每個region所對應的資料作為一個批次調用到internalRegionMultiGet方法中,根據不同的情況擷取資料。

DefaultRheaKVStore#internalRegionMultiGet

private void internalRegionMultiGet(final Region region, final List<byte[]> subKeys, final boolean readOnlySafe,
                                    final CompletableFuture<Map<ByteArray, byte[]>> future, final int retriesLeft,
                                    final Errors lastCause, final boolean requireLeader) {
    //因為目前的是client,是以這裡會是null
    final RegionEngine regionEngine = getRegionEngine(region.getId(), requireLeader);
    // require leader on retry
    //設定重試函數
    final RetryRunner retryRunner = retryCause -> internalRegionMultiGet(region, subKeys, readOnlySafe, future,
            retriesLeft - 1, retryCause, true);
    final FailoverClosure<Map<ByteArray, byte[]>> closure = new FailoverClosureImpl<>(future,
            false, retriesLeft, retryRunner);
    if (regionEngine != null) {
        if (ensureOnValidEpoch(region, regionEngine, closure)) {
            //如果不是null,那麼會擷取rawKVStore,并從中擷取資料
            final RawKVStore rawKVStore = getRawKVStore(regionEngine);
            if (this.kvDispatcher == null) {
                rawKVStore.multiGet(subKeys, readOnlySafe, closure);
            } else {
                //如果是kvDispatcher不為空,那麼放入到kvDispatcher中異步執行
                this.kvDispatcher.execute(() -> rawKVStore.multiGet(subKeys, readOnlySafe, closure));
            }
        }
    } else {
        final MultiGetRequest request = new MultiGetRequest();
        request.setKeys(subKeys);
        request.setReadOnlySafe(readOnlySafe);
        request.setRegionId(region.getId());
        request.setRegionEpoch(region.getRegionEpoch());
        //調用rpc請求
        this.rheaKVRpcService.callAsyncWithRpc(request, closure, lastCause, requireLeader);
    }
}
           

因為我們這裡是client端調用internalRegionMultiGet方法的,是以是沒有設定regionEngine的,那麼會直接向server的目前region所對應的leader節點發送一個MultiGetRequest請求。

因為上面的這些方法基本上和put是一緻的,我們已經在5. SOFAJRaft源碼分析— RheaKV中如何存放資料?講過了,是以這裡不重複的講了。

server端處理MultiGetRequest請求

MultiGetRequest請求會被KVCommandProcessor所處理,KVCommandProcessor裡會根據請求的magic方法傳回值來判斷是用什麼方式來進行處理。我們這裡會調用到DefaultRegionKVService的handleMultiGetRequest方法中處理請求。

public void handleMultiGetRequest(final MultiGetRequest request,
                                  final RequestProcessClosure<BaseRequest, BaseResponse<?>> closure) {
    final MultiGetResponse response = new MultiGetResponse();
    response.setRegionId(getRegionId());
    response.setRegionEpoch(getRegionEpoch());
    try {
        KVParameterRequires.requireSameEpoch(request, getRegionEpoch());
        final List<byte[]> keys = KVParameterRequires.requireNonEmpty(request.getKeys(), "multiGet.keys");
        //調用MetricsRawKVStore的multiGet方法
        this.rawKVStore.multiGet(keys, request.isReadOnlySafe(), new BaseKVStoreClosure() {

            @SuppressWarnings("unchecked")
            @Override
            public void run(final Status status) {
                if (status.isOk()) {
                    response.setValue((Map<ByteArray, byte[]>) getData());
                } else {
                    setFailure(request, response, status, getError());
                }
                closure.sendResponse(response);
            }
        });
    } catch (final Throwable t) {
        LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t));
        response.setError(Errors.forException(t));
        closure.sendResponse(response);
    }
}
           

handleMultiGetRequest方法會調用MetricsRawKVStore的multiGet方法來批量擷取資料。

MetricsRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    //執行個體化MetricsKVClosureAdapter對象
    final KVStoreClosure c = metricsAdapter(closure, MULTI_GET, keys.size(), 0);
    //調用RaftRawKVStore的multiGet方法
    this.rawKVStore.multiGet(keys, readOnlySafe, c);
}
           

multiGet方法會傳入一個MetricsKVClosureAdapter執行個體,通過這個執行個體實作異步回調response。然後調用RaftRawKVStore的multiGet方法。

RaftRawKVStore#multiGet

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    if (!readOnlySafe) {
        this.kvStore.multiGet(keys, false, closure);
        return;
    }
    // KV 存儲實作線性一緻讀
    // 調用 readIndex 方法,等待回調執行
    this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {

        @Override
        public void run(final Status status, final long index, final byte[] reqCtx) {
            //如果狀态傳回成功,
            if (status.isOk()) {
                RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
                return;
            }
            //readIndex 讀取失敗嘗試應用鍵值讀操作申請任務于 Leader 節點的狀态機 KVStoreStateMachine
            RaftRawKVStore.this.readIndexExecutor.execute(() -> {
                if (isLeader()) {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
                            status);
                    // If 'read index' read fails, try to applying to the state machine at the leader node
                    applyOperation(KVOperation.createMultiGet(keys), closure);
                } else {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
                    // Client will retry to leader node
                    new KVClosureAdapter(closure, null).run(status);
                }
            });
        }
    });
}
           

multiGet調用node的readIndex方法進行一緻性讀操作,并設定回調,如果傳回成功那麼就直接調用RocksRawKVStore讀取資料,如果傳回不是成功那麼申請任務于 Leader 節點的狀态機 KVStoreStateMachine。

線性一緻性讀readIndex

所謂線性一緻讀,一個簡單的例子是在 t1 的時刻我們寫入了一個值,那麼在 t1 之後,我們一定能讀到這個值,不可能讀到 t1 之前的舊值(想想 Java 中的 volatile 關鍵字,即線性一緻讀就是在分布式系統中實作 Java volatile 語義)。簡而言之是需要在分布式環境中實作 Java volatile 語義效果,即當 Client 向叢集發起寫操作的請求并且獲得成功響應之後,該寫操作的結果要對所有後來的讀請求可見。和 volatile 的差別在于 volatile 是實作線程之間的可見,而 SOFAJRaft 需要實作 Server 之間的可見。

SOFAJRaft提供的線性一緻讀是基于 Raft 協定的 ReadIndex 實作用 ;Node#readIndex(byte [] requestContext, ReadIndexClosure done) 發起線性一緻讀請求,當安全讀取時傳入的 Closure 将被調用,正常情況從狀态機中讀取資料傳回給用戶端。

Node#readIndex

public void readIndex(final byte[] requestContext, final ReadIndexClosure done) {
    if (this.shutdownLatch != null) {
        //異步執行回調
        Utils.runClosureInThread(done, new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
        throw new IllegalStateException("Node is shutting down");
    }
    Requires.requireNonNull(done, "Null closure");
    //EMPTY_BYTES
    this.readOnlyService.addRequest(requestContext, done);
}
           

readIndex會調用ReadOnlyServiceImpl#addRequest将requestContext和回調方法done傳入,requestContext傳入的是BytesUtil.EMPTY_BYTES

接着往下看

ReadOnlyServiceImpl#addRequest

public void addRequest(final byte[] reqCtx, final ReadIndexClosure closure) {
    if (this.shutdownLatch != null) {
        Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Was stopped"));
        throw new IllegalStateException("Service already shutdown.");
    }
    try {
        EventTranslator<ReadIndexEvent> translator = (event, sequence) -> {
            event.done = closure;
            //EMPTY_BYTES
            event.requestContext = new Bytes(reqCtx);
            event.startTime = Utils.monotonicMs();
        };
        int retryTimes = 0;
        while (true) {
            //ReadIndexEventHandler
            if (this.readIndexQueue.tryPublishEvent(translator)) {
                break;
            } else {
                retryTimes++;
                if (retryTimes > MAX_ADD_REQUEST_RETRY_TIMES) {
                    Utils.runClosureInThread(closure,
                        new Status(RaftError.EBUSY, "Node is busy, has too many read-only requests."));
                    this.nodeMetrics.recordTimes("read-index-overload-times", 1);
                    LOG.warn("Node {} ReadOnlyServiceImpl readIndexQueue is overload.", this.node.getNodeId());
                    return;
                }
                ThreadHelper.onSpinWait();
            }
        }
    } catch (final Exception e) {
        Utils.runClosureInThread(closure, new Status(RaftError.EPERM, "Node is down."));
    }
}
           

addRequest方法裡會将傳入的reqCtx和closure封裝成一個時間,傳入到readIndexQueue隊列中,事件釋出成功後會交由ReadIndexEventHandler處理器處理,釋出失敗會進行重試,最多重試3次。

ReadIndexEventHandler

private class ReadIndexEventHandler implements EventHandler<ReadIndexEvent> {
    // task list for batch
    private final List<ReadIndexEvent> events = new ArrayList<>(
                                                  ReadOnlyServiceImpl.this.raftOptions.getApplyBatch());

    @Override
    public void onEvent(final ReadIndexEvent newEvent, final long sequence, final boolean endOfBatch)
                                                                                                     throws Exception {
        if (newEvent.shutdownLatch != null) {
            executeReadIndexEvents(this.events);
            this.events.clear();
            newEvent.shutdownLatch.countDown();
            return;
        }

        this.events.add(newEvent);
        //批量執行
        if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
            executeReadIndexEvents(this.events);
            this.events.clear();
        }
    }
}
           

ReadIndexEventHandler是ReadOnlyServiceImpl裡面的内部類,裡面有一個全局的events集合用來做事件的批處理,如果目前的event已經達到了32個或是整個Disruptor隊列裡最後一個那麼會調用ReadOnlyServiceImpl的executeReadIndexEvents方法進行事件的批處理。

ReadOnlyServiceImpl#executeReadIndexEvents

private void executeReadIndexEvents(final List<ReadIndexEvent> events) {
    if (events.isEmpty()) {
        return;
    }
    //初始化ReadIndexRequest
    final ReadIndexRequest.Builder rb = ReadIndexRequest.newBuilder() //
        .setGroupId(this.node.getGroupId()) //
        .setServerId(this.node.getServerId().toString());

    final List<ReadIndexState> states = new ArrayList<>(events.size());

    for (final ReadIndexEvent event : events) {
        rb.addEntries(ZeroByteStringHelper.wrap(event.requestContext.get()));
        states.add(new ReadIndexState(event.requestContext, event.done, event.startTime));
    }
    final ReadIndexRequest request = rb.build();

    this.node.handleReadIndexRequest(request, new ReadIndexResponseClosure(states, request));
}
           

executeReadIndexEvents封裝好ReadIndexRequest請求和将ReadIndexState集合封裝到ReadIndexResponseClosure中,為後續的操作做裝備

NodeImpl#handleReadIndexRequest

public void handleReadIndexRequest(final ReadIndexRequest request, final RpcResponseClosure<ReadIndexResponse> done) {
    final long startMs = Utils.monotonicMs();
    this.readLock.lock();
    try {
        switch (this.state) {
            case STATE_LEADER:
                readLeader(request, ReadIndexResponse.newBuilder(), done);
                break;
            case STATE_FOLLOWER:
                readFollower(request, done);
                break;
            case STATE_TRANSFERRING:
                done.run(new Status(RaftError.EBUSY, "Is transferring leadership."));
                break;
            default:
                done.run(new Status(RaftError.EPERM, "Invalid state for readIndex: %s.", this.state));
                break;
        }
    } finally {
        this.readLock.unlock();
        this.metrics.recordLatency("handle-read-index", Utils.monotonicMs() - startMs);
        this.metrics.recordSize("handle-read-index-entries", request.getEntriesCount());
    }
}
           

因為線性一緻讀在任何叢集内的節點發起,并不需要強制要求放到 Leader 節點上,允許在 Follower 節點執行,是以大大降低 Leader 的讀取壓力。

當在Follower節點執行一緻性讀的時候實際上Follower 節點調用 RpcService#readIndex(leaderId.getEndpoint(), newRequest, -1, closure) 方法向 Leader 發送 ReadIndex 請求,交由Leader節點實作一緻性讀。是以我這裡主要介紹Leader的一緻性讀。

繼續往下走調用NodeImpl的readLeader方法

NodeImpl#readLeader

private void readLeader(final ReadIndexRequest request, final ReadIndexResponse.Builder respBuilder,
                        final RpcResponseClosure<ReadIndexResponse> closure) {
    //1. 擷取叢集節點中多數選票數是多少
    final int quorum = getQuorum();
    if (quorum <= 1) {
        // Only one peer, fast path.
        //如果叢集中隻有一個節點,那麼直接調用回調函數,傳回成功
        respBuilder.setSuccess(true) //
                .setIndex(this.ballotBox.getLastCommittedIndex());
        closure.setResponse(respBuilder.build());
        closure.run(Status.OK());
        return;
    }

    final long lastCommittedIndex = this.ballotBox.getLastCommittedIndex();
    //2. 任期必須相等
    //日志管理器 LogManager 基于投票箱 BallotBox 的 lastCommittedIndex 擷取任期檢查是否等于目前任期
    // 如果不等于目前任期表示此 Leader 節點未在其任期内送出任何日志,需要拒絕隻讀請求;
    if (this.logManager.getTerm(lastCommittedIndex) != this.currTerm) {
        // Reject read only request when this leader has not committed any log entry at its term
        closure
                .run(new Status(
                        RaftError.EAGAIN,
                        "ReadIndex request rejected because leader has not committed any log entry at its term, " +
                         "logIndex=%d, currTerm=%d.",
                        lastCommittedIndex, this.currTerm));
        return;
    }
    respBuilder.setIndex(lastCommittedIndex);

    if (request.getPeerId() != null) {
        // request from follower, check if the follower is in current conf.
        final PeerId peer = new PeerId();
        peer.parse(request.getServerId());
        //3. 來自 Follower 的請求需要檢查 Follower 是否在目前配置
        if (!this.conf.contains(peer)) {
            closure
                    .run(new Status(RaftError.EPERM, "Peer %s is not in current configuration: {}.", peer,
                     this.conf));
            return;
        }
    }

    ReadOnlyOption readOnlyOpt = this.raftOptions.getReadOnlyOptions();
    //4. 如果使用的是ReadOnlyLeaseBased,确認leader是否是在在租約有效時間内
    if (readOnlyOpt == ReadOnlyOption.ReadOnlyLeaseBased && !isLeaderLeaseValid()) {
        // If leader lease timeout, we must change option to ReadOnlySafe
        readOnlyOpt = ReadOnlyOption.ReadOnlySafe;
    }

    switch (readOnlyOpt) {
        //5
        case ReadOnlySafe:
            final List<PeerId> peers = this.conf.getConf().getPeers();
            Requires.requireTrue(peers != null && !peers.isEmpty(), "Empty peers");
            //設定心跳的響應回調函數
            final ReadIndexHeartbeatResponseClosure heartbeatDone = new ReadIndexHeartbeatResponseClosure(closure,
                    respBuilder, quorum, peers.size());
            // Send heartbeat requests to followers
            //向 Followers 節點發起一輪 Heartbeat,如果半數以上節點傳回對應的
            // Heartbeat Response,那麼 Leader就能夠确定現在自己仍然是 Leader
            for (final PeerId peer : peers) {
                if (peer.equals(this.serverId)) {
                    continue;
                }
                this.replicatorGroup.sendHeartbeat(peer, heartbeatDone);
            }
            break;
        //6. 因為在租約期内不會發生選舉,確定 Leader 不會變化
        //是以直接傳回回調結果
        case ReadOnlyLeaseBased:
            // Responses to followers and local node.
            respBuilder.setSuccess(true);
            closure.setResponse(respBuilder.build());
            closure.run(Status.OK());
            break;
    }
}
           
  1. 擷取叢集節點中多數選票數是多少,即叢集節點的1/2+1,如果目前的叢集裡隻有一個節點,那麼直接傳回成功,并調用回調方法
  2. 校驗 Raft 叢集節點數量以及 lastCommittedIndex 所屬任期符合預期,那麼響應構造器設定其索引為投票箱 BallotBox 的 lastCommittedIndex
  3. 來自 Follower 的請求需要檢查 Follower 是否在目前配置,如果不在目前配置中直接調用回調方法設定異常
  4. 擷取 ReadIndex 請求級别 ReadOnlyOption 配置,ReadOnlyOption 參數預設值為 ReadOnlySafe。如果設定的是ReadOnlyLeaseBased,那麼會調用isLeaderLeaseValid檢查leader是否是在在租約有效時間内
  5. 配置為ReadOnlySafe 調用 Replicator#sendHeartbeat(rid, closure) 方法向 Followers 節點發送 Heartbeat 心跳請求,發送心跳成功執行 ReadIndexHeartbeatResponseClosure 心跳響應回調;ReadIndex 心跳響應回調檢查是否超過半數節點包括 Leader 節點自身投票贊成,半數以上節點傳回用戶端Heartbeat 請求成功響應,即 applyIndex 超過 ReadIndex 說明已經同步到 ReadIndex 對應的 Log 能夠提供 Linearizable Read
  6. 配置為ReadOnlyLeaseBased,因為Leader 租約有效期間認為目前 Leader 是 Raft Group 内的唯一有效 Leader,是以忽略 ReadIndex 發送 Heartbeat 确認身份步驟,直接傳回 Follower 節點和本地節點 Read 請求成功響應。Leader 節點繼續等待狀态機執行,直到 applyIndex 超過 ReadIndex 安全提供 Linearizable Read

無論是ReadOnlySafe還是ReadOnlyLeaseBased,最後發送成功響應都會調用ReadIndexResponseClosure的run方法。

ReadIndexResponseClosure#run

public void run(final Status status) {
    //fail
    //傳入的狀态不是ok,響應失敗
    if (!status.isOk()) {
        notifyFail(status);
        return;
    }
    final ReadIndexResponse readIndexResponse = getResponse();
    //Fail
    //response沒有響應成功,響應失敗
    if (!readIndexResponse.getSuccess()) {
        notifyFail(new Status(-1, "Fail to run ReadIndex task, maybe the leader stepped down."));
        return;
    }
    // Success
    //一緻性讀成功
    final ReadIndexStatus readIndexStatus = new ReadIndexStatus(this.states, this.request,
        readIndexResponse.getIndex());
    for (final ReadIndexState state : this.states) {
        // Records current commit log index.
        //設定目前送出的index
        state.setIndex(readIndexResponse.getIndex());
    }

    boolean doUnlock = true;
    ReadOnlyServiceImpl.this.lock.lock();
    try {
        //校驗applyIndex 是否超過 ReadIndex
        if (readIndexStatus.isApplied(ReadOnlyServiceImpl.this.fsmCaller.getLastAppliedIndex())) {
            // Already applied, notify readIndex request.
            ReadOnlyServiceImpl.this.lock.unlock();
            doUnlock = false;
            //已經同步到 ReadIndex 對應的 Log 能夠提供 Linearizable Read
            notifySuccess(readIndexStatus);
        } else {
            // Not applied, add it to pending-notify cache.
            ReadOnlyServiceImpl.this.pendingNotifyStatus
                .computeIfAbsent(readIndexStatus.getIndex(), k -> new ArrayList<>(10)) //
                .add(readIndexStatus);
        }
    } finally {
        if (doUnlock) {
            ReadOnlyServiceImpl.this.lock.unlock();
        }
    }
}
           

Run方法首先會校驗一下是否需要響應失敗,如果響應成功,那麼會将所有封裝的ReadIndexState更新一下index,然後校驗一下applyIndex 是否超過 ReadIndex,超過了ReadIndex代表所有已經複制到多數派上的 Log(可視為寫操作)被視為安全的 Log,該 Log 所展現的資料就能對用戶端 Client 可見。

ReadOnlyServiceImpl#notifySuccess

private void notifySuccess(final ReadIndexStatus status) {
    final long nowMs = Utils.monotonicMs();
    final List<ReadIndexState> states = status.getStates();
    final int taskCount = states.size();
    for (int i = 0; i < taskCount; i++) {
        final ReadIndexState task = states.get(i);
        final ReadIndexClosure done = task.getDone(); // stack copy
        if (done != null) {
            this.nodeMetrics.recordLatency("read-index", nowMs - task.getStartTimeMs());
            done.setResult(task.getIndex(), task.getRequestContext().get());
            done.run(Status.OK());
        }
    }
}
           

如果是響應成功,那麼會調用notifySuccess方法,會将status裡封裝的ReadIndexState集合周遊一遍,調用當中的run方法。

這個run方法會調用到我們在multiGet中設定的run方法中

public void multiGet(final List<byte[]> keys, final boolean readOnlySafe, final KVStoreClosure closure) {
    if (!readOnlySafe) {
        this.kvStore.multiGet(keys, false, closure);
        return;
    }
    // KV 存儲實作線性一緻讀
    // 調用 readIndex 方法,等待回調執行
    this.node.readIndex(BytesUtil.EMPTY_BYTES, new ReadIndexClosure() {

        @Override
        public void run(final Status status, final long index, final byte[] reqCtx) {
            //如果狀态傳回成功,
            if (status.isOk()) {
                RaftRawKVStore.this.kvStore.multiGet(keys, true, closure);
                return;
            }
            //readIndex 讀取失敗嘗試應用鍵值讀操作申請任務于 Leader 節點的狀态機 KVStoreStateMachine
            RaftRawKVStore.this.readIndexExecutor.execute(() -> {
                if (isLeader()) {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}, try to applying to the state machine.",
                            status);
                    // If 'read index' read fails, try to applying to the state machine at the leader node
                    applyOperation(KVOperation.createMultiGet(keys), closure);
                } else {
                    LOG.warn("Fail to [multiGet] with 'ReadIndex': {}.", status);
                    // Client will retry to leader node
                    new KVClosureAdapter(closure, null).run(status);
                }
            });
        }
    });
           

這個run方法會調用RaftRawKVStore的multiGet從RocksDB中直接擷取資料。

總結

我們這篇文章從RheaKVStore的用戶端get方法一直講到,RheaKVStore服務端使用JRaft實作線性一緻性讀,并講解了線性一緻性讀是怎麼實作的,通過這個例子大家應該對線性一緻性讀有了一個相對不錯的了解了。