在并發程式設計中,有時候需要使用線程安全的隊列。如果要實作一個線程安全的隊列有兩 種方式:一種是使用阻塞算法,另一種是使用非阻塞算法。使用阻塞算法的隊列可以用一個鎖 (入隊和出隊用同一把鎖)或兩個鎖(入隊和出隊用不同的鎖)等方式來實作。非阻塞的實作方式則可以使用循環CAS的方式來實作。
阻塞隊列
阻塞隊列(BlockingQueue)是一個支援兩個附加操作的隊列。這兩個附加的操作支援阻塞的插入和移除方法。
- 支援阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
- 支援阻塞的移除方法:意思是在隊列為空時,擷取元素的線程會等待隊列變為非空。
應用場景
阻塞隊列常用于生産者和消費者的場景,生産者是向隊列裡添加元素的線程,消費者是 從隊列裡取元素的線程。阻塞隊列就是生産者用來存放元素、消費者用來擷取元素的容器。
插入和移除操作的4中處理方式
- 抛出異常:當隊列滿時,如果再往隊列裡插入元素,會抛出IllegalStateException(“Queue full”)異常。當隊列空時,從隊列裡擷取元素會抛出NoSuchElementException異常。
- 傳回特殊值:當往隊列插入元素時,會傳回元素是否插入成功,成功傳回true。如果是移除方法,則是從隊列裡取出一個元素,如果沒有則傳回null。
- 一直阻塞:當阻塞隊列滿時,如果生産者線程往隊列裡put元素,隊列會一直阻塞生産者線程,直到隊列可用或者響應中斷退出。當隊列空時,如果消費者線程從隊列裡take元素,隊列會阻塞消費者線程,直到隊列不為空。
- 逾時退出:當阻塞隊列滿時,如果生産者線程往隊列裡插入元素,隊列會阻塞生産者線程 一段時間,如果超過了指定的時間,生産者線程就會退出。
注意: 如果是無界阻塞隊列,隊列不可能會出現滿的情況,是以使用put或offer方法永 遠不會被阻塞,而且使用offer方法時,該方法永遠傳回true。
Java裡的阻塞隊列
- ArrayBlockingQueue:一個由數組結構組成的有界阻塞隊列。
- LinkedBlockingQueue:一個由連結清單結構組成的有界阻塞隊列。
- PriorityBlockingQueue:一個支援優先級排序的無界阻塞隊列。
- DelayQueue:一個使用優先級隊列實作的無界阻塞隊列。
- SynchronousQueue:一個不存儲元素的阻塞隊列。
- LinkedTransferQueue:一個由連結清單結構組成的無界阻塞隊列。
- LinkedBlockingDeque:一個由連結清單結構組成的雙向阻塞隊列。
ArrayBlockingQueue
ArrayBlockingQueue是一個用數組實作的有界阻塞隊列,在初始化時需要指定隊列的長度。此隊列按照先進先出(FIFO)的原則對元素進行排序。預設情況下不保證線程公平的通路隊列,但是在初始化的隊列的時候指定阻塞隊列的公平性,如:
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);
。它使用
ReentrantLock
來實作隊列的線程安全。
核心屬性
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
核心方法
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
LinkedBlockingQueue
LinkedBlockingQueue是一個用連結清單實作的有界阻塞隊列。此隊列的預設和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。出隊和入隊使用兩把鎖來實作。
核心屬性
/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
PriorityBlockingQueue
PriorityBlockingQueue是一個支援優先級的無界阻塞隊列。預設情況下元素采取自然順序升序排列。也可以自定義類實作compareTo()方法來指定元素排序規則,或者初始化 PriorityBlockingQueue時,指定構造參數Comparator來對元素進行排序。需要注意的是不能保證 同優先級元素的順序。底層使用數組實作,預設初始容量是11,最大值是
Integer.MAX_VALUE - 8
。容量不夠時會進行擴容
核心方法
// 入隊
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
// 擴容
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
DelayQueue
DelayQueue是一個支援延時擷取元素的無界阻塞隊列。隊列使用PriorityQueue來實作。隊 列中的元素必須實作Delayed接口和Comparable接口,在建立元素時可以指定多久才能從隊列中擷取目前元素。 隻有在延遲期滿時才能從隊列中提取元素。
應用場景
- 緩存系統的設計:可以用DelayQueue儲存緩存元素的有效期,使用一個線程循環查詢 DelayQueue,一旦能從DelayQueue中擷取元素時,表示緩存有效期到了。
- 定時任務排程:使用DelayQueue儲存當天将會執行的任務和執行時間,一旦從 DelayQueue中擷取到任務就開始執行,比如TimerQueue就是使用DelayQueue實作的。
如何實作Delayed接口
DelayQueue隊列的元素必須實作Delayed接口。我們可以參考ScheduledThreadPoolExecutor 裡ScheduledFutureTask類的實作,一共有三步。
第一步:在對象建立的時候,初始化基本資料。使用time記錄目前對象延遲到什麼時候可 以使用,使用sequenceNumber來辨別元素在隊列中的先後順序。代碼如下:
private static final AtomicLong sequencer = new AtomicLong(0);
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
ScheduledFutureTask(Runnable r, V result, long ns, long period){
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
}
第二步:實作getDelay方法,該方法傳回目前元素還需要延時多長時間,機關是納秒,代碼 如下:
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
注意當time小于目前時間時,getDelay會傳回負數,這時才可以出隊。
第三步:實作compareTo方法來指定元素的順序。例如,讓延時時間最長的放在隊列的末 尾。實作代碼如下:
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledThreadPoolExecutor.ScheduledFutureTask) {
ScheduledThreadPoolExecutor.ScheduledFutureTask<?> x = (ScheduledThreadPoolExecutor.ScheduledFutureTask<?>)other;
// 過期時間小的排前面,大的排後面,如果一樣就使用sequenceNumber 來排序。
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
// 快要過期的排在前面
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
如何實作延時阻塞隊列
延時阻塞隊列的實作很簡單,當消費者從隊列裡擷取元素時,如果元素沒有達到延時時 間,就阻塞目前線程。
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) {
if (nanos <= 0)
return null;
else
// 隊列為NULL,阻塞線程直到逾時
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
// 等待時間小于第一個元素的過期時間
if (nanos < delay || leader != null)
// 阻塞線程直到逾時
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待時間大于第一個元素的過期時間,阻塞線程直到第一個元素過期
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
// 喚醒其他阻塞線程
available.signal();
lock.unlock();
}
}
SynchronousQueue
SynchronousQueue是一個不存儲元素的阻塞隊列。每一個put操作必須等待一個take操作, 否則不能繼續添加元素。 它支援公平通路隊列。預設情況下線程采用非公平性政策通路隊列。
SynchronousQueue可以看成是一個傳球手,負責把生産者線程處理的資料直接傳遞給消費 者線程。隊列本身并不存儲任何元素,非常适合傳遞性場景。SynchronousQueue的吞吐量高于 LinkedBlockingQueue和ArrayBlockingQueue。
LinkedTransferQueue
LinkedTransferQueue是一個由連結清單結構組成的無界阻塞TransferQueue隊列。相對于其他阻 塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。
transfer方法
如果目前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法 時),transfer方法可以把生産者傳入的元素立刻transfer(傳輸)給消費者。如果沒有消費者在等 待接收元素,transfer方法會将元素存放在隊列的tail節點,并等到該元素被消費者消費了才返 回。
LinkedBlockingDeque
LinkedBlockingDeque是一個由連結清單結構組成的雙向阻塞隊列。所謂雙向隊列指的是可以 從隊列的兩端插入和移出元素。雙向隊列因為多了一個操作隊列的入口,在多線程同時入隊 時,也就減少了一半的競争。相比其他的阻塞隊列,LinkedBlockingDeque多了addFirst、 addLast、offerFirst、offerLast、peekFirst和peekLast等方法,以First單詞結尾的方法,表示插入、 擷取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入、擷取或移除雙 端隊列的最後一個元素。另外,插入方法add等同于addLast,移除方法remove等效于 removeFirst。但是take方法卻等同于takeFirst,不知道是不是JDK的bug,使用時還是用帶有First 和Last字尾的方法更清楚。
在初始化LinkedBlockingDeque時可以設定容量防止其過度膨脹。另外,雙向阻塞隊列可以運用在“工作竊取”模式中。
參考
《java并發程式設計的藝術》
源碼
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases