天天看点

关于Redisson的分布式锁对于分布式锁的要求

最近在工作中碰到分布式加锁的问题,平时一般用的reentrantlock已经无法满足分布式的需求的,目前市面上流行的分布式锁 zookeeper Redis 等待,现在是简要的学习了下 Redis的分布式锁,先凑活着用,先会用把,之后再去深入理解下原理,话不多说

对于分布式锁的要求

  • 互斥性:分布式锁需要保证在不同节点的不同线程的互斥。这是最根本的。
  • 可重入性:同一个节点上的同一个线程如果获取了锁之后也可以再次获取这个锁。
  • 锁超时:和本地锁一样支持锁超时,防止死锁。
  • 高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级。
  • 支持阻塞和非阻塞:和 ReentrantLock 一样支持 lock 和 trylock 以及 tryLock(long timeOut)。
  • 支持公平锁和非公平锁(可选):公平锁的意思是按照请求加锁的顺序获得锁,非公平锁就相反是无序的。这个一般来说实现的比较少。

这是最基本的

关于Redisson锁

其实我们都知道ReentrantLock已经有很好的锁的性能和实现,在互斥性、可重入性、锁超时、支持阻塞、支持公平锁都有很好的性能和实现,但不适用分布式场景。redisson是分布式锁,弥补这一缺憾(分布式锁有很多种,其他种,此文不做讨论),其中RLock接口继承了Lock接口,自然也会优雅的实现以上对锁的要求。

原理

关于Redisson的分布式锁对于分布式锁的要求

所需jar包

<!-- redis依赖commons-pool 这个依赖一定要添加 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>

        <!--Redis分布式锁-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.11.0</version>
        </dependency>
           

【application.yml】

相比properties文件结构更清晰【推荐使用yml文件】

# 实现Redis分布式锁
spring:
  redis:
    database: 0
    host: 你的主机名
    password: 你的密码
    port: 6379
    lettuce:
      pool:
        max-active: 100
        max-wait: -1
        max-idle: 8
        min-idle: 0
           

注意这里并不一定需要这样按指定名字配置,可以自定义

【RedissionConfig】

import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author Xiang
 * @date 2021/9/8 - 22:27
 */
@Slf4j
@Configuration
public class RedissionConfig {

    @Value("${spring.redis.password}")
    private String password;

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.database}")
    private int database;

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        String REDISSON_PREFIX = "redis://";
        String url = REDISSON_PREFIX + host + ":" + port;
        // 单台Redis
        config.useSingleServer()
                .setAddress(url)
                .setPassword(password)
                .setDatabase(database);

        // 实际开发过程中应该为cluster或者哨兵模式,这里以cluster为例
        //String[] urls = {"127.0.0.1:6379", "127.0.0.2:6379"};
        //config.useClusterServers()
        //        .addNodeAddress(urls);

        try {
            return Redisson.create(config);
        } catch (Exception e) {
            log.error("RedissonClient init redis url:[{}], Exception:", url, e);
            return null;
        }
    }
}
           

【DistributedRedisLock】

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
 * @author Xiang
 * @date 2021/9/8 - 22:36
 */
@Slf4j
@Component
public class DistributedRedisLock {

    @Autowired
    RedissonClient redissonClient;

    // 加锁
    public Boolean lock(String lockName) {
        if (null == redissonClient) {
            log.info("DistributedRedisLock redissonClient is null");
            return false;
        }

        try {
            RLock lock = redissonClient.getLock(lockName);
            // 锁10秒自动释放
            lock.lock(10, TimeUnit.SECONDS);
            log.info("Thread [{}] DistributedRedisLock lock [{}] success 上锁成功", Thread.currentThread().getName(), lockName);
            // 加锁成功
            return true;
        } catch (Exception e) {
            log.error("DistributedRedisLock lock [{}] Exception:", lockName, e);
            return false;
        }
    }

    // 释放锁
    public Boolean unlock(String lockName) {
        if (redissonClient == null) {
            log.info("DistributedRedisLock redissonClient is null");
            return false;
        }

        try {
            RLock lock = redissonClient.getLock(lockName);
            lock.unlock();
            log.info("Thread [{}] DistributedRedisLock unlock [{}] success 解锁", Thread.currentThread().getName(), lockName);
            // 释放锁成功
            return true;
        } catch (Exception e) {
            log.error("DistributedRedisLock unlock [{}] Exception:", lockName, e);
            return false;
        }
    }

}
           

【LockTestController】

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Xiang
 * @date 2021/9/8 - 22:40
 */
@Slf4j
@RestController
@RequestMapping("/lock")
public class LockController {


    @Autowired
    DistributedRedisLock distributedRedisLock;

