天天看点

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

JUC:ReentrantLock

关键词

  • 互斥锁:ReentrantLock
  • 公平锁和非公平锁:ReentrantLock(CAS+AQS队列)
  • 可重入锁:ReentrantLock(state变量+CAS操作)
  • 响应中断(一个线程获取不到锁,不会一直等下去)(tryLock方法,传入时间参数,表示等待指定的时间)

一、概述

Concurrent 包中和互斥锁(ReentrantLock)相关类之间的继承层次

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

Lock是一个接口,其定义如下:

public interface Lock {
  void lock();  //不能被中断
  void lockInterruptibly() throws InterruptedException;  //可以被中断
  boolean tryLock();
  boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  void unlock();
  Condition newCondition();
}
           

常用的方法是lock()/unlock()。lock()不能被中断,对应的lockInterruptibly()可以被中断。

1.2 ReentrantLock(可重入锁)本身没有代码逻辑,实现都在其内部类Sync中:

public class ReentrantLock implements Lock, java.io.Serializable {
  private final Sync sync;
 
  public void lock() {
    sync.acquire(1);
 }
 
  public void unlock() {
    sync.release(1);
 }
  // ...
}
           

1.3 锁的公平性vs.非公平性

Sync是一个抽象类,它有两个子类FairSync与NonfairSync,分别对应公平锁和非公平锁。从下面的ReentrantLock构造方法可以看出,会传入一个布尔类型的变量fair指定锁是公平的还是非公平的,默认为非公平的。

public ReentrantLock() {
	sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
	sync = fair ? new FairSync() : new NonfairSync();
}
           

什么叫公平锁和非公平锁呢?先举个现实生活中的例子,一个人去火车站售票窗口买票,发现现场有人排队,于是他排在队伍末尾,遵循先到者优先服务的规则,这叫公平;如果他去了不排队,直接冲到窗口买票,这叫作不公平。

对应到锁的例子,一个新的线程来了之后,看到有很多线程在排队,自己排到队伍末尾,这叫公平;线程来了之后直接去抢锁,这叫作不公平。默认设置的是非公平锁,其实是为了提高效率,减少线程切换。

1.4 锁实现的基本原理

Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器(AQS),这个类非常重要,该类的父类是AbstractOwnableSynchronizer。

此处的锁具备synchronized功能,即可以阻塞一个线程。为了实现一把具有阻塞或唤醒功能的锁,需要几个核心要素:

  1. 需要一个state变量,标记该锁的状态。state变量至少有两个值:0、1。对state变量的操作,使用CAS保证线程安全。
  2. 需要记录当前是哪个线程持有锁。
  3. 需要底层支持对一个线程进行阻塞或唤醒操作。
  4. 需要有一个队列维护所有阻塞的线程。这个队列也必须是线程安全的无锁队列,也需要使用CAS。

针对要素1和2,在上面两个类中有对应的体现:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
  // ...
  private transient Thread exclusiveOwnerThread;  // 记录持有锁的线程
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
 
  private volatile int state; // 记录锁的状态,通过CAS修改state的值。
  // ...
 
}
           

state取值不仅可以是0、1,还可以大于1,就是为了支持锁的可重入性。例如,同样一个线程,调用5次lock,state会变成5;然后调用5次unlock,state减为0。

  • 当state=0时,没有线程持有锁,exclusiveOwnerThread=null;
  • 当state=1时,有一个线程持有锁,exclusiveOwnerThread=该线程;
  • 当state > 1时,说明该线程重入了该锁

对于要素3,Unsafe类提供了阻塞或唤醒线程的一对操作原语,也就是park/unpark。

public native void unpark(Object thread);

public native void park(boolean isAbsolute, long time);
           

有一个LockSupport的工具类,对这一对原语做了简单封装:

public class LockSupport {
  // ...
 
  private static final Unsafe U = Unsafe.getUnsafe();
 
  public static void park() {
    U.park(false, 0L);
 }
 
  public static void unpark(Thread thread) {
    if (thread != null)
      U.unpark(thread);
 }
}
           

在当前线程中调用park(),该线程就会被阻塞;在另外一个线程中,调用unpark(Thread thread),传入一个被阻塞的线程,就可以唤醒阻塞在park()地方的线程。

  • unpark(Thread thread),它实现了一个线程对另外一个线程的“精准唤醒”。
  • notify也只是唤醒某一个线程,但无法指定具体唤醒哪个线程。

针对要素4,在AQS(AbstractQueuedSynchronizer)中利用双向链表和CAS实现了一个阻塞队列。如下所示:

public abstract class AbstractQueuedSynchronizer {
  // ...
  static final class Node {
    volatile Thread thread; // 每个Node对应一个被阻塞的线程
    volatile Node prev;
    volatile Node next;
    // ...
 }
 
  private transient volatile Node head;
  private transient volatile Node tail;
  // ...
}
           

