天天看點

redis分布式鎖的應用

前言:

項目需求,搞了搞

  1. 實作了鎖的重入
  2. 參考了别人的博文實作了AOP注解形式的鎖、統一配置

參考博文位址:

https://www.cnblogs.com/lijiasnong/p/9952494.html
           

這邊看了下比較主流幾個分布式鎖的應用,最終選擇的redis

原因是:

1、懶(伺服器已有redis做緩存,不想再去安裝zuukeeper)

2、評估認為redis的分布式鎖已能滿足當下應用

正文 - 摘錄核心代碼:

  1. RedisReentrantLock
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * redis分布式鎖 - 使用 ThreadLocal 做線程隔離,實作鎖的重入機制
 * @date 2020/12/11 9:13
 * @author wei.heng
 */
@Component
@Log4j2
public class RedisReentrantLock {

	/** 擷取鎖逾時的時間 */
	private static final int ACQUIRE_LOCK_TIME_OUT_IN_MILLISECONDS = 3 * 1000;
	/** 自旋重試間隔 */
	private static final int WAIT_INTERVAL_IN_MILLISECONDS = 100;
	/** 鎖的失效時間 */
	private static final Integer EXPIRE_SECONDS = 5;
	/** 鎖的字首,友善統一檢視 */
	private static final String LOCK_PREFIX = "LOCK:";

	private ThreadLocal<Map<String, Integer>> lockers = new ThreadLocal<>();

	private RedisService redisService;

	@Autowired
	public RedisReentrantLock(RedisService redisService) {
		this.redisService = redisService;
	}

	/**
	 *
	 * 添加分布式鎖
	 * @param key 鍵
	 * @return boolean 請求鎖是否成功
	 * @date 2020/12/11 10:52
	 * @author wei.heng
	 */
	public boolean lock(String key){
		key = LOCK_PREFIX + key;
		Map<String, Integer> refs = currentLockers();
		Integer refCnt = refs.get(key);
		if(refCnt != null){
			// 目前線程已經加鎖,這裡屬于鎖的重入,計數器加1
			refs.put(key, refCnt + 1);
			return true;
		}
		boolean ok = this._lock(key);
		if(!ok){
			return false;
		}
		refs.put(key, 1);
		return true;
	}

	/**
	 *
	 * 釋放鎖
	 * @param key 鍵
	 * @return boolean 釋放是否成功
	 * @date 2020/12/11 10:53
	 * @author wei.heng
	 */
	public boolean unlock(String key){
		key = LOCK_PREFIX + key;
		Map<String, Integer> refs = currentLockers();
		Integer refCnt = refs.get(key);
		if(refCnt == null){
			return false;
		}
		refCnt -= 1;
		if(refCnt > 0){
			refs.put(key, refCnt);
		} else {
			refs.remove(key);
			this._unlock(key);
		}
		return true;
	}

	private boolean _lock(String key){
		// 逾時時間點
		long timeoutAt = System.currentTimeMillis() + ACQUIRE_LOCK_TIME_OUT_IN_MILLISECONDS;
		while(true){
			boolean isSuccess = redisService.setIfAbsent(key, "", EXPIRE_SECONDS);
			if(isSuccess){
				// 如果加鎖成功,就傳回
				return isSuccess;
 			}
			// 如果加鎖失敗,證明目前鎖被其他線程占用,進入自旋
			if(System.currentTimeMillis() < timeoutAt){
				try {
					TimeUnit.MILLISECONDS.sleep(WAIT_INTERVAL_IN_MILLISECONDS);
				} catch (InterruptedException e) {
					log.error("redis鎖自旋等待被打斷...", e);
					e.printStackTrace();
				}
			} else {
				// 自旋等待時間逾時,傳回擷取鎖失敗
				isSuccess = false;
				return isSuccess;
			}
		}
	}

	private void _unlock(String key){
		redisService.deleteObject(key);
	}

