天天看點

Redis分布式鎖正确打開方式

作者:高可用架構

01為什麼要有分布式鎖

  • JUC提供的鎖機制,可以保證在同一個JVM程序中同一時刻隻有一個線程執行操作邏輯;
  • 多服務多節點的情況下,就意味着有多個JVM程序,要做到這樣,就需要有一個中間人;
  • 分布式鎖就是用來保證在同一時刻,僅有一個JVM程序中的一個線程在執行操作邏輯;

JUC的鎖和分布式鎖都是一種保護系統資源的措施。盡可能将并發帶來的不确定性轉換為同步的确定性;

02分布式鎖特性

特性1:互斥性。在任意時刻,隻有一個用戶端能持有鎖。

特性2:不會發生死鎖。即使有一個用戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證後續其他用戶端能加鎖。

特性3:解鈴還須系鈴人。加鎖和解鎖必須是同一個用戶端(線程),用戶端自己不能解别人加的鎖。

特性4:可重入性。同一個現線程已經擷取到鎖,可再次擷取到鎖。

特性5:具有容錯性。隻要大部分的分布式鎖節點正常運作,用戶端就可以加鎖和解鎖。

常見分布式鎖的三種實作方式:

1. 資料庫鎖;2. 基于ZooKeeper的分布式鎖;3. 基于Redis的分布式鎖。

03特性——不會發生死鎖

很多線程去上鎖,誰鎖成功誰就有權利執行操作邏輯,其他線程要麼直接走搶鎖失敗的邏輯,要麼自旋嘗試搶鎖;比方說A線程競争到了鎖,開始執行操作邏輯(代碼邏輯示範中,使用 Jedis用戶端為例);

public static void doSomething() {

// RedisLock是封裝好的一個類

RedisLock redisLock = new RedisLock(jedis); // 建立jedis執行個體的代碼省略,不是重點

try {

redisLock.lock(); // 上鎖

// 處理業務

System.out.println(Thread.currentThread().getName() + " 線程處理業務邏輯中...");

Thread.sleep(2000);

System.out.println(Thread.currentThread().getName() + " 線程處理業務邏輯完畢");

redisLock.unlock(); // 釋放鎖

} catch (Exception e) {

e.printStackTrace();

}

}

正常情況下,A 線程執行完操作邏輯後,應該将鎖釋放。如果說執行過程中抛出異常,程式不再繼續走正常的釋放鎖流程,沒有釋放鎖怎麼辦?是以想到:釋放鎖的流程一定要在 finally{} 塊中執行,當然,上鎖的流程一定要在 finally{} 對應的 try{} 塊中,否則 finally{} 就沒用了,如下:

public static void doSomething() {

RedisLock redisLock = new RedisLock(jedis); // 建立jedis執行個體的代碼省略,不是重點

try {

redisLock.lock(); // 上鎖,必須在 try{}中

// 處理業務

System.out.println(Thread.currentThread().getName() + " 線程處理業務邏輯中...");

Thread.sleep(2000);

System.out.println(Thread.currentThread().getName() + " 線程處理業務邏輯完畢");

} catch (Exception e) {

e.printStackTrace();

} finally {

redisLock.unlock(); // 在finally{} 中釋放鎖

}

}

寫法注意:redisLock.lock(); 上分布式鎖,必須在try{}中。在JAVA多線程中 lock.lock(); 單機多線程加鎖操作需要在try{}之前。3-1 redisLock.unlock() 放在 finally{} 塊中,還需要設定逾時時間如果在執行 try{} 中邏輯的時候,程式出現了 System.exit(0); 或者 finally{} 中執行異常,比方說連接配接不上 redis-server了;或者還未執行到 finally{}的時候,JVM程序挂掉了,服務當機;這些情況都會導緻沒有成功釋放鎖,别的線程一直拿不到鎖,怎麼辦?如果系統因為一個節點影響,别的節點也都無法正常提供服務了,系統必須要将風險降低,可以給鎖設定一個逾時時間,比方說 1秒,即便發生了上邊的情況,那鎖也會在1秒之後自動釋放,其他線程就可以擷取到鎖;

public static final String lock_key = "zjt-lock";

public void lock() {

while (!tryLock()) {

try {

Thread.sleep(50); // 在while中自旋,如果說讀者想設定一些自旋次數,等待最大時長等自己去擴充,不是此處的重點

} catch (InterruptedException e) {

e.printStackTrace();

}

}

System.out.println("線程:" + threadName + ",占鎖成功!★★★");

}

