天天看點

基于 redis 實作的分布式鎖(一)

分布式鎖的解決方式

  1. 基于資料庫表做樂觀鎖,用于分布式鎖。(适用于小并發)
  2. 使用memcached的add()方法,用于分布式鎖。
  3. 使用memcached的cas()方法,用于分布式鎖。(不常用)
  4. 使用redis的setnx()、expire()方法,用于分布式鎖。
  5. 使用redis的setnx()、get()、getset()方法,用于分布式鎖。
  6. 使用redis的watch、multi、exec指令,用于分布式鎖。(不常用)
  7. 使用zookeeper,用于分布式鎖。(不常用)

這裡主要介紹第四種和第五種:

使用redis的setnx()、expire()方法,用于分布式鎖

原理

對于使用redis的setnx()、expire()來實作分布式鎖,這個方案相對于memcached()的add()方案,redis占優勢的是,其支援的資料類型更多,而memcached隻支援String一種資料類型。除此之外,無論是從性能上來說,還是操作友善性來說,其實都沒有太多的差異,完全看你的選擇,比如公司中用哪個比較多,你就可以用哪個。

首先說明一下setnx()指令,setnx的含義就是SET if Not Exists,其主要有兩個參數 setnx(key, value)。該方法是原子的,如果key不存在,則設定目前key成功,傳回1;如果目前key已經存在,則設定目前key失敗,傳回0。但是要注意的是setnx指令不能設定key的逾時時間,隻能通過expire()來對key設定。

具體的使用步驟如下:

  1. setnx(lockkey, 1) 如果傳回0,則說明占位失敗;如果傳回1,則說明占位成功
  2. expire()指令對lockkey設定逾時時間,為的是避免死鎖問題。
  3. 執行完業務代碼後,可以通過delete指令删除key。

為了保證在某個Redis節點不可用的時候算法能夠繼續運作,這個擷取鎖的操作還有一個逾時時間(timeOut),它要遠小于鎖的有效時間(幾十毫秒量級)。

可能存在的問題

這個方案其實是可以解決日常工作中的需求的,但從技術方案的探讨上來說,可能還有一些可以完善的地方。比如,如果在第一步setnx執行成功後,在expire()指令執行成功前,發生了當機的現象,那麼就依然會出現死鎖的問題,是以如果要對其進行完善的話,可以使用redis的setnx()、get()和getset()方法來實作分布式鎖。

具體實作

鎖具體實作RedisLock:

package com.xiaolyuh.lock;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;

public class RedisLock {
    private static Logger logger = LoggerFactory.getLogger(RedisLock.class);

     靜态常量定義開始///
    /**
     * 存儲到redis中的鎖标志
     */
    private static final String LOCKED = "LOCKED";

    /**
     * 預設請求鎖的逾時時間(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 預設鎖的有效時間(s)
     */
    public static final int EXPIRE = 60;
     靜态常量定義結束///

    /**
     * 鎖标志對應的key
     */
    private String key;

    /**
     * 鎖的有效時間(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 請求鎖的逾時時間(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 鎖flag
     */
    private volatile boolean isLocked = false;
    /**
     * Redis管理模闆
     */
    private StringRedisTemplate redisTemplate;

    /**
     * 構造方法
     *
     * @param redisTemplate Redis管理模闆
     * @param key           鎖定key
     * @param expireTime    鎖過期時間 (秒)
     * @param timeOut       請求鎖逾時時間 (毫秒)
     */
    public RedisLock(StringRedisTemplate redisTemplate, String key, int expireTime, long timeOut) {
        this.key = key;
        this.expireTime = expireTime;
        this.timeOut = timeOut;
        this.redisTemplate = redisTemplate;
    }

    /**
     * 構造方法
     *
     * @param redisTemplate Redis管理模闆
     * @param key           鎖定key
     * @param expireTime    鎖過期時間
     */
    public RedisLock(StringRedisTemplate redisTemplate, String key, int expireTime) {
        this.key = key;
        this.expireTime = expireTime;
        this.redisTemplate = redisTemplate;
    }

