一:功能介紹
基于數組的有界阻塞隊列,基于FIFO的存儲模式,支援公平非公平鎖。
二:源碼分析
//數組
final Object[] items;
//出隊索引
int takeIndex;
//入隊索引
int putIndex;
//隊列大小
int count;
//可重入鎖
final ReentrantLock lock;
//等待通知條件
private final Condition notEmpty;
//等待通知條件
private final Condition notFull;
構造函數
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
//初始化數組容量
this.items = new Object[capacity];
//内容采用可重入鎖ReentrantLock實作,支援公平非公平選擇
lock = new ReentrantLock(fair);
//阻塞隊列,等待條件
notEmpty = lock.newCondition();
//阻塞隊列,等待條件
notFull = lock.newCondition();
}
入隊操作
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//可中斷擷取鎖,如果出現了interrupted,不用一直阻塞
lock.lockInterruptibly();
try {
//如果隊列已滿
while (count == items.length)
//入隊線程阻塞
notFull.await();
//插入資料
insert(e);
} finally {
lock.unlock();
}
}
private void insert(E x) {
//将新的資料指派在數組的某一個索引處
items[putIndex] = x;
//重新指派putIndex,設定下一個被取出元素的索引
putIndex = inc(putIndex);
//隊列大小+1
++count;
//喚醒take線程
notEmpty.signal();
}
final int inc(int i) {
//如果隊列滿了,重新初始化為0
return (++i == items.length) ? 0 : i;
}
出隊操作
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//同上,擷取中斷鎖
lock.lockInterruptibly();
try {
//隊列沒有值,阻塞
while (count == 0)
notEmpty.await();
//傳回被取走的資料
return extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
//擷取takeIndex處的元素
E x = this.<E>cast(items[takeIndex]);
//置空takeIndex處的元素,引用不存在,便于GC,釋放記憶體
items[takeIndex] = null;
//重新指派takeIndex,設定下一個被取出的元素
takeIndex = inc(takeIndex);
//隊列大小-1
--count;
//喚醒put線程
notFull.signal();
return x;
}
移除資料
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//從takeIndex處開始計算,每次i加1,最大為隊列最大容量count
for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
//如果移除元素在數組某個下标找到
if (o.equals(items[i])) {
removeAt(i);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(int i) {
final Object[] items = this.items;
//如果準備移除的索引和下一個被取出的元素索引一樣,直接移除
if (i == takeIndex) {
//指派null,便于GC
items[takeIndex] = null;
//重新設定下一個被取出元素的索引
takeIndex = inc(takeIndex);
//如果需要删除的元素索引不是目前被取出的索引
} else {
//一直循環,直到删除為止
for (;;) {
//假設隊列容量是4,目前存了3個元素,即takeIndex=0,putIndex=3,目前我打算删除數組下标為1的元素
// nexti第一次為2
int nexti = inc(i);
if (nexti != putIndex) {
//相當于将隊列往前移
items[i] = items[nexti];
//相當于i+1
i = nexti;
//待删除的索引與待put的索引相等,比如putIndex=2,i=1,inc(i) = 2
} else {
//索引i處置null,偏于GC
items[i] = null;
//重新指派下一個即将放入元素的索引
putIndex = i;
break;
}
}
}
//隊列大小-1
--count;
//喚醒put線程,公平的話按FIFO順序,非公平的話可以搶占
notFull.signal();
}
周遊隊列
public Iterator<E> iterator() {
return new Itr();
}
private class Itr implements Iterator<E> {
//隊列裡面還剩的元素個數
private int remaining;
//下一次調用next()傳回的索引
private int nextIndex;
//下一次調用next()傳回的元素
private E nextItem;
//上一次調用next()傳回的元素
private E lastItem;
//上一次調用next()傳回的索引
private int lastRet;
Itr() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
lastRet = -1;
//隻有隊列裡面還有元素
if ((remaining = count) > 0)
//擷取takeIndex處的元素
nextItem = itemAt(nextIndex = takeIndex);
} finally {
lock.unlock();
}
}
public boolean hasNext() {
return remaining > 0;
}
public E next() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
//如果隊列沒有值
if (remaining <= 0)
throw new NoSuchElementException();
lastRet = nextIndex;
//擷取下一次擷取索引處的元素
E x = itemAt(nextIndex); // check for fresher value
if (x == null) {
x = nextItem; // we are forced to report old value
lastItem = null; // but ensure remove fails
}
else
//将剛擷取的元素當做上一次擷取的元素
lastItem = x;
//當下一次擷取的元素不存在的時候
while (--remaining > 0 && // skip over nulls
(nextItem = itemAt(nextIndex = inc(nextIndex))) == null)
;
return x;
} finally {
lock.unlock();
}
}
public void remove() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
int i = lastRet;
if (i == -1)
throw new IllegalStateException();
lastRet = -1;
E x = lastItem;
lastItem = null;
// only remove if item still at index
if (x != null && x == items[i]) {
boolean removingHead = (i == takeIndex);
removeAt(i);
if (!removingHead)
nextIndex = dec(nextIndex);
}
} finally {
lock.unlock();
}
}
}