天天看点

Redis(三)应用:分布式锁

文章目录

      • 一、分布式锁
      • 二、超时问题
          • Redis 中使用 Lua 脚本
      • 三、Spring 实现分布式锁
        • 1、 RedisLockRegistry$RedisLock 类 lock()加锁 和 解锁 流程
        • 2、RedisLock#lock() 加锁源码实现
            • UNLINK 命令
            • RedLock 算法
      • 四、基于 Redission 实现分布式锁

一、分布式锁

    首先,Redis 是单线程的 ,整理的 “单线程“ 网络请求模块使用的是一个线程,也就是说,一个线程处理所有网络请求(其他模块仍用了多个线程),不需要考虑并发安全性。

    分布式应用进行逻辑处理时,经常会遇到并发问题。Java 提供的 Synchronized、ReentrantLock、ReentrantReadWriteLock…,仅能在单个JVM 进程内 对 多线程 对共享资源 保证线程安全,在分布式系统环境下统统不好使。

Redis(三)应用:分布式锁

说个结论😏 :

  • 性能:缓存 > Zookeeper >= 数据库
  • 可靠性:Zookeeper > 缓存 > 数据库

    其他,比如:Chubby,是Google开发的粗粒度分布锁的服务,但是并没有开源,开放出了论文和一些相关文档可以进一步了解;Tair,是阿里开源的一个分布式 KV 存储方案;Hazelcast,是基于内存的数据网格开源项目,提供弹性可扩展的分布式内存计算,并且被公认是提高应用程序性能和扩展性最好的方案… …

