天天看點

Redis(三)應用:分布式鎖

文章目錄

      • 一、分布式鎖
      • 二、逾時問題
          • Redis 中使用 Lua 腳本
      • 三、Spring 實作分布式鎖
        • 1、 RedisLockRegistry$RedisLock 類 lock()加鎖 和 解鎖 流程
        • 2、RedisLock#lock() 加鎖源碼實作
            • UNLINK 指令
            • RedLock 算法
      • 四、基于 Redission 實作分布式鎖

一、分布式鎖

    首先,Redis 是單線程的 ,整理的 “單線程“ 網絡請求子產品使用的是一個線程,也就是說,一個線程處理所有網絡請求(其他子產品仍用了多個線程),不需要考慮并發安全性。

    分布式應用進行邏輯處理時,經常會遇到并發問題。Java 提供的 Synchronized、ReentrantLock、ReentrantReadWriteLock…,僅能在單個JVM 程序内 對 多線程 對共享資源 保證線程安全,在分布式系統環境下統統不好使。

Redis(三)應用:分布式鎖

說個結論😏 :

  • 性能:緩存 > Zookeeper >= 資料庫
  • 可靠性:Zookeeper > 緩存 > 資料庫

    其他,比如:Chubby,是Google開發的粗粒度分布鎖的服務,但是并沒有開源,開放出了論文和一些相關文檔可以進一步了解;Tair,是阿裡開源的一個分布式 KV 存儲方案;Hazelcast,是基于記憶體的資料網格開源項目,提供彈性可擴充的分布式記憶體計算,并且被公認是提高應用程式性能和擴充性最好的方案… …