    /**
     * 構造方法(預設請求鎖逾時時間30秒,鎖過期時間60秒)
     *
     * @param redisTemplate Redis管理模闆
     * @param key           鎖定key
     */
    public RedisLock(StringRedisTemplate redisTemplate, String key) {
        this.key = key;
        this.redisTemplate = redisTemplate;
    }

    public boolean lock() {
        // 系統目前時間,納秒
        long nowTime = System.nanoTime();
        // 請求鎖逾時時間,納秒
        long timeout = timeOut * 1000000;
        final Random random = new Random();

        // 不斷循環向Master節點請求鎖,當請求時間(System.nanoTime() - nano)超過設定的逾時時間則放棄請求鎖
        // 這個可以防止一個用戶端在某個宕掉的master節點上阻塞過長時間
        // 如果一個master節點不可用了,應該盡快嘗試下一個master節點
        while ((System.nanoTime() - nowTime) < timeout) {
            // 将鎖作為key存儲到redis緩存中,存儲成功則獲得鎖
            if (redisTemplate.opsForValue().setIfAbsent(key, LOCKED)) {
                isLocked = true;
                // 設定鎖的有效期,也是鎖的自動釋放時間,也是一個用戶端在其他用戶端能搶占鎖之前可以執行任務的時間
                // 可以防止因異常情況無法釋放鎖而造成死鎖情況的發生
                redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);

                // 上鎖成功結束請求
                break;
            }
            // 擷取鎖失敗時,應該在随機延時後進行重試,避免不同用戶端同時重試導緻誰都無法拿到鎖的情況出現
            // 睡眠10毫秒後繼續請求鎖
            try {
                Thread.sleep(10, random.nextInt(50000));
            } catch (InterruptedException e) {
                logger.error("擷取分布式鎖休眠被中斷:", e);
            }
        }
        return isLocked;

    }

    public boolean isLock() {
        redisTemplate.getConnectionFactory().getConnection().time();
        return redisTemplate.hasKey(key);
    }

    public void unlock() {
        // 釋放鎖
        // 不管請求鎖是否成功,隻要已經上鎖,用戶端都會進行釋放鎖的操作
        if (isLocked) {
            redisTemplate.delete(key);
        }
    }

}


package com.xiaolyuh.lock;

import java.util.Random;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;

public class RedisLock {
    private static Logger logger = LoggerFactory.getLogger(RedisLock.class);

     靜态常量定義開始///
    /**
     * 存儲到redis中的鎖标志
     */
    private static final String LOCKED = "LOCKED";

    /**
     * 預設請求鎖的逾時時間(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 預設鎖的有效時間(s)
     */
    public static final int EXPIRE = 60;
     靜态常量定義結束///

    /**
     * 鎖标志對應的key
     */
    private String key;

    /**
     * 鎖的有效時間(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 請求鎖的逾時時間(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 鎖flag
     */
    private volatile boolean isLocked = false;
    /**
     * Redis管理模闆
     */
    private StringRedisTemplate redisTemplate;

    /**
     * 構造方法
     *
     * @param redisTemplate Redis管理模闆
     * @param key           鎖定key
     * @param expireTime    鎖過期時間 (秒)
     * @param timeOut       請求鎖逾時時間 (毫秒)
     */
    public RedisLock(StringRedisTemplate redisTemplate, String key, int expireTime, long timeOut) {
        this.key = key;
        this.expireTime = expireTime;
        this.timeOut = timeOut;
        this.redisTemplate = redisTemplate;
    }

    /**
     * 構造方法
     *
     * @param redisTemplate Redis管理模闆
     * @param key           鎖定key
     * @param expireTime    鎖過期時間
     */
    public RedisLock(StringRedisTemplate redisTemplate, String key, int expireTime) {
        this.key = key;
        this.expireTime = expireTime;
        this.redisTemplate = redisTemplate;
    }

