一、 前言
上節介紹了無界連結清單方式的阻塞隊列LinkedBlockingQueue,本節來研究下有界使用數組方式實作的阻塞隊列ArrayBlockingQueue
二、 ArrayBlockingQueue類圖結構
如圖ArrayBlockingQueue内部
- 有個數組items用來存放隊列元素,
- putindex下标标示入隊元素下标,takeIndex是出隊下标,count統計隊列元素個數,從定義可知道并沒有使用volatile修飾,這是因為通路這些變量使用都是在鎖塊内,并不存在可見性問題。
- 另外有個獨占鎖lock用來對出入隊操作加鎖,這導緻同時隻有一個線程可以通路入隊出隊,
- 另外notEmpty,notFull條件變量用來進行出入隊的同步。
另外構造函數必須傳入隊列大小參數,是以為有界隊列,預設是Lock為非公平鎖。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
三、offer操作
在隊尾插入元素,如果隊列滿則傳回false,否者入隊傳回true。
public boolean offer(E e) {
//e為null,則抛出NullPointerException異常
checkNotNull(e);
//擷取獨占鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
//如果隊列滿則傳回false
if (count == items.length)
return false;
else {
//否者插入元素
insert(e);
return true;
}
} finally {
//釋放鎖
lock.unlock();
}
}
private void insert(E x) {
//元素入隊
items[putIndex] = x;
//計算下一個元素應該存放的下标
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
//循環隊列,計算下标
final int inc(int i) {
return (++i == items.length) ? 0 : i;
}
這裡由于在操作共享變量前加了鎖,是以不存在記憶體不可見問題。
加過鎖後擷取的共享變量都是從主記憶體擷取的,而不是在CPU緩存或者寄存器裡面的值。
釋放鎖後修改的共享變量值會重新整理會主記憶體中。
另外這個隊列是使用循環數組實作,是以計算下一個元素存放下标時候有些特殊。
另外insert後調用 notEmpty.signal();是為了激活調用notEmpty.await()阻塞後放入notEmpty條件隊列中的線程。
四、put操作
在隊列尾部添加元素,如果隊列滿則等待隊列有空位置插入後傳回
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//擷取可被中斷鎖
lock.lockInterruptibly();
try {
//如果隊列滿,則把目前線程放入notFull管理的條件隊列
while (count == items.length)
notFull.await();
//插入元素
insert(e);
} finally {
lock.unlock();
}
}
需要注意的是如果隊列滿了那麼目前線程會阻塞,直到出隊操作調用了notFull.signal方法激活該線程。
代碼邏輯很簡單,但是這裡需要思考一個問題為啥調用lockInterruptibly方法而不是Lock方法。
我的了解是因為調用了條件變量的await()方法,而await()方法會在中斷标志設定後抛出InterruptedException異常後退出,是以還不如在加鎖時候先看中斷标志是不是被設定了,如果設定了直接抛出InterruptedException異常,就不用再去擷取鎖了。
然後看了其他并發類裡面凡是調用了await的方法擷取鎖時候都是使用的lockInterruptibly方法而不是Lock也驗證了這個想法。
五、poll操作
從隊頭擷取并移除元素,隊列為空,則傳回null。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//目前隊列為空則傳回null,否者
return (count == 0) ? null : extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
//擷取元素值
E x = this.<E>cast(items[takeIndex]);
//數組中值值為null;
items[takeIndex] = null;
//隊頭指針計算,隊列元素個數減一
takeIndex = inc(takeIndex);
--count;
//發送信号激活notFull條件隊列裡面的線程
notFull.signal();
return x;
}
六、take操作
從隊頭擷取元素,如果隊列為空則阻塞直到隊列有元素。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//隊列為空,則等待,直到隊列有元素
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
需要注意的是如果隊列為空,目前線程會被挂起放到notEmpty的條件隊列裡面,直到入隊操作執行調用notEmpty.signal後目前線程才會被激活,await才會傳回。
七、peek操作
傳回隊列頭元素但不移除該元素,隊列為空,傳回null
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//隊列為空傳回null,否者傳回頭元素
return (count == 0) ? null : itemAt(takeIndex);
} finally {
lock.unlock();
}
}
final E itemAt(int i) {
return this.<E>cast(items[i]);
}
八、 size操作
擷取隊列元素個數,非常精确因為計算size時候加了獨占鎖,其他線程不能入隊或者出隊或者删除元素
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
九、總結
ArrayBlockingQueue通過使用全局獨占鎖實作同時隻能有一個線程進行入隊或者出隊操作。
這個鎖的粒度比較大,有點類似在方法上添加synchronized的意味。
其中offer,poll操作通過簡單的加鎖進行入隊出隊操作。
而put,take則使用了條件變量實作如果隊列滿則等待,如果隊列空則等待,然後分别在出隊和入隊操作中發送信号激活等待線程實作同步。
另外相比LinkedBlockingQueue,ArrayBlockingQueue的size操作的結果是精确的,因為計算前加了全局鎖。