(資料庫、Zookeeper 使用分布式鎖 移步部落格 https://blog.csdn.net/weixin_41750142/article/details/110956216)

    以下是分布式鎖的一些特點,分布式鎖 家族成員并不一定都滿足這個要求,實作機制不大一樣。

  • 互斥性: 分布式鎖要保證在多個用戶端之間的互斥。
  • 可重入性:同一用戶端的相同線程,允許重複多次加鎖。
  • 鎖逾時:和本地鎖一樣支援鎖逾時,防止死鎖。
  • 非阻塞: 能與 ReentrantLock 一樣支援 trylock() 非阻塞方式獲得鎖。
  • 支援公平鎖和非公平鎖:公平鎖是指按照請求加鎖的順序獲得鎖,非公平鎖真好相反請求加鎖是無序的。

    用并發那塊兒的知識了解分布式鎖,就類似于擷取資源 / 獨占鎖,(如果了解有誤請在評論中指出🤗)如果一個程序已經擷取了資源,當别的程序也要來嘗試擷取資源時,就會失敗。

    擷取資源 / 加鎖 一般是使用 setnx(set if not exists)指令 ,先來先擷取,使用完畢後調用 del 指令釋放資源 / 釋放鎖。

Redis(三)應用:分布式鎖

    但有問題,如果邏輯執行到中間出現異常了,可能會導緻 del 指令沒有被調用,鎖永遠得不到釋放,這樣就會出現可以使用

create /hjiajia

來建立節點,。

    于是,在拿到鎖之後,再給鎖加上一個過期時間,比如 10 s,這樣,即使中間出現異常,也可以保證 10 s 後 鎖會自動釋放。

Redis(三)應用:分布式鎖

    但是還有問題,如果在 setnx 和 expire 之間,伺服器程序突然挂掉了(可能是因為機器掉電 或者 人為 kill 掉了),就會導緻 expire 得不到執行,還是造成了死鎖 👻。

    Redis 2.6.12 之前的版本中,采用 setnx + expire 方式實作分布式鎖,代碼如下所示:

public static boolean lock(Jedis jedis, String lockKey, String requestId, int expireTime) {
		//設定鎖
        Long result = jedis.setnx(lockKey, requestId);
        //擷取鎖成功
        if (result == 1) {
            
            //若在這裡程式突然崩潰,則無法設定過期時間,将發生死鎖
           
            //給 lockKey 設定一個過期時間,确認 key 值删除
            jedis.expire(lockKey, expireTime);
            return true;
        }
        return false;
    }
           

     setnx + expire 可能出現死鎖問題的根源在于,setnx 和 expire 是兩條指令 而不是 原子指令,如果這兩條指令可以一起執行,就不會出現問題。可能你會想到 “事務”【要麼都執行,要麼都不執行】,但是 expire 是依賴于 setnx 執行結果的,如果 setnx 沒有搶到鎖,expire 是不應該執行的,而 事務裡沒有 if-else 分支邏輯,是以事務是行不通的。

    為了解決這個問題,Redis 社群湧現了一堆分布式鎖的 library,實作方式極為複雜(使用起來也極為複雜)。後來,Redis 2.8 版本作者加入了 set 指令的擴充參數,使得 setnx 和 expire 指令可以一起執行,徹底解決了分布式鎖的問題。

Redis(三)應用:分布式鎖

二、逾時問題

    Redis 的分布式鎖并不能解決逾時問題,如果在加鎖 和 釋放鎖 之間的邏輯 執行的時間太長,以至于超出了鎖的逾時限制,就會出現問題。如果這時候鎖過期了,第二個線程重新持有了這把鎖,但是 緊接着 第一個線程執行完了業務邏輯,可又把鎖釋放了,第三個線程可能會在第二個線程邏輯執行完畢之前拿到鎖。

    為了避免這個問題,Redis 分布式鎖不要用于較長時間的任務。如果真的偶爾出現了,資料出現的小波錯亂可能需要人工介入解決。

    比如:把 key 設定成一個随機數 tag ,釋放鎖時檢視 key 的值 與 tag 是否相等,如果相等再删除 key(删除 key 就相當于釋放鎖)。

tag = str(uuid.uuid4())  # 随機值
if redis.set(key, tag, nx=True, ex=5):
    do_something()
    redis.delifequals(key, tag)  # 假想的 delifequals 指令
# 檢視取出 key 的值 與 tag 是否相等,相等再删除
           

    有一個更加安全的方案,是 為 set 指令的 value 參數設定一個随機數,釋放鎖時,先比對随機數是否一緻,然後再删除 key。但是,比對 value 和 删除 key 不是一個原子操作,Redis 也沒有提供類似于 delifequals 這樣的指令,這就需要使用 Lua 腳本來處理,因為 Lua 腳本可以保證 連續多個指令的原子性執行。

# delifequals
if redis.call("get",KEYS[1]) == ARGV[1] then
 return redis.call("del",KEYS[1])
else
 return 0
end
           
Redis 中使用 Lua 腳本

    Redis 從 2.6 版本開始引入對 Lua 腳本的支援,通過在 伺服器中嵌入 Lua 環境,Redis 用戶端可以使用 Lua 腳本,直接在伺服器端 原子地執行多個 Redis 指令。 除此之外,使用腳本的好處還有:減少網絡往返時延開銷;用戶端發送的腳本會永久存儲在 Redis 中,其他用戶端可複用。

    Lua 是動态類型語言,變量不要類型定義,隻需要為變量指派。 值可以存儲在變量中,作為參數傳遞或結果傳回。

    Lua 中有 8 個基本類型分别為:nil、boolean、number、string、userdata、function、thread 和 table。

Redis 内置 Lua 執行指令

  • EVAL 指令文法

各參數含義:

(1)EVAL :Lua 程式的運作環境上下文

(2)script :Lua 腳本

(3)numkeys :參數的個數

(4)key:Redis 鍵,通路下标從 1 開始,比如 KEYS[1]

(5)arg:Redis 鍵的附加參數

  • EVALSHA 指令文法

EVALSHA 指令允許通過腳本的 SHA1 來執行(節省帶寬),Redis 在執行 EVAL/SCRIPT LOAD 後會計算腳本 SHA1【Secure Hash Algorithm 1,安全雜湊演算法1】 緩存,EVALSHA 根據 SHA1 取出緩存腳本執行。

= 使用流程如下:

(1)編寫腳本

(2)腳本送出到 Redis 并擷取 SHA

(3)使用 SHA 調用 Redis 腳本

Redis 運作 Lua 腳本

  • EVAL 直接運作腳本
    Redis(三)應用:分布式鎖
  • EVALSHA 使用:需要 SCRIPT LOAD 和 EVALSHA 配合使用

    (1)SCRIPT LOAD 加載到記憶體,傳回 SHA 簽名

    (2)EVALSHA 使用已存在的簽名

        這樣隻用加載一次,便可重複使用已經加載的簽名腳本,可以多次使用,避免長腳本輸入。

    Redis(三)應用:分布式鎖

    在 Redis 下使用腳本檔案執行

    👀 例1: set、get 操作資料

        首先,在 Redis 路徑下建立 lua 源檔案:

    (1)set.lua

--[[ set.lua, redis的set指令使用 
redis: set key val
--]]
local key = KEYS[1]
local val = ARGV[1]

return redis.call('set', key, val)
           

(2)get.lua

--[[ get.lua, redis的get指令使用 
redis: get key
--]]

local key = KEYS[1]
local val = redis.call("GET", key);

return val;
           

    接下來執行以下指令,設定 key、value 值:

redis-cli --eval set.lua foo , bar
           

✨ 注意 ✨: foo 和 bar 之間的逗号 左 右 都要有空格分隔開,否則會被當成一個字元串。

Redis(三)應用:分布式鎖

    檢視 value 值:

Redis(三)應用:分布式鎖
Redis(三)應用:分布式鎖

    可以看到,這就是個簡單使用 Lua 腳本操作 Redis 資料的例子。

👀 例2:通路次數限制

    先編寫 retelimiting.lua

local times = redis.call('incr',KEYS[1])

if times == 1 then
    redis.call('expire',KEYS[1], ARGV[1])
end

if times > tonumber(ARGV[2]) then
    return 0
end
return 1
           

運作腳本:

Redis(三)應用:分布式鎖

    可以看到,rata.limiting:127.0.0.1 是字首 + ip 組成的 KEY,用 KEYS[1] 擷取,後面的 10 和 3 是參數,在腳本中可以通過 ARGV[1] 和 ARGV[2] 擷取。

    該腳本的作用是将通路頻率限制為 每 10 秒最多 3 次,是以在終端不斷運作此指令會發現當通路頻率在 10 s 内 小于或等于 3 次時傳回 1 ,否則傳回 0.

Redis(三)應用:分布式鎖

三、Spring 實作分布式鎖

    除了使用 Jedis 用戶端外,完全可以直接使用 Spring 官方提供的 企業內建模式 架構,其中提供了很多分布式鎖的方式,Spring 提供了一個統一的分布式鎖抽象,具體實作目前支援:Gemfire、Jdbc、Zookeeper、Redis。

    分布式鎖的代碼在 Spring Integration 中。項目位址:https://github.com/spring-projects/spring-integration

    Spring 對 Lock 分布式鎖做了全局抽象,抽象結構:

Redis(三)應用:分布式鎖

LockRegistry 作為頂層抽象接口,源碼:

/**
 * Strategy for maintaining a registry of shared locks
 *
 * @author Oleg Zhurakousky
 * @author Gary Russell
 * @since 2.1.1
 */
 
@FunctionalInterface
public interface LockRegistry {

    /**
     * Obtains the lock associated with the parameter object.
     * @param lockKey The object with which the lock is associated.
     * @return The associated lock.
     */
    Lock obtain(Object lockKey);

}
           

obtain()

方法獲得具體的 Lock 實作類,分别在對應的 XxxLockRegitry 實作類來建立。

    RedisLockRegistry 中 obtain()方法 對應的 實作類為 RedisLock,RedisLock内部,在 Springboot2.x(Spring5)版本中是通過

SET

+

PEXIPRE

指令結合 Lua腳本實作的,在 Springboot1.x(Spring4)版本中,是通過

SETNX

指令實作的。

    ZookeeperLockRegistry 裡 obtain() 方法 實作類為 ZkLock,ZkLock 内部基于 Apache Curator 架構實作的。

    JdbcLockRegistry 裡 obtain() 方法實作類為 JdbcLock,JdbcLock 内部基于一張 INT_LOCK 資料庫鎖表實作的,通過 JdbcTemplate 來操作。

  • 用戶端使用方法:
private final String registryKey = "sb2";
  RedisLockRegistry lockRegistry = new RedisLockRegistry(getConnectionFactory(), this.registryKey);
  Lock lock = lockRegistry.obtain("foo");
  
  // (一)
  lock.lock();
    try {
    // doSth...
      }
 finally {
    lock.unlock();
   }
           

1、 RedisLockRegistry$RedisLock 類 lock()加鎖 和 解鎖 流程

Redis(三)應用:分布式鎖

加鎖步驟:

(1)lockKey 為 registryKey:path ,例子中為 sb2:foo ,用戶端 C1 優先申請加鎖。

(2)執行 Lua 腳本,get lockKey 不存在,則 set lockKey 成功,值為 clientid(UUID),過期時間預設 60 秒。

(3) 用戶端 C1 同一線程重複加鎖,pexpire lockKey ,重置過期時間為 60 秒。

(4)用戶端 C2 申請加鎖,執行 Lua 腳本,get lockKey 已經存在,并且 clientid 也不同,加鎖失敗。

(5)用戶端 C2 挂起,每個 100 ms 再次嘗試加鎖。

2、RedisLock#lock() 加鎖源碼實作

Redis(三)應用:分布式鎖

源碼:

@Override
public void lock() {
    this.localLock.lock();
    while (true) {
        try {
        
        	    // (一)
            while (!obtainLock()) {
                Thread.sleep(100); //NOSONAR
            }
            break;
        }
        catch (InterruptedException e) {
            /*
             * This method must be uninterruptible so catch and ignore
             * interrupts and only break out of the while loop when
             * we get the lock.
             */
        }
        catch (Exception e) {
        	               //(三)
            this.localLock.unlock();
            rethrowAsLockException(e);
        }
    }
}
           

(一) obtainLock(), 基于 Spring 封裝的 RedisTemplate 來操作的:

private boolean obtainLock() {
    Boolean success =
																				//(二) 
            RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript,
                    Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId,
                    String.valueOf(RedisLockRegistry.this.expireAfter));

    boolean result = Boolean.TRUE.equals(success);

    if (result) {
        this.lockedAt = System.currentTimeMillis();
    }
    return result;
}
           