(数据库、Zookeeper 使用分布式锁 移步博客 https://blog.csdn.net/weixin_41750142/article/details/110956216)

    以下是分布式锁的一些特点,分布式锁 家族成员并不一定都满足这个要求,实现机制不大一样。

  • 互斥性: 分布式锁要保证在多个客户端之间的互斥。
  • 可重入性:同一客户端的相同线程,允许重复多次加锁。
  • 锁超时:和本地锁一样支持锁超时,防止死锁。
  • 非阻塞: 能与 ReentrantLock 一样支持 trylock() 非阻塞方式获得锁。
  • 支持公平锁和非公平锁:公平锁是指按照请求加锁的顺序获得锁,非公平锁真好相反请求加锁是无序的。

    用并发那块儿的知识理解分布式锁,就类似于获取资源 / 独占锁,(如果理解有误请在评论中指出🤗)如果一个进程已经获取了资源,当别的进程也要来尝试获取资源时,就会失败。

    获取资源 / 加锁 一般是使用 setnx(set if not exists)指令 ,先来先获取,使用完毕后调用 del 指令释放资源 / 释放锁。

Redis(三)应用:分布式锁

    但有问题,如果逻辑执行到中间出现异常了,可能会导致 del 指令没有被调用,锁永远得不到释放,这样就会出现可以使用

create /hjiajia

来创建节点,。

    于是,在拿到锁之后,再给锁加上一个过期时间,比如 10 s,这样,即使中间出现异常,也可以保证 10 s 后 锁会自动释放。

Redis(三)应用:分布式锁

    但是还有问题,如果在 setnx 和 expire 之间,服务器进程突然挂掉了(可能是因为机器掉电 或者 人为 kill 掉了),就会导致 expire 得不到执行,还是造成了死锁 👻。

    Redis 2.6.12 之前的版本中,采用 setnx + expire 方式实现分布式锁,代码如下所示:

public static boolean lock(Jedis jedis, String lockKey, String requestId, int expireTime) {
		//设置锁
        Long result = jedis.setnx(lockKey, requestId);
        //获取锁成功
        if (result == 1) {
            
            //若在这里程序突然崩溃,则无法设置过期时间,将发生死锁
           
            //给 lockKey 设置一个过期时间,确认 key 值删除
            jedis.expire(lockKey, expireTime);
            return true;
        }
        return false;
    }
           

     setnx + expire 可能出现死锁问题的根源在于,setnx 和 expire 是两条指令 而不是 原子指令,如果这两条指令可以一起执行,就不会出现问题。可能你会想到 “事务”【要么都执行,要么都不执行】,但是 expire 是依赖于 setnx 执行结果的,如果 setnx 没有抢到锁,expire 是不应该执行的,而 事务里没有 if-else 分支逻辑,所以事务是行不通的。

    为了解决这个问题,Redis 社区涌现了一堆分布式锁的 library,实现方式极为复杂(使用起来也极为复杂)。后来,Redis 2.8 版本作者加入了 set 指令的扩展参数,使得 setnx 和 expire 指令可以一起执行,彻底解决了分布式锁的问题。

Redis(三)应用:分布式锁

二、超时问题

    Redis 的分布式锁并不能解决超时问题,如果在加锁 和 释放锁 之间的逻辑 执行的时间太长,以至于超出了锁的超时限制,就会出现问题。如果这时候锁过期了,第二个线程重新持有了这把锁,但是 紧接着 第一个线程执行完了业务逻辑,可又把锁释放了,第三个线程可能会在第二个线程逻辑执行完毕之前拿到锁。

    为了避免这个问题,Redis 分布式锁不要用于较长时间的任务。如果真的偶尔出现了,数据出现的小波错乱可能需要人工介入解决。

    比如:把 key 设置成一个随机数 tag ,释放锁时查看 key 的值 与 tag 是否相等,如果相等再删除 key(删除 key 就相当于释放锁)。

tag = str(uuid.uuid4())  # 随机值
if redis.set(key, tag, nx=True, ex=5):
    do_something()
    redis.delifequals(key, tag)  # 假想的 delifequals 指令
# 查看取出 key 的值 与 tag 是否相等,相等再删除
           

    有一个更加安全的方案,是 为 set 指令的 value 参数设置一个随机数,释放锁时,先匹配随机数是否一致,然后再删除 key。但是,匹配 value 和 删除 key 不是一个原子操作,Redis 也没有提供类似于 delifequals 这样的指令,这就需要使用 Lua 脚本来处理,因为 Lua 脚本可以保证 连续多个指令的原子性执行。

# delifequals
if redis.call("get",KEYS[1]) == ARGV[1] then
 return redis.call("del",KEYS[1])
else
 return 0
end
           
Redis 中使用 Lua 脚本

    Redis 从 2.6 版本开始引入对 Lua 脚本的支持,通过在 服务器中嵌入 Lua 环境,Redis 客户端可以使用 Lua 脚本,直接在服务器端 原子地执行多个 Redis 指令。 除此之外,使用脚本的好处还有:减少网络往返时延开销;客户端发送的脚本会永久存储在 Redis 中,其他客户端可复用。

    Lua 是动态类型语言,变量不要类型定义,只需要为变量赋值。 值可以存储在变量中,作为参数传递或结果返回。

    Lua 中有 8 个基本类型分别为:nil、boolean、number、string、userdata、function、thread 和 table。

Redis 内置 Lua 执行命令

  • EVAL 命令语法

各参数含义:

(1)EVAL :Lua 程序的运行环境上下文

(2)script :Lua 脚本

(3)numkeys :参数的个数

(4)key:Redis 键,访问下标从 1 开始,比如 KEYS[1]

(5)arg:Redis 键的附加参数

  • EVALSHA 命令语法

EVALSHA 命令允许通过脚本的 SHA1 来执行(节省带宽),Redis 在执行 EVAL/SCRIPT LOAD 后会计算脚本 SHA1【Secure Hash Algorithm 1,安全散列算法1】 缓存,EVALSHA 根据 SHA1 取出缓存脚本执行。

= 使用流程如下:

(1)编写脚本

(2)脚本提交到 Redis 并获取 SHA

(3)使用 SHA 调用 Redis 脚本

Redis 运行 Lua 脚本

  • EVAL 直接运行脚本
    Redis(三)应用:分布式锁
  • EVALSHA 使用:需要 SCRIPT LOAD 和 EVALSHA 配合使用

    (1)SCRIPT LOAD 加载到内存,返回 SHA 签名

    (2)EVALSHA 使用已存在的签名

        这样只用加载一次,便可重复使用已经加载的签名脚本,可以多次使用,避免长脚本输入。

    Redis(三)应用:分布式锁

    在 Redis 下使用脚本文件执行

    👀 例1: set、get 操作数据

        首先,在 Redis 路径下创建 lua 源文件:

    (1)set.lua

--[[ set.lua, redis的set命令使用 
redis: set key val
--]]
local key = KEYS[1]
local val = ARGV[1]

return redis.call('set', key, val)
           

(2)get.lua

--[[ get.lua, redis的get命令使用 
redis: get key
--]]

local key = KEYS[1]
local val = redis.call("GET", key);

return val;
           

    接下来执行以下命令,设置 key、value 值:

redis-cli --eval set.lua foo , bar
           

✨ 注意 ✨: foo 和 bar 之间的逗号 左 右 都要有空格分隔开,否则会被当成一个字符串。

Redis(三)应用:分布式锁

    查看 value 值:

Redis(三)应用:分布式锁
Redis(三)应用:分布式锁

    可以看到,这就是个简单使用 Lua 脚本操作 Redis 数据的例子。

👀 例2:访问次数限制

    先编写 retelimiting.lua

local times = redis.call('incr',KEYS[1])

if times == 1 then
    redis.call('expire',KEYS[1], ARGV[1])
end

if times > tonumber(ARGV[2]) then
    return 0
end
return 1
           

运行脚本:

Redis(三)应用:分布式锁

    可以看到,rata.limiting:127.0.0.1 是前缀 + ip 组成的 KEY,用 KEYS[1] 获取,后面的 10 和 3 是参数,在脚本中可以通过 ARGV[1] 和 ARGV[2] 获取。

    该脚本的作用是将访问频率限制为 每 10 秒最多 3 次,所以在终端不断运行此命令会发现当访问频率在 10 s 内 小于或等于 3 次时返回 1 ,否则返回 0.

Redis(三)应用:分布式锁

三、Spring 实现分布式锁

    除了使用 Jedis 客户端外,完全可以直接使用 Spring 官方提供的 企业集成模式 框架,其中提供了很多分布式锁的方式,Spring 提供了一个统一的分布式锁抽象,具体实现目前支持:Gemfire、Jdbc、Zookeeper、Redis。

    分布式锁的代码在 Spring Integration 中。项目地址:https://github.com/spring-projects/spring-integration

    Spring 对 Lock 分布式锁做了全局抽象,抽象结构:

Redis(三)应用:分布式锁

LockRegistry 作为顶层抽象接口,源码:

/**
 * Strategy for maintaining a registry of shared locks
 *
 * @author Oleg Zhurakousky
 * @author Gary Russell
 * @since 2.1.1
 */
 
@FunctionalInterface
public interface LockRegistry {

    /**
     * Obtains the lock associated with the parameter object.
     * @param lockKey The object with which the lock is associated.
     * @return The associated lock.
     */
    Lock obtain(Object lockKey);

}
           

obtain()

方法获得具体的 Lock 实现类,分别在对应的 XxxLockRegitry 实现类来创建。

    RedisLockRegistry 中 obtain()方法 对应的 实现类为 RedisLock,RedisLock内部,在 Springboot2.x(Spring5)版本中是通过

SET

+

PEXIPRE

命令结合 Lua脚本实现的,在 Springboot1.x(Spring4)版本中,是通过

SETNX

命令实现的。

    ZookeeperLockRegistry 里 obtain() 方法 实现类为 ZkLock,ZkLock 内部基于 Apache Curator 框架实现的。

    JdbcLockRegistry 里 obtain() 方法实现类为 JdbcLock,JdbcLock 内部基于一张 INT_LOCK 数据库锁表实现的,通过 JdbcTemplate 来操作。

  • 客户端使用方法:
private final String registryKey = "sb2";
  RedisLockRegistry lockRegistry = new RedisLockRegistry(getConnectionFactory(), this.registryKey);
  Lock lock = lockRegistry.obtain("foo");
  
  // (一)
  lock.lock();
    try {
    // doSth...
      }
 finally {
    lock.unlock();
   }
           

1、 RedisLockRegistry$RedisLock 类 lock()加锁 和 解锁 流程

Redis(三)应用:分布式锁

加锁步骤:

(1)lockKey 为 registryKey:path ,例子中为 sb2:foo ,客户端 C1 优先申请加锁。

(2)执行 Lua 脚本,get lockKey 不存在,则 set lockKey 成功,值为 clientid(UUID),过期时间默认 60 秒。

(3) 客户端 C1 同一线程重复加锁,pexpire lockKey ,重置过期时间为 60 秒。

(4)客户端 C2 申请加锁,执行 Lua 脚本,get lockKey 已经存在,并且 clientid 也不同,加锁失败。

(5)客户端 C2 挂起,每个 100 ms 再次尝试加锁。

2、RedisLock#lock() 加锁源码实现

Redis(三)应用:分布式锁

源码:

@Override
public void lock() {
    this.localLock.lock();
    while (true) {
        try {
        
        	    // (一)
            while (!obtainLock()) {
                Thread.sleep(100); //NOSONAR
            }
            break;
        }
        catch (InterruptedException e) {
            /*
             * This method must be uninterruptible so catch and ignore
             * interrupts and only break out of the while loop when
             * we get the lock.
             */
        }
        catch (Exception e) {
        	               //(三)
            this.localLock.unlock();
            rethrowAsLockException(e);
        }
    }
}
           

(一) obtainLock(), 基于 Spring 封装的 RedisTemplate 来操作的:

private boolean obtainLock() {
    Boolean success =
																				//(二) 
            RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript,
                    Collections.singletonList(this.lockKey), RedisLockRegistry.this.clientId,
                    String.valueOf(RedisLockRegistry.this.expireAfter));

    boolean result = Boolean.TRUE.equals(success);

    if (result) {
        this.lockedAt = System.currentTimeMillis();
    }
    return result;
}
           

