天天看点

JAVA LOCK代码浅析

java lock总体来说关键要素主要包括3点:

1.unsafe.compareandswapxxx(object o,long offset,int expected,int x)

2.unsafe.park() 和 unsafe.unpark()

3.单向链表结构或者说存储线程的数据结构

第1点

主要为了保证锁的原子性,相当于一个锁是否正在被使用的标记,并且比较和设置这个标记的操作是原子的(硬件提供的swap和test_and_set指令,单cpu下同一指令的多个指令周期不可中断,smp中通过锁总线支持上诉两个指令的原子性),这基本等于软件级别所能达到的最高级别隔离。

第2点

主要将未得到锁的线程禁用(park)和唤醒(unpark),也是直接native实现(这几个native方法的实现代码在hotspotsrcsharevmprimsunsafe.cpp文件中,但是关键代码park的最终实现是和操作系统相关的,比如windows下实现是在os_windows.cpp中,有兴趣的同学可以下载jdk源码查看)。唤醒一个被park()线程主要手段包括以下几种

1. 其他线程调用以被park()线程为参数的unpark(thread thread).

2. 其他线程中断被park()线程,如waiters.peek().interrupt();waiters为存储线程对象的队列.

3. 不知原因的返回。

park()方法返回并不会报告到底是上诉哪种返回,所以返回好最好检查下线程状态,如

[java]

locksupport.park(); //禁用当前线程

if(thread.interrupted){

//dosomething

}[/java]

abstractqueuedsynchronizer(aqs)对于这点实现得相当巧妙,如下所示