(二)obtainLockScript 對應的 Lua 腳本代碼:

private static final String OBTAIN_LOCK_SCRIPT =
    "local lockClientId = redis.call('GET', KEYS[1])\n" +
            "if lockClientId == ARGV[1] then\n" +
            "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
            "  return true\n" +
            "elseif not lockClientId then\n" +
            "  redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
            "  return true\n" +
            "end\n" +
            "return false";
           

(三)unlock():

解鎖流程:

Redis(三)應用:分布式鎖

源碼:

@Override
public void unlock() {
    if (!this.localLock.isHeldByCurrentThread()) {
        throw new IllegalStateException("You do not own lock at " + this.lockKey);
    }
    if (this.localLock.getHoldCount() > 1) {
        this.localLock.unlock();
        return;
    }
    try {
        if (!isAcquiredInThisProcess()) {
            throw new IllegalStateException("Lock was released in the store due to expiration. " +
                    "The integrity of data protected by this lock may have been compromised.");
        }

        if (Thread.currentThread().isInterrupted()) {
            RedisLockRegistry.this.executor.execute(this::removeLockKey);
        }
        else {
        	// (四)
            removeLockKey();
        }

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Released lock; " + this);
        }
    }
    catch (Exception e) {
        ReflectionUtils.rethrowRuntimeException(e);
    }
    finally {
        this.localLock.unlock();
    }
}
           

