天天看點

Redis分布式鎖—Redisson+RLock可重入鎖實作篇

前言

平時的工作中,由于生産環境中的項目是需要部署在多台伺服器中的,是以經常會面臨解決分布式場景下資料一緻性的問題,那麼就需要引入分布式鎖來解決這一問題。

針對分布式鎖的實作,目前比較常用的就如下幾種方案:

  1. 基于資料庫實作分布式鎖
  2. 基于 Redis 實作分布式鎖 【本文】
  3. 基于 Zookeeper 實作分布式鎖

接下來這個系列文章會跟大家一塊探讨這三種方案,本篇為 Redis 實作分布式鎖篇。

Redis分布式環境搭建推薦:基于Docker的Redis叢集搭建

Redis分布式鎖一覽

說到 Redis 鎖,能搜到的,或者說常用的無非就下面這兩個:

  • setNX + Lua腳本
  • Redisson + RLock可重入鎖 【本文】

接下來我們一一探索這兩個的實作,本文為 Redisson + RLock可重入鎖 實作篇。

1、setNX+Lua實作方式

跳轉連結:https://www.cnblogs.com/niceyoo/p/13711149.html

2、Redisson介紹

Redisson 是 java 的 Redis 用戶端之一,是 Redis 官網推薦的 java 語言實作分布式鎖的項目。

Redisson 提供了一些 api 友善操作 Redis。因為本文主要以鎖為主,是以接下來我們主要關注鎖相關的類,以下是 Redisson 中提供的多樣化的鎖:

  • 可重入鎖(Reentrant Lock)
  • 公平鎖(Fair Lock)
  • 聯鎖(MultiLock)
  • 紅鎖(RedLock)
  • 讀寫鎖(ReadWriteLock)
  • 信号量(Semaphore) 等等

總之,管你了解不了解,反正 Redisson 就是提供了一堆鎖... 也是目前大部分公司使用 Redis 分布式鎖最常用的一種方式。

本文中 Redisson 分布式鎖的實作是基于 RLock 接口,而 RLock 鎖接口實作源碼主要是 RedissonLock 這個類,而源碼中加鎖、釋放鎖等操作都是使用 Lua 腳本來完成的,并且封裝的非常完善,開箱即用。

接下來主要以 Redisson 實作 RLock 可重入鎖為主。

代碼中實作過程

一起來看看在代碼中 Redisson 怎麼實作分布式鎖的,然後再對具體的方法進行解釋。

源碼位址:https://github.com/niceyoo/redis-redlock

篇幅限制,文中代碼不全,請以上方源碼連結為主。

代碼大緻邏輯:首先會涉及資料庫 2 個表,order2(訂單表)、stock(庫存表),controller層會提供一個建立訂單的接口,建立訂單之前,先擷取 RedLock 分布式鎖,擷取鎖成功後,在一個事務下減庫存,建立訂單;最後通過建立大于庫存的并發數模拟是否出現超賣的情況。

代碼環境:

SpringBoot2.2.2.RELEASE

+

Spring Data JPA

Redisson

1)Maven 依賴 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>redis-redlock</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>redis-redlock</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.10</version>
        </dependency>

        <!-- redisson -->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.11.1</version>
        </dependency>

        <!-- Gson -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>

        <!-- JPA -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <!-- Mysql Connector -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.48</version>
        </dependency>

        <!-- 資料庫連接配接池 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.20</version>
        </dependency>

        <!-- Hutool工具包 -->
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>4.6.8</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
           

redisson、MySQL 等相關依賴。

2)application.yml 配置檔案
server:
  port: 6666
  servlet:
    context-path: /

spring:
  # 資料源
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/redis_demo?useUnicode=true&characterEncoding=utf-8&useSSL=false
    username: root
    password: 123456
    type: com.alibaba.druid.pool.DruidDataSource
    driverClassName: com.mysql.jdbc.Driver
    logSlowSql: true
  jpa:
    # 顯示sql
    show-sql: false
    # 自動生成表結構
    generate-ddl: true
    hibernate:
      ddl-auto: update
  redis:
    redis:
      cluster:
        nodes: 10.211.55.4:6379, 10.211.55.4:6380, 10.211.55.4:6381
      lettuce:
        pool:
          min-idle: 0
          max-idle: 8
          max-active: 20