(二)obtainLockScript 对应的 Lua 脚本代码:

private static final String OBTAIN_LOCK_SCRIPT =
    "local lockClientId = redis.call('GET', KEYS[1])\n" +
            "if lockClientId == ARGV[1] then\n" +
            "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
            "  return true\n" +
            "elseif not lockClientId then\n" +
            "  redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n" +
            "  return true\n" +
            "end\n" +
            "return false";
           

(三)unlock():

解锁流程:

Redis(三)应用:分布式锁

源码:

@Override
public void unlock() {
    if (!this.localLock.isHeldByCurrentThread()) {
        throw new IllegalStateException("You do not own lock at " + this.lockKey);
    }
    if (this.localLock.getHoldCount() > 1) {
        this.localLock.unlock();
        return;
    }
    try {
        if (!isAcquiredInThisProcess()) {
            throw new IllegalStateException("Lock was released in the store due to expiration. " +
                    "The integrity of data protected by this lock may have been compromised.");
        }

        if (Thread.currentThread().isInterrupted()) {
            RedisLockRegistry.this.executor.execute(this::removeLockKey);
        }
        else {
        	// (四)
            removeLockKey();
        }

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Released lock; " + this);
        }
    }
    catch (Exception e) {
        ReflectionUtils.rethrowRuntimeException(e);
    }
    finally {
        this.localLock.unlock();
    }
}
           

