文章目錄
-
- 并發 Queue
-
- 非阻塞隊列-ConcurrentLinkedQueue
-
- 簡介
- 源碼分析
- 簡單示例
- 阻塞隊列-BlockingQueue
-
- 常用隊列
- 常用方法
- LinkedBlockingQueue 與 ArrayBlockingQueue 差別
- 簡單示例
- Deque(Double-Ended-Queue)雙向隊列(JDK1.6 以後)
-
- 簡介
- LinkedBlockingDeque
- 參考
并發 Queue
非阻塞隊列-ConcurrentLinkedQueue
簡介
-
是一個基于連結節點的無界線程安全隊列,使用ConcurrentLinkedQueue
(CAS)算法實作Compare And Swap
- 無鎖實作,效率比阻塞隊列更好
- Tomcat 中
中的每個NioEndPoint
裡面就維護一個poller
用來作為緩沖存放任務ConcurrentLinkedQueue<Runnable>
源碼分析
- 屬性: volatile 類型的
節點Node
-
:存放連結清單第一個item為null的節點head
-
:則并不是總指向最後一個節點tail
-
節點内部則維護一個變量Node
用來存放節點的值,item
用來存放下一個節點,進而連結為一個單向無界清單next
private transient volatile Node<E> head;
private transient volatile Node<E> tail;
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
- 方法
-
:内部調用add
offer
-
:添加資料,添加到尾部(tail)offer
public boolean offer(E e) {
checkNotNull(e); e為null則抛出空指針異常
// 構造Node節點構造函數内部調用unsafe.putObject
final Node<E> newNode = new Node<E>(e);
// 從尾節點插入
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 如果q=null說明p是尾節點則插入
if (q == null) {
// p is last node
if (p.casNext(null, newNode)) {
// cas成功說明新增節點已經被放傳入連結表,然後設定目前尾節點(包含head,1,3,5.。。個節點為尾節點)
if (p != t) // hop two nodes at a time
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// 多線程操作時候,由于poll時候會把老的head變為自引用,然後head的next變為新head,是以這裡需要
// 重新找新的head,因為新的head後面的節點才是激活的節點
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
-
:取資料,從頭部擷取資料(head)并且移除poll
public E poll() {
restartFromHead:
// 死循環
for (;;) {
// 死循環
for (Node<E> h = head, p = h, q;;) {
// 儲存目前節點值
E item = p.item;
// 目前節點有值則cas變為null(1)
if (item != null && p.casItem(item, null)) {
//cas成功标志目前節點以及從連結清單中移除
if (p != h) // 類似tail間隔2設定一次頭節點(2)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 目前隊列為空則傳回null(3)
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//自引用了,則重新找新的隊列頭節點(4)
else if (p == q)
continue restartFromHead;
else//(5)
p = q;
}
}
}
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
-
:取資料,從頭部擷取資料(head),不移除。代碼與peek
類似,隻是少了poll
castItem()
-
:擷取目前隊列元素個數,因為使用CAS沒有加鎖是以從調用size函數到傳回結果期間有可能增删元素,導緻統計的元素個數不精确size
-
:都是調用isEmpty
,略有不同first()
Node<E> first() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
boolean hasItem = (p.item != null);
if (hasItem || (q = p.next) == null) {
updateHead(h, p);
return hasItem ? p : null;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
-
:如果隊列裡面存在該元素則删除給元素,如果存在多個則删除第一個,并傳回true,否者傳回 falseremove
簡單示例
// 使用 `Compare And Swap`(CAS) 算法實作無鎖隊列,效率更好
ConcurrentLinkedQueue<String> noBlockQueue = new ConcurrentLinkedQueue<>();
/** 添加方法 */
// add 内部調用 offer 方法
noBlockQueue.add("a");
noBlockQueue.offer("b");
//[a, b]
System.out.println(noBlockQueue);
/** 擷取資料的方法 */
// 擷取資料,并且從隊列移除
System.out.println(noBlockQueue.poll());
System.out.println(noBlockQueue);
// 擷取,但是不會從隊列移除
System.out.println(noBlockQueue.peek());
System.out.println(noBlockQueue);
// isEmpty 和 size 内部都是調用 first()方法。
// 因為使用CAS沒有加鎖,是以是一個不精确的資料
System.out.println(noBlockQueue.isEmpty());
System.out.println(noBlockQueue.size());
輸出:
[a, b]
a
[b]
b
[b]
false
1
阻塞隊列-BlockingQueue
阻塞隊列的主要功能其實并不是在于提高并發時的性能,更多的是簡化多線程之間的資料共享,如生産者-消費者模式
常用隊列
-
:有限隊列,底層數組。内部維護定長數組用于緩存對象,維護兩個整型常量,分别辨別隊列頭部和尾部在數組中的位置。預設非公平鎖ArrayBlockingQueue
-
:可以設定無限,底層連結清單。維護一個資料緩沖隊列(連結清單構成)LinkedBlockingQueue
-
:帶有優先級的阻塞隊列PriorityBlockingQueue
常用方法
-
:增加一個元索,如果隊列已滿,則抛出一個異常boolean add(E e)
IllegalStateException("Queue full")
-
:添加一個元素并傳回true,如果隊列已滿,則傳回false。不阻塞boolean offer(E e)
-
:隊列滿時指定阻塞時間,指定時間内還不能加入傳回falseboolean offer(E e, long timeout, TimeUnit unit)
-
:添加一個元素,如果隊列滿,則阻塞,直到有空間void put(E e)
-
:移除并傳回隊列頭部的元素,如果隊列為空,則阻塞E take()
-
:取出隊首的對象。如果在指定時間内,有資料可取這傳回;逾時後依然沒有資料則傳回nullE poll(long timeout, TimeUnit unit)
-
:移除資料boolean remove(Object o)
-
:一次性從隊列擷取所有可用的資料對象。可以提升擷取資料的效率int drainTo(Collection<? super E> c)
LinkedBlockingQueue 與 ArrayBlockingQueue 差別
- 底層實作不同
-
底層是數組ArrayBlockingQueue
-
底層是連結清單(Node對象)LinkedBlockingQueue
- 隊列容量不同
-
是有限隊列,構造的時候一定要指定初始化大小。當然如果手動指定其ArrayBlockingQueue
為 `Integer.MAX_VALUE`` 也可以了解為無限capacity
-
預設是無限(Integer.MAX_VALUE),也可以手動縮小其容量,指定LinkedBlockingQueue
。capacity
- 在使用無限隊列的過程中,注意當生産大大的快于消費時的 OOM 情況
- 同步實作原理不同(鎖實作不一樣)
-
是一把鎖。即其添加與移除操作使用的是同一個ArrayBlockingQueue
對象,隻是有兩個同步ReenterLock
進行同步。Condition
-
是鎖分離,提高并發吞吐量。添加與移除操作是不同的鎖,由于連結清單的緣故,添加和移除操作分别作用于隊列的前端和尾端,使用兩把不同的鎖分離了添加和移除操作。即take與take之間一把鎖,put與put之間一把鎖LinkedBlockingQueue
-
鎖分離源碼LinkedBlockingQueue
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
-
一把鎖源碼ArrayBlockingQueue
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
....
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
簡單示例
- 添加資料
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(2);
// 1- 增加一個元索,如果隊列已滿,則抛出一個 IllegalStateException: Queue full 異常
blockingQueue.add("a");
System.out.println(blockingQueue);
// blockingQueue.add("a2");
// IllegalStateException: Queue full
// blockingQueue.add("a3");
// 2-添加一個元素并傳回true, 如果隊列已滿,則傳回false,不會有異常
boolean b = blockingQueue.offer("b");
System.out.println(b);
boolean b2 = blockingQueue.offer("b2");
System.out.println(b2);
System.out.println(blockingQueue);
// 3-隊列滿時指定阻塞時間,在規定時間内如果無法添加則傳回false,不會有異常
boolean c = blockingQueue.offer("c", 2, TimeUnit.SECONDS);
System.out.println(c);
boolean c1 = blockingQueue.offer("c1", 2, TimeUnit.SECONDS);
System.out.println(c1);
System.out.println(blockingQueue);
// 清除資料
blockingQueue.clear();
// 4- 添加一個元素,如果隊列滿,則阻塞
blockingQueue.put("d");
System.out.println(blockingQueue);
輸出:
[a]
true
false
[a, b]
false
false
[a, b]
[d]
- 取出資料
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(2);
blockingQueue.add("a");
blockingQueue.add("b");
// 1- 移除并返問隊列頭部的元素, 如果隊列為空,則傳回null,不會有異常
String poll = blockingQueue.poll();
System.out.println(poll);
System.out.println(blockingQueue);
// 2- 有資料直接傳回;隊列空時指定阻塞時間,在規定時間内還是沒有資料,則傳回 null
String poll1 = blockingQueue.poll(2, TimeUnit.SECONDS);
System.out.println(poll1);
String poll2 = blockingQueue.poll(2, TimeUnit.SECONDS);
System.out.println(poll2);
System.out.println(blockingQueue);
blockingQueue.add("a");
blockingQueue.add("b");
// 4-傳回但不移除隊列頭部的元素, 如果隊列為空,則傳回 null,不會有異常
String peek = blockingQueue.peek();
System.out.println(peek);
System.out.println(blockingQueue);
// blockingQueue.clear();
// 6-移除并傳回隊列頭部的元素,如果隊列為空,則阻塞
blockingQueue.take();
System.out.println(blockingQueue);
blockingQueue.clear();
blockingQueue.add("a");
blockingQueue.add("b");
// 7- 批量擷取
ArrayList<String> list = new ArrayList<>();
int i = blockingQueue.drainTo(list);
System.out.println(i);
System.out.println(list);
輸出:
a
[b]
b
null
[]
a
[a, b]
[b]
2
[a, b]
Deque(Double-Ended-Queue)雙向隊列(JDK1.6 以後)
簡介
- Deque雙向隊列,允許在隊列的頭部或者尾部執行出隊和入隊操作
- 與 Queue 相比,具有更加複雜的功能
-
、LinkedList
、ArrayDeque
是實作了LinkedBlockingDeque
接口的常見雙向隊列。Deque
- 由于
基于數組實作,擁有高效的随機通路性能,但是由于其擴充時需要重新配置設定記憶體并進行數組複制,寫性能不如ArrayDeque
LinkedList
-
是線程安全的雙向隊列LinkedBlockingDeque
LinkedBlockingDeque
- 使用連結清單結構,每一個隊列節點都維護一個前驅節點和後驅節點
- 沒有進行讀寫鎖的分離,效率較低
參考
- 源碼位址
- 非阻塞算法在并發容器中的實作
- 并發隊列-無界非阻塞隊列 ConcurrentLinkedQueue 原理探究