# 日志
logging:
  # 輸出級别
  level:
    root: info
  file:
    # 指定路徑
    path: redis-logs
    # 最大儲存天數
    max-history: 7
    # 每個檔案最大大小
    max-size: 5MB
           

配置redis,指定資料庫位址。

3)Redisson配置類 RedissonConfig.java
/**
 * redisson配置類
 */
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
       Config config = new Config();
       config.useClusterServers()
               .setScanInterval(2000)
               .addNodeAddress("redis://10.211.55.4:6379", "redis://redis://10.211.55.4:6380")
               .addNodeAddress("redis://redis://10.211.55.4:6381");
       RedissonClient redisson = Redisson.create(config);
       return redisson;
    }

}
           
4)StockServerImpl 庫存實作類,其他參考源碼
import com.example.redisredlock.bean.Stock;
import com.example.redisredlock.dao.StockDao;
import com.example.redisredlock.server.StockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Slf4j
@Service
@Transactional
public class StockServerImpl implements StockService {

    @Autowired
    private StockDao stockDao;

    @Override
    public StockDao getRepository() {
        return stockDao;
    }

    /**
     * 減庫存
     *
     * @param productId
     * @return
     */
    @Override
    public boolean decrease(String productId) {
        Stock one = stockDao.getOne(productId);
        int stockNum = one.getStockNum() - 1;
        one.setStockNum(stockNum);
        stockDao.saveAndFlush(one);
        return true;
    }
}
           

庫存實作類,就一個接口,完成對庫存的-1操作。

5)OrderServerImpl 訂單實作類(核心代碼)
package com.example.redisredlock.server.impl;

import com.example.redisredlock.bean.Order;
import com.example.redisredlock.dao.OrderDao;
import com.example.redisredlock.server.OrderServer;
import com.example.redisredlock.server.StockService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@Slf4j
@Service
@Transactional
public class OrderServerImpl implements OrderServer {

    /**
     * 庫存service
     */
    @Resource
    private StockService stockService;

    /**
     * 訂單order dao
     */
    @Resource
    private OrderDao orderDao;

    @Override
    public OrderDao getRepository() {
        return orderDao;
    }

    @Resource
    private RedissonClient redissonClient;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public boolean createOrder(String userId, String productId) {

        /**  如果不加鎖,必然超賣 **/
        RLock lock = redissonClient.getLock("stock:" + productId);

        try {
            lock.lock(10, TimeUnit.SECONDS);

            int stock = stockService.get(productId).getStockNum();
            log.info("剩餘庫存:{}", stock);
            if (stock <= 0) {
                return false;
            }

            String orderNo = UUID.randomUUID().toString().replace("-", "").toUpperCase();
       /** 減庫存操作 **/
            if (stockService.decrease(productId)) {
                Order order = new Order();
                order.setUserId(userId);
                order.setProductId(productId);
                order.setOrderNo(orderNo);
                Date now = new Date();
                order.setCreateTime(now);
                order.setUpdateTime(now);
                orderDao.save(order);
                return true;
            }

        } catch (Exception ex) {
            log.error("下單失敗", ex);
        } finally {
            lock.unlock();
        }

        return false;
    }
}
           
6)Order 訂單實體類
@Data
@Entity
@Table(name = "order2")
public class Order extends BaseEntity {

    private static final long serialVersionUID = 1L;

    /**
     * 訂單編号
     */
    private String orderNo;

    /**
     * 下單使用者id
     */
    private String userId;

    /**
     * 産品id
     */
    private String productId;

}
           
7)Stock 庫存實體類
@Data
@Entity
@Table(name = "stock")
public class Stock extends BaseEntity {

    private static final long serialVersionUID = 1L;

    /**
     * 用産品id,設定為庫存id
     */

    /**
     * 庫存數量
     */
    private Integer stockNum;

}
           