    /**
     * 構造方法(預設請求鎖逾時時間30秒,鎖過期時間60秒)
     *
     * @param redisTemplate Redis管理模闆
     * @param key           鎖定key
     */
    public RedisLock(StringRedisTemplate redisTemplate, String key) {
        this.key = key;
        this.redisTemplate = redisTemplate;
    }

    public boolean lock() {
        // 系統目前時間,納秒
        long nowTime = System.nanoTime();
        // 請求鎖逾時時間,納秒
        long timeout = timeOut * 1000000;
        final Random random = new Random();

        // 不斷循環向Master節點請求鎖,當請求時間(System.nanoTime() - nano)超過設定的逾時時間則放棄請求鎖
        // 這個可以防止一個用戶端在某個宕掉的master節點上阻塞過長時間
        // 如果一個master節點不可用了,應該盡快嘗試下一個master節點
        while ((System.nanoTime() - nowTime) < timeout) {
            // 将鎖作為key存儲到redis緩存中,存儲成功則獲得鎖
            if (redisTemplate.opsForValue().setIfAbsent(key, LOCKED)) {
                isLocked = true;
                // 設定鎖的有效期,也是鎖的自動釋放時間,也是一個用戶端在其他用戶端能搶占鎖之前可以執行任務的時間
                // 可以防止因異常情況無法釋放鎖而造成死鎖情況的發生
                redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);

                // 上鎖成功結束請求
                break;
            }
            // 擷取鎖失敗時,應該在随機延時後進行重試,避免不同用戶端同時重試導緻誰都無法拿到鎖的情況出現
            // 睡眠10毫秒後繼續請求鎖
            try {
                Thread.sleep(10, random.nextInt(50000));
            } catch (InterruptedException e) {
                logger.error("擷取分布式鎖休眠被中斷:", e);
            }
        }
        return isLocked;

    }

    public boolean isLock() {
        redisTemplate.getConnectionFactory().getConnection().time();
        return redisTemplate.hasKey(key);
    }

    public void unlock() {
        // 釋放鎖
        // 不管請求鎖是否成功,隻要已經上鎖,用戶端都會進行釋放鎖的操作
        if (isLocked) {
            redisTemplate.delete(key);
        }
    }

}      

調用鎖:

public void redisLock(int i) {
        RedisLock redisLock = new RedisLock(redisTemplate, "redisLockKey:"+i % 10, 5*60 , 500);
        try {
            long now = System.currentTimeMillis();
            if (redisLock.lock()) {
                logger.info("=" + (System.currentTimeMillis() - now));
                // TODO 擷取到鎖要執行的代碼塊
                logger.info("j:" + j ++);
            } else {
                logger.info("k:" + k ++);
            }
        } catch (Exception e) {
            logger.info(e.getMessage(), e);
        } finally {
            // 一定要釋放鎖
            redisLock.unlock();
        }
    }

public void redisLock(int i) {
        RedisLock redisLock = new RedisLock(redisTemplate, "redisLockKey:"+i % 10, 5*60 , 500);
        try {
            long now = System.currentTimeMillis();
            if (redisLock.lock()) {
                logger.info("=" + (System.currentTimeMillis() - now));
                // TODO 擷取到鎖要執行的代碼塊
                logger.info("j:" + j ++);
            } else {
                logger.info("k:" + k ++);
            }
        } catch (Exception e) {
            logger.info(e.getMessage(), e);
        } finally {
            // 一定要釋放鎖
            redisLock.unlock();
        }
    }      

使用redis的setnx()、get()、getset()方法,用于分布式鎖

原理

這個方案的背景主要是在setnx()和expire()的方案上針對可能存在的死鎖問題,做了一版優化。

那麼先說明一下這三個指令,對于setnx()和get()這兩個指令,相信不用再多說什麼。那麼getset()指令?這個指令主要有兩個參數 getset(key,newValue)。該方法是原子的,對key設定newValue這個值,并且傳回key原來的舊值。假設key原來是不存在的,那麼多次執行這個指令,會出現下邊的效果:

  1. getset(key, "value1") 傳回nil 此時key的值會被設定為value1
  2. getset(key, "value2") 傳回value1 此時key的值會被設定為value2
  3. 依次類推!

