天天看點

Heron CLI中update指令實作背後的源碼解析概述ApiServerRuntimeManagerMainAuroraScheduler結束語

概述

最近回顧了一下之前針對Heron的Blog内容,發現大部分都是在對Heron進行使用和實驗部分的内容。對其理論方面的分析比較少。而Heron作為很新的一代流式計算平台,在目前國内Flink的熱潮中,還是顯得很冷清。

從Spark streaming的模拟流,到storm,flink實作真正的流處理,再到Twitter自我革命開源Heron。作為在國内較早接觸Heron的一撥人(Heron2015年開源,自己2017年開始進行了實驗環境的搭建和相關的研究,當時國内連環境搭建的文章都查不到,目前除了自己的好像也沒見到,如果有人見到可以告訴我一下),也開始發現除了之前Heron的官方論文和文章,也多多少少多了一些關注。從在國内找不到資料,到目前CSDN中出現了一些文章,自己也成了一個見證者。是以後面計劃就理論和源碼層面對Heron寫一些文章,來對這個開源系統多多少少做一些能做的事情。下面就開始這次的這部分内容吧。

Heron CLI(command line interface)中提供了如下可用的Topology管理指令,在scheduler相關的源碼下可以看到相關的枚舉對象。

/***
 * This enum defines commands invoked from heron client
 */
public enum Command {
    // TODO(mfu): Move ACTIVATE & DEACTIVATE out? They are non-related to Scheduling
    SUBMIT,
    KILL,
    ACTIVATE,
    DEACTIVATE,
    UPDATE,
    RESTART;

    public static Command makeCommand(String commandString) {
        return Command.valueOf(commandString.toUpperCase());
    }
}           

Heron CLI的update指令允許使用者在Topology運作的過程中動态的修改component的并行度(parallelism),使用的指令格式如下:

heron update local/ads/PROD my-topology 
    --component-parallelism=my-spout:2 
    --component-parallelism=my-bolt:4
           

具體的update指令的使用方式,可以通過Heron的文檔進行了解。這裡對其中update指令背後的實作進行拆解和分析,來看看是否能為我們提供一些其他的思路。

ApiServer

在Heron中,ApiServer負責處理使用者從Heron CLI中送出的所有指令,其中當然包括了update的指令。是以我們從這裡的源碼開始。

package com.twitter.heron.apiserver.actions;

public class TopologyRuntimeAction implements Action {
    private final Config config;
    private final Command command;

    TopologyRuntimeAction(Config config, Command command) {
        this.config = config;
        this.command = command;
    }

    @Override
    public void execute() {
        final RuntimeManagerMain runtimeManagerMain = new RuntimeManagerMain(config, command);
        runtimeManagerMain.manageTopology();
    }
}           

可以看到在ApiServer中,構造函數中根據使用者傳入的指令建構了command對象(這裡省略了指令的解析和該類對象的調用過程)。然後在執行的execute方法中,根據Config對象(該對象就是使用者在Topology指定參數并啟動時的配置對象,封裝了topology送出時的所有靜态配置資訊)和command對象建構了RuntimeManagerMain類的對象并調用了manageTopology()方法。該類是在運作時管理拓撲的核心處理類,也就是在topology運作過程中,所有對topology的管理和動态的調整均是由該類中的方法進行處理的。下面我們開看一下該類中調用的manageTopology方法。

RuntimeManagerMain

上面在action類中建構該類對象的時候調用了構造方法,該構造方法隻是對config和command對象進行了初始化,沒有額外的操作:

public RuntimeManagerMain(Config config, Command command) {
        // initialize the options
        this.config = config;
        this.command = command;
    }           

下面來看一下manageTopology方法:

/**
     * Manager a topology
     * 1. Instantiate necessary resources
     * 2. Valid whether the runtime management is legal
     * 3. Complete the runtime management for a specific command
     */
    public void manageTopology()
            throws TopologyRuntimeManagementException, TMasterException, PackingException {
        String topologyName = Context.topologyName(config);
        // 1. Do prepare work
        // create an instance of state manager
        String statemgrClass = Context.stateManagerClass(config);
        IStateManager statemgr;
        try {
            statemgr = ReflectionUtils.newInstance(statemgrClass);
        } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
            throw new TopologyRuntimeManagementException(String.format(
                    "Failed to instantiate state manager class '%s'",
                    statemgrClass), e);
        }

        // Put it in a try block so that we can always clean resources
        try {
            // initialize the statemgr
            statemgr.initialize(config);

            // TODO(mfu): timeout should read from config
            SchedulerStateManagerAdaptor adaptor = new SchedulerStateManagerAdaptor(statemgr, 5000);

            validateRuntimeManage(adaptor, topologyName);

            // 2. Try to manage topology if valid
            // invoke the appropriate command to manage the topology
            LOG.log(Level.FINE, "Topology: {0} to be {1}ed", new Object[]{topologyName, command});

            // build the runtime config
            Config runtime = Config.newBuilder()
                    .put(Key.TOPOLOGY_NAME, Context.topologyName(config))
                    .put(Key.SCHEDULER_STATE_MANAGER_ADAPTOR, adaptor)
                    .build();

            // Create a ISchedulerClient basing on the config
            ISchedulerClient schedulerClient = getSchedulerClient(runtime);

            callRuntimeManagerRunner(runtime, schedulerClient);
        } finally {
            // 3. Do post work basing on the result
            // Currently nothing to do here

            // 4. Close the resources
            SysUtils.closeIgnoringExceptions(statemgr);
        }
    }           

可以從注釋中看到這裡包含了幾個主要的步驟:

1. 準備工作。包括擷取topologyname,根據config配置對象擷取statemgrClass類名。然後使用反射,根據statemgrClass擷取statemgr對象。該對象就是對Heron狀态中繼資料進行保持并維護的對象,這裡對應的就是Zookeeper。是以後面使用config對象對statemgr進行了初始化并連接配接(逾時時間為5000ms)。最後開始根據topologyname驗證目前拓撲是否處于合法的狀态,能夠進行update操作。

2. 根據指令進行topology的管理。首先這裡建立了一個Config類的runtime對象,這個對象為在運作時的動态配置對象(相對于config對象,runtime中的配置項可以改變)。後面可以看到之後的runtime和config對象,均是從這裡傳入并貫穿整個過程的。

runtime建立完成後,調用了如下getSchedulerClient()的方法,來擷取了schedulerClient對象。為什麼需要擷取scheduler對象呢?如果看過Heron中scheduler的代碼可以知道,每個scheduler執行對topology進行update的具體過程都不相同,是以需要調用具體的scheduer中update方法以完成update指令。是以需要建構scheduelr對象。

protected ISchedulerClient getSchedulerClient(Config runtime)
            throws SchedulerException {
        return new SchedulerClientFactory(config, runtime).getSchedulerClient();
    }           

可以看到這裡使用了工廠模式來建構scheduler對象。工廠中的getSchedulerClient方法如下:

/**
     * Implementation of getSchedulerClient - Used to create objects
     * Currently it creates either HttpServiceSchedulerClient or LibrarySchedulerClient
     *
     * @return getSchedulerClient created. return null if failed to create ISchedulerClient instance
     */
    public ISchedulerClient getSchedulerClient() throws SchedulerException {
        LOG.fine("Creating scheduler client");
        ISchedulerClient schedulerClient;

        if (Context.schedulerService(config)) {
            // get the instance of the state manager
            SchedulerStateManagerAdaptor statemgr = Runtime.schedulerStateManagerAdaptor(runtime);

            Scheduler.SchedulerLocation schedulerLocation =
                    statemgr.getSchedulerLocation(Runtime.topologyName(runtime));

            if (schedulerLocation == null) {
                throw new SchedulerException("Failed to get scheduler location from state manager");
            }

            LOG.log(Level.FINE, "Scheduler is listening on location: {0} ", schedulerLocation.toString());

            schedulerClient =
                    new HttpServiceSchedulerClient(config, runtime, schedulerLocation.getHttpEndpoint());
        } else {
            // create an instance of scheduler
            final IScheduler scheduler = LauncherUtils.getInstance()
                    .getSchedulerInstance(config, runtime);

            LOG.fine("Invoke scheduler as a library");
            schedulerClient = new LibrarySchedulerClient(config, runtime, scheduler);
        }

        return schedulerClient;
    }           