8)OrderController 訂單接口
package com.example.redisredlock.controller;

import com.example.redisredlock.bean.Order;
import com.example.redisredlock.server.OrderServer;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;

/**
 * @author niceyoo
 */
@RestController
@RequestMapping("/order")
public class OrderController {

    @Resource
    private OrderServer orderServer;

    @PostMapping("/createOrder")
    public boolean createOrder(Order order) {
        return orderServer.createOrder(order.getUserId(), order.getProductId());
    }

}
           

表結構說明及接口測試部分

因為項目中使用 Spring Data JPA,是以會自動建立資料庫表結構,大緻為:

stock(庫存表)

id(商品id) stock_num(庫存數量) create_time(建立時間) update_time(更新時間)
1234 100 xxxx

order2(訂單表)

id(訂單id) order_no(訂單号) user_id(使用者id) product_id(商品id)

如下是詳細表結構+資料:

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for order2
-- ----------------------------
DROP TABLE IF EXISTS `order2`;
CREATE TABLE `order2` (
  `id` varchar(64) NOT NULL,
  `create_time` datetime(6) DEFAULT NULL,
  `update_time` datetime(6) DEFAULT NULL,
  `order_no` varchar(255) DEFAULT NULL,
  `user_id` varchar(64) DEFAULT NULL,
  `product_id` varchar(64) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Table structure for stock
-- ----------------------------
DROP TABLE IF EXISTS `stock`;
CREATE TABLE `stock` (
  `id` varchar(255) NOT NULL,
  `create_time` datetime(6) DEFAULT NULL,
  `update_time` datetime(6) DEFAULT NULL,
  `stock_num` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of stock
-- ----------------------------
BEGIN;
INSERT INTO `stock` VALUES ('1234', '2020-09-21 21:38:09.000000', '2020-09-22 08:32:17.883000', 0);
COMMIT;

SET FOREIGN_KEY_CHECKS = 1;
           

建立訂單的過程就是消耗庫存表 stock_num 的過程,如果沒有分布式鎖的情況下,在高并發下很容易出現商品超賣的情況,是以引入了分布式鎖的概念,如下是在庫存100,并發1000的情況下,測試超賣情況:

JMeter 模拟程序截圖

Redis分布式鎖—Redisson+RLock可重入鎖實作篇

JMeter 調用接口截圖

Redis分布式鎖—Redisson+RLock可重入鎖實作篇

stock 庫存表截圖

Redis分布式鎖—Redisson+RLock可重入鎖實作篇

訂單表截圖

Redis分布式鎖—Redisson+RLock可重入鎖實作篇

加了鎖之後并沒有出現超賣情況。

核心代碼說明

整個 demo 核心代碼在建立訂單 createOrder() 加鎖的過程,如下:

@Override
@Transactional(rollbackFor = Exception.class)
public boolean createOrder(String userId, String productId) {

    //  如果不加鎖,必然超賣
    RLock lock = redissonClient.getLock("stock:" + productId);

    try {
        lock.lock(10, TimeUnit.SECONDS);

        int stock = stockService.get(productId).getStockNum();
        log.info("剩餘庫存:{}", stock);
        if (stock <= 0) {
            return false;
        }

        String orderNo = UUID.randomUUID().toString().replace("-", "").toUpperCase();

        if (stockService.decrease(productId)) {
            Order order = new Order();
            order.setUserId(userId);
            order.setProductId(productId);
            order.setOrderNo(orderNo);
            Date now = new Date();
            order.setCreateTime(now);
            order.setUpdateTime(now);
            orderDao.save(order);
            return true;
        }

    } catch (Exception ex) {
        log.error("下單失敗", ex);
    } finally {
        lock.unlock();
    }

    return false;
}
           

去除業務邏輯,加鎖架構結構為:

RLock lock = redissonClient.getLock("xxx");

lock.lock();

try {
    ...
} finally {
    lock.unlock();
}
           

關于 RedLock 中的方法

因為 RLock 本身繼承自 Lock 接口,如下分為兩部分展示:

public interface RLock extends Lock, RLockAsync {

    //----------------------Lock接口方法-----------------------

    /**
     * 加鎖 鎖的有效期預設30秒
     */
    void lock();
    
    /**
     * tryLock()方法是有傳回值的,它表示用來嘗試擷取鎖,如果擷取成功,則傳回true,如果擷取失敗(即鎖已被其他線程擷取),則傳回false .
     */
    boolean tryLock();
    
    /**
     * tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,隻不過差別在于這個方法在拿不到鎖時會等待一定的時間,
     * 在時間期限之内如果還拿不到鎖,就傳回false。如果如果一開始拿到鎖或者在等待期間内拿到了鎖,則傳回true。
     *
     * @param time 等待時間
     * @param unit 時間機關 小時、分、秒、毫秒等
     */
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    
    /**
     * 解鎖
     */
    void unlock();
    
    /**
     * 中斷鎖 表示該鎖可以被中斷 假如A和B同時調這個方法,A擷取鎖,B為擷取鎖,那麼B線程可以通過
     * Thread.currentThread().interrupt(); 方法真正中斷該線程
     */
    void lockInterruptibly();

    //----------------------RLock接口方法-----------------------
    /**
     * 加鎖 上面是預設30秒這裡可以手動設定鎖的有效時間
     *
     * @param leaseTime 鎖有效時間
     * @param unit      時間機關 小時、分、秒、毫秒等
     */
    void lock(long leaseTime, TimeUnit unit);
    
    /**
     * 這裡比上面多一個參數,多添加一個鎖的有效時間
     *
     * @param waitTime  等待時間
     * @param leaseTime 鎖有效時間
     * @param unit      時間機關 小時、分、秒、毫秒等
     */
    boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
    
    /**
     * 檢驗該鎖是否被線程使用,如果被使用傳回True
     */
    boolean isLocked();
    
    /**
     * 檢查目前線程是否獲得此鎖(這個和上面的差別就是該方法可以判斷是否目前線程獲得此鎖,而不是此鎖是否被線程占有)
     * 這個比上面那個實用
     */
    boolean isHeldByCurrentThread();
    
    /**
     * 中斷鎖 和上面中斷鎖差不多,隻是這裡如果獲得鎖成功,添加鎖的有效時間
     * @param leaseTime  鎖有效時間
     * @param unit       時間機關 小時、分、秒、毫秒等
     */
    void lockInterruptibly(long leaseTime, TimeUnit unit);  
}
           

1、加鎖

首先重點在 getLock() 方法,到底是怎麼拿到分布式鎖的,我們點進該方法:

public RLock getLock(String name) {
    return new RedissonLock(this.connectionManager.getCommandExecutor(), name);
}
           

調用 getLock() 方法後實際傳回一個 RedissonLock 對象,此時就有點呼應了,文章前面提到的

Redisson 普通的鎖實作源碼主要是 RedissonLock 這個類,而源碼中加鎖、釋放鎖等操作都是使用 Lua 腳本來完成的,封裝的非常完善,開箱即用。

在 RedissonLock 對象中,主要實作 lock() 方法,而 lock() 方法主要調用 tryAcquire() 方法:

tryAcquire() 方法又繼續調用 tryAcquireAsync() 方法:

Redis分布式鎖—Redisson+RLock可重入鎖實作篇

到這,由于 leaseTime == -1,于是又調用 tryLockInnerAsync()方法,感覺有點無限套娃那種感覺了...

咳咳,不過這個方法是最關鍵的了:

Redis分布式鎖—Redisson+RLock可重入鎖實作篇

這個方法就有點意思了,看到了一些熟悉的東西,還記得上一篇裡的 Lua 腳本嗎?

我們來分析一下這個部分的 Lua 腳本:

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
     "if (redis.call('exists', KEYS[1]) == 0) then " +
         "redis.call('hset', KEYS[1], ARGV[2], 1); " +
         "redis.call('pexpire', KEYS[1], ARGV[1]); " +
         "return nil; " +
     "end; " +
     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
         "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
         "redis.call('pexpire', KEYS[1], ARGV[1]); " +
         "return nil; " +
     "end; " +
     "return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
           

腳本裡,一共是有兩個參數 KEYS[1]、通過後面的參數可以得知: KEYS[1] 為 getName(),ARGV[2] 為 getLockName(threadId)。

假設傳遞加鎖參數時傳入的 name 值為 "niceyoo",

假設線程調用的 ID 為 thread-1,

假設 RedissonLock 類的成員變量 UUID 類型的 id 值為 32063ed-98522fc-80287ap,

結合 getLockName(threadId)) 方法:

protected String getLockName(long threadId) {
    return this.id + ":" + threadId;
}
           

即,KEYS[1] = niceyoo,ARGV[2] = 32063ed-98522fc-80287ap:thread-1

然後将假設值帶入語句中:

  1. 判斷是否存在名為 “niceyoo” 的 key;
  2. 如果沒有,則在其下設定一個字段為 “32063ed-98522fc-80287ap:thread-1”,值為 1 的鍵值對 ,并設定它的過期時間,也就是第一個 if 語句體;
  3. 如果存在,則進一步判斷 “32063ed-98522fc-80287ap:thread-1” 是否存在,若存在,則其值加 1,并重新設定過期時間,這個過程可以看做鎖重入;
  4. 傳回 “niceyoo” 的生存時間(毫秒);

如果放在鎖這個場景下就是,key 表示目前線程名稱,argv 為目前獲得鎖的線程,所有競争這把鎖的線程都要判斷這個 key 下有沒有自己的,也就是上邊那些 if 判斷,如果沒有就不能獲得鎖,如果有,則進入重入鎖,字段值+1。

2、解鎖

解鎖調用的是 unlockInnerAsync() 方法:

Redis分布式鎖—Redisson+RLock可重入鎖實作篇

該方法同樣還是調用的 Lua 腳本實作的。

同樣還是假設 name=niceyoo,假設線程 ID 是 thread-1

同理,我們可以得到:

KEYS[1] 是 getName(),即 KEYS[1]=niceyoo,

KEYS[2] 是 getChannelName(),即 KEYS[2]=redisson_lock__channel:{niceyoo},

ARGV[1] 是 LockPubSub.unlockMessage,即ARGV[1]=0,

ARGV[2] 是生存時間,

ARGV[3] 是 getLockName(threadId),即 ARGV[3]=32063ed-98522fc-80287ap:thread-1

是以,上面腳本的意思是:

  1. 判斷是否存在 name 為 “niceyoo” 的key;
  2. 如果不存在,向 Channel 中廣播一條消息,廣播的内容是0,并傳回1
  3. 如果存在,進一步判斷字段 32063ed-98522fc-80287ap:thread-1 是否存在
  4. 若字段不存在,傳回空,若字段存在,則字段值減1
  5. 若減完以後,字段值仍大于0,則傳回0
  6. 減完後,若字段值小于或等于0,則廣播一條消息,廣播内容是0,并傳回1;

可以猜測,廣播0表示資源可用,即通知那些等待擷取鎖的線程現在可以獲得鎖了。

3、加鎖解鎖小結

Redis分布式鎖—Redisson+RLock可重入鎖實作篇
Redis分布式鎖—Redisson+RLock可重入鎖實作篇

4、其他補充

4.1 lock() 方法

通常在獲得 RLock 時,需要調用 lock() 方法,那麼設定過期時間跟不設定有啥差別:

RLock lock = redissonClient.getLock("xxx");

/*最常見的使用方法*/
lock.lock();
           

如果沒有設定過期時間,預設還是會有一個30秒的過期時間,等價于:

RLock lock = redissonClient.getLock("xxx");

/*支援過期解鎖,30秒之後自動釋放鎖,無須調用unlock方法手動解鎖*/
lock.lock(30, TimeUnit.SECONDS);
           
4.1 tryLock() 方法

有的小夥在在擷取分布式鎖時,使用的是 tryLock() 方法,跟 lock() 方法有啥差別:

RLock lock = redissonClient.getLock("xxx");

/*嘗試加鎖,最多等待10秒,上鎖以後10秒自動解鎖,傳回true表示加鎖成功*/
if(lock.tryLock(10,10, TimeUnit.SECONDS)){
 xxx
}
           

首先我們來看一下 tryLock() 方法源碼:

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    //1、 擷取鎖同時擷取成功的情況下,和lock(...)方法是一樣的 直接傳回True,擷取鎖False再往下走
    if (ttl == null) {
        return true;
    }
    //2、如果超過了嘗試擷取鎖的等待時間,當然傳回false 了。
    time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        acquireFailed(threadId);
        return false;
    }

    // 3、訂閱監聽redis消息,并且建立RedissonLockEntry,其中RedissonLockEntry中比較關鍵的是一個 Semaphore屬性對象,用來控制本地的鎖請求的信号量同步,傳回的是netty架構的Future實作。
    final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    //  阻塞等待subscribe的future的結果對象,如果subscribe方法調用超過了time,說明已經超過了用戶端設定的最大wait time,則直接傳回false,取消訂閱,不再繼續申請鎖了。
    //  隻有await傳回true,才進入循環嘗試擷取鎖
    if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                @Override
                public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                    if (subscribeFuture.isSuccess()) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                }
            });
        }
        acquireFailed(threadId);
        return false;
    }

   //4、如果沒有超過嘗試擷取鎖的等待時間,那麼通過While一直擷取鎖。最終隻會有兩種結果
    //1)、在等待時間内擷取鎖成功 傳回true。2)等待時間結束了還沒有擷取到鎖那麼傳回false。
    while (true) {
        long currentTime = System.currentTimeMillis();
        ttl = tryAcquire(leaseTime, unit, threadId);
        // 擷取鎖成功
        if (ttl == null) {
            return true;
        }
       //   擷取鎖失敗
        time -= System.currentTimeMillis() - currentTime;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
    }
}
           