介紹完要使用的指令後,具體的使用步驟如下:

  1. setnx(lockkey, 目前時間+過期逾時時間) ,如果傳回1,則擷取鎖成功;如果傳回0則沒有擷取到鎖,轉向2。
  2. get(lockkey)擷取值oldExpireTime ,并将這個value值與目前的系統時間進行比較,如果小于目前系統時間,則認為這個鎖已經逾時,可以允許别的請求重新擷取,轉向3。
  3. 計算newExpireTime=目前時間+過期逾時時間,然後getset(lockkey, newExpireTime) 會傳回目前lockkey的值currentExpireTime。
  4. 判斷currentExpireTime與oldExpireTime 是否相等,如果相等,說明目前getset設定成功,擷取到了鎖。如果不相等,說明這個鎖又被别的請求擷取走了,那麼目前請求可以直接傳回失敗,或者繼續重試。
  5. 在擷取到鎖之後,目前線程可以開始自己的業務處理,當處理完畢後,比較自己的處理時間和對于鎖設定的逾時時間,如果小于鎖設定的逾時時間,則直接執行delete釋放鎖;如果大于鎖設定的逾時時間,則不需要再鎖進行處理。

可能存在的問題

問題1: 在“get(lockkey)擷取值oldExpireTime ”這個操作與“getset(lockkey, newExpireTime) ”這個操作之間,如果有N個線程在get操作擷取到相同的oldExpireTime後,然後都去getset,會不會傳回的newExpireTime都是一樣的,都會是成功,進而都擷取到鎖???

我認為這套方案是不存在這個問題的。依據有兩條: 第一,redis是單程序單線程模式,串行執行指令。 第二,在串行執行的前提條件下,getset之後會比較傳回的currentExpireTime與oldExpireTime 是否相等。

問題2: 在“get(lockkey)擷取值oldExpireTime ”這個操作與“getset(lockkey, newExpireTime) ”這個操作之間,如果有N個線程在get操作擷取到相同的oldExpireTime後,然後都去getset,假設第1個線程擷取鎖成功,其他鎖擷取失敗,但是擷取鎖失敗的線程它發起的getset指令确實執行了,這樣會不會造成第一個擷取鎖的線程設定的鎖逾時時間一直在延長???

我認為這套方案确實存在這個問題的可能。但我個人認為這個微笑的誤差是可以忽略的,不過技術方案上存在缺陷,大家可以自行抉擇哈。

問題3: 這個方案必須要保證分布式伺服器的時間一定要同步,否則這個鎖就會出問題。

具體實作

鎖具體實作RedisLock:

package com.xiaolyuh.lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * Redis分布式鎖(這種方式伺服器時間一定要同步,否則會出問題)
 * 
 * @author yuhao.wangwang
 * @version 1.0
 * @date 2017年11月3日 上午10:21:27
 */
public class RedisLock2 {

    /**
     * 預設請求鎖的逾時時間(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 預設鎖的有效時間(s)
     */
    public static final int EXPIRE = 60;

    private static Logger logger = LoggerFactory.getLogger(RedisLock2.class);

    private StringRedisTemplate redisTemplate;

    /**
     * 鎖标志對應的key
     */
    private String lockKey;
    /**
     * 鎖的有效時間(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 請求鎖的逾時時間(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 鎖的有效時間
     */
    private long expires = 0;

    /**
     * 鎖标記
     */
    private volatile boolean locked = false;

    final Random random = new Random();