private boolean tryLock() {

SetParams setParams = new SetParams();

setParams.ex(1); // 逾時時間1s

setParams.nx(); // nx

String response = jedis.set(lock_key, "", setParams); // 轉換為redis指令就是:set zjt-key "" ex 1 nx

return "OK".equals(response);

}

注意,上鎖時,設定key和設定逾時時間這兩個操作是原子性的,要麼都執行,要麼都不執行。

Redis原生支援:

// http://redis.io/commands/set.html              SET key value [EX seconds] [PX milliseconds] [NX|XX]           

不要在代碼裡邊分兩次調用:

set k v

exipre k time

3-2 鎖的逾時時間該怎麼計算?

剛才假設的逾時時間1s是怎麼計算的?這個時間該設多少合适呢?鎖中的業務邏輯的執行時間,一般是在測試環境進行多次測試,然後在壓測環境多輪壓測之後,比方說計算出平均的執行時間是 200ms,鎖的逾時時間放大3-5倍,比如這裡設定為 1s,因為如果鎖的操作邏輯中有網絡 IO操作,線上的網絡不會總一帆風順,需要給網絡抖動留有緩沖時間。另外,如果設定 10s,果真發生了當機,那意味着這 10s中間,這個分布式鎖的服務全部節點都是不可用的,這個和業務以及系統的可用性有挂鈎,要衡量,要慎重(後邊3-13會再詳細聊)。那如果一個節點當機之後可以通知 redis-server釋放鎖嗎?注意,由于是當機存在不可控力,斷電不能通知。如果是停機,不是 kill -9,也不是斷電,這樣似乎可以去做一些編碼去釋放鎖,可以參考 JVM的鈎子、Dubbo的優雅停機、或者 linux程序級通信技術來做這件事情。當然也可以手動停服務後,手動删除掉 redis中的鎖。

04特性——解鈴還須系鈴人

如果說 A線程在執行操作邏輯的過程中,别的線程直接進行了釋放鎖的操作,是不是就出問題了?

聯想 ReentrantLock中的 isHeldByCurrentThread()方法,是以必須在鎖上加個标記,隻有上鎖的線程 A線程知道,相當于是一個密語,也就是說釋放鎖的時候,首先先把密語和鎖上的标記進行比對,如果比對不上,就沒有權利釋放鎖;