tryLock() 方法是申請鎖并傳回鎖有效期還剩的時間,如果為空說明鎖未被其他線程申請,那麼就直接擷取鎖并傳回,如果擷取到時間,則進入等待競争邏輯。

tryLock() 方法一般用于特定滿足需求的場合,但不建議作為一般需求的分布式鎖,一般分布式鎖建議用 lock(long leaseTime, TimeUnit unit) 方法。因為從性能上考慮,在高并發情況下後者效率是前者的好幾倍。

Redis分布式鎖的缺點

在上一節中我們提到了 「setNX+Lua腳本」實作分布式鎖在叢集模式下的缺陷,

我們再來回顧一下,通常我們為了實作 Redis 的高可用,一般都會搭建 Redis 的叢集模式,比如給 Redis 節點挂載一個或多個 slave 從節點,然後采用哨兵模式進行主從切換。但由于 Redis 的主從模式是異步的,是以可能會在資料同步過程中,master 主節點當機,slave 從節點來不及資料同步就被選舉為 master 主節點,進而導緻資料丢失,大緻過程如下:

  1. 使用者在 Redis 的 master 主節點上擷取了鎖;
  2. master 主節點當機了,存儲鎖的 key 還沒有來得及同步到 slave 從節點上;
  3. slave 從節點更新為 master 主節點;
  4. 使用者從新的 master 主節點擷取到了對應同一個資源的鎖,同把鎖擷取兩次。