    /**
     * 使用預設的鎖過期時間和請求鎖的逾時時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }

    /**
     * 使用預設的請求鎖的逾時時間,指定鎖的過期時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param expireTime    鎖的過期時間(機關:秒)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, int expireTime) {
        this(redisTemplate, lockKey);
        this.expireTime = expireTime;
    }

    /**
     * 使用預設的鎖的過期時間,指定請求鎖的逾時時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param timeOut       請求鎖的逾時時間(機關:毫秒)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, long timeOut) {
        this(redisTemplate, lockKey);
        this.timeOut = timeOut;
    }

    /**
     * 鎖的過期時間和請求鎖的逾時時間都是用指定的值
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param expireTime    鎖的過期時間(機關:秒)
     * @param timeOut       請求鎖的逾時時間(機關:毫秒)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, int expireTime, long timeOut) {
        this(redisTemplate, lockKey, expireTime);
        this.timeOut = timeOut;
    }

    /**
     * @return 擷取鎖的key
     */
    public String getLockKey() {
        return lockKey;
    }

    /**
     * 獲得 lock.
     * 實作思路: 主要是使用了redis 的setnx指令,緩存了鎖.
     * reids緩存的key是鎖的key,所有的共享, value是鎖的到期時間(注意:這裡把過期時間放在value了,沒有時間上設定其逾時時間)
     * 執行過程:
     * 1.通過setnx嘗試設定某個key的值,成功(目前沒有這個鎖)則傳回,成功獲得鎖
     * 2.鎖已經存在則擷取鎖的到期時間,和目前時間比較,逾時的話,則設定新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public boolean lock() {
        // 請求鎖逾時時間,納秒
        long timeout = timeOut * 1000000;
        // 系統目前時間,納秒
        long nowTime = System.nanoTime();

        while ((System.nanoTime() - nowTime) < timeout) {
            // 分布式伺服器有時差,這裡給1秒的誤內插補點
            expires = System.currentTimeMillis() + expireTime + 1;
            String expiresStr = String.valueOf(expires); //鎖到期時間

            if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr)) {
                locked = true;
                // 設定鎖的有效期,也是鎖的自動釋放時間,也是一個用戶端在其他用戶端能搶占鎖之前可以執行任務的時間
                // 可以防止因異常情況無法釋放鎖而造成死鎖情況的發生
                redisTemplate.expire(lockKey, expireTime, TimeUnit.SECONDS);

                // 上鎖成功結束請求
                return true;
            }

            String currentValueStr = redisTemplate.opsForValue().get(lockKey); //redis裡的時間
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判斷是否為空,不為空的情況下,如果被其他線程設定了值,則第二個條件判斷是過不去的
                // lock is expired

                String oldValueStr = redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
                //擷取上一個鎖到期時間,并設定現在的鎖到期時間,
                //隻有一個線程才能擷取上一個線上的設定時間,因為jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止誤删(覆寫,因為key是相同的)了他人的鎖——這裡達不到效果,這裡值會被覆寫,但是因為什麼相差了很少的時間,是以可以接受

                    //[分布式的情況下]:如過這個時候,多個線程恰好都到了這裡,但是隻有一個線程的設定值和目前值相同,他才有權利擷取鎖
                    // lock acquired
                    locked = true;
                    return true;
                }
            }

            /*
                延遲10 毫秒,  這裡使用随機時間可能會好一點,可以防止饑餓程序的出現,即,當同時到達多個程序,
                隻會有一個程序獲得鎖,其他的都用同樣的頻率進行嘗試,後面有來了一些進行,也以同樣的頻率申請鎖,這将可能導緻前面來的鎖得不到滿足.
                使用随機的等待時間可以一定程度上保證公平性
             */
            try {
                Thread.sleep(10, random.nextInt(50000));
            } catch (InterruptedException e) {
                logger.error("擷取分布式鎖休眠被中斷:", e);
            }

        }
        return locked;
    }


    /**
     * 解鎖
     */
    public synchronized void unlock() {
        // 隻有加鎖成功并且鎖還有效才去釋放鎖
        if (locked && expires > System.currentTimeMillis()) {
            redisTemplate.delete(lockKey);
            locked = false;
        }
    }

}

package com.xiaolyuh.lock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * Redis分布式鎖(這種方式伺服器時間一定要同步,否則會出問題)
 * 
 * @author yuhao.wangwang
 * @version 1.0
 * @date 2017年11月3日 上午10:21:27
 */