阻塞队列是整个AQS核心中的核心。如下图所示,head指向双向链表头部,tail指向双向链表尾部。

  • 入队就是把新的Node加到tail后面,然后对tail进行CAS操作;
  • 出队就是对head进行CAS操作,把head向后移一个位置。
JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

初始的时候,head=tail=NULL;然后,在往队列中加入阻塞的线程时,会新建一个空的Node,让head和tail都指向这个空Node;之后,在后面加入被阻塞的线程对象。所以,当head=tail的时候,说明队列为空。

1.5 公平与非公平的lock()实现差异

下面分析基于AQS,ReentrantLock在公平性和非公平性上的实现差异。

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

非公平锁:直接使用当前线程获取锁,不排队

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断
JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

公平锁:判断当前队列中没有等待线程,后使用当前线程获取锁,需要排队

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

1.6 阻塞队列与唤醒机制

下面进入锁的最为关键的部分,即acquireQueued(…)方法内部一探究竟。

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

先说addWaiter(…)方法,就是为当前线程生成一个Node,然后把Node 放入双向链表的尾部 。要注意的是,这只是把Thread对象放入了一个队列中而已,线程本身并未阻塞。

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

创建节点,尝试将节点追加到队列尾部。获取tail节点,将tail节点的next设置为当前节点。

如果tail不存在,就初始化队列。

在addWaiter(…)方法把Thread对象加入阻塞队列之后的工作就要靠acquireQueued(…)方法完成。

线程一旦进入acquireQueued(…)就会被无限期阻塞,即使有其他线程调用interrupt()方法也不能将其唤醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从accquireQueued(…)返回。

进入acquireQueued(…),该线程被阻塞。在该方法返回的一刻,就是拿到锁的那一刻,也就是被唤醒的那一刻,此时会删除队列的第一个元素(head指针前移1个节点)。

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

首先,acquireQueued(…)方法有一个返回值,表示什么意思呢?虽然该方法不会中断响应,但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该方法会返回true;否则,返回false。

基于这个返回值,才有了下面的代码:

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断
JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

当 acquireQueued(…)返回 true 时,会调用 selfInterrupt(),自己给自己发送中断信号,也就是自己把自己的中断标志位设为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在lock代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响应该中断信号。

阻塞就发生在下面这个方法中:

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

线程调用 park()方法,自己把自己阻塞起来,直到被其他线程唤醒,该方法返回。

park()方法返回有两种情况。

  1. 其他线程调用了unpark(Thread t)。
  2. 其他线程调用了t.interrupt()。这里要注意的是,lock()不能响应中断,但LockSupport.park()会响应中断。

也正因为LockSupport.park()可能被中断唤醒,acquireQueued(…)方法才写了一个for死循环。唤醒之后,如果发现自己排在队列头部,就去拿锁;如果拿不到锁,则再次自己阻塞自己。不断重复此过程,直到拿到锁。

被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1,会返回false;如果是情况2,则返回true。

1.7 unlock()实现分析

说完了lock,下面分析unlock的实现。unlock不区分公平还是非公平。

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断
JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

上图中,当前线程要释放锁,先调用tryRelease(arg)方法,如果返回true,则取出head,让head获取锁。

对于tryRelease方法:

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

首先计算当前线程释放锁后的state值。

如果当前线程不是排他线程,则抛异常,因为只有获取锁的线程才可以进行释放锁的操作。

此时设置state,没有使用CAS,因为是单线程操作。

再看unparkSuccessor方法:

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断
JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

release()里面做了两件事:tryRelease(…)方法释放锁;unparkSuccessor(…)方法唤醒队列中的后继者。

1.8 lockInterruptibly()实现分析

上面的 lock 不能被中断,这里的 lockInterruptibly()可以被中断:

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断
JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

这里的 acquireInterruptibly(…)也是 AQS 的模板方法,里面的 tryAcquire(…)分别被 FairSync和NonfairSync实现。

主要看doAcquireInterruptibly(…)方法:

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

当parkAndCheckInterrupt()返回true的时候,说明有其他线程发送中断信号,直接抛出InterruptedException,跳出for循环,整个方法返回。

1.9 tryLock()实现分析

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

tryLock()实现基于调用非公平锁的tryAcquire(…),对state进行CAS操作,如果操作成功就拿到锁;如果操作不成功则直接返回false,也不阻塞。

二、公平锁、非公平锁、响应中断

  • ReentrantLock主要利用CAS+AQS队列来实现。它支持公平锁和非公平锁,两者的实现类似。
  • ReentrantLock是独占锁,加锁和解锁的过程需要手动进行,不易操作,但非常灵活。
  • ReentrantLock可重入,加锁和解锁需要手动进行,且次数需一样,否则其他线程无法获得锁。
  • ReentrantLock可以相应中断。
  • ReentrantLock还可以实现公平锁机制。在锁上等待时间最长的线程将获得锁的使用权。通俗的理解就是谁排队时间最长谁先执行获取锁。