(四):

// 删除缓存Key
private void removeLockKey() {
    if (this.unlinkAvailable) {
        try {
            RedisLockRegistry.this.redisTemplate.unlink(this.lockKey);
        }
        catch (Exception ex) {
            LOGGER.warn("The UNLINK command has failed (not supported on the Redis server?); " +
                    "falling back to the regular DELETE command", ex);
            this.unlinkAvailable = false;
            RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
        }
    }
    else {
        RedisLockRegistry.this.redisTemplate.delete(this.lockKey);
    }
}
           

    可以看到,并不是直接调用 Redis 的 DEL 命令删除 key,这也是在 Sptingboot2.x 版本中的一个优化,Redis 4.0 版本以上提供了 UNLINK 命令。

UNLINK 命令

Redis 官网关于 UNLINK 给出的一段解释:

This command is very similar to DEL: it removes the specified keys.
Just like DEL a key is ignored if it does not exist. However the
command performs the actual memory reclaiming in a different thread,
so it is not blocking, while DEL is. This is where the command name
comes from: the command just unlinks the keys from the keyspace. The
actual removal will happen later asynchronously.
           

译为:

    这个命令与 DEL 非常相似:它删除指定的键。就像 DEL 一样,如果键不存在,则忽略这个命令。然而,命令在其他的线程中执行实际的内存回收再利用,所以它是非阻塞的,这也是它命名的由来:UNLINK 命令没有将 key 和 keyspace 连接在一起。实际的删除将在以后异步地发生。

    可以看到,UNLINK 是非阻塞的,DEL 命令对于大型 list 或者 hash ,如果值太大,分配的空间太多,会长时间阻止 Redis,正是为了解决这样的问题, UNLINK 命令,是 非阻塞地删除。 不管如果值很小,DEL 一般和 UNLINK 效率差不多。

    本质上,RedisLock#lock() 加锁方式还是使用 SETNX 实现的,而且 Spring 只是做了一层薄薄的封装,支持 可重入加锁、超时等待、可中断加锁。

    但是有个问题,锁的过期时间不能灵活设置,客户端初始化时,创建 RedisLockRegistry 时运行设置,参数为 long 类型的 expireAfter,但是它是全局的。

