天天看点

AQS 和 ReetrantLcok 特征和使用介绍

AbstractQueuedSynchronizer 简介

AbstractQueuedSynchronizer 为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁定和相关同步器(信号量、事件,等等)提供一个框架。此类的设计目标是成为依靠单个原子 int 值来表示状态的大多数同步器的一个有用基础。子类必须定义更改此状态的受保护方法,并定义哪种状态对于此对象意味着被获取或被释放。假定这些条件之后,此类中的其他方法就可以实现所有排队和阻塞机制。但只是为了获得同步而只追踪使用 getState()、setState(int) 和 compareAndSetState(int, int) 方法来操作以原子方式更新的 int 值。

AQS 的特征:

  1. 阻塞等待队列
  2. 共享/独占
  3. 公平/非公平
  4. 可重入
  5. 允许中断

核心属性 int sate

  1. state 表示共享属性被 volatile 修饰
AQS 和 ReetrantLcok 特征和使用介绍

三个核心方法:

  1. getState()
  2. setState()
  3. compareAndSetState()
AQS 和 ReetrantLcok 特征和使用介绍

两种资源共享方式:

  • Exclusive-独占,只有一个线程可以访问,如 ReetrantLock
  • Share 共享,多个线程可以同时执行,如:Semaphore/CountDownLatch

AQS 定义两种队列

  • 同步等待队列:主要是用于维护获取互斥失败时入队的线程
  • 条件等待队列:调用 await() 的时候会释放锁,点燃后线程会加入到套件队列,调用 signal() 唤醒的时候把条件队列的节点移动到同步队列中,等待再次获取锁。

AQS 队列节点中的 5 种状态

  1. 值为 0 表示初始化状态,表示当前节点在 sync 队列中,等待获取锁。
  2. CANCELLED , 值为1 , 表示当前的线程被取消;
  3. SIGNAL,值为 -1,表示当前的线程被取消;
  4. CONDITION,值为 -2,表示当前节点的后继节点的线程需要运行,也就是 unpark;
  5. PROPAGAGTE 值为-3,表示当前场景下后续的 acquireShard 能够继续执行。

不同的自定义同步器竞争共享资源的方式也不同,自定义同步器在实现时自需要实现共享资源 state 的获取与实现方式即可,至于具体线程等待队列的维护(如获取资源失败入队、出队等),AQS 已经实现好了,自定义同步器时主要实现一下几个方法(AQS 其实是一个典型的模板方法模式的运用):

  • isHeldExclusively() 该线程是否正在独占资源。只有使用到 condition 才需要去实现。
  • tryAcquire(int):独占方式。尝试获取资源,成功返回 true,失败返回 false。
AQS 和 ReetrantLcok 特征和使用介绍
  • tryRelease(int):独占方式。尝试释放资源,成功返回 true,失败返回 false。
  • tryAcquireShared(int): 共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int): 共享方式。尝试释放资源,如果释放后允许唤醒后续等待节点返回 true , 否则返回 false。

自定义独占锁

public class Liu666Lock extends AbstractQueuedSynchronizer {

    @Override
    protected boolean tryAcquire(int arg) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    @Override
    protected boolean tryRelease(int arg) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock() {
        acquire(1);
    }

    public void unlock() {
        release(1);
    }
}      

测试一下:

AQS 和 ReetrantLcok 特征和使用介绍

输出结果如下:

AQS 和 ReetrantLcok 特征和使用介绍

多执行几次我们可以观察,虽然执行的顺序不一定有序,但是我们最终结果是始终 idx = 10

同步等待

AQS 当中的同步等待队列也称为 CLH 队列,CLH队列是 Craig、Landin、Hagersten 三人发明的一种基于双向链表数据结构的队列,是 FIFO 先进先出等待队列,Java 的 CLH 队列是原自 CLH 队列的一个变种实现,线程由原自旋机制改为阻塞机制。

AQS 依赖 CLH 同步队列来完成同步状态的管理:

  • 当前线程如果获取同步状态失败时,AQS 则会将当前线程已经等待状态信息构造成一个节点(Node)并将其加入到 CLH 同步队列,同时会阻塞当前线程
  • 当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。
  • 通过 signal 或者 signalAll 将条件队列中的节点转移到同步队列。(由条件队列转换为同步队列)
AQS 和 ReetrantLcok 特征和使用介绍

条件等待队列

AQS 中条件队列是使用单向链表保存的,用 nextWaiter 属性来连接

  • 调用 await 方法阻塞线程;
  • 当前线程存储同步队列头节点,调用 await 方法进行阻塞(从同步队列转换到条件队列)

Condition 接口

AQS 和 ReetrantLcok 特征和使用介绍
  1. 调用 Condition#await 方法会释放当前持有的锁,然后阻塞当前线程,同时像 Condition 队列尾部添加一个节点,所以调用 Condition#await 方法的时候必须持有锁。
  2. 调用 Condition#signal 方法会将 Condition 队列的首节点移动到队列尾部,然后唤醒调用 Condition#awite 方法而阻塞的线程(唤醒之后这个线程就可以去竞争锁了),所以调用 Condition#signal 方法必须持有锁,持有锁的线程唤醒被因调用 Condition#await 方法而阻塞的线程。

等待唤醒机制 await/signal 实验