public class RedisLock2 {

    /**
     * 預設請求鎖的逾時時間(ms 毫秒)
     */
    private static final long TIME_OUT = 100;

    /**
     * 預設鎖的有效時間(s)
     */
    public static final int EXPIRE = 60;

    private static Logger logger = LoggerFactory.getLogger(RedisLock2.class);

    private StringRedisTemplate redisTemplate;

    /**
     * 鎖标志對應的key
     */
    private String lockKey;
    /**
     * 鎖的有效時間(s)
     */
    private int expireTime = EXPIRE;

    /**
     * 請求鎖的逾時時間(ms)
     */
    private long timeOut = TIME_OUT;

    /**
     * 鎖的有效時間
     */
    private long expires = 0;

    /**
     * 鎖标記
     */
    private volatile boolean locked = false;

    final Random random = new Random();

    /**
     * 使用預設的鎖過期時間和請求鎖的逾時時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey) {
        this.redisTemplate = redisTemplate;
        this.lockKey = lockKey + "_lock";
    }

    /**
     * 使用預設的請求鎖的逾時時間,指定鎖的過期時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param expireTime    鎖的過期時間(機關:秒)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, int expireTime) {
        this(redisTemplate, lockKey);
        this.expireTime = expireTime;
    }

    /**
     * 使用預設的鎖的過期時間,指定請求鎖的逾時時間
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param timeOut       請求鎖的逾時時間(機關:毫秒)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, long timeOut) {
        this(redisTemplate, lockKey);
        this.timeOut = timeOut;
    }

    /**
     * 鎖的過期時間和請求鎖的逾時時間都是用指定的值
     *
     * @param redisTemplate
     * @param lockKey       鎖的key(Redis的Key)
     * @param expireTime    鎖的過期時間(機關:秒)
     * @param timeOut       請求鎖的逾時時間(機關:毫秒)
     */
    public RedisLock2(StringRedisTemplate redisTemplate, String lockKey, int expireTime, long timeOut) {
        this(redisTemplate, lockKey, expireTime);
        this.timeOut = timeOut;
    }

    /**
     * @return 擷取鎖的key
     */
    public String getLockKey() {
        return lockKey;
    }

    /**
     * 獲得 lock.
     * 實作思路: 主要是使用了redis 的setnx指令,緩存了鎖.
     * reids緩存的key是鎖的key,所有的共享, value是鎖的到期時間(注意:這裡把過期時間放在value了,沒有時間上設定其逾時時間)
     * 執行過程:
     * 1.通過setnx嘗試設定某個key的值,成功(目前沒有這個鎖)則傳回,成功獲得鎖
     * 2.鎖已經存在則擷取鎖的到期時間,和目前時間比較,逾時的話,則設定新的值
     *
     * @return true if lock is acquired, false acquire timeouted
     * @throws InterruptedException in case of thread interruption
     */
    public boolean lock() {
        // 請求鎖逾時時間,納秒
        long timeout = timeOut * 1000000;
        // 系統目前時間,納秒
        long nowTime = System.nanoTime();

        while ((System.nanoTime() - nowTime) < timeout) {
            // 分布式伺服器有時差,這裡給1秒的誤內插補點
            expires = System.currentTimeMillis() + expireTime + 1;
            String expiresStr = String.valueOf(expires); //鎖到期時間

            if (redisTemplate.opsForValue().setIfAbsent(lockKey, expiresStr)) {
                locked = true;
                // 設定鎖的有效期,也是鎖的自動釋放時間,也是一個用戶端在其他用戶端能搶占鎖之前可以執行任務的時間
                // 可以防止因異常情況無法釋放鎖而造成死鎖情況的發生
                redisTemplate.expire(lockKey, expireTime, TimeUnit.SECONDS);

                // 上鎖成功結束請求
                return true;
            }

            String currentValueStr = redisTemplate.opsForValue().get(lockKey); //redis裡的時間
            if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
                //判斷是否為空,不為空的情況下,如果被其他線程設定了值,則第二個條件判斷是過不去的
                // lock is expired

                String oldValueStr = redisTemplate.opsForValue().getAndSet(lockKey, expiresStr);
                //擷取上一個鎖到期時間,并設定現在的鎖到期時間,
                //隻有一個線程才能擷取上一個線上的設定時間,因為jedis.getSet是同步的
                if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
                    //防止誤删(覆寫,因為key是相同的)了他人的鎖——這裡達不到效果,這裡值會被覆寫,但是因為什麼相差了很少的時間,是以可以接受

                    //[分布式的情況下]:如過這個時候,多個線程恰好都到了這裡,但是隻有一個線程的設定值和目前值相同,他才有權利擷取鎖
                    // lock acquired
                    locked = true;
                    return true;
                }
            }

