[java隊列]——PriorityBlockingQueue
- PriorityBlockingQueue介紹
- PriorityBlockingQueue内部實作
-
- 基本屬性
- 構造方法
- 入隊
- 擴容
- 出隊
- PriorityBlockingQueue總結
PriorityBlockingQueue介紹
上一篇[[java隊列]——PriorityQueue介紹了優先級隊列,回顧一下PriorityQueue有哪些特點:
- 資料實作,自動擴容,無界
- 非線程安全
- 使用小頂堆作為實作,入隊就是堆的插入,出隊就是堆的删除堆頂元素
這一篇将介紹另一個優先級隊列PriorityBlockingQueue,阻塞式的優先級隊列,它相比PriorityQueue有以下特點
- 線程安全
- 阻塞
- 同樣是數組實作,自動擴容,無界,同樣底層是使用小頂堆實作優先級
PriorityBlockingQueue内部實作
基本屬性
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
//預設初始容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//數組最大容量
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
//存儲元素的數組
private transient Object[] queue;
//元素個數
private transient int size;
//比較器,用于比較元素之間的大小
private transient Comparator<? super E> comparator;
//可重入鎖,控制并發安全
private final ReentrantLock lock;
//可重入鎖上的非空條件
private final Condition notEmpty;
//???通過這個變量CAS更新成功的,就可以進行擴容
private transient volatile int allocationSpinLock;
//???據說用于序列化
private PriorityQueue<E> q;
結論:
- 數組實作
- 可重入鎖+非空條件控制并發
- 為什麼不需要非滿條件,因為隊列是無界的
- 結構基本與PriorityQueue一樣,隻是加多了鎖,擴容使用了一個volatile變量CAS來控制并發。
構造方法
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
結論:
- 構造方法非常簡單,直接看代碼吧
入隊
還是跟前面介紹的一樣,阻塞隊列一般有四個入隊方法。基本都是調用了offer(E e)方法
public boolean add(E e) {
return offer(e);
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
//入隊加鎖
lock.lock();
int n, cap;
Object[] array;
//循環判斷如果元素個數大于等于數組長度,則進行擴容。
while ((n = size) >= (cap = (array = queue).length))
//調用tryGrow完畢要繼續判斷,因為調用tryGrow傳回并不代表目前線程已經擴容完畢,有可能是其他線程正在擴容,
//如果其他線程正在擴容,目前線程的tryGrow将會直接傳回,要繼續while循環判斷,直到其他線程擴容完畢。
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
//入隊方法與PriorityQueue類似,這裡就不再介紹了
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
//入隊成功,發出信号
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
結論:
- 入隊加鎖,入隊完畢,發出非空信号通知
- 循環判斷數組元素個數是否已滿,進行tryGrow擴容
擴容
private void tryGrow(Object[] array, int oldCap) {
//這裡先釋放鎖,目的是由于已經達到擴容條件了,如果有多個線程正在嘗試入隊,這些線程将會被阻塞
//為了減少阻塞的線程,這裡直接釋放鎖,使用CAS的方式進行擴容。
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
//CAS更新allocationSpinLock,更新成功才能進行擴容
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
//跟PriorityQueue一樣的擴容增量規則
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
//擴容後解鎖
allocationSpinLock = 0;
}
}
//若newArray為null,說明cas更新失敗,其他線程正在擴容。這個時候将目前線程讓出cpu
if (newArray == null) // back off if another thread is allocating
//yield是讓目前線程讓出cpu,從運作狀态變為就緒狀态,但并不代表,cpu就一定會先執行其他線程。因為是搶占式的,有可能又搶到cpu
Thread.yield();
//擴容成功,或者,線程讓出cpu後傳回回來,繼續加鎖
lock.lock();
//如果擴容成功并且就數組沒有被替換過。。沒看太懂這裡??
if (newArray != null && queue == array) {
//隊列更新為新數組
queue = newArray;
//複制新數組
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
結論:
- 擴容容量規則與PriorityQueue一緻
- 擴容前先釋放鎖,為了減少阻塞線程的數量。
- 擴容前先釋放鎖,利用CAS來控制擴容。CAS更新失敗,則讓目前線程讓出CPU
- 最後再次加鎖,再确認是否擴容成功
出隊
同樣出隊有四個方法
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//出隊加鎖,響應中斷
lock.lockInterruptibly();
E result;
try {
//若出隊隊列為空,阻塞等待,dequeue與PriorityQueue的出隊一樣,這裡就不介紹了
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
//解鎖
lock.unlock();
}
return result;
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
結論:
- 出隊加鎖,若dequeue傳回為空則一直阻塞,直到收到隊列不為空的通知或者中斷才會傳回
- dequeue方法基本跟PriorityQueue一緻
PriorityBlockingQueue總結
- 出入隊線程安全,使用鎖和非空條件實作
- 擴容線程安全,使用CAS
- 阻塞
- 與PriorityQueue同樣是數組實作,自動擴容,無界,同樣底層是使用小頂堆實作優先級,出入隊的堆操作也一樣