	private Map<String, Integer> currentLockers(){
		Map<String, Integer> refs = lockers.get();
		if(refs != null){
			return refs;
		}
		lockers.set(new HashMap<>());
		return lockers.get();
	}

}


           
  1. RedisService 核心代碼
/**
     *
     * 如果key不存在,則設值
     * @param key 鍵
     * @param value 值
     * @param exptime 過期時間 - 機關:秒
     * @return boolean 設值是否成功(如果已有該key存在,則傳回false)
     * @date 2020/12/11 10:42
     * @author wei.heng
     */
    public boolean setIfAbsent(final String key, final Serializable value, final long exptime) {
        Boolean result = (Boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
            RedisSerializer valueSerializer = redisTemplate.getValueSerializer();
            RedisSerializer keySerializer = redisTemplate.getKeySerializer();
            Object obj = connection.execute("set", keySerializer.serialize(key),
                valueSerializer.serialize(value),
                "NX".getBytes(StandardCharsets.UTF_8),
                "EX".getBytes(StandardCharsets.UTF_8),
                String.valueOf(exptime).getBytes(StandardCharsets.UTF_8));
            return obj != null;
        });
        return result;
    }
           

到這裡,redis的分布式鎖已經可以使用了。

看到網上别人的文章,為了友善,

下面繼續做了下抄襲、修改

再次重申,抄襲的位址:

https://www.cnblogs.com/lijiasnong/p/9952494.html
           
  1. RedisLockable 建立注解
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * <pre>
 * redis分布式鎖
 * 使用示例(可參見 AppUserController 裡被注釋掉的 test 函數示例):
 * 1、按對象屬性加鎖@RedisLockable("#ObjectName.attributeName")
 * 2、按函數入參加鎖@RedisLockable("#functionParamName")
 * 3、按對象參數 + 函數參數加鎖:@RedisLockable("#ObjectName.attributeName.concat('#').concat(#functionParamName)
 * </pre>
 * @date 2020/12/11 11:48
 * @author wei.heng
 */
@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisLockable {
    /** redis鎖的關鍵字(keys),可使用SpEL表達式代表參數值 */
    String[] value() default "";
}
           
  1. ReflectParamNames

    這個類需要引入jar包,我這裡搜了個最新的

<dependency>
            <groupId>javassist</groupId>
            <artifactId>javassist</artifactId>
            <version>3.12.1.GA</version>
        </dependency>
           
import javassist.*;
import javassist.bytecode.CodeAttribute;
import javassist.bytecode.LocalVariableAttribute;
import javassist.bytecode.MethodInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 反射擷取參數名
 * @date 2020/12/11 14:03
 * @author wei.heng
 */
public class ReflectParamNames {

	private static Logger log = LoggerFactory.getLogger(ReflectParamNames.class);
	private  static ClassPool pool = ClassPool.getDefault();

	static{
		ClassClassPath classPath = new ClassClassPath(ReflectParamNames.class);
		pool.insertClassPath(classPath);
	}

	public static String[] getNames(String className,String methodName) {
		CtClass cc = null;
		try {
			cc = pool.get(className);
			CtMethod cm = cc.getDeclaredMethod(methodName);
			// 使用javaassist的反射方法擷取方法的參數名
			MethodInfo methodInfo = cm.getMethodInfo();
			CodeAttribute codeAttribute = methodInfo.getCodeAttribute();
			LocalVariableAttribute attr = (LocalVariableAttribute) codeAttribute.getAttribute(LocalVariableAttribute.tag);
			if (attr == null) return new String[0];

			int begin = 0;

			String[] paramNames = new String[cm.getParameterTypes().length];
			int count = 0;
			int pos = Modifier.isStatic(cm.getModifiers()) ? 0 : 1;

			for (int i = 0; i < attr.tableLength(); i++){
				//  為什麼 加這個判斷,發現在windows 跟linux執行時,參數順序不一緻,通過觀察,實際的參數是從this後面開始的
				if (attr.variableName(i).equals("this")){
					begin = i;
					break;
				}
			}

			for (int i = begin+1; i <= begin+paramNames.length; i++){
				paramNames[count] = attr.variableName(i);
				count++;
			}
			return paramNames;
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			try {
				if(cc != null) cc.detach();
			} catch (Exception e2) {
				log.error(e2.getMessage());
			}


		}
		return new String[0];
	}
}
           
  1. RedisLockAspect
import com.google.common.base.Joiner;
import com.alibaba.fastjson.JSON;
import lombok.extern.log4j.Log4j2;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.Signature;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.stereotype.Component;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * 注解形式的redis分布式鎖
 * @date 2020/12/11 14:10
 * @author wei.heng
 */
@Aspect
@Log4j2
@Component
public class RedisLockAspect {

	private RedisReentrantLock redisLock;
	/** 釋放鎖的逾時毫秒數 - 超過該時間則不進行鎖的釋放(鎖已失效,此時再進行釋放,會導緻其他持有該鎖的線程異常) */
	private static int EXPIRE_MILLISECONDS = RedisReentrantLock.EXPIRE_SECONDS * 1000;

	@Autowired
	public RedisLockAspect(RedisReentrantLock redisLock) {
		this.redisLock = redisLock;
	}

	@Around("@annotation(com.ruoyi.common.security.annotation.RedisLockable)")
	public Object around(ProceedingJoinPoint point) throws Throwable {
		Signature signature = point.getSignature();
		MethodSignature methodSignature = (MethodSignature) signature;
		Method method = methodSignature.getMethod();
		String targetName = point.getTarget().getClass().getName();
		String methodName = point.getSignature().getName();
		Object[] arguments = point.getArgs();

		RedisLockable redisLockable = method.getAnnotation(RedisLockable.class);
		String[] values = redisLockable.value();
		String redisKey;
		if(values.length > 1) {
			//擷取參數名
			String[] parameterNames = methodSignature.getParameterNames();
			redisKey = getLockKeyMap(targetName, methodName,redisLockable.value(), parameterNames, arguments);
		} else {
			redisKey = getLockKey(targetName, methodName, redisLockable.value(), arguments);
		}
		long timeOutAt = System.currentTimeMillis() + EXPIRE_MILLISECONDS;
		boolean isLock = redisLock.lock(redisKey);
		if(isLock) {
			try {
				return point.proceed();
			} finally {
				if(System.currentTimeMillis() < timeOutAt){
					// 目前時間超過逾時時間點,則不進行鎖的釋放(鎖已失效,此時再進行釋放,會導緻其他持有該鎖的線程異常)
					redisLock.unlock(redisKey);
				}
			}
		} else {
			log.info("未擷取到鎖:" + redisKey);
			throw new GetRedisLockFailureException();
		}
	}

	private String getLockKey(String targetName, String methodName, String[] keys, Object[] arguments) {

		StringBuilder sb = new StringBuilder();
		sb.append(targetName).append(".").append(methodName);

		if(keys != null) {
			String keyStr = Joiner.on(".").skipNulls().join(keys);
			String[] parameters = ReflectParamNames.getNames(targetName, methodName);
			ExpressionParser parser = new SpelExpressionParser();
			Expression expression = parser.parseExpression(keyStr);
			EvaluationContext context = new StandardEvaluationContext();
			int length = parameters.length;
			if (length > 0) {
				for (int i = 0; i < length; i++) {
					context.setVariable(parameters[i], arguments[i]);
				}
			}
			String keysValue = expression.getValue(context, String.class);
			sb.append("#").append(keysValue);
		}
		return sb.toString();
	}
	
	
	
	/**
	 * 改方法擷取 參數值必須隻有一級,轉換map擷取 如:keys = {"name1","name2"}
	 * @param targetName
	 * @param methodName
	 * @param keys
	 * @param parameterNames
	 * @param arguments
	 * @return
	 * @author liugang
	 */
	private String getLockKeyMap(String targetName, String methodName, String[] keys,String[] parameterNames, Object[] arguments) {
		StringBuilder sb = new StringBuilder();
		sb.append(targetName).append(".").append(methodName).append(":");
		if(arguments != null && arguments.length > 0) {
			Map<String, Object> map = new HashMap<String, Object>();
			for(int i = 0; i < arguments.length; i++) {
				Object ob = arguments[i];
				if(ob == null) {
					continue;
				}
				if(ob instanceof String || ob instanceof Integer || ob instanceof Long
						|| ob instanceof Number || ob instanceof Float || ob instanceof Double 
						|| ob instanceof BigDecimal || ob instanceof Date || ob instanceof Boolean ) {
					map.put(parameterNames[i], ob);
				} else if(ob instanceof Serializable) {
					String json = JSON.toJSONString(ob);
					map.putAll(JSON.parseObject(json, Map.class));
				} else if(ob instanceof LinkedHashMap || ob instanceof HashMap || ob instanceof Map) {
					map.putAll((Map)ob);
				} else {
					map.put(parameterNames[i], ob);
				}
			}
			for(String key : keys) {
				if(map.get(key) != null) {
					sb.append("#").append(map.get(key));
				}
			}
		}
		return sb.toString();
	}

}
           

到這裡全部結束,可以直接使用@RedisLockable 進行分布式鎖的同步控制了(▽)

測試:

redis分布式鎖的應用

發出兩個postman請求,參數相同,第一個OK,第二個傳回409

修改其中某個參數、導緻不是同一把鎖,兩個請求均通過(通過的場景就不截圖了)

redis分布式鎖的應用

PS:

2020年12月14日某同僚回報這個鎖有BUG,多次請求發現ThreadLocal裡的計數器在累加,

是以提問:按照線程隔離的字面意思,每次請求都是一個新的線程,計數器應該都從0開始才對

這裡有個知識點是,ThreadLocal的線程隔離,是根據線程名來的

經過測試,發現請求N次後,會再次循環之前的線程名,這裡程式就識别為和之前的請求是同一個線程了(線程名稱一樣),于是出現了計數器的累加

這個是ThreadLocal實作原理的問題了,不屬于BUG範疇,我們隻要正确使用就好了

該同僚非正常使用鎖導緻的問題,既沒有使用這裡寫的注解方式做鎖的同步操作,在顯式地使用鎖後又未進行釋放

我們知道Java所有的手動鎖,都是使用後需要手動釋放資源的,在正确使用的情況下,不存在這個問題

2021年01月21日有朋友回報 redis鎖安全問題

鑒于上文中讨論的問題,個人了解,問題三描述的是真實存在的BUG(這個也是我在使用之前就已知的)

簡單描述下:程序1逾時導緻鎖自動釋放,程序2拿到鎖,這時程序1突然又響應了、把程序2的鎖釋放掉了

這裡我想了一種補償機制:在擷取到鎖資源的時候,生成時間戳,在釋放鎖之前通過時間戳判斷是否超過了redis的逾時時間X秒,若超過了則認為鎖已自動釋放、不再進行鎖的釋放(redis unlock)操作

雖然這個極限情況下,BUG仍然存在的,因為生成時間戳和擷取redis鎖資源,不是原子操作,會有間隔時間(N納秒或M微秒),但已能極大地規避該問題的發生。對于極小機率就等于不存在這句話我還是不贊同,但根據自身的業務需求評估,認為已能滿足當下需求(這裡業務出錯率零容忍的可以止步了)

現在redis還有個什麼redlock,暫時沒有去研究,有興趣的朋友可以去了解下

文中描述的問題2和問題4,我了解是不存在的:

我們在加鎖的時候,使用了redisTemplate.execute,Spring Data Redis 提供的 SessionCallback 的接口支援多個操作的執行都在同一個連接配接中,是以文中描述的SETNX和EXPIRE操作,實際上是原子的

而鎖的重入是在本地LocalThread中實作的,由于線程隔離,是以也不存在交叉釋放的問題

redis分布式鎖的應用
redis分布式鎖的應用