ok,然後為了解決這個問題,Redis 作者提出了 RedLock 算法,步驟如下(五步):

在下面的示例中,我們假設有 5 個完全獨立的 Redis Master 節點,他們分别運作在 5 台伺服器中,可以保證他們不會同時當機。

  1. 擷取目前 Unix 時間,以毫秒為機關。
  2. 依次嘗試從 N 個執行個體,使用相同的 key 和随機值擷取鎖。在步驟 2,當向 Redis 設定鎖時,用戶端應該設定一個網絡連接配接和響應逾時時間,這個逾時時間應該小于鎖的失效時間。例如你的鎖自動失效時間為 10 秒,則逾時時間應該在 5-50 毫秒之間。這樣可以避免伺服器端 Redis 已經挂掉的情況下,用戶端還在死死地等待響應結果。如果伺服器端沒有在規定時間内響應,用戶端應該盡快嘗試另外一個 Redis 執行個體。
  3. 用戶端使用目前時間減去開始擷取鎖時間(步驟 1 記錄的時間)就得到擷取鎖使用的時間。當且僅當從大多數(這裡是 3 個節點)的 Redis 節點都取到鎖,并且使用的時間小于鎖失效時間時,鎖才算擷取成功。
  4. 如果取到了鎖,key 的真正有效時間等于有效時間減去擷取鎖所使用的時間(步驟 3 計算的結果)。
  5. 如果因為某些原因,擷取鎖失敗(沒有在至少 N/2+1 個Redis執行個體取到鎖或者取鎖時間已經超過了有效時間),用戶端應該在所有的 Redis 執行個體上進行解鎖(即便某些 Redis 執行個體根本就沒有加鎖成功)。

