天天看點

Java并發阻塞隊列之ArrayBlockingQueueArrayBlockingQueueJava并發阻塞隊列之ArrayBlockingQueueArrayBlockingQueue

文章目錄

  • Java并發阻塞隊列之ArrayBlockingQueueArrayBlockingQueue
    • JUC簡介
    • ArrayBlockingQueue簡介
    • ArrayBlockingQueue函數清單
    • 源代碼分析
    • 加入隊列
    • 取出隊列
    • 實戰

Java并發阻塞隊列之ArrayBlockingQueueArrayBlockingQueue

JUC簡介

在 Java 5.0 提供了java.util.concurrent(簡稱JUC)包,在此包中增加了在并發程式設計中很常用的工具類,用于定義類似于線程的自定義子系統,包括線程池,異步IO和輕量級任務架構;還提供了設計用于多線程上下文中的Collection實作等;

今天要講的ArrayBlockingQueue便是JUC包下的一個工具類。

ArrayBlockingQueue簡介

ArrayBlockingQueue是數組實作的線程安全的有界的阻塞隊列。線程安全是指類内部通過“互斥鎖”保護競争資源,實作多線程對競争資源的互斥通路。

“有界”則是指ArrayBlockingQueue對應的數組是有界限且固定的,在建立對象時由構造函數指定,一旦指定則無法更改。

阻塞隊列,是指多線程通路競争資源時,當競争資源已被某線程擷取時,其它要擷取該資源的線程需要阻塞等待;

ArrayBlockingQueue是按FIFO(先進先出)原則對元素進行排序,元素都是從尾部插入到隊列,從頭部開始傳回。

ArrayBlockingQueue函數清單

// 建立一個帶有給定的(固定)容量和預設通路政策的ArrayBlockingQueue。
ArrayBlockingQueue(int capacity)
// 建立一個具有給定的(固定)容量和指定通路政策的ArrayBlockingQueue。
ArrayBlockingQueue(int capacity, boolean fair)
// 建立一個具有給定的(固定)容量和指定通路政策的 ArrayBlockingQueue,它最初包含給定collection的元素,并以collection疊代器的周遊順序添加元素。
ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c)

// 将指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量),在成功時傳回true,如果此隊列已滿,則抛出IllegalStateException。
boolean add(E e)
// 自動移除此隊列中的所有元素。
void clear()
// 如果此隊列包含指定的元素,則傳回true。
boolean contains(Object o)
// 移除此隊列中所有可用的元素,并将它們添加到給定collection中。
int drainTo(Collection<? super E> c)
// 最多從此隊列中移除給定數量的可用元素,并将這些元素添加到給定 collection中。
int drainTo(Collection<? super E> c, int maxElements)
// 傳回在此隊列中的元素上按适當順序進行疊代的疊代器。
Iterator<E> iterator()
// 将指定的元素插入到此隊列的尾部(如果立即可行且不會超過該隊列的容量),在成功時傳回 true,如果此隊列已滿,則傳回false。
boolean offer(E e)
// 将指定的元素插入此隊列的尾部,如果該隊列已滿,則在到達指定的等待時間之前等待可用的空間。
boolean offer(E e, long timeout, TimeUnit unit)
// 擷取但不移除此隊列的頭;如果此隊列為空,則傳回null。
E peek()
// 擷取并移除此隊列的頭,如果此隊列為空,則傳回null。
E poll()
// 擷取并移除此隊列的頭部,在指定的等待時間前等待可用的元素(如果有必要)。
E poll(long timeout, TimeUnit unit)
// 将指定的元素插入此隊列的尾部,如果該隊列已滿,則等待可用的空間。
void put(E e)
// 傳回在無阻塞的理想情況下(不存在記憶體或資源限制)此隊列能接受的其他元素數量。
int remainingCapacity()
// 從此隊列中移除指定元素的單個執行個體(如果存在)。
boolean remove(Object o)
// 傳回此隊列中元素的數量。
int size()
// 擷取并移除此隊列的頭部,在元素變得可用之前一直等待(如果有必要)。
E take()
// 傳回一個按适當順序包含此隊列中所有元素的數組。
Object[] toArray()
// 傳回一個按适當順序包含此隊列中所有元素的數組;傳回數組的運作時類型是指定數組的運作時類型。
<T> T[] toArray(T[] a)
// 傳回此 collection 的字元串表示形式。
String toString()
           

