天天看点

CountDownLatch & Semaphore 实现原理详解

CountDownLatch 和 Semaphore 是Java中常用的两个同步器。他们的用法百度一下一大堆,我就不多做介绍了。下面我将从源码的角度分析一下这两个类的实现原理。

阅读本篇文章之前,建议先理解AQS同步器的原理。可以看我之前写的一篇文章:

Java同步器框架-AQS原理&源码解析

理解了AQS的底层原理,再来看CountDownLatch 和 Semaphore 的实现原理就会发现很简单。

CountDownLatch 实现原理

CountDownLatch 内部定义了一个AQS的实现类

private final Sync sync;
private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            //判断state是否为0了,如果为0就返回1,表示允许通过。返回-1的话线程就要挂起等待state为0
            return (getState() == ) ?  : -;
        }

        protected boolean tryReleaseShared(int releases) {
            //通过自旋+CAS的操作将state-1。当state减到0的时候AQS就会去唤醒挂起中的线程
            for (;;) {
                int c = getState();
                if (c == )
                    return false;
                int nextc = c-;
                if (compareAndSetState(c, nextc))
                    return nextc == ;
            }
        }
}
public CountDownLatch(int count) {
        if (count < ) throw new IllegalArgumentException("count < 0");
        //初始化AQS同步器
        this.sync = new Sync(count);
}
public void await() throws InterruptedException {
        //尝试获取资源,等待的时候支持线程响应中断
        sync.acquireSharedInterruptibly();
}
public void countDown() {
        //释放资源
        sync.releaseShared();
}
           

可以看出,countDown()和await()方法的底层其实都是依赖于Sync。

Sync是一个AQS同步器,实现了 tryAcquireShared()和tryReleaseShared(),这是一个共享模式下的同步器。

CountDownLatch在初始化的时候传入一个同步线程数量count表示AQS的state。然后执行await()的时候,AQS会先判断state是否为0,state不为0的话就会阻塞当前线程,并让当前线程进入AQS的CLH队列中排队。

当有线程执行countDown的时候,Sync会通过cas+自旋的方式将state减一,然后判断state是否等于0。等于0的时候返回true,AQS发现tryReleaseShared()返回true,就会去唤醒正在CLH队列中排队等待的线程,先唤醒排在最前面的那个线程。由于是共享模式,那个线程被唤醒后,检查state=0了,就结束阻塞,并且会通知下一个排队线程,下一个线程醒来后,一样判断state是否等于0了,然后结束阻塞,通知下一个,一直循环下去,直到所有阻塞中的线程全部被唤醒。

Semaphore 实现原理

也是基于AQS实现的。内部也定义了一个Sync ,AQS的实现类

private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = L;
        //定义初始的许可证的数量
        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        //非公平的策略
        final int nonfairTryAcquireShared(int acquires) {
            //用CAS+自旋的方式获取资源
            for (;;) {
                //直接去获取资源
                //假设现在CLH队列中有人在排队了,然后刚好有线程释放了一个资源并且唤醒队列中等待的线程了
                //然后唤醒后还没来得及获取到资源,就被人插队获取了资源
                //下面的代码就是尝试不排队直接获取资源,获取不到资源再去排队,所以这是一种不公平的算法
                int available = getState();
                int remaining = available - acquires;
                if (remaining <  ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        //释放资源
        protected final boolean tryReleaseShared(int releases) {
            //用CAS+自旋的方式释放资源
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current ==  || compareAndSetState(current, ))
                    return current;
            }
        }
    }

    /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            //采用非公平的方式获取资源
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            //用CAS+自旋的方式尝试获取资源
            for (;;) {
                //判断CLH队列中是否有人在排队等待,如果是的话直接返回-1,表示也进入队列等待
                if (hasQueuedPredecessors())
                    return -;
                //如果队列中没有人在排队,就尝试获取资源了
                int available = getState();
                int remaining = available - acquires;
                if (remaining <  ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
}
//默认选择非公平的AQS同步器
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
      //可以选择公平的还是不公平的同步器
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly();
}

public void release() {
        sync.releaseShared();
}
           

在Semaphore 的例子中,AQS的state就可以直接看成是一种资源(许可证)。

acquire()

方法就是尝试获取资源,当资源不足的的时候,就进入CLH队列排队等待,也就是阻塞当前线程。

release()

方法就是释放一些资源,释放完后就会通知CLH队列中的线程。CLH队列中的线程被唤醒后就会再次检查是否有资源了,有的话就消耗资源,也就是state-n,然后就退出阻塞。由于是共享模式的AQS,线程被唤醒后还会继续通知下一个排在自己后面的线程,那个线程也醒来检查资源是否足够,以此类推下去。

比较有意思的是,Semaphore 内部提供了两个AQS同步器的实现类,一种是公平模式的,一种是非公平模式的。这两种同步器其实只是在获取资源的行为上有所不同,也就是

tryAcquireShared(int acquires)

的实现不同,而在释放资源的行为上是一样的。

在公平模式下,获取资源前会先去检查CLH队列是否有人在排队了,如果发现有人在排队了,就会自觉也去排队。而在非公平模式下,它会直接尝试先获取资源,获取不到再去排队。

我们可以理解为一间店在卖面包,面包的数量会不断产生,如果数量不够了,排队的人就要在那等着。在公平模式下,有一个人来买面包,发现大家都在排队,就自觉进入队列中也排队等待。在非公平的模式下,来买面包的人就直接先去店主那看有没有剩余的面包可以买,这时候可以店主刚生产出一个面包来,还没来得及交给排在队伍的第一个人,就被插队者买走了。所以,这就是不公平的模式,违反了先到先得的规则。

总结

CountDownLatch 和 Semaphore这两个类的原理其实都很好理解。看这两个类的源码也很快,结合AQS的原理一起理解,会对Java线程之间的同步有更深的理解。

最后,如果哪里有写的不对的地方,烦请指出,感激不尽!

继续阅读