源码:

/**
     * Constructs a lock registry with the supplied lock expiration.
     * @param connectionFactory The connection factory.
     * @param registryKey The key prefix for locks.
     * @param expireAfter The expiration in milliseconds.
     */
public RedisLockRegistry(RedisConnectionFactory connectionFactory, String registryKey, long expireAfter) {
    Assert.notNull(connectionFactory, "'connectionFactory' cannot be null");
    Assert.notNull(registryKey, "'registryKey' cannot be null");
    this.redisTemplate = new StringRedisTemplate(connectionFactory);
    this.obtainLockScript = new DefaultRedisScript<>(OBTAIN_LOCK_SCRIPT, Boolean.class);
    this.registryKey = registryKey;
    this.expireAfter = expireAfter;
           

RedLock 算法

    从 Redis 主从架构上考虑,依然存在问题,因为 Redis 集群数据 同步到各个节点时,是异步的 ,如果在 Master 节点获取到锁后,在没有同步到其他节点时,Master 节点崩溃了,此时 新的 Master 节点依然可以获取锁,这样的话, 多个应用服务可以同时获取到锁。

    为了解决这样的问题,Redis 之父 Antirez 提出了 RedLock 算法。

  • RedLock 算法实现过程分析

        假设 Redis 部署模式是 Redis Cluster,总共有 5 个 master 节点,获取锁的步骤如下:

    (1)获取当前时间戳,单位是 毫秒。

    (2)轮流尝试在每个 master 节点上创建锁,过期时间设置较短。

    (3)尝试在大多数节点上建立一个锁,比如 5 个 master 节点,就要求 n/2 + 1 = 3 个节点。

    (4)客户端计算建立好锁的时间,如果建立锁的时间小于超时时间,就算建立成功了。

    (5)要是锁建立失败了,那么就依次删除这个锁。

    (6)只要有客户端创建成功了分布式锁,其他客户端就要不断轮询去尝试获取锁

  • RedLock 算法可能存在问题

    (1)节点崩溃重启,会出现多个客户端持有锁

        假设一共有 5 个 Redis 节点:A、B、C、D、E,设想发生了如下的事件序列:

    ① 客户端 C1 成功对 Redis 集群中 A、B、C 三个节点加锁成功,但是 D、E 加锁失败。

    ② 节点 C 崩溃重启了,但是客户端 C1 在节点 C 加锁未 持久化 完成。

    ③ 节点 C 重启后,客户端 C2 成功对 Redis 集群中 C、D、E 尝试加锁成功了。

        这样客户端 C1 和 C2 同时获取了同一把分布式锁。

        为了应对这种节点重启引起的锁失效问题,Antirez 提出了 延迟重启 的概念 ,即 一个节点崩溃后,先不立即重启它,而是等待一段时间后再重启,等待时间大于锁的有效时间。采用这种方式,这个节点在重启前,所参与的锁都会过期,重启后就不会对现有的锁造成影响。

    (2)时钟跳跃

        假设一共有 5 个 Redis 节点:A、B、C、D、E,设想发生了如下的事件序列:

    ① 客户端 C1 成功对 Redis 集群种 A、B、C 三个节点成功加锁,但是因为网络问题,与 D 和 E 的通信失败。

    ② 节点 C 上的时钟发生了向前跳跃,导致它维护的锁快速过期。

    ③ 客户端 C2 对 Redis 集群种节点 C、D、E 成功加上了同一把锁。

        这样客户端 C1 和 C2 同时获取了同一把分布式锁。

        为了应对这种时钟跳跃引起的锁失效问题,Antirez 提出了应该禁止人为修改系统时间,使用一个不会进行 跳跃式 调整系统时钟的 ntpd 程序。

        但是,RedLock 算法并没有解决操作共享资源超时 导致 锁失效的问题。所以这个算法还是不推荐使用的。

四、基于 Redission 实现分布式锁

    Redission 是 Redis 的 Java 实现的客户端,其 API 提供了比较全面的 Redis 命令的支持。Jedis 简单使用阻塞的 I/O 和 Redis 交互,Redission 通过 Netty 支持非阻塞 I/O。

    Redission 封装了锁的实现,还对集合、对象、常用缓存框架等做了友好的封装,易于使用。Redisson分布式锁Github:https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers

    Redission 可以便携支持多种 Redis 部署架构:

(1)Redis 单机

(2)Master-Slave + Sentinel 哨兵

Master-Slave 配置:

Config config = new Config();
MasterSlaveServersConfig serverConfig = config.useMasterSlaveServers()
            .setMasterAddress("")
            .addSlaveAddress("")
            .setReadMode(ReadMode.SLAVE)
            .setMasterConnectionPoolSize(maxActiveSize)
            .setMasterConnectionMinimumIdleSize(maxIdleSize)
            .setSlaveConnectionPoolSize(maxActiveSize)
            .setSlaveConnectionMinimumIdleSize(maxIdleSize)
            .setConnectTimeout(CONNECTION_TIMEOUT_MS) // 默认10秒
            .setTimeout(socketTimeout)
            ;
            
RedissonClient redisson = Redisson.create(config);
RLock lock = redisson.getLock("myLock");

// 获得锁
lock.lock();

lock.lock(10, TimeUnit.SECONDS);

				 //(一)
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       lock.unlock();
   }
}
           

    其中,Config 类中私有属性 long 类型的 lockWatchdog :