源代碼分析

構造函數

ArrayBlockingQueue提供了三個構造函數。

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}
           

其中,第一個構造函數隻需指定隊列(數組)初始化大小,這正是前面提到的“有界”的邊界所在。同時,它調用了第二個構造函數,預設将fair參數傳值為false。

fair是“可重入的獨占鎖(ReentrantLock)”的類型。fair為true,表示是公平鎖;fair為false,表示是非公平鎖。公平性通常會降低吞吐量,但是減少了可變性和避免了“不平衡性”。

構造函數中this.items對應的代碼為:

/** The queued items */
final Object[] items;
           

這是存儲阻塞隊列資料的數組。在第三個構造函數中提供了初始化隊列數組中資料的方法。

加入隊列

ArrayBlockingQueue提供了4個方法将元素添加入隊列。

  • add(E e) :如果立即可行且不會超過該隊列的容量,将指定的元素插入到隊列的尾部。成功傳回true,隊列已滿則抛出IllegalStateException(“Queue full”)異常。
  • offer(E e) :如果立即可行且不會超過該隊列的容量,将指定的元素插入到此隊列的尾部。成功傳回true,隊列已滿則傳回false。
  • offer(E e, long timeout, TimeUnit unit) :将指定的元素插入此隊列的尾部,如果該隊列已滿,則在到達指定的等待時間之前等待可用的空間
  • put(E e) :将指定的元素插入此隊列的尾部,如果隊列已滿則等待可用的空間。成功則傳回true,等待逾時則傳回false。

源代碼說明:

// 方法一
public boolean add(E e) {
    return super.add(e);
}
// 方法二
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
// 方法三
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();  // 一直等到擷取鎖
    try {
        if (count == items.length)  //假如目前容納的元素個數已經等于數組長度,那麼傳回false
            return false;
        else {
            enqueue(e);		// 将元素插入到隊列中,傳回true
            return true;
        }
    } finally {
        lock.unlock();		//釋放鎖
    }
}
           

add方法調用了父類的add方法(方法二),通過父類的add方法可以得出,最終還是調用了offer方法(方法三)。可以看出,父類方法調用offer之後,如果offer傳回false,則表示隊列已滿,父類方法會抛出異常。

而offer方法首先校驗添加的對象是否為null,如果null則直接抛出空指針異常。然後獲得鎖進行隊列大小(count記錄了隊列中元素的個數)比較,如果目前隊列中的元素個數與count相等,則傳回false,不進行插入。否則,将元素插入隊列,并傳回true。

下面再看一下offer中調用的enqueue方法:

private void enqueue(E x) {	

    //調用enqueue的方法都已經進行過同步處理
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;		//putIndex是下一個被添加元素的坐标
    if (++putIndex == items.length)	
    //putIndex+1, 并且與數組長度進行比較,相同則從數組開頭
        putIndex = 0;			//插入元素,這就是循環數組的奧秘了
    count++;				//目前元素總量+1
    notEmpty.signal();			//給等到在數組非空的線程一個信号,喚醒他們。
}
           

enqueue方法才是所有添加隊列方法真正調用來操作添加的方法,其中putIndex是下一個被添加元素的坐标。整個方法的業務邏輯是這樣的:首先将待添加的元素添加到putIndex所在的位置,并且對putIndex進行自增(指向下一個待添加的位置)。然後比較下一個待添加的位置是否和數組的長度相同,如果相同則将putIndex指向數組開頭(進入此方法的前提條件是隊列數組未滿)。然後隊列總量加1。