(四):

// 删除緩存Key
private void removeLockKey() {
    if (this.unlinkAvailable) {
        try {
            RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
        }
        catch (Exception ex) {
            LOGGER.warn("The UNLINK command has failed (not supported on the Redis server?); " +
                    "falling back to the regular DELETE command", ex);
            this.unlinkAvailable = false;
            RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
        }
    }
    else {
        RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
    }
}
           

    可以看到,并不是直接調用 Redis 的 DEL 指令删除 key,這也是在 Sptingboot2.x 版本中的一個優化,Redis 4.0 版本以上提供了 UNLINK 指令。

UNLINK 指令

Redis 官網關于 UNLINK 給出的一段解釋:

This command is very similar to DEL: it removes the specified keys.
Just like DEL a key is ignored if it does not exist. However the
command performs the actual memory reclaiming in a different thread,
so it is not blocking, while DEL is. This is where the command name
comes from: the command just unlinks the keys from the keyspace. The
actual removal will happen later asynchronously.
           

譯為:

    這個指令與 DEL 非常相似:它删除指定的鍵。就像 DEL 一樣,如果鍵不存在,則忽略這個指令。然而,指令在其他的線程中執行實際的記憶體回收再利用,是以它是非阻塞的,這也是它命名的由來:UNLINK 指令沒有将 key 和 keyspace 連接配接在一起。實際的删除将在以後異步地發生。

    可以看到,UNLINK 是非阻塞的,DEL 指令對于大型 list 或者 hash ,如果值太大,配置設定的空間太多,會長時間阻止 Redis,正是為了解決這樣的問題, UNLINK 指令,是 非阻塞地删除。 不管如果值很小,DEL 一般和 UNLINK 效率差不多。

    本質上,RedisLock#lock() 加鎖方式還是使用 SETNX 實作的,而且 Spring 隻是做了一層薄薄的封裝,支援 可重入加鎖、逾時等待、可中斷加鎖。

    但是有個問題,鎖的過期時間不能靈活設定,用戶端初始化時,建立 RedisLockRegistry 時運作設定,參數為 long 類型的 expireAfter,但是它是全局的。