到這,基本看出來,隻要是大多數的 Redis 節點可以正常工作,就可以保證 Redlock 的正常工作。這樣就可以解決前面單點 Redis 的情況下我們讨論的節點挂掉,由于異步通信,導緻鎖失效的問題。

但是細想後, Redlock 還是存在如下問題:

假設一共有5個Redis節點:A, B, C, D, E。設想發生了如下的事件序列:

  1. 用戶端1 成功鎖住了 A, B, C,擷取鎖成功(但 D 和 E 沒有鎖住)。
  2. 節點 C 崩潰重新開機了,但用戶端1在 C 上加的鎖沒有持久化下來,丢失了。
  3. 節點 C 重新開機後,用戶端2 鎖住了 C, D, E,擷取鎖成功。
  4. 這樣,用戶端1 和 用戶端2 同時獲得了鎖(針對同一資源)。

哎,還是不能解決故障重新開機後帶來的鎖的安全性問題...

針對節點重後引發的鎖失效問題,Redis 作者又提出了 延遲重新開機 的概念,大緻就是說,一個節點崩潰後,不要立刻重新開機他,而是等到一定的時間後再重新開機,等待的時間應該大于鎖的過期時間,采用這種方式,就可以保證這個節點在重新開機前所參與的鎖都過期,聽上去感覺 延遲重新開機 解決了這個問題...

