阻塞隊列繼承結構說明:
根據繼承結構可知:
1.Array 和 Linked 實作了統一的繼承結構,不同的地方在于内部實作不用阻塞機制和隊列資料存放結構
2.它們都實作了Iterable 接口,表示它們都是可疊代的,即實作了iterator() 方法,iterator()方法會傳回一個Iterator ,Iterator為疊代器 其中主要的方法為next hasNext 等, 那麼隊列中對疊代器的實作使用内部類實作,在ArrayBlockingQueue 中即為Itr 這個私有的内部類,我們發現在每次調用iterator() 方法時都會生成一個Itr , 在ArrayBlockingQueue 中還有一個Itrs内部類 Itrs使用單向連結清單的方式維護生成的多個疊代器,疊代器的實作原理會在其它的部落格中進行剖析,這裡就不多說了
3. ArrayBlockingQueue與ReentrantLock是組合關系,ArrayBlockingQueue中包含一個ReentrantLock對象(lock)。ReentrantLock是可重入的互斥鎖,ArrayBlockingQueue就是根據該互斥鎖實作“多線程對競争資源的互斥通路”。而且,ReentrantLock分為公平鎖和非公平鎖,關于具體使用公平鎖還是非公平鎖,在建立ArrayBlockingQueue時可以指定;而且,ArrayBlockingQueue預設會使用非公平鎖。
主要方法:
要想了解這兩個阻塞隊列,先分析一下隊列提供的主要方法 BlockingQueue
// 将指定元素插入到隊列尾部 如果成功傳回true
// 如果隊列已滿 抛出IllegalStateException 其内部調用的offer
boolean add(E e);
//将指定元素插入隊列尾部,如果成功傳回true 如果隊列已滿傳回false
boolean offer(E e);
//将指定元素插入隊列尾部,如果隊列已滿進行阻塞 直到notFull條件喚醒
void put(E e) throws InterruptedException;
//将指定元素插入隊列尾部,如果隊列已滿則等待指定時間
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//移除并擷取隊列頭部元素,如果隊列為空則阻塞 直到notEmpty條件喚醒
E take() throws InterruptedException;
//移除并擷取隊列頭部元素,如果隊列為空則等待指定時間
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//隊列剩餘可使用容量 items.length - count
int remainingCapacity();
//移除隊列中指定元素 改方法調用removeAt 兩個方法之後會詳細說明,因為remove移除的可能不是頭部元素是以需要隊列中元素重新排列和修改putIndex值
boolean remove(Object o);
//是否包含指定元素
public boolean contains(Object o);
//移除隊列中所有的可用元素,并添加到Collection中
int drainTo(Collection<? super E> c);
//移除指定個數的可用元素,并添加到Collection中,從takeIndex開始
int drainTo(Collection<? super E> c, int maxElements);
//移除并擷取隊列頭部元素,如果為空則抛NoSuchElementException 内部調用poll
E remove();
//移除并擷取隊列頭元素,如果為空則傳回null
E poll();
//隻擷取不移除隊列頭部元素,如果為空則抛NoSuchElementException 内部調用peek
E element();
//隻擷取不移除隊列頭部元素,如果隊列為空則傳回null 内部調用itemAt
E peek();
ArrayBlockingQueue 構造方法和成員表量說明:
//存放隊列元素的數組
final Object[] items;
//下次take, poll, peek or remove 元素的位置
int takeIndex;
//下次 put, offer, or add 元素的位置
int putIndex;
//隊列元素的數量
int count;
//進行隊列同步操作的重入鎖
final ReentrantLock lock;
//不為空條件 當調用take隊列為空時進行wait
private final Condition notEmpty;
//不滿條件 當調用put隊列已滿時進行wait
private final Condition notFull;
//維護隊列疊代器 Itrs 内部使用單向連結清單維護多個疊代器
transient Itrs itrs = null;
提供三個構造方法:
//指定數組容量
public ArrayBlockingQueue(int capacity) {
//預設ReentrantLock 為非公平模式
this(capacity, false);
}
//指定數組容量并設定ReentrantLock 公平非公平模式
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//除了上面兩個設定,還設定隊列初始元素
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
ArrayBlockingQueue中的元素存在公平通路與非公平通路的差別,對于公平通路隊列,被阻塞的線程可以按照阻塞的先後順序通路隊列,即先阻塞的線程先通路隊列。而非公平隊列,當隊列可用時,阻塞的線程将進入争奪通路資源的競争中,也就是說誰先搶到誰就執行,沒有固定的先後順序。
非阻塞方法我們先不說明,先詳細剖析一下兩個阻塞方法put take
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果隊列已滿則進行阻塞等待 即添加到notFull Condition的等待隊列中
while (count == items.length)
notFull.await();
//如果沒滿則直接入隊
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
//索引自增,如果已是最後一個位置,重新設定 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++;
//有元素入隊之後喚醒阻塞在notEmpty Condition 上的take操作
notEmpty.signal();
}
put方法是一個阻塞的方法,如果隊列元素已滿,那麼目前線程将會被notFull條件對象挂起加到等待隊列中,直到隊列有空檔才會喚醒執行添加操作。但如果隊列沒有滿,那麼就直接調用enqueue(e)方法将元素加入到數組隊列中。說白了就是當隊列滿時通過條件對象Condtion來阻塞目前調用put方法的線程,直到線程又再次被喚醒執行。總得來說添加線程的執行存在以下兩種情況,一是,隊列已滿,那麼新到來的put線程将添加到notFull的條件隊列中等待,二是,有移除線程執行移除操作,移除成功同時喚醒put線程,如下圖所示
上圖為隊列已滿進行put操作的示意圖
當有線程進行take操作取出元素,示意圖如下:
接下來我們看下take操作,take操作在隊列為空的時候阻塞,當有put操作時喚醒notEmpty Condition
我們先看下源碼:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果隊列為空進行等待
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
//取值索引到頭之後重置為0
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//喚醒等待在notFull上的put操作
notFull.signal();
return x;
}
從代碼上看邏輯還是很簡單的, 這裡就不畫示意圖了, 邏輯跟put相反
到這裡主要的方法基本上已經都說明了, 但是最後還要說一下remove方法,這個方法的邏輯稍微有點複雜,現在我們剖析一下remove方法
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//不為空才删除,為空則直接傳回false
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
//循環隊列中的可用元素,如果有指定remove的對象則調用removeAt
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
//如果要移除的元素位置正好是takeIndex 即在上面的remove循環中第一次就命中
//這是一種特殊情況,其實就相當于take操作了
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
//重新設定疊代器元素
itrs.elementDequeued();
//如果移除的并非takeIndex元素
} else {
// an "interior" remove
// slide over all others up through putIndex.
final int putIndex = this.putIndex;
//此處循環正是移除的邏輯,根據removeIndex 把removeIndex 後面的可用元素統一向前
//挪動 然後把putIndex之前的值設定為null 重新設定putIndex
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
//對應更新疊代器資料
itrs.removedAt(removeIndex);
}
//喚醒notFull put 操作
notFull.signal();
}
隻看代碼對于移除可能還不是很清除,那我們就畫下圖了解一下:
到此位置ArrayBlockingQueue的實作原理基本上算是說完了,但是關于疊代器的實作,還在在并發操作中疊代器原理,還有在移除時疊代器是如何整理資料的, 這個之後單獨說明