            /*
                延遲10 毫秒,  這裡使用随機時間可能會好一點,可以防止饑餓程序的出現,即,當同時到達多個程序,
                隻會有一個程序獲得鎖,其他的都用同樣的頻率進行嘗試,後面有來了一些進行,也以同樣的頻率申請鎖,這将可能導緻前面來的鎖得不到滿足.
                使用随機的等待時間可以一定程度上保證公平性
             */
            try {
                Thread.sleep(10, random.nextInt(50000));
            } catch (InterruptedException e) {
                logger.error("擷取分布式鎖休眠被中斷:", e);
            }

        }
        return locked;
    }


    /**
     * 解鎖
     */
    public synchronized void unlock() {
        // 隻有加鎖成功并且鎖還有效才去釋放鎖
        if (locked && expires > System.currentTimeMillis()) {
            redisTemplate.delete(lockKey);
            locked = false;
        }
    }

}      

調用方式:

public void redisLock2(int i) {
    RedisLock2 redisLock2 = new RedisLock2(redisTemplate, "redisLock:" + i % 10, 5 * 60, 500);
    try {
        long now = System.currentTimeMillis();
        if (redisLock2.lock()) {
            logger.info("=" + (System.currentTimeMillis() - now));
            // TODO 擷取到鎖要執行的代碼塊
            logger.info("j:" + j++);
        } else {
            logger.info("k:" + k++);
        }
    } catch (Exception e) {
        logger.info(e.getMessage(), e);
    } finally {
        redisLock2.unlock();
    }
}

public void redisLock2(int i) {
    RedisLock2 redisLock2 = new RedisLock2(redisTemplate, "redisLock:" + i % 10, 5 * 60, 500);
    try {
        long now = System.currentTimeMillis();
        if (redisLock2.lock()) {
            logger.info("=" + (System.currentTimeMillis() - now));
            // TODO 擷取到鎖要執行的代碼塊
            logger.info("j:" + j++);
        } else {
            logger.info("k:" + k++);
        }
    } catch (Exception e) {
        logger.info(e.getMessage(), e);
    } finally {
        redisLock2.unlock();
    }
}      

對于上面兩種redis實作分布式鎖的方案都有一個問題:

  • 就是你擷取鎖後執行業務邏輯的代碼隻能在redis鎖的有效時間之内,因為,redis的key到期後會自動清除,這個鎖就算釋放了。是以這個鎖的有效時間一定要結合業務做好評估。
  • 這兩種方式解鎖的時候是直接删除key,假如C1擷取到了鎖,這個時候redis挂了,并且資料沒有持久化,等redis服務啟動起來,C2請求過來擷取到了鎖。但是C1請求現在執行完了删除了key,這個時候就把C2的鎖删掉了。(在下一篇文章中有解決方案)

使用redis的SET resource-name anystring NX EX max-lock-time方式來實作分布式鎖

下一篇文章介紹

​​​Spring-data-redis + redis 分布式鎖(二)​​

  • ​​http://zhangtielei.com/posts/blog-redlock-reasoning.html​​
  • ​​http://strawhatfy.github.io/2015/07/09/Distributed%20locks%20with%20Redis/​​

繼續閱讀