前言:
項目需求,搞了搞
- 實作了鎖的重入
- 參考了别人的博文實作了AOP注解形式的鎖、統一配置
參考博文位址:
https://www.cnblogs.com/lijiasnong/p/9952494.html
這邊看了下比較主流幾個分布式鎖的應用,最終選擇的redis
原因是:
1、懶(伺服器已有redis做緩存,不想再去安裝zuukeeper)
2、評估認為redis的分布式鎖已能滿足當下應用
正文 - 摘錄核心代碼:
- RedisReentrantLock
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* redis分布式鎖 - 使用 ThreadLocal 做線程隔離,實作鎖的重入機制
* @date 2020/12/11 9:13
* @author wei.heng
*/
@Component
@Log4j2
public class RedisReentrantLock {
/** 擷取鎖逾時的時間 */
private static final int ACQUIRE_LOCK_TIME_OUT_IN_MILLISECONDS = 3 * 1000;
/** 自旋重試間隔 */
private static final int WAIT_INTERVAL_IN_MILLISECONDS = 100;
/** 鎖的失效時間 */
private static final Integer EXPIRE_SECONDS = 5;
/** 鎖的字首,友善統一檢視 */
private static final String LOCK_PREFIX = "LOCK:";
private ThreadLocal<Map<String, Integer>> lockers = new ThreadLocal<>();
private RedisService redisService;
@Autowired
public RedisReentrantLock(RedisService redisService) {
this.redisService = redisService;
}
/**
*
* 添加分布式鎖
* @param key 鍵
* @return boolean 請求鎖是否成功
* @date 2020/12/11 10:52
* @author wei.heng
*/
public boolean lock(String key){
key = LOCK_PREFIX + key;
Map<String, Integer> refs = currentLockers();
Integer refCnt = refs.get(key);
if(refCnt != null){
// 目前線程已經加鎖,這裡屬于鎖的重入,計數器加1
refs.put(key, refCnt + 1);
return true;
}
boolean ok = this._lock(key);
if(!ok){
return false;
}
refs.put(key, 1);
return true;
}
/**
*
* 釋放鎖
* @param key 鍵
* @return boolean 釋放是否成功
* @date 2020/12/11 10:53
* @author wei.heng
*/
public boolean unlock(String key){
key = LOCK_PREFIX + key;
Map<String, Integer> refs = currentLockers();
Integer refCnt = refs.get(key);
if(refCnt == null){
return false;
}
refCnt -= 1;
if(refCnt > 0){
refs.put(key, refCnt);
} else {
refs.remove(key);
this._unlock(key);
}
return true;
}
private boolean _lock(String key){
// 逾時時間點
long timeoutAt = System.currentTimeMillis() + ACQUIRE_LOCK_TIME_OUT_IN_MILLISECONDS;
while(true){
boolean isSuccess = redisService.setIfAbsent(key, "", EXPIRE_SECONDS);
if(isSuccess){
// 如果加鎖成功,就傳回
return isSuccess;
}
// 如果加鎖失敗,證明目前鎖被其他線程占用,進入自旋
if(System.currentTimeMillis() < timeoutAt){
try {
TimeUnit.MILLISECONDS.sleep(WAIT_INTERVAL_IN_MILLISECONDS);
} catch (InterruptedException e) {
log.error("redis鎖自旋等待被打斷...", e);
e.printStackTrace();
}
} else {
// 自旋等待時間逾時,傳回擷取鎖失敗
isSuccess = false;
return isSuccess;
}
}
}
private void _unlock(String key){
redisService.deleteObject(key);
}
private Map<String, Integer> currentLockers(){
Map<String, Integer> refs = lockers.get();
if(refs != null){
return refs;
}
lockers.set(new HashMap<>());
return lockers.get();
}
}
- RedisService 核心代碼
/**
*
* 如果key不存在,則設值
* @param key 鍵
* @param value 值
* @param exptime 過期時間 - 機關:秒
* @return boolean 設值是否成功(如果已有該key存在,則傳回false)
* @date 2020/12/11 10:42
* @author wei.heng
*/
public boolean setIfAbsent(final String key, final Serializable value, final long exptime) {
Boolean result = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
RedisSerializer keySerializer = redisTemplate.getKeySerializer();
Object obj = connection.execute("set", keySerializer.serialize(key),
valueSerializer.serialize(value),
"NX".getBytes(StandardCharsets.UTF_8),
"EX".getBytes(StandardCharsets.UTF_8),
String.valueOf(exptime).getBytes(StandardCharsets.UTF_8));
return obj != null;
});
return result;
}
到這裡,redis的分布式鎖已經可以使用了。
看到網上别人的文章,為了友善,
下面繼續做了下抄襲、修改
再次重申,抄襲的位址:
https://www.cnblogs.com/lijiasnong/p/9952494.html
- RedisLockable 建立注解
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* <pre>
* redis分布式鎖
* 使用示例(可參見 AppUserController 裡被注釋掉的 test 函數示例):
* 1、按對象屬性加鎖@RedisLockable("#ObjectName.attributeName")
* 2、按函數入參加鎖@RedisLockable("#functionParamName")
* 3、按對象參數 + 函數參數加鎖:@RedisLockable("#ObjectName.attributeName.concat('#').concat(#functionParamName)
* </pre>
* @date 2020/12/11 11:48
* @author wei.heng
*/
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisLockable {
/** redis鎖的關鍵字(keys),可使用SpEL表達式代表參數值 */
String[] value() default "";
}
-
ReflectParamNames
這個類需要引入jar包,我這裡搜了個最新的
<dependency>
<groupId>javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.12.1.GA</version>
</dependency>
import javassist.*;
import javassist.bytecode.CodeAttribute;
import javassist.bytecode.LocalVariableAttribute;
import javassist.bytecode.MethodInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 反射擷取參數名
* @date 2020/12/11 14:03
* @author wei.heng
*/
public class ReflectParamNames {
private static Logger log = LoggerFactory.getLogger(ReflectParamNames.class);
private static ClassPool pool = ClassPool.getDefault();
static{
ClassClassPath classPath = new ClassClassPath(ReflectParamNames.class);
pool.insertClassPath(classPath);
}
public static String[] getNames(String className,String methodName) {
CtClass cc = null;
try {
cc = pool.get(className);
CtMethod cm = cc.getDeclaredMethod(methodName);
// 使用javaassist的反射方法擷取方法的參數名
MethodInfo methodInfo = cm.getMethodInfo();
CodeAttribute codeAttribute = methodInfo.getCodeAttribute();
LocalVariableAttribute attr = (LocalVariableAttribute) codeAttribute.getAttribute(LocalVariableAttribute.tag);
if (attr == null) return new String[0];
int begin = 0;
String[] paramNames = new String[cm.getParameterTypes().length];
int count = 0;
int pos = Modifier.isStatic(cm.getModifiers()) ? 0 : 1;
for (int i = 0; i < attr.tableLength(); i++){
// 為什麼 加這個判斷,發現在windows 跟linux執行時,參數順序不一緻,通過觀察,實際的參數是從this後面開始的
if (attr.variableName(i).equals("this")){
begin = i;
break;
}
}
for (int i = begin+1; i <= begin+paramNames.length; i++){
paramNames[count] = attr.variableName(i);
count++;
}
return paramNames;
} catch (Exception e) {
e.printStackTrace();
}finally{
try {
if(cc != null) cc.detach();
} catch (Exception e2) {
log.error(e2.getMessage());
}
}
return new String[0];
}
}
- RedisLockAspect
import com.google.common.base.Joiner;
import com.alibaba.fastjson.JSON;
import lombok.extern.log4j.Log4j2;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;
import java.io.Serializable;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* 注解形式的redis分布式鎖
* @date 2020/12/11 14:10
* @author wei.heng
*/
@Aspect
@Log4j2
@Component
public class RedisLockAspect {
private RedisReentrantLock redisLock;
/** 釋放鎖的逾時毫秒數 - 超過該時間則不進行鎖的釋放(鎖已失效,此時再進行釋放,會導緻其他持有該鎖的線程異常) */
private static int EXPIRE_MILLISECONDS = RedisReentrantLock.EXPIRE_SECONDS * 1000;
@Autowired
public RedisLockAspect(RedisReentrantLock redisLock) {
this.redisLock = redisLock;
}
@Around("@annotation(com.ruoyi.common.security.annotation.RedisLockable)")
public Object around(ProceedingJoinPoint point) throws Throwable {
Signature signature = point.getSignature();
MethodSignature methodSignature = (MethodSignature) signature;
Method method = methodSignature.getMethod();
String targetName = point.getTarget().getClass().getName();
String methodName = point.getSignature().getName();
Object[] arguments = point.getArgs();
RedisLockable redisLockable = method.getAnnotation(RedisLockable.class);
String[] values = redisLockable.value();
String redisKey;
if(values.length > 1) {
//擷取參數名
String[] parameterNames = methodSignature.getParameterNames();
redisKey = getLockKeyMap(targetName, methodName,redisLockable.value(), parameterNames, arguments);
} else {
redisKey = getLockKey(targetName, methodName, redisLockable.value(), arguments);
}
long timeOutAt = System.currentTimeMillis() + EXPIRE_MILLISECONDS;
boolean isLock = redisLock.lock(redisKey);
if(isLock) {
try {
return point.proceed();
} finally {
if(System.currentTimeMillis() < timeOutAt){
// 目前時間超過逾時時間點,則不進行鎖的釋放(鎖已失效,此時再進行釋放,會導緻其他持有該鎖的線程異常)
redisLock.unlock(redisKey);
}
}
} else {
log.info("未擷取到鎖:" + redisKey);
throw new GetRedisLockFailureException();
}
}
private String getLockKey(String targetName, String methodName, String[] keys, Object[] arguments) {
StringBuilder sb = new StringBuilder();
sb.append(targetName).append(".").append(methodName);
if(keys != null) {
String keyStr = Joiner.on(".").skipNulls().join(keys);
String[] parameters = ReflectParamNames.getNames(targetName, methodName);
ExpressionParser parser = new SpelExpressionParser();
Expression expression = parser.parseExpression(keyStr);
EvaluationContext context = new StandardEvaluationContext();
int length = parameters.length;
if (length > 0) {
for (int i = 0; i < length; i++) {
context.setVariable(parameters[i], arguments[i]);
}
}
String keysValue = expression.getValue(context, String.class);
sb.append("#").append(keysValue);
}
return sb.toString();
}
/**
* 改方法擷取 參數值必須隻有一級,轉換map擷取 如:keys = {"name1","name2"}
* @param targetName
* @param methodName
* @param keys
* @param parameterNames
* @param arguments
* @return
* @author liugang
*/
private String getLockKeyMap(String targetName, String methodName, String[] keys,String[] parameterNames, Object[] arguments) {
StringBuilder sb = new StringBuilder();
sb.append(targetName).append(".").append(methodName).append(":");
if(arguments != null && arguments.length > 0) {
Map<String, Object> map = new HashMap<String, Object>();
for(int i = 0; i < arguments.length; i++) {
Object ob = arguments[i];
if(ob == null) {
continue;
}
if(ob instanceof String || ob instanceof Integer || ob instanceof Long
|| ob instanceof Number || ob instanceof Float || ob instanceof Double
|| ob instanceof BigDecimal || ob instanceof Date || ob instanceof Boolean ) {
map.put(parameterNames[i], ob);
} else if(ob instanceof Serializable) {
String json = JSON.toJSONString(ob);
map.putAll(JSON.parseObject(json, Map.class));
} else if(ob instanceof LinkedHashMap || ob instanceof HashMap || ob instanceof Map) {
map.putAll((Map)ob);
} else {
map.put(parameterNames[i], ob);
}
}
for(String key : keys) {
if(map.get(key) != null) {
sb.append("#").append(map.get(key));
}
}
}
return sb.toString();
}
}
到這裡全部結束,可以直接使用@RedisLockable 進行分布式鎖的同步控制了(▽)
測試:
發出兩個postman請求,參數相同,第一個OK,第二個傳回409
修改其中某個參數、導緻不是同一把鎖,兩個請求均通過(通過的場景就不截圖了)
PS:
2020年12月14日某同僚回報這個鎖有BUG,多次請求發現ThreadLocal裡的計數器在累加,
是以提問:按照線程隔離的字面意思,每次請求都是一個新的線程,計數器應該都從0開始才對
這裡有個知識點是,ThreadLocal的線程隔離,是根據線程名來的
經過測試,發現請求N次後,會再次循環之前的線程名,這裡程式就識别為和之前的請求是同一個線程了(線程名稱一樣),于是出現了計數器的累加
這個是ThreadLocal實作原理的問題了,不屬于BUG範疇,我們隻要正确使用就好了
該同僚非正常使用鎖導緻的問題,既沒有使用這裡寫的注解方式做鎖的同步操作,在顯式地使用鎖後又未進行釋放
我們知道Java所有的手動鎖,都是使用後需要手動釋放資源的,在正确使用的情況下,不存在這個問題
2021年01月21日有朋友回報 redis鎖安全問題
鑒于上文中讨論的問題,個人了解,問題三描述的是真實存在的BUG(這個也是我在使用之前就已知的)
簡單描述下:程序1逾時導緻鎖自動釋放,程序2拿到鎖,這時程序1突然又響應了、把程序2的鎖釋放掉了
這裡我想了一種補償機制:在擷取到鎖資源的時候,生成時間戳,在釋放鎖之前通過時間戳判斷是否超過了redis的逾時時間X秒,若超過了則認為鎖已自動釋放、不再進行鎖的釋放(redis unlock)操作
雖然這個極限情況下,BUG仍然存在的,因為生成時間戳和擷取redis鎖資源,不是原子操作,會有間隔時間(N納秒或M微秒),但已能極大地規避該問題的發生。對于極小機率就等于不存在這句話我還是不贊同,但根據自身的業務需求評估,認為已能滿足當下需求(這裡業務出錯率零容忍的可以止步了)
現在redis還有個什麼redlock,暫時沒有去研究,有興趣的朋友可以去了解下
文中描述的問題2和問題4,我了解是不存在的:
我們在加鎖的時候,使用了redisTemplate.execute,Spring Data Redis 提供的 SessionCallback 的接口支援多個操作的執行都在同一個連接配接中,是以文中描述的SETNX和EXPIRE操作,實際上是原子的
而鎖的重入是在本地LocalThread中實作的,由于線程隔離,是以也不存在交叉釋放的問題