private void doacquiresharedinterruptibly(int arg)throws interruptedexception {

final node node = addwaiter(node.shared);

try {

for (;;) {

final node p = node.predecessor();

if (p == head) {

int r = tryacquireshared(arg);

if (r >= 0) {

setheadandpropagate(node, r);

p.next = null; // help gc

return;

}

//parkandcheckinterrupt()会返回park住的线程在被unpark后的线程状态,如果线程中断,跳出循环。

if (shouldparkafterfailedacquire(p, node) &&

parkandcheckinterrupt())

break;

} catch (runtimeexception ex) {

cancelacquire(node);

throw ex;

//在park()住的线程被unpark()后,第一时间返回当前线程是否被打断

private final boolean parkandcheckinterrupt() {

locksupport.park(this);

return thread.interrupted();

[/java]

第3点对于一个synchronizer的实现非常重要,存储等待线程,并且unlock时唤醒等待线程,这中间有很多工作需要做,唤醒策略,等待线程意外终结处理,公平非公平,可重入不可重入等。

以上简单说明了下java locks关键要素,现在我们来看下java.util.concurrent.locks大致结构

JAVA LOCK代码浅析

上图中,lock的实现类其实都是构建在abstractqueuedsynchronizer上,为何图中没有用uml线表示呢,这是每个lock实现类都持有自己内部类sync的实例,而这个sync就是继承abstractqueuedsynchronizer(aqs)。为何要实现不同的sync呢?这和每种lock用途相关。另外还有aqs的state机制。

基于aqs构建的synchronizer包括reentrantlock,semaphore,countdownlatch, reetrantread writelock,futuretask等,这些synchronizer实际上最基本的东西就是原子状态的获取和释放,只是条件不一样而已。

reentrantlock需要记录当前线程获取原子状态的次数,如果次数为零,那么就说明这个线程放弃了锁(也有可能其他线程占据着锁从而需要等待),如果次数大于1,也就是获得了重进入的效果,而其他线程只能被park住,直到这个线程重进入锁次数变成0而释放原子状态。以下为reetranlock的fairsync的tryacquire实现代码解析。

//公平获取锁

protected final boolean tryacquire(int acquires) {

final thread current = thread.currentthread();

int c = getstate();

//如果当前重进入数为0,说明有机会取得锁

if (c == 0) {

//如果是第一个等待者,并且设置重进入数成功,那么当前线程获得锁

if (isfirst(current) &&

compareandsetstate(0, acquires)) {

setexclusiveownerthread(current);

return true;

//如果当前线程本身就持有锁,那么叠加重进入数,并且继续获得锁

else if (current == getexclusiveownerthread()) {

int nextc = c + acquires;

if (nextc < 0)

throw new error("maximum lock count exceeded");

setstate(nextc);

//以上条件都不满足,那么线程进入等待队列。

return false;

semaphore则是要记录当前还有多少次许可可以使用,到0,就需要等待,也就实现并发量的控制,semaphore一开始设置许可数为1,实际上就是一把互斥锁。以下为semaphore的fairsync实现

protected int tryacquireshared(int acquires) {

thread current = thread.currentthread();

thread first = getfirstqueuedthread();

//如果当前等待队列的第一个线程不是当前线程,那么就返回-1表示当前线程需要等待

if (first != null && first != current)

return -1;

//如果当前队列没有等待者,或者当前线程就是等待队列第一个等待者,那么先取得semaphore还有几个许可证,并且减去当前线程需要的许可证得到剩下的值

int available = getstate();

int remaining = available - acquires;

//如果remining<0,那么反馈给aqs当前线程需要等待,如果remaining>0,并且设置availble成功设置成剩余数,那么返回剩余值(>0),也就告知aqs当前线程拿到许可,可以继续执行。

if (remaining < 0 ||compareandsetstate(available, remaining))

return remaining;

countdownlatch闭锁则要保持其状态,在这个状态到达终止态之前,所有线程都会被park住,闭锁可以设定初始值,这个值的含义就是这个闭锁需要被countdown()几次,因为每次countdown是sync.releaseshared(1),而一开始初始值为10的话,那么这个闭锁需要被countdown()十次,才能够将这个初始值减到0,从而释放原子状态,让等待的所有线程通过。

//await时候执行,只查看当前需要countdown数量减为0了,如果为0,说明可以继续执行,否则需要park住,等待countdown次数足够,并且unpark所有等待线程

public int tryacquireshared(int acquires) {

return getstate() == 0? 1 : -1;

//countdown 时候执行,如果当前countdown数量为0,说明没有线程await,直接返回false而不需要唤醒park住线程,如果不为0,得到剩下需要 countdown的数量并且compareandset,最终返回剩下的countdown数量是否为0,供aqs判定是否释放所有await线程。

public boolean tryreleaseshared(int releases) {

if (c == 0)

int nextc = c-1;

if (compareandsetstate(c, nextc))

return nextc == 0;

futuretask需要记录任务的执行状态,当调用其实例的get方法时,内部类sync会去调用aqs的acquiresharedinterruptibly()方法,而这个方法会反向调用sync实现的tryacquireshared()方法,即让具体实现类决定是否让当前线程继续还是park,而futuretask的tryacquireshared方法所做的唯一事情就是检查状态,如果是running状态那么让当前线程park。而跑任务的线程会在任务结束时调用futuretask 实例的set方法(与等待线程持相同的实例),设定执行结果,并且通过unpark唤醒正在等待的线程,返回结果。

//get时待用,只检查当前任务是否完成或者被cancel,如果未完成并且没有被cancel,那么告诉aqs当前线程需要进入等待队列并且park住

protected int tryacquireshared(int ignore) {

return innerisdone()? 1 : -1;

//判定任务是否完成或者被cancel

boolean innerisdone() {

return ranorcancelled(getstate()) && runner == null;

//get时调用,对于cancel与其他异常进行抛错

v innerget(long nanostimeout) throws interruptedexception, executionexception, timeoutexception {

if (!tryacquiresharednanos(0,nanostimeout))

throw new timeoutexception();

if (getstate() == cancelled)

throw new cancellationexception();

if (exception != null)

throw new executionexception(exception);

return result;

//任务的执行线程执行完毕调用(set(v v))

void innerset(v v) {

int s = getstate();

//如果线程任务已经执行完毕,那么直接返回(多线程执行任务?)

if (s == ran)

//如果被cancel了,那么释放等待线程,并且会抛错

if (s == cancelled) {

releaseshared(0);

//如果成功设定任务状态为已完成,那么设定结果,unpark等待线程(调用get()方法而阻塞的线程),以及后续清理工作(一般由futruetask的子类实现)

if (compareandsetstate(s, ran)) {

result = v;

done();

以上4个aqs的使用是比较典型,然而有个问题就是这些状态存在哪里呢?并且是可以计数的。从以上4个example,我们可以很快得到答案,aqs提供给了子类一个int state属性。并且暴露给子类getstate()和setstate()两个方法(protected)。这样就为上述状态解决了存储问题,retrantlock可以将这个state用于存储当前线程的重进入次数,semaphore可以用这个state存储许可数,countdownlatch则可以存储需要被countdown的次数,而future则可以存储当前任务的执行状态(runing,ran,cancell)。其他的synchronizer存储他们的一些状态。

aqs留给实现者的方法主要有5个方法,其中tryacquire,tryrelease和isheldexclusively三个方法为需要独占形式获取的synchronizer实现的,比如线程独占reetranlock的sync,而tryacquireshared和tryreleasedshared为需要共享形式获取的synchronizer实现。

reentrantlock内部sync类实现的是tryacquire,tryrelease, isheldexclusively三个方法(因为获取锁的公平性问题,tryacquire由继承该sync类的内部类fairsync和nonfairsync实现)semaphore内部类sync则实现了tryacquireshared和tryreleasedshared(与countdownlatch相似,因为公平性问题,tryacquireshared由其内部类fairsync和nonfairsync实现)。countdownlatch内部类sync实现了tryacquireshared和tryreleasedshared。futuretask内部类sync也实现了tryacquireshared和tryreleasedshared。

其实使用过一些java synchronizer的之后,然后结合代码,能够很快理解其到底是如何做到各自的特性的,在把握了基本特性,即获取原子状态和释放原子状态,其实我们自己也可以构造synchronizer。如下是一个lock api的一个例子,实现了一个先入先出的互斥锁。

public class fifomutex {

private atomicboolean locked=new atomicboolean(false);

private queue<thread> waiters=new concurrentlinkedqueue<thread>();

总结,java lock机制对于整个java concurrent包的成员意义重大,了解这个机制对于使用java并发类有着很多的帮助,文章中可能存在着各种错误,请各位多多谅解并且能够提出来,谢谢。

文章参考:jdk 1.6 source

java 并发编程实践

jdk 1.6 api 文档

本文来源于"阿里中间件团队播客",原文发表时间" 2010-09-30"