源碼:

/**
     * Constructs a lock registry with the supplied lock expiration.
     * @param connectionFactory The connection factory.
     * @param registryKey The key prefix for locks.
     * @param expireAfter The expiration in milliseconds.
     */
public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, long expireAfter) {
    Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
    Assert.notNull(registryKey, "'registryKey' cannot be null");
    this.redisTemplate = new StringRedisTemplate(connectionFactory);
    this.obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
    this.registryKey = registryKey;
    this.expireAfter = expireAfter;
           

RedLock 算法

    從 Redis 主從架構上考慮,依然存在問題,因為 Redis 叢集資料 同步到各個節點時,是異步的 ,如果在 Master 節點擷取到鎖後,在沒有同步到其他節點時,Master 節點崩潰了,此時 新的 Master 節點依然可以擷取鎖,這樣的話, 多個應用服務可以同時擷取到鎖。

    為了解決這樣的問題,Redis 之父 Antirez 提出了 RedLock 算法。

  • RedLock 算法實作過程分析

        假設 Redis 部署模式是 Redis Cluster,總共有 5 個 master 節點,擷取鎖的步驟如下:

    (1)擷取目前時間戳,機關是 毫秒。

    (2)輪流嘗試在每個 master 節點上建立鎖,過期時間設定較短。

    (3)嘗試在大多數節點上建立一個鎖,比如 5 個 master 節點,就要求 n/2 + 1 = 3 個節點。

    (4)用戶端計算建立好鎖的時間,如果建立鎖的時間小于逾時時間,就算建立成功了。

    (5)要是鎖建立失敗了,那麼就依次删除這個鎖。

    (6)隻要有用戶端建立成功了分布式鎖,其他用戶端就要不斷輪詢去嘗試擷取鎖

  • RedLock 算法可能存在問題

    (1)節點崩潰重新開機,會出現多個用戶端持有鎖

        假設一共有 5 個 Redis 節點:A、B、C、D、E,設想發生了如下的事件序列:

    ① 用戶端 C1 成功對 Redis 叢集中 A、B、C 三個節點加鎖成功,但是 D、E 加鎖失敗。

    ② 節點 C 崩潰重新開機了,但是用戶端 C1 在節點 C 加鎖未 持久化 完成。

    ③ 節點 C 重新開機後,用戶端 C2 成功對 Redis 叢集中 C、D、E 嘗試加鎖成功了。

        這樣用戶端 C1 和 C2 同時擷取了同一把分布式鎖。

        為了應對這種節點重新開機引起的鎖失效問題,Antirez 提出了 延遲重新開機 的概念 ,即 一個節點崩潰後,先不立即重新開機它,而是等待一段時間後再重新開機,等待時間大于鎖的有效時間。采用這種方式,這個節點在重新開機前,所參與的鎖都會過期,重新開機後就不會對現有的鎖造成影響。

    (2)時鐘跳躍

        假設一共有 5 個 Redis 節點:A、B、C、D、E,設想發生了如下的事件序列:

    ① 用戶端 C1 成功對 Redis 叢集種 A、B、C 三個節點成功加鎖,但是因為網絡問題,與 D 和 E 的通信失敗。

    ② 節點 C 上的時鐘發生了向前跳躍,導緻它維護的鎖快速過期。

    ③ 用戶端 C2 對 Redis 叢集種節點 C、D、E 成功加上了同一把鎖。

        這樣用戶端 C1 和 C2 同時擷取了同一把分布式鎖。

        為了應對這種時鐘跳躍引起的鎖失效問題,Antirez 提出了應該禁止人為修改系統時間,使用一個不會進行 跳躍式 調整系統時鐘的 ntpd 程式。

        但是,RedLock 算法并沒有解決操作共享資源逾時 導緻 鎖失效的問題。是以這個算法還是不推薦使用的。

四、基于 Redission 實作分布式鎖

    Redission 是 Redis 的 Java 實作的用戶端,其 API 提供了比較全面的 Redis 指令的支援。Jedis 簡單使用阻塞的 I/O 和 Redis 互動,Redission 通過 Netty 支援非阻塞 I/O。

    Redission 封裝了鎖的實作,還對集合、對象、常用緩存架構等做了友好的封裝,易于使用。Redisson分布式鎖Github:https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers

    Redission 可以便攜支援多種 Redis 部署架構:

(1)Redis 單機

(2)Master-Slave + Sentinel 哨兵

Master-Slave 配置:

Config config = new Config();
MasterSlaveServersConfig serverConfig = config.useMasterSlaveServers()
            .setMasterAddress("")
            .addSlaveAddress("")
            .setReadMode(ReadMode.SLAVE)
            .setMasterConnectionPoolSize(maxActiveSize)
            .setMasterConnectionMinimumIdleSize(maxIdleSize)
            .setSlaveConnectionPoolSize(maxActiveSize)
            .setSlaveConnectionMinimumIdleSize(maxIdleSize)
            .setConnectTimeout(CONNECTION_TIMEOUT_MS) // 預設10秒
            .setTimeout(socketTimeout)
            ;
            
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("myLock");

// 獲得鎖
lock.lock();

lock.lock(10, TimeUnit.SECONDS);

				 //(一)
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}
           

    其中,Config 類中私有屬性 long 類型的 lockWatchdog :

