前言
Semaphore,信号量,一般用于控制同時通路資源的線程數量。可以認為Synchronized代表的是一把鎖,那麼Semaphore就是多把鎖。
常用方法
public class Semaphore implements java.io.Serializable {
//構造方法,傳入令牌數,預設執行個體化一個非公平鎖
public Semaphore(int permits);
//擷取一個令牌,在擷取成功之前,以及被其他線程中斷之前,目前線程會被阻塞
public void acquire() throws InterruptedException;
//擷取一個令牌,在擷取成功之前,目前線程會被阻塞(中斷被忽略)
public void acquireUninterruptibly() ;
//嘗試擷取令牌,立即傳回擷取成功與否,不阻塞目前線程
public boolean tryAcquire();
//釋放一個令牌
public void release();
//傳回目前可用的令牌數
public int availablePermits();
}
現在有這樣的一個例子:
某衛生間隻有3個坑位,把坑前面的擋門了解為令牌,是以這裡有3個令牌,現在模拟5個人搶坑位的場景。
package com.xue.testSemaphore;
import java.util.concurrent.Semaphore;
public class Main {
public static void main(String[] args) {
//最多支援3個人同時蹲坑
Semaphore semaphore = new Semaphore(3);
//5個人來搶坑位
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "已經在蹲坑");
//模拟蹲坑時長
Thread.sleep((long) (Math.random() * 10 * 1000));
//離開坑位
System.out.println(Thread.currentThread().getName() + "即将離開坑位");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, i + "号").start();
}
}
}
輸出如下:
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiI0gTMx81dsQWZ4lmZf1GLlpXazVmcvwFciV2dsQXYtJ3bm9CX9s2RkBnVHFmb1clWvB3MaVnRtp1XlBXe0xCMy81dvRWYoNHLwEzX5xCMx8FesU2cfdGLwMzX0xiRGZkRGZ0Xy9GbvNGLpZTY1EmMZVDUSFTU4VFRR9Fd4VGdsYTMfVmepNHLrJXYtJXZ0F2dvwVZnFWbp1zczV2YvJHctM3cv1Ce-cmbw5SN0UDMzkjNwYDNmRjM2ITNzYzXzQTMxcTMxIzLcBTMyIDMy8CXn9Gbi9CXzV2Zh1WavwVbvNmLvR3YxUjLyM3Lc9CX6MHc0RHaiojIsJye.png)
首先0、1、2号已經搶完了所有的坑位,3與4号隻能在外面等候,對的他們沒排隊(預設執行個體化了一個非公平鎖)。2号出來後,3号才能進去。接着0号出來,4号才能進去。
這個例子雖然有點俗,這确實能讓人印象深刻呀。
原了解析
類圖
Semaphore有2個内部類,FairSync與NonfairSync,它們都繼承自Sync,Sync又繼承自AQS。可以看的出來,Semaphore與CountDownLatch的結構類似,都需要借助于AQS。
對CountDownLatch不熟悉的同學,可以先參考我的另外一篇文章CountDownLatch實作原理
構造方法
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
預設執行個體化了一個非公平鎖,當然也可以進行指定。這裡的permits最終會傳到AQS的state變量中,代表目前可用的令牌數。
acquire()
擷取一個令牌,擷取到線程可以繼續執行,否則将會被阻塞。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
調用了AQS中的模版方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//嘗試擷取arg個令牌,該方法傳回可用令牌數-需求數,如果小于0,則進行阻塞
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
其中tryAcquireShared()由具體的子類(AQS的子類Sync的子類NonfailSync)進行實作
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
這裡又調用了父類Sync的方法
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
remaining =可用令牌數-需求數<0時,直接傳回remaining 。否則利用CAS進行更新,同樣傳回remaining 。對CAS機制不熟悉的同學,可以先參考我的另外一篇文章淺探CAS實作原理
該方法傳回一個小于0的值時,将會調用以下方法,這段代碼的作用主要就是将擷取不到令牌的線程封裝為節點,加入到阻塞隊列中。
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//建立一個共享類型的節點加入到阻塞隊列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
release()
釋放一個令牌,接着喚醒所有同步隊列中的阻塞的共享模式的節點線程。被喚醒的線程重新嘗試去擷取令牌,擷取成功則繼續執行,否則重新加入到阻塞隊列中。
public void release() {
sync.releaseShared(1);
}
releaseShared是AQS中的模版方法
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
調用了Sync中的tryReleaseShared方法
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}