lockWatchdogTimeout = 30 * 1000;

    其中 ,RedissonClient 客户端提供了众多的接口实现,支持 可重入锁、公平锁、读写锁、锁超时、RedLock 等。

(一)tryLock() 源码:

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        
        // (二)尝试获取锁,并返回剩余超时时间
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        
        // 如果ttl为空则说明锁未被其他客户端持有
        if (ttl == null) {
            return true;
        }
        
        // 检查是否超过等待时间 超过则返回 false
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        
        // 当前线程进行订阅
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }
 
        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
            // 在等待时间内 重复尝试获取锁 直到超过等待时间或成功获取锁
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }
           

    leaseTime 参数指定加锁时间,超过这个时间,锁就自动解开了。

(二)tryAcquire:

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
			   // (三)
    return get(tryAcquireAsync(leaseTime, unit, threadId));
} 
           

(三)tryAcquireAsync:

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    if (leaseTime != -1) {
    
        	//(四)
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.addListener(new FutureListener<Long>() {
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            if (!future.isSuccess()) {
                return;
            }
 
            Long ttlRemaining = future.getNow();
            // lock acquired
            if (ttlRemaining == null) {
            
            	// (五)
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
           

(四)tryLockInnerAsync:

    为了兼容老的版本,Redission 里都是通过 Lua 脚本执行 Redis 命令的,同时保证了原子性操作。

加锁执行的 Lua 脚本:

<T> RFuture<T> tryLockInnerAsync(long leaseTime,TimeUnit unit,long threadId,RedisStricCommand<T> command){
    internalLockLeaseTime = unit.toMillis(leaseTime);

	return commandExecutor.evalWriterAsync(getName(),LongCoder.INSTANCE,command,
	--检查 key 是否被占用,如果没有则设置超时时间和唯一标识,初始化 value = 1 
	 "if (redis.call('exists',KEYS[1] == 0) then " +
	  		"redis.call('hset',KEY[1], ARGV[2],1);" +
	  		"redis.call('pexpire', KEY[1],ARGV[1]);" +
	  		"return nil;" +
	  "end: " +
	  
	  --如果锁重入,需要判断锁的 key、field,都一致的情况下 value +1
	  "if (redis.call('hexists',KEY[1], ARGV[2]) == 1) then" +
	  		"redis.call('hincrby'.KEY[1],ARGV[2],1);" +
	  		--锁重入需要重新设置超时时间
	  		"redis.call('pexpire',KEY[1],ARGV[1]);" +
	  		"return nil;"
	 "end;" +
	 		--返回剩余的过期时间
	 		"return redis.call('pttl',KEY[1]);",
	 		Collections.<~> singletonList(getName()),interanlLockLeaseTime,getLockName(threadId);
}	 
           

参数含义:

  • KEY[1]:要加锁的 KEY 名称,比如上例中的 mylock。
  • ARGV[1]:针对加锁的 KEY 设置的过期时间。
  • ARGV[2]:Hash 结构中 KEY 的名称。
  • LockName 是 UUID:线程 ID。

(五)看门狗🐕 scheduleExpirationRenewal:

private void scheduleExpirationRenewal(final long threadId) {
    if (expirationRenewalMap.containsKey(getEntryName())) {
        return;
    }
    
    //新建定时任务,每隔1/3过期时间则刷新过期时间
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            
            RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                    "end; " +
                    "return 0;",                  Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
            
            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    expirationRenewalMap.remove(getEntryName());
                    if (!future.isSuccess()) {
                        log.error("Can't update lock " + getName() + " expiration", future.cause());
                        return;
                    }
                    
                    if (future.getNow()) {
                        // reschedule itself
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    // 如果传入 key 对应的 value 已经存在,就返回存在的 value,不进行替换。如果不存在,就添加 key 和 value,返回null
    if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
        task.cancel();
    }
}
           

    看门狗的作用是在 Redission 实例被关闭前,不断延迟锁的有效期, 默认情况下,看门狗的检查锁的超时时间是 30 秒钟 ,也可以通过修改 Config.lockWatchTimeout 来令行指定,internalLockLease 和 lockWatchdogTimeout 两个参数是相等的。

    整理了加锁的调用流程大概是这样:

Redis(三)应用:分布式锁

    可以看到,上例中,假设 客户端 C1 申请加锁,KEY 为 mylock,如果 KEY 不存在,则 通过 hset 设置值 value =1 ,通过 pexpire 设置过期时间,同时开启开门狗 watchdog 任务,默认加锁时间是 30 秒, 每隔 10 秒判断一下,如果 key 还在,重置过期时间到 30 秒,如此解决 业务处理时间比过期时间长的问题。(也就是说,默认加锁时间是 30 秒,如果加锁的业务没有执行完,那么到 30-10=20 秒时,就会进行一次续期,把锁的过期时间重置为 30 秒,万一机器宕机了,定时任务就不会跑,就不会续期,那 30 秒后锁就自动释放了,如此避免死锁问题。)

    接下来,客户端 C1 相同线程再次加锁,key 存在,判断 Redis 里 Hash 中的 lockName 跟当前线程 lockName 相同,则将 Hash 中的 lockName 的值 value 加1,代表支持可重入加锁。

    接下来,客户端 C2 申请加锁,如果 key 存在,判断 Redis 里 Hash 中的 lockName 跟当前线程 lockName “mylock” 不同,则 返回剩余过期时间。

    接下来,客户端 C2 线程在 tryLock() 方法内不断尝试获取锁,此处是基于 Semaphore 信号量实现的,有许可立即返回,否则继续重试。

    官网中关于 WatchDog 的描述:

译为:

    如果获得锁的 Redisson 实例崩溃,那么该锁可能会在获得的状态下永远挂起。为了避免 Redisson 维护看门狗,它会在锁持有者 Redisson 实例处于 alive 活动状态时延长锁过期时间。默认情况下,锁看门狗超时时间为30秒,可以通过配置进行更改。

    以上是加锁分析,接下来是解锁逻辑:

@Override
    public void unlock() {
    
        // 1.通过 Lua 脚本执行 Redis 命令释放锁
        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE,
                RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end;" +
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                        "end; " +
                        "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()),
                LockPubSub.unlockMessage, internalLockLeaseTime,
                getLockName(Thread.currentThread().getId()));
                
        // 2.非锁的持有者释放锁时抛出异常
        if (opStatus == null) {
            throw new IllegalMonitorStateException(
                    "attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + Thread.currentThread().getId());
        }
        
        // 3.释放锁后取消刷新锁失效时间的调度任务
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }
           

    可以看到,如果 key 不存在,说明锁已释放,直接执行 publish 命令发布释放锁消息并返回 1;如果 key 存在,但是 field 在 Hash 中不存在,说明自己不是锁持有者,无权释放锁,返回 nil。因为锁是可重入的,所以释放锁时不能把所有已获取的锁全都释放掉,一次只能释放一把锁,因此执行 hincrby 对锁的值减一。释放一把锁后,如果还有剩余的锁,则刷新锁的失效时间并返回 0;如果刚才释放的已经是最后一把锁,则执行 del 命令删除锁的 key,并发布锁释放消息,返回 1。

(3)Redis-Cluster 集群

参考文章:

(1)https://mp.weixin.qq.com/s?__biz=MjM5MDAxOTk2MQ==&mid=2650283760&idx=1&sn=dc65028aadb0136ea348bbc1f62c7f75&chksm=be4786e689300ff0393f8355a090bbf89dd541311c4bc94f5c28d6ceafb1d3edf2a32b9df95e&mpshare=1&scene=23&srcid=1209Py9XcrfJsuUQN1AwmWxr&sharer_sharetime=1607571927696&sharer_shareid=969ef7742555284a7918c681ba9e8479#rd

(2)https://www.sohu.com/a/326080287_100212268

(3)https://blog.csdn.net/lbh199466/article/details/90176059

继续阅读