lockWatchdogTimeout = 30 * 1000;

    其中 ,RedissonClient 用戶端提供了衆多的接口實作,支援 可重入鎖、公平鎖、讀寫鎖、鎖逾時、RedLock 等。

(一)tryLock() 源碼:

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        
        // (二)嘗試擷取鎖,并傳回剩餘逾時時間
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        
        // 如果ttl為空則說明鎖未被其他用戶端持有
        if (ttl == null) {
            return true;
        }
        
        // 檢查是否超過等待時間 超過則傳回 false
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        
        // 目前線程進行訂閱
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }
 
        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
            // 在等待時間内 重複嘗試擷取鎖 直到超過等待時間或成功擷取鎖
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }
           

    leaseTime 參數指定加鎖時間,超過這個時間,鎖就自動解開了。

(二)tryAcquire:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
			   // (三)
    return get(tryAcquireAsync(leaseTime, unit, threadId));
} 
           

(三)tryAcquireAsync:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
    
        	//(四)
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
 
            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
            
            	// (五)
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
           

(四)tryLockInnerAsync:

    為了相容老的版本,Redission 裡都是通過 Lua 腳本執行 Redis 指令的,同時保證了原子性操作。

加鎖執行的 Lua 腳本:

<T> RFuture<T> tryLockInnerAsync(long leaseTime,TimeUnit unit,long threadId,RedisStricCommand<T> command){
    internalLockLeaseTime = unit.toMillis(leaseTime);

	return commandExecutor.evalWriterAsync(getName(),LongCoder.INSTANCE,command,
	--檢查 key 是否被占用,如果沒有則設定逾時時間和唯一辨別,初始化 value = 1 
	 "if (redis.call('exists',KEYS[1] == 0) then " +
	  		"redis.call('hset',KEY[1], ARGV[2],1);" +
	  		"redis.call('pexpire', KEY[1],ARGV[1]);" +
	  		"return nil;" +
	  "end: " +
	  
	  --如果鎖重入,需要判斷鎖的 key、field,都一緻的情況下 value +1
	  "if (redis.call('hexists',KEY[1], ARGV[2]) == 1) then" +
	  		"redis.call('hincrby'.KEY[1],ARGV[2],1);" +
	  		--鎖重入需要重新設定逾時時間
	  		"redis.call('pexpire',KEY[1],ARGV[1]);" +
	  		"return nil;"
	 "end;" +
	 		--傳回剩餘的過期時間
	 		"return redis.call('pttl',KEY[1]);",
	 		Collections.<~> singletonList(getName()),interanlLockLeaseTime,getLockName(threadId);
}	 
           

參數含義:

  • KEY[1]:要加鎖的 KEY 名稱,比如上例中的 mylock。
  • ARGV[1]:針對加鎖的 KEY 設定的過期時間。
  • ARGV[2]:Hash 結構中 KEY 的名稱。
  • LockName 是 UUID:線程 ID。

(五)看門狗🐕 scheduleExpirationRenewal:

private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }
    
    //建立定時任務,每隔1/3過期時間則重新整理過期時間
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            
            RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return 0;",                  Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
            
            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }
                    
                    if (future.getNow()) {
                        // reschedule itself
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    // 如果傳入 key 對應的 value 已經存在,就傳回存在的 value,不進行替換。如果不存在,就添加 key 和 value,傳回null
    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
        task.cancel();
    }
}
           

    看門狗的作用是在 Redission 執行個體被關閉前,不斷延遲鎖的有效期, 預設情況下,看門狗的檢查鎖的逾時時間是 30 秒鐘 ,也可以通過修改 Config.lockWatchTimeout 來令行指定,internalLockLease 和 lockWatchdogTimeout 兩個參數是相等的。

    整理了加鎖的調用流程大概是這樣:

Redis(三)應用:分布式鎖

    可以看到,上例中,假設 用戶端 C1 申請加鎖,KEY 為 mylock,如果 KEY 不存在,則 通過 hset 設定值 value =1 ,通過 pexpire 設定過期時間,同時開啟開門狗 watchdog 任務,預設加鎖時間是 30 秒, 每隔 10 秒判斷一下,如果 key 還在,重置過期時間到 30 秒,如此解決 業務處理時間比過期時間長的問題。(也就是說,預設加鎖時間是 30 秒,如果加鎖的業務沒有執行完,那麼到 30-10=20 秒時,就會進行一次續期,把鎖的過期時間重置為 30 秒,萬一機器當機了,定時任務就不會跑,就不會續期,那 30 秒後鎖就自動釋放了,如此避免死鎖問題。)

    接下來,用戶端 C1 相同線程再次加鎖,key 存在,判斷 Redis 裡 Hash 中的 lockName 跟目前線程 lockName 相同,則将 Hash 中的 lockName 的值 value 加1,代表支援可重入加鎖。

    接下來,用戶端 C2 申請加鎖,如果 key 存在,判斷 Redis 裡 Hash 中的 lockName 跟目前線程 lockName “mylock” 不同,則 傳回剩餘過期時間。

    接下來,用戶端 C2 線程在 tryLock() 方法内不斷嘗試擷取鎖,此處是基于 Semaphore 信号量實作的,有許可立即傳回,否則繼續重試。

    官網中關于 WatchDog 的描述:

譯為:

    如果獲得鎖的 Redisson 執行個體崩潰,那麼該鎖可能會在獲得的狀态下永遠挂起。為了避免 Redisson 維護看門狗,它會在鎖持有者 Redisson 執行個體處于 alive 活動狀态時延長鎖過期時間。預設情況下,鎖看門狗逾時時間為30秒,可以通過配置進行更改。

    以上是加鎖分析,接下來是解鎖邏輯:

@Override
    public void unlock() {
    
        // 1.通過 Lua 腳本執行 Redis 指令釋放鎖
        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
                RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end;" +
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                        "end; " +
                        "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()),
                LockPubSub.unlockMessage, internalLockLeaseTime,
                getLockName(Thread.currentThread().getId()));
                
        // 2.非鎖的持有者釋放鎖時抛出異常
        if (opStatus == null) {
            throw new IllegalMonitorStateException(
                    "attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + Thread.currentThread().getId());
        }
        
        // 3.釋放鎖後取消重新整理鎖失效時間的排程任務
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }
           

    可以看到,如果 key 不存在,說明鎖已釋放,直接執行 publish 指令釋出釋放鎖消息并傳回 1;如果 key 存在,但是 field 在 Hash 中不存在,說明自己不是鎖持有者,無權釋放鎖,傳回 nil。因為鎖是可重入的,是以釋放鎖時不能把所有已擷取的鎖全都釋放掉,一次隻能釋放一把鎖,是以執行 hincrby 對鎖的值減一。釋放一把鎖後,如果還有剩餘的鎖,則重新整理鎖的失效時間并傳回 0;如果剛才釋放的已經是最後一把鎖,則執行 del 指令删除鎖的 key,并釋出鎖釋放消息,傳回 1。

(3)Redis-Cluster 叢集

參考文章:

(1)https://mp.weixin.qq.com/s?__biz=MjM5MDAxOTk2MQ==&mid=2650283760&idx=1&sn=dc65028aadb0136ea348bbc1f62c7f75&chksm=be4786e689300ff0393f8355a090bbf89dd541311c4bc94f5c28d6ceafb1d3edf2a32b9df95e&mpshare=1&scene=23&srcid=1209Py9XcrfJsuUQN1AwmWxr&sharer_sharetime=1607571927696&sharer_shareid=969ef7742555284a7918c681ba9e8479#rd

(2)https://www.sohu.com/a/326080287_100212268

(3)https://blog.csdn.net/lbh199466/article/details/90176059

繼續閱讀