2.1 锁的基本使用

分别lock和unlock

public class BaseDemo {

    private static final Lock lock = new ReentrantLock();

    public static void main(String[] args) {
        new Thread(()->run(),"线程1").start();
        new Thread(()->run(),"线程2").start();
    }

    public static void run(){
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName()+"获取锁");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println(Thread.currentThread().getName()+"锁释放");
            lock.unlock();
        }
    }
}
           
JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

2.2 公平锁使用

public class FairSyncDemo {

    /**
     * 参数传递true,表示使用公平锁
     */
    private static final Lock lock = new ReentrantLock(true);

    public static void main(String[] args) {
        new Thread(() -> run(), "线程1").start();
        new Thread(() -> run(), "线程2").start();
        new Thread(() -> run(), "线程3").start();
        new Thread(() -> run(), "线程4").start();
    }


    public static void run() {
        for (int i=0 ; i<2 ; i++) {
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + "获取锁");
                TimeUnit.SECONDS.sleep(3);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //System.out.println(Thread.currentThread().getName() + "锁释放");
                lock.unlock();
            }
            System.out.println("-----------------");
        }
    }
}
           
JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

2.3 非公平锁使用

public class NoFairSyncDemo {

    /**
     * 参数传递false,表示使用公平锁,默认值就是false
     */
    private static final Lock lock = new ReentrantLock(false);

    public static void main(String[] args) {
        new Thread(() -> run(), "线程1").start();
        new Thread(() -> run(), "线程2").start();
        new Thread(() -> run(), "线程3").start();
        new Thread(() -> run(), "线程4").start();
    }

    public static void run() {
        for (int i=0 ; i<2 ; i++) {
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName() + "获取锁");
                TimeUnit.SECONDS.sleep(2);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //System.out.println(Thread.currentThread().getName() + "锁释放");
                lock.unlock();
            }
            System.out.println("-----------------");
        }
    }

}
           
JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

2.4 响应中断

/**
 * 响应中断就是一个线程获取不到锁,不会傻傻的一直等下去,
 * ReentrantLock会给予一个中断回应
 */
public class InterruptDemo {

    static Lock lock1 = new ReentrantLock();
    static Lock lock2 = new ReentrantLock();

    public static void main(String[] args) {
        Thread thread1 = new Thread(new MyThread(lock1,lock2));
        Thread thread2 = new Thread(new MyThread(lock1,lock2));
        thread1.start();
        thread2.start();
        thread1.interrupt();//第一个线程中断
    }

    static class MyThread implements Runnable{
        Lock innerLock1;
        Lock innerLock2;

        public MyThread(Lock innerLock1, Lock innerLock2) {
            this.innerLock1 = innerLock1;
            this.innerLock2 = innerLock2;
        }

        @Override
        public void run() {

            try {
                innerLock1.lockInterruptibly();
                TimeUnit.MICROSECONDS.sleep(55);
                innerLock2.lockInterruptibly();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                innerLock1.unlock();
                innerLock2.unlock();
                System.out.println(Thread.currentThread().getName()+"线程结束!");
            }
        }

    }

}
           

定义了两个锁lock1和lock2

然后使用两个线程thread1和thread2构造死锁场景

正常情况下,这两个线程相互等待获取资源而处于死循环状态。但是我们此时innerLock1中断,另外一个线程就可以获取资源,正常地执行了

JUC:ReentrantLock互斥锁关键词一、概述二、公平锁、非公平锁、响应中断

2.5 限时等待

通过tryLock方法来实现,可以选择传入时间参数,表示等待指定的时间,无参则表示立即返回锁申请的结果:true表示获取锁成功,false表示获取锁失败。我们可以将这种方法用来解决死锁问题。

public class InterruptWaitTimeDemo {

    static Lock lock1 = new ReentrantLock();
    static Lock lock2 = new ReentrantLock();

    public static void main(String[] args) {
        Thread thread1 = new Thread(new InterruptDemo.MyThread(lock1,lock2));
        Thread thread2 = new Thread(new InterruptDemo.MyThread(lock1,lock2));
        thread1.start();
        thread2.start();
        thread1.interrupt();//第一个线程中断
    }

    static class MyThread implements Runnable{
        Lock innerLock1;
        Lock innerLock2;

        public MyThread(Lock innerLock1, Lock innerLock2) {
            this.innerLock1 = innerLock1;
            this.innerLock2 = innerLock2;
        }

        @Override
        public void run() {

            try {
                if(!innerLock1.tryLock()){
                    TimeUnit.MILLISECONDS.sleep(150);
                }
                if(!innerLock2.tryLock()){
                    TimeUnit.MILLISECONDS.sleep(150);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                innerLock1.unlock();
                innerLock2.unlock();
                System.out.println(Thread.currentThread().getName()+"线程结束!");
            }
        }

    }

}
           

继续阅读