但是,還是有個問題,節點重新開機後,在等待的時間内,這個節點對外是不工作的。那麼如果大多數節點都挂了,進入了等待,就會導緻系統的不可用,因為系統在過期時間内任何鎖都無法加鎖成功...

巴拉巴拉那麼多,關于 Redis 分布式鎖的缺點顯然進入了一個無解的步驟,包括後來的 神仙打架事件(Redis 作者 antirez 和 分布式領域專家 Martin Kleppmann)...

總之,首先我們要明确使用分布式鎖的目的是什麼?

無外乎就是保證同一時間内隻有一個用戶端可以對共享資源進行操作,也就是共享資源的原子性操作。

總之,在 Redis 分布式鎖的實作上還有很多問題等待解決,我們需要認識到這些問題并清楚如何正确實作一個 Redis 分布式鎖,然後在工作中合理的選擇和正确的使用分布式鎖。

目前我們項目中也有在用分布式鎖,也有用到 Redis 實作分布式鎖的場景,然後有的小夥伴就可能問,啊,你們就不怕出現上邊提到的那種問題嗎~

其實實作分布式鎖,從中間件上來選,也有 Zookeeper 可選,并且 Zookeeper 可靠性比 Redis 強太多,但是效率是低了點,如果并發量不是特别大,追求可靠性,那麼肯定首選 Zookeeper。

如果是為了效率,就首選 Redis 實作。

好了,之後一起探索 Zookeeper 實作分布式鎖。

Redis分布式鎖:https://www.cnblogs.com/niceyoo/category/1615830.html

最近換工作了,一切都是重新開始,新的系統、新的人、新的業務… 未來路很長,大家一起加油。

繼續閱讀