@Slf4j
public class ConditionTest {

    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(() -> {
            lock.lock();
            try {
                log.info(Thread.currentThread().getName() + "开始执行任务");
                condition.await();
                log.info(Thread.currentThread().getName() + "任务执行结束");
            } catch (Throwable e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "t1").start();
        new Thread(() -> {
            lock.lock();
            try {
                log.info(Thread.currentThread().getName() + "开始执行任务");
                TimeUnit.SECONDS.sleep(2);
                condition.signal();
                log.info(Thread.currentThread().getName() + "任务执行结束");
            } catch (Throwable e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "t2").start();
    }
}      

从下面的结果我们可以看到 t1 线程获取锁过后,调用 ​

​await​

​方法进入阻塞状态并且释放锁,然后 t2 线程获取锁,并且去唤醒 t1 线程继续执行,这就是一个简单的条件队列例子。

输出结果如下:

AQS 和 ReetrantLcok 特征和使用介绍

ReetrantLock 与 synchroinzed 比较

  • synchroinzed 是 JVM 层次的锁实现,ReetrantLock 是 JDK 层次的锁实现;
  • synchroinzed 的锁状态是无法在 Java 代码中直接判断的,但是 ReetrantLock 可以通过 ReetrantLock#isLock 判断;
  • synchroinzed 是非公平锁,ReetrantLock 是可以公平的也可以是非公平的;
  • synchroinzed 是不可以被中断的,而 ReetrantLock#lockInterruptibly 方法是可以中断锁的;
  • 在发生异常的时候 synchroinzed 会自动释放锁,而 ReetrentLock 需要开发者在 finaly 代码块中显示释放锁;
  • ReetrantLock 获取锁的形式有很多中:如立即返回是否成功的 tryLock(),以及等嗲指定时长的获取,更加灵活;
  • synchroinzed 在特定的情况下已经在等待的线程是后来的线程先获得锁(回顾一下 synchroinzed 唤醒策略),而 ReetranLock 对于已经正在等待的线程是先来的先获取锁。

ReetrantLcok 特征

reetrantlock 是一种基于 aqs 框架的应用实现。是基于 JDK 的一种线程同步手段,他的功能类似与 synchronized 是一种互斥锁,相对于 synchronized 具备一下特点:

  • 可中断
  • 可设置超时时间
  • 可设置公平锁
  • 支持多个条件变量
  • 与 synchronized 一样,都支持可重入

ReetrantLcok 部分源码:

AQS 和 ReetrantLcok 特征和使用介绍

ReetrantLock 使用范式

使用方式:

// 1. 创建锁(默认非公平锁)
ReentrantLock lock = new ReentrantLock(false);
// 2. 加锁
lock.lock();
try {
   // 3. todo 原子操作
} finally {
    // 4. 解锁
    lock.unlock();
}      

可重入特征

下面我们来测试一下 ReetrantLock 的几个特征:

  • 可重入,就是说在一个线程内可以多次获取锁。下面是一个简单的例子:
public static void lockReentrant() {
        ReentrantLock lock = new ReentrantLock();
        lock.lock();
        log.info("main 线程获取锁 1 次");
        lock.lock();
        log.info("main 线程获取锁 1 次");
        lock.unlock();
        log.info("main 线程解锁 1 次");
        lock.unlock();
        log.info("main 线程解锁 1 次");
    }      

输出结果如下:

AQS 和 ReetrantLcok 特征和使用介绍

可中断特征

代码如下:

ReentrantLock lock = new ReentrantLock();

Thread t1 = new Thread(() -> {
    log.info(Thread.currentThread().getName() + " 启动。。。");
    try {
        lock.lockInterruptibly();
        log.info(Thread.currentThread().getName() + " 成功获取锁。。。");
        lock.unlock();
    } catch (InterruptedException e) {
        e.printStackTrace();
        log.info(Thread.currentThread().getName() + " 等待锁的过程中被中断。。。");
    }
}, "t1");

lock.lock();
try {
    t1.start();

    log.info(Thread.currentThread().getName() + " 成功获取锁。。。");
    Thread.sleep(2000);

    t1.interrupt();
    log.info("t1 执行中断。。。");
} catch (InterruptedException e) {
    e.printStackTrace();
    log.info(Thread.currentThread().getName() + " 等待锁的过程中被中断。。。");
} finally {
    lock.unlock();
}      

这个场景主要是模仿,对于线程中断的场景,然后放弃锁的获取,减少锁的无效竞争者。

输出结果如下:

AQS 和 ReetrantLcok 特征和使用介绍
  • 设置获取锁的超时时间,比如我们对于一些互斥操作, 只能让一个线程获取成功,但是允许其他线程在允许的时候内重试,来保证最大的并发执行。
@Slf4j
public class ReentrantLockTest {


    public static void main(String[] args) {
        lockTimeOut();
    }

    public static void lockTimeOut() {
        // 1. 创建一个 ReentrantLock 实例
        ReentrantLock lock = new ReentrantLock();
        // 2. 创建线程 t1
        Thread t1 = new Thread(() -> {
            log.debug("t1 线程启动。。。。");
            try {
                // t1 尝试获取锁,锁获取超时时间 1s
                if (lock.tryLock(1, TimeUnit.SECONDS)) {
                    log.debug("t1 线程等待 1s 后 获取锁失败");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 如果是自己获取锁才去解锁
                if (lock.isHeldByCurrentThread()) { lock.unlock(); }
            }
        }, "t1");

        // 3. 主线程获取锁
        lock.lock();
        try {
            log.debug("main 线程获取锁成功");
            // 4. 启动 t1 线程
            t1.start();
            // 5. 休眠 2s 
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 如果是自己获取锁才去解锁
            if (lock.isHeldByCurrentThread()) { lock.unlock(); }
        }
    }
}      

公平与非公平特征

Condition 总结

参考资料

  1. ​​baike.baidu.com/item/Abstra…​​

继续阅读