    AtomicInteger ID = new AtomicInteger(0);
    AtomicInteger ID1 = new AtomicInteger(0);

    // 测试不释放锁
    @GetMapping("/testLock")
    public void testLock() {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
//                distributedRedisLock.lock(LOCK);
                try {
                    System.out.println(ID.addAndGet(1)+"进入等待");
                    cyclicBarrier.await();
                    System.out.println("开始执行");
                    post();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    // 测试不释放锁
    @GetMapping("/testLock1")
    public void testLock1() {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
//                distributedRedisLock.lock(LOCK);
                try {
                    System.out.println(ID1.addAndGet(1)+"进入等待");
                    cyclicBarrier.await();
                    System.out.println("开始执行");
                    post1();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    // 实际业务开发使用分布式锁的方式
    public void post() throws InterruptedException {
        final String LOCK = "LOCK2LOCK";
        try {
            if (distributedRedisLock.lock(LOCK)) {
                log.info("第e二个准备开始业务逻辑");
                TimeUnit.SECONDS.sleep(1);
                // 业务逻辑
                log.info("第e二个开始业务逻辑");
                TimeUnit.SECONDS.sleep(1);
            } else {
                // 处理获取锁失败的逻辑
                log.info("获取锁失败");
            }
        } catch (Exception e) {
            log.error("处理异常:", e);
        } finally {
            distributedRedisLock.unlock(LOCK);
            TimeUnit.SECONDS.sleep(1);
        }
    }


    // 实际业务开发使用分布式锁的方式
    public void post1() throws InterruptedException {
        final String LOCK = "LOCK1LOCK";
        try {
            if (distributedRedisLock.lock(LOCK)) {
                // 业务逻辑
                log.info("第一个开始业务逻辑");
                TimeUnit.SECONDS.sleep(1);
            } else {
                // 处理获取锁失败的逻辑
                log.info("获取锁失败");
            }
        } catch (Exception e) {
            log.error("处理异常:", e);
        } finally {
            distributedRedisLock.unlock(LOCK);
            TimeUnit.SECONDS.sleep(1);
        }
    }

}
           

这里的测试程序用了apache-jmeter-5.4.1进行性能压力测试

测试小demo

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Xiang
 * @date 2021/9/8 - 22:40
 */
@Slf4j
@RestController
@RequestMapping("/lock")
public class LockController {


    @Autowired
    DistributedRedisLock distributedRedisLock;

    AtomicInteger ID = new AtomicInteger(0);
    AtomicInteger ID1 = new AtomicInteger(0);

    // 测试不释放锁
    @GetMapping("/testLock")
    public void testLock() {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(100);
        for (int i = 0; i < 100; i++) {
            new Thread(() -> {
//                distributedRedisLock.lock(LOCK);
                try {
                    System.out.println(ID.addAndGet(1)+"进入等待");
                    cyclicBarrier.await();
                    System.out.println("开始执行");
                    post();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    // 测试不释放锁
    @GetMapping("/testLock1")
    public void testLock1() {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
//                distributedRedisLock.lock(LOCK);
                try {
                    System.out.println(ID1.addAndGet(1)+"进入等待");
                    cyclicBarrier.await();
                    System.out.println("开始执行");
                    post1();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }

    // 测试不释放锁
    @GetMapping("/testLock2")
    public void testLock2() throws InterruptedException {
        System.out.println("开始执行");
        post();
    }

    // 实际业务开发使用分布式锁的方式
    public void post() throws InterruptedException {
        final String LOCK = "LOCK2LOCK";
        try {
            if (distributedRedisLock.lock(LOCK)) {
                log.info("第e二个获得锁准备开始业务逻辑");
                TimeUnit.SECONDS.sleep(5);
                // 业务逻辑
            } else {
                // 处理获取锁失败的逻辑
                log.info("获取锁失败");
            }
        } catch (Exception e) {
            log.error("处理异常:", e);
        } finally {
            distributedRedisLock.unlock(LOCK);
            TimeUnit.SECONDS.sleep(1);
        }
    }


    // 实际业务开发使用分布式锁的方式
    public void post1() throws InterruptedException {
        final String LOCK = "LOCK1LOCK";
        try {
            if (distributedRedisLock.lock(LOCK)) {
                // 业务逻辑
                log.info("第一个开始业务逻辑");
                TimeUnit.SECONDS.sleep(1);
            } else {
                // 处理获取锁失败的逻辑
                log.info("获取锁失败");
            }
        } catch (Exception e) {
            log.error("处理异常:", e);
        } finally {
            distributedRedisLock.unlock(LOCK);
            TimeUnit.SECONDS.sleep(1);
        }
    }

}
           

不同的锁不会影响其他锁的正常进行

同一把锁会一会一直等待