通過這段代碼我們就可以真正了解到ArrayBlockingQueue是如何循環使用數組的。首先建立一個定長空數組,然後依次填滿數組的0,1,2,……,items.length-1 位置。與此同時,隊列中的0,1,2,……位置的元素也在不停的被消費掉。當數組的items.length-1也被填充了元素,次數隊列依舊未滿,那麼新增的元素将放置在哪裡?對了,就是像上面的代碼一樣,會從數組的0坐标重新依次開始添加新的元素。通過這種方式,ArrayBlockingQueue實作了在定長數組下FIFO的隊列。

取出隊列

ArrayBlockingQueue提供了以下方法支援取出隊列:

  • poll() :擷取并移除此隊列的頭,如果隊列為空,則傳回null。
  • poll(long timeout, TimeUnit unit) :擷取并移除此隊列的頭部,在指定的等待時間前等待可用的元素,逾時則傳回null。
  • remove(Object o) :從此隊列中移除指定元素的單個執行個體(如果存在多個則隻移除第一個)。如果不存在要移除的元素則傳回false。
  • take() :擷取并移除此隊列的頭部,如果隊列為空,則一直等待可用元素,也就是說必須要拿到一個元素,除非線程中斷。
  • peek():擷取隊列中takeIndex(待擷取元素索引)位置的元素,如果為null則傳回空。

源代碼:

public E poll() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (count == 0) ? null : dequeue();
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0) {
            if (nanos <= 0)
                return null;
            nanos = notEmpty.awaitNanos(nanos);
        }
        return dequeue();
    } finally {
        lock.unlock();
    }
}

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return itemAt(takeIndex); // null when queue is empty
    } finally {
        lock.unlock();
    }
}

public boolean remove(Object o) {
    if (o == null) return false;
    final Object[] items = this.items;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count > 0) {
            final int putIndex = this.putIndex;
            int i = takeIndex;
            do {
                if (o.equals(items[i])) {
                    removeAt(i);
                    return true;
                }
                if (++i == items.length)
                    i = 0;
            } while (i != putIndex);
        }
        return false;
    } finally {
        lock.unlock();
    }
}
           

下面重點分析一下取出隊列原因調用的dequeue方法:

/**
 * Extracts element at current take position, advances, and signals.
 * Call only when holding lock.
 */
private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    // 取出指定元素
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    // 并将取出元素索引内容置為null
    items[takeIndex] = null;
    // 将待取出索引+1,并與隊列長度做比較,如果超出數組長度則從0重新開始
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    // 如果疊代器不為null,則進行疊代處理
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}
           

在enqueue中了解了添加元素進入隊列的操作之後就不難了解從隊列中取出資料的過程了。首先調用dequeue的取出操作,都會先将元素取出,然後再将數組對應位置置null。然後對takeIndex的位置進行後移1位,如果takeIndex處于數組的最後一位,則重新從0開始。

實戰

在學習了基礎理論知識之後,我們用一個執行個體來練習一下。

package com.secbro2.juc;

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;

/**
 * @author zzs
 */
public class ArrayBlockingQueueDemo {

	private static Queue<String> queue = new ArrayBlockingQueue<String>(20);

	public static void main(String[] args) {
		new QueueThread("QTA").start();
		new QueueThread("QTB").start();
	}

	private static class QueueThread extends Thread {
		QueueThread(String name) {
			super(name);
		}

		@Override
		public void run() {
			for (int i = 0; i < 6; i++) {
				// 線程名稱+序号
				String str = Thread.currentThread().getName() + "-" + i;
				queue.add(str);
				printQueue();
			}

		}
	}

	private static void printQueue() {
		StringBuilder sb = new StringBuilder();
		for (Object aQueue : queue) {
			sb.append(aQueue).append(",");
		}

		System.out.println(sb.toString());
	}
}
           
QTA-0,QTB-0,
QTA-0,QTB-0,
QTA-0,QTB-0,QTA-1,
QTA-0,QTB-0,QTA-1,QTB-1,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,QTA-4,QTB-4,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,QTA-4,QTB-4,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,QTA-4,QTB-4,QTB-5,
QTA-0,QTB-0,QTA-1,QTB-1,QTA-2,QTB-2,QTA-3,QTB-3,QTA-4,QTB-4,QTB-5,QTA-5,
           

繼續閱讀