天天看點

java 隊列 array_java阻塞隊列之ArrayBlockingQueue

在Java的java.util.concurrent包中定義了和多線程并發相關的操作,有許多好用的工具類,今天就來看下阻塞隊列。阻塞隊列很好的解決了多線程中資料的安全傳輸問題,其中最典型的例子就是客園很好的解決“生産者--消費者”問題。下面來看其中一個實作類ArrayBlockingQueue。看到這個名字,就很好了解這個隊列肯定是使用數組實作的隊列,即使用數組實作的“先進先出”的隊列,下面看其具體的實作。(均為JDK8)

一、構造方法

在ArrayBlockingQueue類中有下面的3個構造方法,

java 隊列 array_java阻塞隊列之ArrayBlockingQueue

1、ArrayBlockingQueue(int)

接收一個整型的參數,這個整型參數指的是隊列的長度,其定義如下,

public ArrayBlockingQueue(intcapacity) {this(capacity, false);

}

可以看到這個方法調用的是ArrayBlockingQueue(int,boolean)方法,那麼看下這個方法,

2、ArrayBlockingQueue(int,boolean)

接收兩個參數,一個整型,一個boolean類型,前邊已經知道整型參數是隊列的長度,那麼boolean類型參數代表什麼意思那,其定義如下,

public ArrayBlockingQueue(int capacity, booleanfair) {if (capacity <= 0)throw newIllegalArgumentException();this.items = newObject[capacity];

lock= newReentrantLock(fair);

notEmpty=lock.newCondition();

notFull=lock.newCondition();

}

可以看到在這個構造方法中進行了相關邏輯實作,對items進行了數組初始化,boolean類型的參數是作為可重入鎖的參數進行初始化,規定可重入鎖是公平還是不公平,預設為false,另外初始化了notEmpty、notFull兩個信号量。

3、ArrayBlockingQueue(int,boolean,Collection extends E>)

接收兩三個參數,第一個整型,第二個boolean類型,第三個集合類型,此構造方法不常用,其定義如下,

public ArrayBlockingQueue(int capacity, booleanfair,

Collection extends E>c) {this(capacity, fair);final ReentrantLock lock = this.lock;

lock.lock();//Lock only for visibility, not mutual exclusion

try{int i = 0;try{for(E e : c) {

checkNotNull(e);

items[i++] =e;

}

}catch(ArrayIndexOutOfBoundsException ex) {throw newIllegalArgumentException();

}

count=i;

putIndex= (i == capacity) ? 0: i;

}finally{

lock.unlock();

}

}

通過上面的三個構造方法可以構造一個ArrayBlockingQueue的隊列,在構造方法中初始化了實列變量,下面是一些執行個體變量,

private static final long serialVersionUID = -817911632652898426L;

//儲存隊列元素的數組

finalObject[] items;

//取出元素的位置

inttakeIndex;

//添加元素的位置

intputIndex;

//隊列中元素的數量

intcount;

//鎖對象

finalReentrantLock lock;

//不空的信号量

private finalCondition notEmpty;

//不滿的信号量

private finalCondition notFull;

transient Itrs itrs = null;

二、隊列的操作

需要使用阻塞隊列,那麼就需要向隊列中添加或取出元素,在ArrayBlocking中已經實作了相關操作,對于添加/取出均是成對出現,提供的方法中有抛出異常、傳回false、線程阻塞等幾種情形。

1、add/peek

add/peek是一對互斥的操作,add向隊列種放入元素,peek取出元素。

1.1、add

add的定義如下

public booleanadd(E e) {return super.add(e);

}

從上面可以看出add方法調用了其父類的實作,父類實作如下

public booleanadd(E e) {if(offer(e))return true;else

throw new IllegalStateException("Queue full");

}

父類實作種調用的offer方法,在offer方法傳回true時,add方法傳回true,其他則抛出“Queue full”的異常。offer方法下面會講到。

1.2、peek

peek方法定義如下

publicE peek() {final ReentrantLock lock = this.lock;

lock.lock();try{return itemAt(takeIndex); //null when queue is empty

} finally{

lock.unlock();

}

}

在peek方法種使用可重入鎖,傳回takeIndex處的元素,前面注釋中寫道,此變量代表的是待取出元素的索引。itemAt方法定義如下,

@SuppressWarnings("unchecked")final E itemAt(inti) {return(E) items[i];

}

此方法未進行任何的判斷直接傳回takeIndex出的數組元素。

2、put/take

put/take是一對互斥的操作,put向隊列種放入元素,take取出元素,其實作方式和add/peek不一樣。

2.1、put

put的定義如下

public void put(E e) throwsInterruptedException {

checkNotNull(e);final ReentrantLock lock = this.lock;

lock.lockInterruptibly();try{while (count ==items.length)

notFull.await();

enqueue(e);

}finally{

lock.unlock();

}

}

此方法會判斷元素是否null,然後判斷目前隊列中的元素數量和隊列的長度,如果二者相等則阻塞目前線程;如果不相等則執行enqueue(e)方法,其定義如下,