private boolean tryLock() {              SetParams setParams = new SetParams();              setParams.ex(1); // 逾時時間1s              setParams.nx(); // nx              String response = jedis.set(lock_key, "", setParams); // 轉換為redis指令就是:set zjt_key "" ex 1 nx              return "OK".equals(response);              }                  // 别的線程直接調用釋放鎖操作,分布式鎖崩潰!              public void unlock() {              jedis.del(encode(lock_key));              System.out.println("線程:" + threadName + " 釋放鎖成功!☆☆☆");              }                  private byte[] encode(String param) {              return param.getBytes();              }           
4-1 這個密語value(約定)設定成什麼呢?上鎖之前,在該線程代碼中生成一個 UUID,将這個作為秘鑰,存在鎖鍵的 value中,釋放鎖的時候,用這個進行校驗,因為隻有上鎖的線程知道這個秘鑰,别的線程是不知道的。
String releaseLock_lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1] \n" +               "then\n" +               " return redis.call(\"del\", KEYS[1])\n" +               "else\n" +               " return 0\n" +               "end";                  private boolean tryLock(String uuid) {              SetParams setParams = new SetParams();              setParams.ex(1); // 逾時時間1s              setParams.nx(); // nx              String response = jedis.set(lock_key, uuid, setParams); // 轉換為redis指令就是:set zjt-key "" ex 1 nx              return "OK".equals(response);              }                  public void unlock(String uuid) {                  List<byte[]> keys = Arrays.asList(encode(lock_key));              List<byte[]> args = Arrays.asList(encode(uuid));                  // 使用lua腳本,保證原子性              long eval = (Long) jedis.eval(encode(releaseLock_lua), keys, args);              if (eval == 1) {              System.out.println("線程:" + threadName + " 釋放鎖成功!☆☆☆");              } else {              System.out.println("線程:" + threadName + " 釋放鎖失敗!該線程未持有鎖!!!");              }                  }                  private byte[] encode(String param) {              return param.getBytes();              }           

為什麼使用 lua腳本?因為保證原子性。

因為是兩個操作,如果分兩步那就是:

get k // 進行秘鑰 value的比對              del k // 比對成功後,删除k           

如果第一步比對成功後,第二步還沒來得及執行的時候,鎖到期,然後緊接着别的線程擷取到鎖,裡邊的uuid已經變了,也就是說持有鎖的線程已經不是該線程了,此時再執行第二步的删除鎖操作,肯定是錯誤的。

05特性——可重入性

作為一把鎖,在使用 synchronized、ReentrantLock的時候是不是有可重入性?

那分布式鎖該如何實作可重入呢?如果 A線程的鎖方法邏輯中調用了 x()方法,x()方法中也需要擷取這把鎖,按照這個邏輯,x()方法中的鎖應該重入進去即可,那是不是需要将剛才生成的這個 UUID秘鑰傳遞給 x()方法?怎麼傳遞?用參數傳遞就會侵入業務代碼。

5-1、不侵入業務代碼實作可重入:Thread-Id

若想給上鎖的 A線程設定一個隻有它自己知道的秘鑰。線程本身的 id(Thread.currentThread().getId())就是一個唯一辨別。把秘鑰 value設定為線程的 id。

String releaseLock_lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1] \n" +               "then\n" +               " return redis.call(\"del\", KEYS[1])\n" +               "else\n" +               " return 0\n" +               "end";              String addLockLife_lua = "if redis.call(\"exists\", KEYS[1]) == 1\n" +               "then\n" +               " return redis.call(\"expire\", KEYS[1], ARGV[1])\n" +               "else\n" +               " return 0\n" +               "end";                  public void lock() {              // 判斷是否可重入              if (isHeldByCurrentThread()) {              return;              }                  while (!tryLock()) {              try {              Thread.sleep(50); // 自旋              } catch (InterruptedException e) {              e.printStackTrace();              }              }                  System.out.println("線程:" + threadName + ",占鎖成功!★★★");              }                  // 是否是目前線程占有鎖,同時将逾時時間重新設定,這個很重要,同樣也是原子操作              private boolean isHeldByCurrentThread() {                  List<byte[]> keys = Arrays.asList(encode(lock_key));              List<byte[]> args = Arrays.asList(encode(String.valueOf(threadId)), encode(String.valueOf(1)));                  long eval = (Long) jedis.eval(encode(addLockLife_lua), keys, args);              return eval == 1;              }                  private boolean tryLock(String uuid) {              SetParams setParams = new SetParams();              setParams.ex(1); // 逾時時間1s              setParams.nx(); // nx              String response = jedis.set(lock_key, String.valueOf(threadId), setParams); // 轉換為redis指令就是:set zjt-key xxx ex 1 nx              return "OK".equals(response);              }                  public void unlock(String uuid) {                  List<byte[]> keys = Arrays.asList(encode(lock_key));              List<byte[]> args = Arrays.asList(encode(String.valueOf(threadId)));                  // 使用lua腳本,保證原子性              long eval = (Long) jedis.eval(encode(releaseLock_lua), keys, args);              if (eval == 1) {              System.out.println("線程:" + threadName + " 釋放鎖成功!☆☆☆");              } else {              System.out.println("線程:" + threadName + " 釋放鎖失敗!該線程未持有鎖!!!");              }                  }                  private byte[] encode(String param) {              return param.getBytes();              }               

5-2、Thread-Id 真能行嗎?不行。很多人認為Thread的id是唯一的,是在同一個 JVM程序中,是在一個作業系統中,也就是在一個機器中。而現實是,我們的部署是叢集部署,多個執行個體節點,那意味着會存在這樣一種情況,S1機器上的線程上鎖成功,此時鎖中秘鑰 value是線程id=1,如果說同一時間 S2機器中,正好線程id=1的線程嘗試獲得這把鎖,比對秘鑰發現成功,結果也重入了這把鎖,也開始執行邏輯,此時,如果分布式鎖崩潰!隻需要在每個節點中維護不同的辨別即可。怎麼維護呢?應用啟動的時候,使用 UUID生成一個唯一辨別 APP_ID,放在記憶體中(或者使用zookeeper去配置設定機器id等等)。此時,秘鑰 value通過APP_ID+ThreadId存即可:

// static變量,final修飾,加載在記憶體中,JVM程序生命周期中不變              private static final String APP_ID = UUID.randomUUID().toString();                  String releaseLock_lua = "if redis.call(\"get\",KEYS[1]) == ARGV[1] \n" +               "then\n" +               " return redis.call(\"del\", KEYS[1])\n" +               "else\n" +               " return 0\n" +               "end";              String addLockLife_lua = "if redis.call(\"exists\", KEYS[1]) == 1\n" +               "then\n" +               " return redis.call(\"expire\", KEYS[1], ARGV[1])\n" +               "else\n" +               " return 0\n" +               "end";                  public void lock() {              // 判斷是否可重入              if (isHeldByCurrentThread()) {              return;              }                  while (!tryLock()) {              try {              Thread.sleep(50); // 自旋              } catch (InterruptedException e) {              e.printStackTrace();              }              }                  System.out.println("線程:" + threadName + ",占鎖成功!★★★");              }                  // 是否是目前線程占有鎖,同時将逾時時間重新設定,這個很重要,同樣也是原子操作              private boolean isHeldByCurrentThread() {                  List<byte[]> keys = Arrays.asList(encode(lock_key));              List<byte[]> args = Arrays.asList(encode(APP_ID + String.valueOf(threadId)), encode(String.valueOf(1)));                  long eval = (Long) jedis.eval(encode(addLockLife_lua), keys, args);              return eval == 1;              }                  private boolean tryLock(String uuid) {              SetParams setParams = new SetParams();              setParams.ex(1); // 逾時時間1s              setParams.nx(); // nx              String response = jedis.set(lock_key, APP_ID + String.valueOf(threadId), setParams); // 轉換為redis指令就是:set zjt-key xxx ex 1 nx              return "OK".equals(response);              }                  public void unlock(String uuid) {                  List<byte[]> keys = Arrays.asList(encode(lock_key));              List<byte[]> args = Arrays.asList(encode(APP_ID + String.valueOf(threadId)));                  // 使用lua腳本,保證原子性              long eval = (Long) jedis.eval(encode(releaseLock_lua), keys, args);              if (eval == 1) {              System.out.println("線程:" + threadName + " 釋放鎖成功!☆☆☆");              } else {              System.out.println("線程:" + threadName + " 釋放鎖失敗!該線程未持有鎖!!!");              }                  }                  private byte[] encode(String param) {              return param.getBytes();              }           

5-3、APP_ID(執行個體唯一辨別) + ThreadId 還是 UUID 好呢?

如果 A線程執行邏輯中間開啟了一個子線程執行任務,這個子線程任務中也需要重入這把鎖,因為子線程擷取到的線程 id不一樣,導緻重入失敗。那意味着需要将這個秘鑰繼續傳遞給子線程,JUC中 InheritableThreadLocal 派上用場。這種方式存在一些問題,因為線程間傳遞的是父線程的 id。

微服務中多服務間調用的話可以借用系統自身有的 traceId作為秘鑰即可。比如sgm中的traceId或者利用RPC架構的隐式傳參。

「至于選擇哪種 value的方式,根據實際的系統設計 + 業務場景,選擇最合适的即可,沒有最好,隻有最合适。」5-4、鎖重入的逾時時間怎麼設定?

以上内容的主要注意力在怎麼重入進去,而分布式鎖要考慮的事情還有很多,重入進去後,逾時時間随便設嗎?

比方說 A線程在鎖方法中調用了 x()方法,而 x()方法中也有擷取鎖的邏輯,如果 A線程擷取鎖後,執行過程中,到 x()方法時,這把鎖是要重入進去的,但是請注意,這把鎖的逾時時間如果小于第一次上鎖的時間,比方說 A線程設定的逾時時間是 1s,在 100ms的時候執行到 x()方法中,而 x()方法中設定的逾時時間是 100ms,那麼意味着 100ms之後鎖就釋放了,而這個時候 A線程的主方法還沒有執行完呢!卻被重入鎖設定的時間搞壞了!這個怎麼解決?

如果說在記憶體中設定一個這把鎖設定過的最大的逾時時間,重入的時候判斷下傳進來的時間,重入時expire的時候始終設定成最大的時間,而不是由重入鎖随意降低鎖時間導緻上一步的主鎖出現問題。

上邊舉例中,調用的 x()方法是在一個 JVM中,如果是調用遠端的一個 RPC服務呢(像這種調用的話就需要将秘鑰value通過 RpcContext傳遞過去了)到另一個節點的服務中進行鎖重入,這個時間依然是要用目前設定過鎖的最大時間的,是以這個最大的時間要存在 redis中而非 JVM記憶體中。經過這一步的分析,重入 lua腳本就修改為這樣了:

ADD_LOCK_LIFE("if redis.call(\"get\", KEYS[1]) == ARGV[1]\n" + // 判斷是否是鎖持有者              "then\n" +               " local thisLockMaxTimeKeepKey=KEYS[1] .. \":maxTime\"\n" + // 記錄鎖最大時間的key是:鎖名字:maxTime              " local nowTime=tonumber(ARGV[2])\n" + // 目前傳參進來的time              " local maxTime=redis.call(\"incr\", thisLockMaxTimeKeepKey)\n" + // 取出目前鎖設定的最大的逾時時間,如果這個保持時間的key不存在傳回的是字元串nil,這裡為了lua腳本的易讀性,用incr操作,這樣讀出來的都是number類型的操作              " local bigerTime=maxTime\n" + // 臨時變量bigerTime=maxTime              " if nowTime>maxTime-1\n" + // 如果傳參進來的時間>記錄的最大時間              " then\n" +               " bigerTime=nowTime\n" + // 則更新bigerTime              " redis.call(\"set\", thisLockMaxTimeKeepKey, tostring(bigerTime))\n" + // 設定逾時時間為最大的time,是最安全的              " else \n" +               " redis.call(\"decr\", thisLockMaxTimeKeepKey)\n" + // 目前傳參time<maxTime,将剛才那次incr減回來              " end\n" +               " return redis.call(\"expire\", KEYS[1], tostring(bigerTime))\n" + // 重新設定逾時時間為目前鎖過的最大的time              "else\n" +               " return 0\n" +               "end")           

其實,還有另外一種方案比較簡單,就是鎖的逾時時間=第一次上鎖的時間+後面所有重入鎖的時間。也就是(expire = 主ttl + 重入exipre),這種方案是放大的思想,一放大就又有上邊提到過的一個問題:expire太大怎麼辦,參考上邊。5-5、重入鎖的方法中直接執行 unlock?考慮重入次數A線程執行一共需要500ms,執行中需要調用 x()方法,x()方法中有一個重入鎖,執行用了 50ms,然後執行完後,x()方法的 finally{} 塊中将鎖進行釋放。為啥能釋放掉?因為秘鑰我有,比對成功了我就直接釋放了。這當然是有問題的,是以我們要通過鎖重入次數來進行釋放鎖時候的判斷,也就是說上鎖的時候需要多元護一個 key來儲存目前鎖的重入次數,如果執行釋放鎖時,先進行重入次數 -1,-1後如果是0,可以直接 del,如果>0,說明還有重入的鎖在,不能直接 del。5-6、考慮如何存儲鎖的屬性(鎖的key 重入次數key 最大逾時時間key)目前為止,算上上一步中設定最大逾時時間的key,加上這一步重入次數的key,加上鎖本身的key,已經有3個key,需要注意的事情是,這三個key的逾時時間是都要設定的!為什麼?假如說重入次數的 key沒有設定逾時時間,服務A節點中在一個JVM中重入了5次後,調用一次 RPC服務,RPC服務中同樣重入鎖,此時,鎖重入次數是 6,這個時候A服務當機,就意味着無論怎樣,這把鎖不可能釋放了,這個分布式鎖提供的完整能力,全線不可用了!

是以,這幾個 key是要設定逾時時間的,如何設定?最大逾時時間的 key和重入次數的 key,都附屬于鎖,它們都是鎖的屬性,如果鎖不在了,談它們就毫無意義,這個時候用什麼存儲呢?redis的 hash資料結構,就可以做,key是鎖,裡邊的 hashKey分别是鎖的屬性, hashValue是屬性值,逾時時間隻設定鎖本身 key就可以了。這個時候鎖的資料結構就要改變一下了。

06看門狗機制

在設定逾時時間時,預估鎖方法執行時間是 200ms,放大5倍後,設定逾時時間是 1s(過期時間确定)。假想一下,如果生産環境中,鎖方法中的 IO操作,極端情況下逾時嚴重,比方說 IO就消耗了 2s(業務執行時長不确定),那就意味着,在這次 IO還沒有結束的時候,這把鎖已經到期釋放掉了,就意味着别的線程趁虛而入,分布式鎖崩潰!

一把分布式鎖的目的是同一時刻隻有一個線程持有鎖,作為服務而言,這個鎖現在不管是被哪個線程上鎖成功了,服務應該保證這個線程執行的安全性,怎麼辦?鎖續命(看門狗機制)。當鎖出現了上鎖操作,就意味着這把鎖開始投入使用,這時服務中需要有一個 daemon線程定時去守護我的鎖的安全性,怎麼守護?比如說鎖逾時時間設定的是 1s,那麼這個定時任務是每隔 300ms去 redis服務端做一次檢查,就像 session會話的活躍機制一樣。看個例子,逾時時間設定的是 1s,實際方法執行時間是 3s,定時線程每隔 300ms就會去把這把鎖的逾時時間重新設定為 1s,每隔 300ms一次,成功将鎖續命成功。

public class RedisLockIdleThreadPool {              private String threadAddLife_lua = "if redis.call(\"exists\", KEYS[1]) == 1\n" +               "then\n" +               " return redis.call(\"expire\", KEYS[1], ARGV[1])\n" +               "else\n" +               " return 0\n" +               "end";                  private volatile ScheduledExecutorService scheduledThreadPool;                  public RedisLockIdleThreadPool() {                  if (scheduledThreadPool == ) {              synchronized (this) {              if (scheduledThreadPool == ) {              scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(); // 我這樣建立線程池是為了代碼的易讀性,大家務必使用ThreadPoolExecutor去建立                  scheduledThreadPool.scheduleAtFixedRate(() -> {              addLife();              }, 0, 300, TimeUnit.MILLISECONDS);              }              }              }              }                  private void addLife() {              // ... 省略jedis的初始化過程                  List<byte[]> keys = Arrays.asList(RedisLock.lock_key.getBytes());              List<byte[]> args = Arrays.asList(String.valueOf(1).getBytes());                  jedis.eval(threadAddLife_lua.getBytes(), keys, args);              }                  }           

除此之外,如果每個服務中都像如此續命鎖,假如說A服務還在執行過程中的時候,還沒有執行完,就是說還沒有手動釋放鎖的時候,當機,此時 redis中鎖還在有效期。

續命的前提是,得判斷是不是目前程序持有的鎖,也就是 APP_ID,如果不是就不進行續命。續命鎖的 lua腳本發生改變,如下:

THREAD_ADD_LIFE("local v=redis.call(\"get\", KEYS[1]) \n" + // get key              "if v==false \n" + // 如果不存在key,讀出結果v是false              "then \n" + // 不存在不處理              "else \n" +               " local match = string.find(v, ARGV[1]) \n" + // 存在,判斷是否能和APP_ID比對,比對不上時match是nil              " if match==\"nil\" \n" +               " then \n" +               " else \n" +               " return redis.call(\"expire\", KEYS[1], ARGV[2]) \n" + // 比對上了傳回的是索引位置,如果比對上了意味着就是目前程序占有的鎖,就延長時間              " end \n" +               "end")           

6-1 等待鎖逾時釋放即便設定了一個很合理的 expire,比如 10s,但是線上如果真出現了A節點剛拿到鎖就當機了,那其他節點也隻能幹等10s,之後才能拿到鎖。主要還是業務能不能接受。而如果是 To C的業務中,大部分場景無法接受的,因為可能會導緻使用者流失。是以需要另外一個監控服務,定時去監控 redis中鎖的獲得者的健康狀态,如果擷取者超過n次無法通信,由監控服務負責将鎖摘除掉,讓别的線程繼續去擷取到鎖去幹活。

07結語

分布式鎖的五大特性:互斥、不會發生死鎖、解鈴還須系鈴人、可重入性、具有容錯性 非常重要。不同的分布式鎖的實作方式中蘊含着相同的五大特性,目前的系統開發中普遍存在五大特性對應的場景,是以了解每個特性背後的技術尤其重要,了解分布式鎖的背後原理,在适合的場景使用鎖,不要盲目加鎖,不要盲目擴大鎖的範圍,知其然知其是以然。

本文由高可用架構轉載。技術原創及架構實踐文章,歡迎通過公衆号菜單「聯系我們」進行投稿

繼續閱讀