天天看點

JDK多線程基礎(13):并發資料結構(隊列)

文章目錄

    • 并發 Queue
      • 非阻塞隊列-ConcurrentLinkedQueue
        • 簡介
        • 源碼分析
        • 簡單示例
      • 阻塞隊列-BlockingQueue
        • 常用隊列
        • 常用方法
        • LinkedBlockingQueue 與 ArrayBlockingQueue 差別
        • 簡單示例
    • Deque(Double-Ended-Queue)雙向隊列(JDK1.6 以後)
      • 簡介
      • LinkedBlockingDeque
    • 參考

并發 Queue

非阻塞隊列-ConcurrentLinkedQueue

簡介

  1. ConcurrentLinkedQueue

    是一個基于連結節點的無界線程安全隊列,使用

    Compare And Swap

    (CAS)算法實作
  2. 無鎖實作,效率比阻塞隊列更好
  3. Tomcat 中

    NioEndPoint

    中的每個

    poller

    裡面就維護一個

    ConcurrentLinkedQueue<Runnable>

    用來作為緩沖存放任務

源碼分析

  1. 屬性: volatile 類型的

    Node

    節點
  • head

    :存放連結清單第一個item為null的節點
  • tail

    :則并不是總指向最後一個節點
  • Node

    節點内部則維護一個變量

    item

    用來存放節點的值,

    next

    用來存放下一個節點,進而連結為一個單向無界清單
private transient volatile Node<E> head;
private transient volatile Node<E> tail;

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}
           
  1. 方法
  • add

    :内部調用

    offer

  • offer

    :添加資料,添加到尾部(tail)
