目錄
- Semaphore概述及案例學習
- 類圖結構及重要字段
- void acquire()
- 非公平
- 公平政策
- void acquire(int permits)
- void acquireUninterruptibly()
- void acquireUninterruptibly(int permits)
- boolean tryAcquire()
- boolean tryAcquire(int permits)
- boolean tryAcquire(int permits, long timeout, TimeUnit unit)
- void release()
- void release(int permits)
- 其他方法
- 總結
- 參考閱讀
Semaphore概述及案例學習
Semaphore信号量用來控制同時通路特定資源的線程數量,它通過協調各個線程,以保證合理地使用公共資源。
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10); //10個許可證數量,最大并發數為10
public static void main(String[] args) {
for(int i = 0; i < THREAD_COUNT; i ++){ //執行30個線程
threadPool.execute(new Runnable() {
@Override
public void run() {
s.tryAcquire(); //嘗試擷取一個許可證
System.out.println("save data");
s.release(); //使用完之後歸還許可證
}
});
}
threadPool.shutdown();
}
}
- 建立一個大小為30的線程池,但是信号量規定在10,保證許可證數量為10。
- 每次線程調用
或者tryAcquire()
方法都會原子性的遞減許可證的數量,release()會原子性遞增許可證數量。acquire()
類圖結構及重要字段
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
/** All mechanics via AbstractQueuedSynchronizer subclass */
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
// permits指定初始化信号量個數
Sync(int permits) {
setState(permits);
}
// ...
}
static final class NonfairSync extends Sync {...}
static final class FairSync extends Sync {...}
// 預設采用非公平政策
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 可以指定公平政策
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
//...
}
- 基于AQS,類似于ReentrantLock,Sync繼承自AQS,有公平政策和非公平政策兩種實作。
- 類似于CountDownLatch,state在這裡也是通過構造器指定,表示初始化信号量的個數。
本篇文章閱讀需要建立在一定的AQS基礎之上,這邊推薦幾篇前置文章,可以瞅一眼:
- Java并發包源碼學習系列:AbstractQueuedSynchronizer
- Java并發包源碼學習系列:CLH同步隊列及同步資源擷取與釋放
- Java并發包源碼學習系列:AQS共享式與獨占式擷取與釋放資源的差別
- Java并發包源碼學習系列:詳解Condition條件隊列、signal和await
- Java并發包源碼學習系列:挂起與喚醒線程LockSupport工具類
void acquire()
調用該方法時,表示希望擷取一個信号量資源,相當于
acquire(1)
。
如果目前信号量個數大于0,CAS将目前信号量值減1,成功後直接傳回。
如果目前信号量個數等于0,則目前線程将被置入AQS的阻塞隊列。
該方法是響應中斷的,其他線程調用了該線程的
interrupt()
方法,将會抛出中斷異常傳回。
// Semaphore.java
public void acquire() throws InterruptedException {
// 傳遞的 arg 為 1 , 擷取1個信号量資源
sync.acquireSharedInterruptibly(1);
}
// AQS.java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 線程被 中斷, 抛出中斷異常
if (Thread.interrupted())
throw new InterruptedException();
// 子類實作, 公平和非公平兩種政策
if (tryAcquireShared(arg) < 0)
// 如果擷取失敗, 則置入阻塞隊列,
// 再次進行嘗試, 嘗試失敗則挂起目前線程
doAcquireSharedInterruptibly(arg);
}
非公平
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
// 這裡直接調用Sync定義的 非公平共享模式擷取方法
return nonfairTryAcquireShared(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
// 擷取目前信号量的值
int available = getState();
// 減去需要擷取的值, 得到剩餘的信号量個數
int remaining = available - acquires;
// 不剩了,表示目前信号量個數不能滿足需求, 傳回負數, 線程置入AQS阻塞
// 還有的剩, CAS設定目前信号量值為剩餘值, 并傳回剩餘值
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
你會發現,非公平政策是無法保證【AQS隊列中阻塞的線程】和【目前線程】擷取的順序的,目前線程是有可能在排隊的線程之前就拿到資源,産生插隊現象。
公平政策就不一樣了,它會通過
hasQueuedPredecessors()
方法看看隊列中是否存在前驅節點,以保證公平性。
公平政策
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
// 如果隊列中在此之前已經有線程在排隊了,直接放棄擷取
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
void acquire(int permits)
在acquire()的基礎上,指定了擷取信号量的數量permits。
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
void acquireUninterruptibly()
該方法與
acquire()
類似,但是不響應中斷。
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
void acquireUninterruptibly(int permits)
該方法與
acquire(permits)
類似,但是不響應中斷。
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
boolean tryAcquire()
tryAcquire和acquire非公平政策公用一個邏輯,但是差別在于,如果擷取信号量失敗,或者CAS失敗,将會直接傳回false,而不會置入阻塞隊列中。
一般try開頭的方法的特點就是這樣,嘗試一下,成功是最好,失敗也不至于被阻塞,而是立刻傳回false。
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
boolean tryAcquire(int permits)
相比于普通的
tryAcquire()
,指定了permits的值。
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
boolean tryAcquire(int permits, long timeout, TimeUnit unit)
相比于
tryAcquire(int permits)
,增加了逾時控制。
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
void release()
将信号量值加1,如果有線程因為調用acquire方法而被阻塞在AQS阻塞隊列中,将根據公平政策選擇一個信号量個數滿足需求的線程喚醒,線程喚醒後也會嘗試擷取新增的信号量。
參考文章:Java并發包源碼學習系列:AQS共享模式擷取與釋放資源
// Semaphore.java
public void release() {
sync.releaseShared(1);
}
// AQS.java
public final boolean releaseShared(int arg) {
// 嘗試釋放鎖
if (tryReleaseShared(arg)) {
// 釋放成功, 喚醒AQS隊列裡面最先挂起的線程
// https://blog.csdn.net/Sky_QiaoBa_Sum/article/details/112386838
doReleaseShared();
return true;
}
return false;
}
// Semaphore#Sync.java
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 擷取目前信号量
int current = getState();
// 期望加上releases
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS操作,更新
if (compareAndSetState(current, next))
return true;
}
}
}
void release(int permits)
和
release()
相比指定了permits的值。
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
其他方法
Semaphore還提供其他一些方法,實作比較簡單,這邊就簡單寫一下吧:
// 傳回此信号量中目前可用的許可證數量, 其實就是得到目前的 state值 getState()
public int availablePermits() {
return sync.getPermits();
}
// 将state更新為0, 傳回0
public int drainPermits() {
return sync.drainPermits();
}
// 減少reduction個許可證
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
// 判斷公平政策
public boolean isFair() {
return sync instanceof FairSync;
}
// 判斷是否有線程證在等待擷取許可證
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
// 傳回正在等待擷取許可證的線程數
public final int getQueueLength() {
return sync.getQueueLength();
}
// 傳回所有等待擷取許可證的線程集合
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
總結
Semaphore信号量用來控制同時通路特定資源的線程數量,它通過協調各個線程,以保證合理地使用公共資源。
- 基于AQS,類似于ReentrantLock,Sync繼承自AQS,有公平政策和非公平政策兩種實作。
- 類似于CountDownLatch,state在這裡也是通過構造器指定,表示初始化信号量的個數。
每次線程調用
tryAcquire()
或者
acquire()
方法都會原子性的遞減許可證的數量,release()會原子性遞增許可證數量,隻要有許可證就可以重複使用。
參考閱讀
- 《Java并發程式設計之美》
- 《Java并發程式設計的藝術》