最近在工作中碰到分布式加鎖的問題,平時一般用的reentrantlock已經無法滿足分布式的需求的,目前市面上流行的分布式鎖 zookeeper Redis 等待,現在是簡要的學習了下 Redis的分布式鎖,先湊活着用,先會用把,之後再去深入了解下原理,話不多說
對于分布式鎖的要求
- 互斥性:分布式鎖需要保證在不同節點的不同線程的互斥。這是最根本的。
- 可重入性:同一個節點上的同一個線程如果擷取了鎖之後也可以再次擷取這個鎖。
- 鎖逾時:和本地鎖一樣支援鎖逾時,防止死鎖。
- 高可用:加鎖和解鎖需要高效,同時也需要保證高可用防止分布式鎖失效,可以增加降級。
- 支援阻塞和非阻塞:和 ReentrantLock 一樣支援 lock 和 trylock 以及 tryLock(long timeOut)。
- 支援公平鎖和非公平鎖(可選):公平鎖的意思是按照請求加鎖的順序獲得鎖,非公平鎖就相反是無序的。這個一般來說實作的比較少。
這是最基本的
關于Redisson鎖
其實我們都知道ReentrantLock已經有很好的鎖的性能和實作,在互斥性、可重入性、鎖逾時、支援阻塞、支援公平鎖都有很好的性能和實作,但不适用分布式場景。redisson是分布式鎖,彌補這一缺憾(分布式鎖有很多種,其他種,此文不做讨論),其中RLock接口繼承了Lock接口,自然也會優雅的實作以上對鎖的要求。
原理
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIiNx8FesU2cfdGLwczX0xiRGZkRGZ0Xy9GbvNGLwIzXlpXazxCVHllNydVW1ATLLZDaPlkNFFXN1AVd3UjcXlVNfNVW1omMxVTQClGVF5UMR9Fd4VGdsATNfd3bkFGazxycykFaKdkYzZUbapXNXlleSdVY2pESa9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLwcDZ5MGZmJmYyMWM3czMhVjYkRzYilzYhJ2M1ITN3U2Lc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
所需jar包
<!-- redis依賴commons-pool 這個依賴一定要添加 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!--Redis分布式鎖-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.0</version>
</dependency>
【application.yml】
相比properties檔案結構更清晰【推薦使用yml檔案】
# 實作Redis分布式鎖
spring:
redis:
database: 0
host: 你的主機名
password: 你的密碼
port: 6379
lettuce:
pool:
max-active: 100
max-wait: -1
max-idle: 8
min-idle: 0
注意這裡并不一定需要這樣按指定名字配置,可以自定義
【RedissionConfig】
import lombok.extern.slf4j.Slf4j;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Xiang
* @date 2021/9/8 - 22:27
*/
@Slf4j
@Configuration
public class RedissionConfig {
@Value("${spring.redis.password}")
private String password;
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.database}")
private int database;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
String REDISSON_PREFIX = "redis://";
String url = REDISSON_PREFIX + host + ":" + port;
// 單台Redis
config.useSingleServer()
.setAddress(url)
.setPassword(password)
.setDatabase(database);
// 實際開發過程中應該為cluster或者哨兵模式,這裡以cluster為例
//String[] urls = {"127.0.0.1:6379", "127.0.0.2:6379"};
//config.useClusterServers()
// .addNodeAddress(urls);
try {
return Redisson.create(config);
} catch (Exception e) {
log.error("RedissonClient init redis url:[{}], Exception:", url, e);
return null;
}
}
}
【DistributedRedisLock】
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* @author Xiang
* @date 2021/9/8 - 22:36
*/
@Slf4j
@Component
public class DistributedRedisLock {
@Autowired
RedissonClient redissonClient;
// 加鎖
public Boolean lock(String lockName) {
if (null == redissonClient) {
log.info("DistributedRedisLock redissonClient is null");
return false;
}
try {
RLock lock = redissonClient.getLock(lockName);
// 鎖10秒自動釋放
lock.lock(10, TimeUnit.SECONDS);
log.info("Thread [{}] DistributedRedisLock lock [{}] success 上鎖成功", Thread.currentThread().getName(), lockName);
// 加鎖成功
return true;
} catch (Exception e) {
log.error("DistributedRedisLock lock [{}] Exception:", lockName, e);
return false;
}
}
// 釋放鎖
public Boolean unlock(String lockName) {
if (redissonClient == null) {
log.info("DistributedRedisLock redissonClient is null");
return false;
}
try {
RLock lock = redissonClient.getLock(lockName);
lock.unlock();
log.info("Thread [{}] DistributedRedisLock unlock [{}] success 解鎖", Thread.currentThread().getName(), lockName);
// 釋放鎖成功
return true;
} catch (Exception e) {
log.error("DistributedRedisLock unlock [{}] Exception:", lockName, e);
return false;
}
}
}
【LockTestController】
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Xiang
* @date 2021/9/8 - 22:40
*/
@Slf4j
@RestController
@RequestMapping("/lock")
public class LockController {
@Autowired
DistributedRedisLock distributedRedisLock;
AtomicInteger ID = new AtomicInteger(0);
AtomicInteger ID1 = new AtomicInteger(0);
// 測試不釋放鎖
@GetMapping("/testLock")
public void testLock() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
// distributedRedisLock.lock(LOCK);
try {
System.out.println(ID.addAndGet(1)+"進入等待");
cyclicBarrier.await();
System.out.println("開始執行");
post();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
// 測試不釋放鎖
@GetMapping("/testLock1")
public void testLock1() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
// distributedRedisLock.lock(LOCK);
try {
System.out.println(ID1.addAndGet(1)+"進入等待");
cyclicBarrier.await();
System.out.println("開始執行");
post1();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
// 實際業務開發使用分布式鎖的方式
public void post() throws InterruptedException {
final String LOCK = "LOCK2LOCK";
try {
if (distributedRedisLock.lock(LOCK)) {
log.info("第e二個準備開始業務邏輯");
TimeUnit.SECONDS.sleep(1);
// 業務邏輯
log.info("第e二個開始業務邏輯");
TimeUnit.SECONDS.sleep(1);
} else {
// 處理擷取鎖失敗的邏輯
log.info("擷取鎖失敗");
}
} catch (Exception e) {
log.error("處理異常:", e);
} finally {
distributedRedisLock.unlock(LOCK);
TimeUnit.SECONDS.sleep(1);
}
}
// 實際業務開發使用分布式鎖的方式
public void post1() throws InterruptedException {
final String LOCK = "LOCK1LOCK";
try {
if (distributedRedisLock.lock(LOCK)) {
// 業務邏輯
log.info("第一個開始業務邏輯");
TimeUnit.SECONDS.sleep(1);
} else {
// 處理擷取鎖失敗的邏輯
log.info("擷取鎖失敗");
}
} catch (Exception e) {
log.error("處理異常:", e);
} finally {
distributedRedisLock.unlock(LOCK);
TimeUnit.SECONDS.sleep(1);
}
}
}
這裡的測試程式用了apache-jmeter-5.4.1進行性能壓力測試
測試小demo
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author Xiang
* @date 2021/9/8 - 22:40
*/
@Slf4j
@RestController
@RequestMapping("/lock")
public class LockController {
@Autowired
DistributedRedisLock distributedRedisLock;
AtomicInteger ID = new AtomicInteger(0);
AtomicInteger ID1 = new AtomicInteger(0);
// 測試不釋放鎖
@GetMapping("/testLock")
public void testLock() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(100);
for (int i = 0; i < 100; i++) {
new Thread(() -> {
// distributedRedisLock.lock(LOCK);
try {
System.out.println(ID.addAndGet(1)+"進入等待");
cyclicBarrier.await();
System.out.println("開始執行");
post();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
// 測試不釋放鎖
@GetMapping("/testLock1")
public void testLock1() {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
// distributedRedisLock.lock(LOCK);
try {
System.out.println(ID1.addAndGet(1)+"進入等待");
cyclicBarrier.await();
System.out.println("開始執行");
post1();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
// 測試不釋放鎖
@GetMapping("/testLock2")
public void testLock2() throws InterruptedException {
System.out.println("開始執行");
post();
}
// 實際業務開發使用分布式鎖的方式
public void post() throws InterruptedException {
final String LOCK = "LOCK2LOCK";
try {
if (distributedRedisLock.lock(LOCK)) {
log.info("第e二個獲得鎖準備開始業務邏輯");
TimeUnit.SECONDS.sleep(5);
// 業務邏輯
} else {
// 處理擷取鎖失敗的邏輯
log.info("擷取鎖失敗");
}
} catch (Exception e) {
log.error("處理異常:", e);
} finally {
distributedRedisLock.unlock(LOCK);
TimeUnit.SECONDS.sleep(1);
}
}
// 實際業務開發使用分布式鎖的方式
public void post1() throws InterruptedException {
final String LOCK = "LOCK1LOCK";
try {
if (distributedRedisLock.lock(LOCK)) {
// 業務邏輯
log.info("第一個開始業務邏輯");
TimeUnit.SECONDS.sleep(1);
} else {
// 處理擷取鎖失敗的邏輯
log.info("擷取鎖失敗");
}
} catch (Exception e) {
log.error("處理異常:", e);
} finally {
distributedRedisLock.unlock(LOCK);
TimeUnit.SECONDS.sleep(1);
}
}
}
不同的鎖不會影響其他鎖的正常進行
同一把鎖會一會一直等待