public boolean offer(E e) {
    checkNotNull(e);    e為null則抛出空指針異常
    
    // 構造Node節點構造函數内部調用unsafe.putObject
    final Node<E> newNode = new Node<E>(e);
    
    // 從尾節點插入
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        // 如果q=null說明p是尾節點則插入
        if (q == null) {
            // p is last node
            if (p.casNext(null, newNode)) {
                // cas成功說明新增節點已經被放傳入連結表,然後設定目前尾節點(包含head,1,3,5.。。個節點為尾節點)
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // 多線程操作時候,由于poll時候會把老的head變為自引用,然後head的next變為新head,是以這裡需要
            // 重新找新的head,因為新的head後面的節點才是激活的節點
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}
           
  • poll

    :取資料,從頭部擷取資料(head)并且移除
public E poll() {
    restartFromHead:
 
    // 死循環
    for (;;) {
 
        // 死循環
        for (Node<E> h = head, p = h, q;;) {
 
            // 儲存目前節點值
            E item = p.item;
 
            // 目前節點有值則cas變為null(1)
            if (item != null && p.casItem(item, null)) {
                //cas成功标志目前節點以及從連結清單中移除
                if (p != h) // 類似tail間隔2設定一次頭節點(2)
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // 目前隊列為空則傳回null(3)
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            //自引用了,則重新找新的隊列頭節點(4)
            else if (p == q)
                continue restartFromHead;
            else//(5)
                p = q;
        }
    }
}

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}
           
  • peek

    :取資料,從頭部擷取資料(head),不移除。代碼與

    poll

    類似,隻是少了

    castItem()

  • size

    :擷取目前隊列元素個數,因為使用CAS沒有加鎖是以從調用size函數到傳回結果期間有可能增删元素,導緻統計的元素個數不精确
  • isEmpty

    :都是調用

    first()

    ,略有不同
Node<E> first() {
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            boolean hasItem = (p.item != null);
            if (hasItem || (q = p.next) == null) {
                updateHead(h, p);
                return hasItem ? p : null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}
           
  • remove

    :如果隊列裡面存在該元素則删除給元素,如果存在多個則删除第一個,并傳回true,否者傳回 false

簡單示例

// 使用 `Compare And Swap`(CAS) 算法實作無鎖隊列,效率更好
ConcurrentLinkedQueue<String> noBlockQueue = new ConcurrentLinkedQueue<>();

/** 添加方法 */
// add 内部調用 offer 方法
noBlockQueue.add("a");
noBlockQueue.offer("b");
//[a, b]
System.out.println(noBlockQueue);

/** 擷取資料的方法 */
// 擷取資料,并且從隊列移除
System.out.println(noBlockQueue.poll());
System.out.println(noBlockQueue);

// 擷取,但是不會從隊列移除
System.out.println(noBlockQueue.peek());
System.out.println(noBlockQueue);

// isEmpty 和 size 内部都是調用 first()方法。
// 因為使用CAS沒有加鎖,是以是一個不精确的資料
System.out.println(noBlockQueue.isEmpty());
System.out.println(noBlockQueue.size());

輸出:
[a, b]
a
[b]
b
[b]
false
1
           

阻塞隊列-BlockingQueue

阻塞隊列的主要功能其實并不是在于提高并發時的性能,更多的是簡化多線程之間的資料共享,如生産者-消費者模式

常用隊列

  1. ArrayBlockingQueue

    :有限隊列,底層數組。内部維護定長數組用于緩存對象,維護兩個整型常量,分别辨別隊列頭部和尾部在數組中的位置。預設非公平鎖
  2. LinkedBlockingQueue

    :可以設定無限,底層連結清單。維護一個資料緩沖隊列(連結清單構成)
  3. PriorityBlockingQueue

    :帶有優先級的阻塞隊列

常用方法

  1. boolean add(E e)

    :增加一個元索,如果隊列已滿,則抛出一個異常

    IllegalStateException("Queue full")

  2. boolean offer(E e)

    :添加一個元素并傳回true,如果隊列已滿,則傳回false。不阻塞
  3. boolean offer(E e, long timeout, TimeUnit unit)

    :隊列滿時指定阻塞時間,指定時間内還不能加入傳回false
  4. void put(E e)

    :添加一個元素,如果隊列滿,則阻塞,直到有空間
  5. E take()

    :移除并傳回隊列頭部的元素,如果隊列為空,則阻塞
  6. E poll(long timeout, TimeUnit unit)

    :取出隊首的對象。如果在指定時間内,有資料可取這傳回;逾時後依然沒有資料則傳回null
  7. boolean remove(Object o)

    :移除資料
  8. int drainTo(Collection<? super E> c)

    :一次性從隊列擷取所有可用的資料對象。可以提升擷取資料的效率

LinkedBlockingQueue 與 ArrayBlockingQueue 差別

  1. 底層實作不同
  • ArrayBlockingQueue

    底層是數組
  • LinkedBlockingQueue

    底層是連結清單(Node對象)
  1. 隊列容量不同
  • ArrayBlockingQueue

    是有限隊列,構造的時候一定要指定初始化大小。當然如果手動指定其

    capacity

    為 `Integer.MAX_VALUE`` 也可以了解為無限
  • LinkedBlockingQueue

    預設是無限(Integer.MAX_VALUE),也可以手動縮小其容量,指定

    capacity

  • 在使用無限隊列的過程中,注意當生産大大的快于消費時的 OOM 情況
  1. 同步實作原理不同(鎖實作不一樣)
  • ArrayBlockingQueue

    是一把鎖。即其添加與移除操作使用的是同一個

    ReenterLock

    對象,隻是有兩個同步

    Condition

    進行同步。
  • LinkedBlockingQueue

    是鎖分離,提高并發吞吐量。添加與移除操作是不同的鎖,由于連結清單的緣故,添加和移除操作分别作用于隊列的前端和尾端,使用兩把不同的鎖分離了添加和移除操作。即take與take之間一把鎖,put與put之間一把鎖
  1. LinkedBlockingQueue

    鎖分離源碼
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
           
  1. ArrayBlockingQueue

    一把鎖源碼
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
....
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull =  lock.newCondition();
           

簡單示例

  1. 添加資料
LinkedBlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>(2);

// 1- 增加一個元索,如果隊列已滿,則抛出一個 IllegalStateException: Queue full 異常
blockingQueue.add("a");
System.out.println(blockingQueue);
// blockingQueue.add("a2");
// IllegalStateException: Queue full
// blockingQueue.add("a3");

// 2-添加一個元素并傳回true, 如果隊列已滿,則傳回false,不會有異常
boolean b = blockingQueue.offer("b");
System.out.println(b);
boolean b2 = blockingQueue.offer("b2");
System.out.println(b2);
System.out.println(blockingQueue);

// 3-隊列滿時指定阻塞時間,在規定時間内如果無法添加則傳回false,不會有異常
boolean c = blockingQueue.offer("c", 2, TimeUnit.SECONDS);
System.out.println(c);
boolean c1 = blockingQueue.offer("c1", 2, TimeUnit.SECONDS);
System.out.println(c1);
System.out.println(blockingQueue);

// 清除資料
blockingQueue.clear();

// 4- 添加一個元素,如果隊列滿,則阻塞
blockingQueue.put("d");
System.out.println(blockingQueue);

輸出:
[a]
true
false
[a, b]
false
false
[a, b]
[d]
           
  1. 取出資料
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(2);

blockingQueue.add("a");
blockingQueue.add("b");

// 1- 移除并返問隊列頭部的元素, 如果隊列為空,則傳回null,不會有異常
String poll = blockingQueue.poll();
System.out.println(poll);
System.out.println(blockingQueue);

// 2- 有資料直接傳回;隊列空時指定阻塞時間,在規定時間内還是沒有資料,則傳回 null
String poll1 = blockingQueue.poll(2, TimeUnit.SECONDS);
System.out.println(poll1);
String poll2 = blockingQueue.poll(2, TimeUnit.SECONDS);
System.out.println(poll2);
System.out.println(blockingQueue);

blockingQueue.add("a");
blockingQueue.add("b");

// 4-傳回但不移除隊列頭部的元素, 如果隊列為空,則傳回 null,不會有異常
String peek = blockingQueue.peek();
System.out.println(peek);
System.out.println(blockingQueue);

//        blockingQueue.clear();

// 6-移除并傳回隊列頭部的元素,如果隊列為空,則阻塞
blockingQueue.take();
System.out.println(blockingQueue);

blockingQueue.clear();
blockingQueue.add("a");
blockingQueue.add("b");

// 7- 批量擷取
ArrayList<String> list = new ArrayList<>();
int i = blockingQueue.drainTo(list);
System.out.println(i);
System.out.println(list);

輸出:
a
[b]
b
null
[]
a
[a, b]
[b]
2
[a, b]
           

Deque(Double-Ended-Queue)雙向隊列(JDK1.6 以後)

簡介

  1. Deque雙向隊列,允許在隊列的頭部或者尾部執行出隊和入隊操作
  2. 與 Queue 相比,具有更加複雜的功能
  3. LinkedList

    ArrayDeque

    LinkedBlockingDeque

    是實作了

    Deque

    接口的常見雙向隊列。
  • 由于

    ArrayDeque

    基于數組實作,擁有高效的随機通路性能,但是由于其擴充時需要重新配置設定記憶體并進行數組複制,寫性能不如

    LinkedList

  • LinkedBlockingDeque

    是線程安全的雙向隊列

LinkedBlockingDeque

  1. 使用連結清單結構,每一個隊列節點都維護一個前驅節點和後驅節點
  2. 沒有進行讀寫鎖的分離,效率較低
JDK多線程基礎(13):并發資料結構(隊列)

參考

  1. 源碼位址
  2. 非阻塞算法在并發容器中的實作
  3. 并發隊列-無界非阻塞隊列 ConcurrentLinkedQueue 原理探究
JDK多線程基礎(13):并發資料結構(隊列)

繼續閱讀