JUC:ReentrantLock
关键词
- 互斥锁:ReentrantLock
- 公平锁和非公平锁:ReentrantLock(CAS+AQS队列)
- 可重入锁:ReentrantLock(state变量+CAS操作)
- 响应中断(一个线程获取不到锁,不会一直等下去)(tryLock方法,传入时间参数,表示等待指定的时间)
一、概述
Concurrent 包中和互斥锁(ReentrantLock)相关类之间的继承层次
Lock是一个接口,其定义如下:
public interface Lock {
void lock(); //不能被中断
void lockInterruptibly() throws InterruptedException; //可以被中断
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
常用的方法是lock()/unlock()。lock()不能被中断,对应的lockInterruptibly()可以被中断。
1.2 ReentrantLock(可重入锁)本身没有代码逻辑,实现都在其内部类Sync中:
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
public void lock() {
sync.acquire(1);
}
public void unlock() {
sync.release(1);
}
// ...
}
1.3 锁的公平性vs.非公平性
Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别对应公平锁和非公平锁。从下面的ReentrantLock构造方法可以看出,会传入一个布尔类型的变量fair指定锁是公平的还是非公平的,默认为非公平的。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
什么叫公平锁和非公平锁呢?先举个现实生活中的例子,一个人去火车站售票窗口买票,发现现场有人排队,于是他排在队伍末尾,遵循先到者优先服务的规则,这叫公平;如果他去了不排队,直接冲到窗口买票,这叫作不公平。
对应到锁的例子,一个新的线程来了之后,看到有很多线程在排队,自己排到队伍末尾,这叫公平;线程来了之后直接去抢锁,这叫作不公平。默认设置的是非公平锁,其实是为了提高效率,减少线程切换。
1.4 锁实现的基本原理
Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器(AQS),这个类非常重要,该类的父类是AbstractOwnableSynchronizer。
此处的锁具备synchronized功能,即可以阻塞一个线程。为了实现一把具有阻塞或唤醒功能的锁,需要几个核心要素:
- 需要一个state变量,标记该锁的状态。state变量至少有两个值:0、1。对state变量的操作,使用CAS保证线程安全。
- 需要记录当前是哪个线程持有锁。
- 需要底层支持对一个线程进行阻塞或唤醒操作。
- 需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要使用CAS。
针对要素1和2,在上面两个类中有对应的体现:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// ...
private transient Thread exclusiveOwnerThread; // 记录持有锁的线程
}
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private volatile int state; // 记录锁的状态,通过CAS修改state的值。
// ...
}
state取值不仅可以是0、1,还可以大于1,就是为了支持锁的可重入性。例如,同样一个线程,调用5次lock,state会变成5;然后调用5次unlock,state减为0。
- 当state=0时,没有线程持有锁,exclusiveOwnerThread=null;
- 当state=1时,有一个线程持有锁,exclusiveOwnerThread=该线程;
- 当state > 1时,说明该线程重入了该锁
对于要素3,Unsafe类提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark。
public native void unpark(Object thread);
public native void park(boolean isAbsolute, long time);
有一个LockSupport的工具类,对这一对原语做了简单封装:
public class LockSupport {
// ...
private static final Unsafe U = Unsafe.getUnsafe();
public static void park() {
U.park(false, 0L);
}
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
}
在当前线程中调用park(),该线程就会被阻塞;在另外一个线程中,调用unpark(Thread thread),传入一个被阻塞的线程,就可以唤醒阻塞在park()地方的线程。
- unpark(Thread thread),它实现了一个线程对另外一个线程的“精准唤醒”。
- notify也只是唤醒某一个线程,但无法指定具体唤醒哪个线程。
针对要素4,在AQS(AbstractQueuedSynchronizer)中利用双向链表和CAS实现了一个阻塞队列。如下所示:
public abstract class AbstractQueuedSynchronizer {
// ...
static final class Node {
volatile Thread thread; // 每个Node对应一个被阻塞的线程
volatile Node prev;
volatile Node next;
// ...
}
private transient volatile Node head;
private transient volatile Node tail;
// ...
}
阻塞队列是整个AQS核心中的核心。如下图所示,head指向双向链表头部,tail指向双向链表尾部。
- 入队就是把新的Node加到tail后面,然后对tail进行CAS操作;
- 出队就是对head进行CAS操作,把head向后移一个位置。
初始的时候,head=tail=NULL;然后,在往队列中加入阻塞的线程时,会新建一个空的Node,让head和tail都指向这个空Node;之后,在后面加入被阻塞的线程对象。所以,当head=tail的时候,说明队列为空。
1.5 公平与非公平的lock()实现差异
下面分析基于AQS,ReentrantLock在公平性和非公平性上的实现差异。
非公平锁:直接使用当前线程获取锁,不排队
公平锁:判断当前队列中没有等待线程,后使用当前线程获取锁,需要排队
1.6 阻塞队列与唤醒机制
下面进入锁的最为关键的部分,即acquireQueued(…)方法内部一探究竟。
先说addWaiter(…)方法,就是为当前线程生成一个Node,然后把Node 放入双向链表的尾部 。要注意的是,这只是把Thread对象放入了一个队列中而已,线程本身并未阻塞。
创建节点,尝试将节点追加到队列尾部。获取tail节点,将tail节点的next设置为当前节点。
如果tail不存在,就初始化队列。
在addWaiter(…)方法把Thread对象加入阻塞队列之后的工作就要靠acquireQueued(…)方法完成。
线程一旦进入acquireQueued(…)就会被无限期阻塞,即使有其他线程调用interrupt()方法也不能将其唤醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从accquireQueued(…)返回。
进入acquireQueued(…),该线程被阻塞。在该方法返回的一刻,就是拿到锁的那一刻,也就是被唤醒的那一刻,此时会删除队列的第一个元素(head指针前移1个节点)。
首先,acquireQueued(…)方法有一个返回值,表示什么意思呢?虽然该方法不会中断响应,但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该方法会返回true;否则,返回false。
基于这个返回值,才有了下面的代码:
当 acquireQueued(…)返回 true 时,会调用 selfInterrupt(),自己给自己发送中断信号,也就是自己把自己的中断标志位设为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在lock代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响应该中断信号。
阻塞就发生在下面这个方法中:
线程调用 park()方法,自己把自己阻塞起来,直到被其他线程唤醒,该方法返回。
park()方法返回有两种情况。
- 其他线程调用了unpark(Thread t)。
- 其他线程调用了t.interrupt()。这里要注意的是,lock()不能响应中断,但LockSupport.park()会响应中断。
也正因为LockSupport.park()可能被中断唤醒,acquireQueued(…)方法才写了一个for死循环。唤醒之后,如果发现自己排在队列头部,就去拿锁;如果拿不到锁,则再次自己阻塞自己。不断重复此过程,直到拿到锁。
被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1,会返回false;如果是情况2,则返回true。
1.7 unlock()实现分析
说完了lock,下面分析unlock的实现。unlock不区分公平还是非公平。
上图中,当前线程要释放锁,先调用tryRelease(arg)方法,如果返回true,则取出head,让head获取锁。
对于tryRelease方法:
首先计算当前线程释放锁后的state值。
如果当前线程不是排他线程,则抛异常,因为只有获取锁的线程才可以进行释放锁的操作。
此时设置state,没有使用CAS,因为是单线程操作。
再看unparkSuccessor方法:
release()里面做了两件事:tryRelease(…)方法释放锁;unparkSuccessor(…)方法唤醒队列中的后继者。
1.8 lockInterruptibly()实现分析
上面的 lock 不能被中断,这里的 lockInterruptibly()可以被中断:
这里的 acquireInterruptibly(…)也是 AQS 的模板方法,里面的 tryAcquire(…)分别被 FairSync和NonfairSync实现。
主要看doAcquireInterruptibly(…)方法:
当parkAndCheckInterrupt()返回true的时候,说明有其他线程发送中断信号,直接抛出InterruptedException,跳出for循环,整个方法返回。
1.9 tryLock()实现分析
tryLock()实现基于调用非公平锁的tryAcquire(…),对state进行CAS操作,如果操作成功就拿到锁;如果操作不成功则直接返回false,也不阻塞。
二、公平锁、非公平锁、响应中断
- ReentrantLock主要利用CAS+AQS队列来实现。它支持公平锁和非公平锁,两者的实现类似。
- ReentrantLock是独占锁,加锁和解锁的过程需要手动进行,不易操作,但非常灵活。
- ReentrantLock可重入,加锁和解锁需要手动进行,且次数需一样,否则其他线程无法获得锁。
- ReentrantLock可以相应中断。
- ReentrantLock还可以实现公平锁机制。在锁上等待时间最长的线程将获得锁的使用权。通俗的理解就是谁排队时间最长谁先执行获取锁。
2.1 锁的基本使用
分别lock和unlock
public class BaseDemo {
private static final Lock lock = new ReentrantLock();
public static void main(String[] args) {
new Thread(()->run(),"线程1").start();
new Thread(()->run(),"线程2").start();
}
public static void run(){
try {
lock.lock();
System.out.println(Thread.currentThread().getName()+"获取锁");
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(Thread.currentThread().getName()+"锁释放");
lock.unlock();
}
}
}
2.2 公平锁使用
public class FairSyncDemo {
/**
* 参数传递true,表示使用公平锁
*/
private static final Lock lock = new ReentrantLock(true);
public static void main(String[] args) {
new Thread(() -> run(), "线程1").start();
new Thread(() -> run(), "线程2").start();
new Thread(() -> run(), "线程3").start();
new Thread(() -> run(), "线程4").start();
}
public static void run() {
for (int i=0 ; i<2 ; i++) {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "获取锁");
TimeUnit.SECONDS.sleep(3);
} catch (Exception e) {
e.printStackTrace();
} finally {
//System.out.println(Thread.currentThread().getName() + "锁释放");
lock.unlock();
}
System.out.println("-----------------");
}
}
}
2.3 非公平锁使用
public class NoFairSyncDemo {
/**
* 参数传递false,表示使用公平锁,默认值就是false
*/
private static final Lock lock = new ReentrantLock(false);
public static void main(String[] args) {
new Thread(() -> run(), "线程1").start();
new Thread(() -> run(), "线程2").start();
new Thread(() -> run(), "线程3").start();
new Thread(() -> run(), "线程4").start();
}
public static void run() {
for (int i=0 ; i<2 ; i++) {
try {
lock.lock();
System.out.println(Thread.currentThread().getName() + "获取锁");
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
} finally {
//System.out.println(Thread.currentThread().getName() + "锁释放");
lock.unlock();
}
System.out.println("-----------------");
}
}
}
2.4 响应中断
/**
* 响应中断就是一个线程获取不到锁,不会傻傻的一直等下去,
* ReentrantLock会给予一个中断回应
*/
public class InterruptDemo {
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
public static void main(String[] args) {
Thread thread1 = new Thread(new MyThread(lock1,lock2));
Thread thread2 = new Thread(new MyThread(lock1,lock2));
thread1.start();
thread2.start();
thread1.interrupt();//第一个线程中断
}
static class MyThread implements Runnable{
Lock innerLock1;
Lock innerLock2;
public MyThread(Lock innerLock1, Lock innerLock2) {
this.innerLock1 = innerLock1;
this.innerLock2 = innerLock2;
}
@Override
public void run() {
try {
innerLock1.lockInterruptibly();
TimeUnit.MICROSECONDS.sleep(55);
innerLock2.lockInterruptibly();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
innerLock1.unlock();
innerLock2.unlock();
System.out.println(Thread.currentThread().getName()+"线程结束!");
}
}
}
}
定义了两个锁lock1和lock2
然后使用两个线程thread1和thread2构造死锁场景
正常情况下,这两个线程相互等待获取资源而处于死循环状态。但是我们此时innerLock1中断,另外一个线程就可以获取资源,正常地执行了
2.5 限时等待
通过tryLock方法来实现,可以选择传入时间参数,表示等待指定的时间,无参则表示立即返回锁申请的结果:true表示获取锁成功,false表示获取锁失败。我们可以将这种方法用来解决死锁问题。
public class InterruptWaitTimeDemo {
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
public static void main(String[] args) {
Thread thread1 = new Thread(new InterruptDemo.MyThread(lock1,lock2));
Thread thread2 = new Thread(new InterruptDemo.MyThread(lock1,lock2));
thread1.start();
thread2.start();
thread1.interrupt();//第一个线程中断
}
static class MyThread implements Runnable{
Lock innerLock1;
Lock innerLock2;
public MyThread(Lock innerLock1, Lock innerLock2) {
this.innerLock1 = innerLock1;
this.innerLock2 = innerLock2;
}
@Override
public void run() {
try {
if(!innerLock1.tryLock()){
TimeUnit.MILLISECONDS.sleep(150);
}
if(!innerLock2.tryLock()){
TimeUnit.MILLISECONDS.sleep(150);
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
innerLock1.unlock();
innerLock2.unlock();
System.out.println(Thread.currentThread().getName()+"线程结束!");
}
}
}
}