private voidenqueue(E x) {//assert lock.getHoldCount() == 1;//assert items[putIndex] == null;

final Object[] items = this.items;

items[putIndex]=x;if (++putIndex ==items.length)

putIndex= 0;

count++;

notEmpty.signal();

}

把待插入的元素放在數組的putIndex位置,如果執行完插入後putIndex等于數組的長度,說明隊列已經滿了,那麼把putIndex的值置為0,即下次插入的位置為0,下次要插入成功的必要條件是取出了一個元素,取出的位置為takeIndex。

2.2、take

take方法是取出一個元素,其定義如下,

public E take() throwsInterruptedException {final ReentrantLock lock = this.lock;

lock.lockInterruptibly();try{while (count == 0)

notEmpty.await();returndequeue();

}finally{

lock.unlock();

}

}

此方法中首先判斷目前隊列的元素數量如果為0,則目前線程進行等待,等待notEmpty.singal(),如果不為空則執行dequeue()方法,其定義如下,

privateE dequeue() {//assert lock.getHoldCount() == 1;//assert items[takeIndex] != null;

final Object[] items = this.items;

@SuppressWarnings("unchecked")

E x=(E) items[takeIndex];

items[takeIndex]= null;if (++takeIndex ==items.length)

takeIndex= 0;

count--;if (itrs != null)

itrs.elementDequeued();

notFull.signal();returnx;

}

此方法取出takeIndex位置的元素,把數組中此位置的引用置為null,判斷takeIndex和數組的長度,如果相等證明,已經取到了最後一個元素,下次再取元素需要從位置0開始,為此把takeIndex置為0。

3、offer(E)/poll

offer/poll是一對互斥的操作,offer向隊列種放入元素,poll取出元素,

3.1、offer

offer的定義如下

public booleanoffer(E e) {

checkNotNull(e);final ReentrantLock lock = this.lock;

lock.lock();try{if (count ==items.length)return false;else{

enqueue(e);return true;

}

}finally{

lock.unlock();

}

}

此方法判斷如果隊列中的元素數量和隊列長度相等,則直接傳回false,否則執行enqueue方法,put方法會将線程挂起,直到被中斷或插入成功。

3.2、poll

poll方法如下

publicE poll() {final ReentrantLock lock = this.lock;

lock.lock();try{return (count == 0) ? null: dequeue();

}finally{

lock.unlock();

}

}

此方法判斷目前隊列中的元素個數如果為0傳回null,否則執行dequeue操作,take方法會将線程挂起,直到被中斷或取出成功。

4、offer(E,long,TimeUnit)/poll(long,TimeUnnit)

這兩個方法是普通offer/poll方法的加強版,在隊列滿時指定了重試的時間,如果超過指定的時間後還是無法添加或取出則傳回false。

4.1、offer

offer方法如下

public boolean offer(E e, longtimeout, TimeUnit unit)throwsInterruptedException {

checkNotNull(e);

//逾時時間long nanos =unit.toNanos(timeout);final ReentrantLock lock = this.lock;

lock.lockInterruptibly();try{while (count ==items.length) {if (nanos <= 0)return false;//超過規定時間,傳回false

nanos=notFull.awaitNanos(nanos);

}

enqueue(e);return true;

}finally{

lock.unlock();

}

}

此方法會在指定的規定時間内一直重試,如果規定時間内無法退出循環即添加元素,則傳回false。

4.2、poll

poll方法如下,

public E poll(long timeout, TimeUnit unit) throwsInterruptedException {long nanos =unit.toNanos(timeout);final ReentrantLock lock = this.lock;

lock.lockInterruptibly();try{while (count == 0) {if (nanos <= 0)return null;

nanos=notEmpty.awaitNanos(nanos);

}returndequeue();

}finally{

lock.unlock();

}

}

此方法同樣是在取出元素時進行規定時間的重試,超過規定時間則傳回null。

三、方法比較

1、添加方法比較

序号

方法名

隊列滿時處理方式

方法傳回值

1

add(E e)

抛出“Queue full”異常

boolean

2

offer(E e)

傳回false

boolean

3

put(E e)

線程阻塞,直到中斷或被喚醒

void

4

offer(E e, long timeout, TimeUnit unit)

在規定時間内重試,超過規定時間傳回false

boolean

2、取出方法比較

序号

方法名

隊列空時處理方式

方法傳回值

1

peek()

傳回null

E

2

poll()

傳回null

E

3

take()

線程阻塞,指定中斷或被喚醒

E

4

poll(long timeout, TimeUnit unit)

在規定時間内重試,超過規定時間傳回null

E

四、總結

以上時關于ArrayBlockingQueue這個阻塞隊列的相關實作及方法介紹,此隊列以數組為載體,配合可重入鎖實作生産線程和消費線程共享資料,ArrayBlockingQueue作為共享池,實作了并發條件下的添加及取出等方法。

有不正之處歡迎指正,感謝!