可以清楚的看到,過程根據if條件分為了兩部分。那麼這個if語句中是對config中的哪個配置項進行驗證呢?其實就是在scheduler.yaml檔案中的如下配置項:

heron.scheduler.is.service: false           

該值預設為false,也就是不會将scheduler作為service進行啟動。如果改為true,heron則會将scheduler作為一項http服務保持運作,在用到scheduler時,隻需要使用service的URL來調用即可。是以這也是if語句成立中的基本過程。而我們保持了預設false,那麼scheduler在送出拓撲之後執行個體就會被釋放,是以這裡需要根據配置檔案重新建立一個scheduler對象來在後面調用其中的udpate方法。LauncherUtils.getInstance().getSchedulerInstance(config, runtime)的具體過程如下。根據類名反射建立指定的scheduler執行個體對象,并初始化即可。很好了解。

/**
     * Creates and initializes scheduler instance
     *
     * @return initialized scheduler instances
     */
    public IScheduler getSchedulerInstance(Config config, Config runtime)
            throws SchedulerException {
        String schedulerClass = Context.schedulerClass(config);
        IScheduler scheduler;
        try {
            // create an instance of scheduler
            scheduler = ReflectionUtils.newInstance(schedulerClass);
        } catch (IllegalAccessException | InstantiationException | ClassNotFoundException e) {
            throw new SchedulerException(String.format("Failed to instantiate scheduler using class '%s'",
                    schedulerClass));
        }

        scheduler.initialize(config, runtime);
        return scheduler;
    }           

該部分過程的最後一個步驟是使用scheduler對象,建立了一個LibrarySchedulerClient對象,該對象正是在後面使用scheduler來調用scheduler.onUpdate方法的地方。該類中包含了scheduler中對應的幾個方法調用過程:

@Override
    public boolean restartTopology(Scheduler.RestartTopologyRequest restartTopologyRequest) {
        // ...
    }

    @Override
    public boolean killTopology(Scheduler.KillTopologyRequest killTopologyRequest) {
        // ...
    }

    @Override
    public boolean updateTopology(Scheduler.UpdateTopologyRequest updateTopologyRequest) {
        boolean ret = false;
        try {
            scheduler.initialize(config, runtime);
            ret = scheduler.onUpdate(updateTopologyRequest);
        } finally {
            SysUtils.closeIgnoringExceptions(scheduler);
        }
        return ret;
    }           

之後的過程,則是調用了callRuntimeManagerRunner(runtime, schedulerClient); 方法:

protected void callRuntimeManagerRunner(Config runtime, ISchedulerClient schedulerClient)
            throws TopologyRuntimeManagementException, TMasterException, PackingException {
        // create an instance of the runner class
        RuntimeManagerRunner runtimeManagerRunner =
                new RuntimeManagerRunner(config, runtime, command, schedulerClient);

        // invoke the appropriate handlers based on command
        runtimeManagerRunner.call();
    }           

該方法主要是建立了RuntimeManagerRunner對象來進行具體哪一個的command操作。(上面的分析中ApiServer隻是建構了command對象,但如何對應的調用update方法還沒有出現。正是這裡。)

public void call()
            throws TMasterException, TopologyRuntimeManagementException,
            PackingException, UpdateDryRunResponse {
        // execute the appropriate command
        String topologyName = Context.topologyName(config);
        switch (command) {
            case ACTIVATE: ...
            case DEACTIVATE: ...
            case RESTART: ...
            case KILL: ...
            case UPDATE: updateTopologyHandler(topologyName, config.getStringValue(NEW_COMPONENT_PARALLELISM_KEY));
                break;
            default:
                LOG.severe("Unknown command for topology: " + command);
        }
    }           

這裡主要關注update對應的過程:

@VisibleForTesting
    void updateTopologyHandler(String topologyName, String newParallelism)
            throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse {
        // 1. 擷取topo基本資訊
        LOG.fine(String.format("updateTopologyHandler called for %s with %s",
                topologyName, newParallelism));
        SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime);
        TopologyAPI.Topology topology = manager.getTopology(topologyName);
        Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism);
        PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName);

        // 2. 變化檢查并建構新packingplan
        if (!changeDetected(currentPlan, changeRequests)) {
            throw new TopologyRuntimeManagementException(
                    String.format("The component parallelism request (%s) is the same as the "
                            + "current topology parallelism. Not taking action.", newParallelism));
        }

        PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests,
                topology);

        // 3. dryRun模式
        if (Context.dryRun(config)) {
            PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer();
            PackingPlan oldPlan = deserializer.fromProto(currentPlan);
            PackingPlan newPlan = deserializer.fromProto(proposedPlan);
            throw new UpdateDryRunResponse(topology, config, newPlan, oldPlan, changeRequests);
        }

        // 4. 建構updateRequest并進行topo更新操作
        Scheduler.UpdateTopologyRequest updateTopologyRequest =
                Scheduler.UpdateTopologyRequest.newBuilder()
                        .setCurrentPackingPlan(currentPlan)
                        .setProposedPackingPlan(proposedPlan)
                        .build();

        LOG.fine("Sending Updating topology request: " + updateTopologyRequest);
        if (!schedulerClient.updateTopology(updateTopologyRequest)) {
            throw new TopologyRuntimeManagementException(String.format(
                    "Failed to update topology with Scheduler, updateTopologyRequest="
                            + updateTopologyRequest));
        }

        // Clean the connection when we are done.
        LOG.fine("Scheduler updated topology successfully.");
    }           

上面在注釋中對該過程進行了劃分。可以看到其中的主要過程如下:

  1. 擷取topo的基本資訊。包括從runtime中取回adapter(該對象為zookeer的連接配接對象,儲存在了runtime中,這樣在連接配接實效前都可以使用該對象連接配接zookeeper而不用重建連接配接,以提高實時性)。根據topologyName擷取完整的topo資訊。擷取update指令中指定的parallelism的更新資訊,以及擷取目前的packingplan。
  2. 驗證packingplan中的元件并行度是否發生了變化。
  3. 根據指定更新的parallelism建構新的packingplan(proposedPackingPlan)
  4. 使用新的packingplan來建立updateTopologyRequest對象。
  5. 最後調用schedulerClient.updateTopology()方法,傳入updateTopologyRequest參數,來實施具體的更新過程。
該過程很重要(for me),後面會在其他文章中進行解釋。

到這裡我們代碼是不是就回到了上面分析過的LibrarySchedulerClient中的updateTopology方法了,如下(和上面的一樣):

@Override
    public boolean updateTopology(Scheduler.UpdateTopologyRequest updateTopologyRequest) {
        boolean ret = false;
        try {
            scheduler.initialize(config, runtime);
            ret = scheduler.onUpdate(updateTopologyRequest);
        } finally {
            SysUtils.closeIgnoringExceptions(scheduler);
        }
        return ret;
    }           

也就是具體執行一個scheduler中的onUpdate方法,對應于我目前的狀況就是AuroraScheduler中的onUpdate方法。這裡先放一下,我們後面再看。

還記得這是上面整體步驟的第2部分嗎?【手動捂臉】。下面先看一下剩下的兩個收尾步驟。

3. 做一些response的工作。根據代碼,目前這部分還沒有任何過程。

4. 結束部分:就是關閉各種資源。其實就是statemgr,zookeeper的連接配接資源。

AuroraScheduler

到這裡,我們上面整體分析了所有的過程,但具體到某個scheduler中的onUpdate方法還沒有進行分析。現在就具體進行這個過程。AuroraScheduler中的onUpdate方法如下:

@Override
    public boolean onUpdate(Scheduler.UpdateTopologyRequest request) {
        try {
            updateTopologyManager.updateTopology(
                    request.getCurrentPackingPlan(), request.getProposedPackingPlan());
        } catch (ExecutionException | InterruptedException e) {
            LOG.log(Level.SEVERE, "Could not update topology for request: " + request, e);
            return false;
        }
        return true;
    }           

可以看到這裡就是調用了updateTopologyManager.updateTopology()方法,并傳入了目前的currentPackingPlan和新的proposedPackingPlan。updateTopologyManager的建立在scheduler的initialize方法中:

@Override
    public void initialize(Config mConfig, Config mRuntime) {
        this.config = Config.toClusterMode(mConfig);
        this.runtime = mRuntime;
        try {
            this.controller = getController();
        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
            LOG.severe("AuroraController initialization failed " + e.getMessage());
        }
        this.updateTopologyManager =
                new UpdateTopologyManager(config, runtime, Optional.<IScalable>of(this));
    }           

下面就具體來看一下updateTopologyManager.updateTopology(currentPackingPlan, proposedPackingPlan)的過程:

/**
     * Scales the topology out or in based on the proposedPackingPlan
     *
     * @param existingProtoPackingPlan the current plan. If this isn't what's found in the state
     *                                 manager, the update will fail
     * @param proposedProtoPackingPlan packing plan to change the topology to
     */
    public void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPlan,
                               final PackingPlans.PackingPlan proposedProtoPackingPlan)
            throws ExecutionException, InterruptedException, ConcurrentModificationException {
        String topologyName = Runtime.topologyName(runtime);
        SchedulerStateManagerAdaptor stateManager = Runtime.schedulerStateManagerAdaptor(runtime);
        Lock lock = stateManager.getLock(topologyName, IStateManager.LockName.UPDATE_TOPOLOGY);

        if (lock.tryLock(5, TimeUnit.SECONDS)) {
            try {
                PackingPlans.PackingPlan foundPackingPlan = getPackingPlan(stateManager, topologyName);

                if (!deserializer.fromProto(existingProtoPackingPlan)
                        .equals(deserializer.fromProto(foundPackingPlan))) {
                    throw new ConcurrentModificationException(String.format(
                            "The packing plan in state manager is not the same as the submitted existing "
                                    + "packing plan for topology %s. Another actor has changed it and has likely"
                                    + "performed an update on it. Failing this request, try again once other "
                                    + "update is complete", topologyName));
                }

                updateTopology(existingProtoPackingPlan, proposedProtoPackingPlan, stateManager);
            } finally {
                lock.unlock();
            }
        } else {
            throw new ConcurrentModificationException(String.format(
                    "The update lock can not be obtained for topology %s. Another actor is performing an "
                            + "update on it. Failing this request, try again once current update is complete",
                    topologyName));
        }
    }           

從上面的過程可以明顯看到了分布式鎖,Heron中使用了Zookeer實作的分布式鎖,來保證這種更新過程的分布式一緻性。具體分析一下這部分過程:

1. 擷取topo的基本資訊。同樣還是包括topologyName,stateManager對象。

2. 擷取分布式鎖并進行更新操作。該過程中擷取的鎖key對應于topologyName,鎖名為指定的常量updateTopology。然後加鎖過程為5s,鎖中進行同步的過程如下:

  • (1)使用stateManager,根據topologyName從zookeeper中擷取目前之前儲存的packingPlan資訊。然後通過反序列化之後與方法中傳入的currentPackingPlan進行比較,看是否一緻。這裡其實是一個校驗的過程,防止待更新的packingPlan和zookeeper中維護的packingPlan資料不一緻的問題。
  • (2)調用同名的重載方法執行具體的更新過程。
  • (3)釋放鎖。

下面就來看一下調用的同名重載方法的内容:

private void updateTopology(final PackingPlans.PackingPlan existingProtoPackingPlan,
                                final PackingPlans.PackingPlan proposedProtoPackingPlan,
                                SchedulerStateManagerAdaptor stateManager)
            throws ExecutionException, InterruptedException {
        // 1. 擷取反序列化後的topo基本資訊
        String topologyName = Runtime.topologyName(runtime);
        PackingPlan existingPackingPlan = deserializer.fromProto(existingProtoPackingPlan);
        PackingPlan proposedPackingPlan = deserializer.fromProto(proposedProtoPackingPlan);

        Preconditions.checkArgument(proposedPackingPlan.getContainers().size() > 0, String.format(
                "proposed packing plan must have at least 1 container %s", proposedPackingPlan));
        
        // 2. 驗證并計算container的update數量
        ContainerDelta containerDelta = new ContainerDelta(
                existingPackingPlan.getContainers(), proposedPackingPlan.getContainers());
        int newContainerCount = containerDelta.getContainersToAdd().size();
        int removableContainerCount = containerDelta.getContainersToRemove().size();

        String message = String.format("Topology change requires %s new containers and removing %s "
                        + "existing containers, but the scheduler does not support scaling, aborting. "
                        + "Existing packing plan: %s, proposed packing plan: %s",
                newContainerCount, removableContainerCount, existingPackingPlan, proposedPackingPlan);
        Preconditions.checkState(newContainerCount + removableContainerCount == 0
                || scalableScheduler.isPresent(), message);

        // 3. 驗證目前topo的狀态是否合法
        TopologyAPI.Topology topology = getTopology(stateManager, topologyName);
        boolean initiallyRunning = topology.getState() == TopologyAPI.TopologyState.RUNNING;

        // 4. 開始更新操作
        // (1)暫定topo運作
        // deactivate and sleep
        if (initiallyRunning) {
            // Update the topology since the state should have changed from RUNNING to PAUSED
            // Will throw exceptions internally if tmaster fails to deactivate
            deactivateTopology(stateManager, topology, proposedPackingPlan);
        }

        // (2)調用目前scheduler的addContainer或者removeAll方法來更新topo
        Set<PackingPlan.ContainerPlan> updatedContainers =
                new HashSet<>(proposedPackingPlan.getContainers());
        // request new resources if necessary. Once containers are allocated we should make the changes
        // to state manager quickly, otherwise the scheduler might penalize for thrashing on start-up
        if (newContainerCount > 0 && scalableScheduler.isPresent()) {
            Set<PackingPlan.ContainerPlan> containersToAdd = containerDelta.getContainersToAdd();
            Set<PackingPlan.ContainerPlan> containersAdded =
                    scalableScheduler.get().addContainers(containersToAdd);
            // Update the PackingPlan with new container-ids
            if (containersAdded != null) {
                updatedContainers.removeAll(containersToAdd);
                updatedContainers.addAll(containersAdded);
            }
        }

        // (3)根據更新後的container數量,重新建構一個新的packingPlan對象(updatedPackingPlan)
        PackingPlan updatedPackingPlan =
                new PackingPlan(proposedPackingPlan.getId(), updatedContainers);
        PackingPlanProtoSerializer serializer = new PackingPlanProtoSerializer();
        PackingPlans.PackingPlan updatedProtoPackingPlan = serializer.toProto(updatedPackingPlan);
        LOG.fine("The updated Packing Plan: " + updatedProtoPackingPlan);

        // (4)序列化後更新stateManager中的packingplan資料
        // update packing plan to trigger the scaling event
        logInfo("Update new PackingPlan: %s",
                stateManager.updatePackingPlan(updatedProtoPackingPlan, topologyName));

        // 5. 重新激活拓撲
        // reactivate topology
        if (initiallyRunning) {
            // wait before reactivating to give the tmaster a chance to receive the packing update and
            // delete the packing plan. Instead we could message tmaster to invalidate the physical plan
            // and/or possibly even update the packing plan directly
            SysUtils.sleep(Duration.ofSeconds(10));
            // Will throw exceptions internally if tmaster fails to deactivate
            reactivateTopology(stateManager, topology, removableContainerCount);
        }

        if (removableContainerCount > 0 && scalableScheduler.isPresent()) {
            scalableScheduler.get().removeContainers(containerDelta.getContainersToRemove());
        }
    }           

具體的過程分解在代碼中進行了标注。在5步驟中,可以看到在執行update操作後,拓撲從deactivate到reactivate過程中,會睡眠10s的時間,以為tmaster留出時間來更新packingplan和physicalplan的資料資訊。

結束語

至此,我們分析完了Heron CLI中update指令背後的實作過程。其中可能有些細節未涉及到,請參考源碼,自己後續還會進行一些補充。感謝您的閱讀,如有錯誤之處,歡迎指正。

對一項技術的掌握,熟練使用應該是第一個層面,而從底層出發來看一項功能,一個元件的具體實作時,則會對該技術掌握的更加清晰。那麼什麼是底層,源碼和架構是我認為的學習的兩個角度。同時,開源項目的源碼有很多可以學習和深究的内容。例如,上面的使用zookeeper實作分布式鎖的實作部分,就是一個很好的學習機會。後面有機會也會對這方面的内容進行拆解和分析。而這次的update指令實作解析,也是為了之後的内容做了一個基礎,一個鋪墊。

部落客碼字不易,轉載請注明出處。