天天看點

Java之集合(二十三)SynchronousQueue

  轉載請注明源出處:http://www.cnblogs.com/lighten/p/7515729.html

1.前言

  本章介紹阻塞隊列SynchronousQueue。之前介紹過LinkedTransferQueue,特點提供了讓生産者知道消費者消費了其産出,沒消費就等待的模式,本章介紹的這個類則必須是生産者生産後消費者消費了才會繼續下去,反之亦然,消費者必須等待生産者産出。SynchronousQueue隻有這一種模式,而LinkedTransferQueue是可選的,SynchronousQueue不存儲元素,像接力棒一樣,沒有交接就一直等。是以其特定是沒有容量,不能peek檢視,如果沒有消費者不能插入,不能周遊,該隊清單現的就像一個空的集合。同樣的,該隊列不接受空元素。預設情況下,線程的等待喚醒是非公平的,可以設定成公平模式,保證線程是先入先出(先到先得)。通常兩種模式的性能差不多,非公平模式可以維持更多的線程,公平模式則支援更高的吞吐量。

2.SynchronousQueue

2.1 實作原理

  該類的實作是基于dual stack和dual queue算法,dual queue在LinkedTransferQueue中介紹過。queue和stack都包含資料節點和請求節點,其特點就是任何操作都能明确目前隊列的所處模式(資料--沒有被消費者消費或請求--沒有生産者)。stack和queue都繼承自抽象類Transferer,其定義了唯一方法transfer用來put或者take,在dual資料結構中,定義成一個方法原因在于put和take操作是對稱的。

  SynchronousQueue的實作與原算法有些不同的地方:1、原算法使用bit-marked指針,這裡使用mode bits,導緻了一系列的改動。2、SynchronousQueue會阻塞線程等待到裝滿。3、通過逾時和中斷,支援取消操作,包括清除所有取消節點/線程,避免垃圾存留或記憶體損耗。

  阻塞操作大多通過LockSupport類的park或unpark方法,除非是在多核CPU上該節點看起來是下一個首個填滿的結點,通過自旋一位。在非常忙碌的隊列中,自旋可以顯著提升吞吐量。cleaning操作在queue和stack中不同,queue中remove操作是O(1)時間,但是stack為O(n)時間。

2.2 資料結構

Java之集合(二十三)SynchronousQueue

  Transferer就是上面所說的抽象類,裡面隻有一個方法。後面也有Stack和Queue的實作。

  NCPUS:目前主機CPU核數

  maxTimedSpins:限時等待阻塞前自旋的次數,單核為0,多核32

  maxUntimedSpins:不限時等待阻塞前自旋的次數,maxTimedSpins * 16

  spinForTimeoutThreshold:納秒數,這個為了更快的自旋而不是使用park時間。初略估計足夠了,預設1000

  transferer:具體使用的實作對象。

Java之集合(二十三)SynchronousQueue

  通過構造函數可以看出,公平模式使用的是queue,非公平模式使用的是stack,預設非公平。

2.3 基本操作

  該類的基本操作都是基于transferer實作的,是以這裡就不進行介紹。

Java之集合(二十三)SynchronousQueue
Java之集合(二十三)SynchronousQueue

  存入取出的不同之處隻在于第一參數是否是null,不為null就是存入,為null就是取出。是以該隊列也不能存入null元素。其它的方法都是空。

2.4 TransferQueue

Java之集合(二十三)SynchronousQueue

  資料結構和之前所講LinkedTransferQueue基本一緻,方法也類似。主要看transfer(E,boolean,long)方法。基本的算法就是循環做兩件事情:1、如果隊列為空或者持有相同的模式的結點,嘗試添加隊列結點,等待fulfilled或cancelled,并傳回比對項。2、如果隊列不為空,放入的和其模式相反,即可以比對就通過CAS操作填充該節點的item字段并出隊列,傳回比對項。

  代碼過長不給出,描述一下相關過長:

  1、通過E來判斷目前調用是一個什麼模式的結點。

  2、死循環處理:

    1.頭尾節點存在null,為初始化進行循環。

    2.隊列為空或模式一緻:

      判斷t是否是目前的尾,不是意味丢失尾,重新循環

      判斷目前尾的下一個是否為null,不為null就是尾結點滞後了,重新設定尾結點,重新循環

      不等待就傳回null

      建立該節點

      設定尾結點的下一個節點失敗,被搶先,重新循環

      成功重置尾結點。

      進行等待指定時間。

      逾時被取消,清除傳回null。

      丢失順序,重置頭

      傳回結果。

    3.隊列不為空且模式不一緻:

      頭結點的下一個節點,如果為空或者頭尾結點被改變了,讀取不一緻重新循環。

      此刻沒有亂序,取出節點的item,進行CAS操作判斷是否被搶先了,被搶先了移除該節點,繼續循環嘗試。

      成功了移除該節點,解除waiter的等待。

2.5 TransferStack

Java之集合(二十三)SynchronousQueue

  stack的結點資料結構,和queue的有些不同,就是多了一個match結點。node有四個方法:1、CAS設定next結點。2、CAS設定match結點,傳回比對結構。3、取消目前結點。4、傳回目前結點是否取消。

  transfer方法的邏輯和queue的類似,stack的transfer循環需要做三件事情:1、如果棧為空或者模式相同,生成結點入棧等待比對,傳回結果或空如果逾時。2、如果棧不為空且模式不同,比對等待的結點,兩個都出棧,傳回比對值。由于其他線程可能執行第3點,比對或者斷開連接配接可能不是必須的。3、如果棧頂元素比對成功,幫助其出棧比對,然後繼續循環。

  整個流程就是上面3點,其他的照着看代碼應該較為簡單,和queue的思路差不多。由于第三點需要幫助其他線程出棧,這個過程可能被其它後到線程搶先,是以是非公平的。

3.使用例子

@Test
	public void testSynchronous() {
		SynchronousQueue<Integer> queue = new SynchronousQueue<>();
		System.out.println(queue.offer(1));	// 立即傳回,必須要有消費者
		System.out.println(queue.poll());	// 立即傳回,必須要有生産者
		long start = System.currentTimeMillis();
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println(Thread.currentThread().getName()+"-" + 
				queue.take()+",耗時:"+(System.currentTimeMillis()-start)); // 沒有生産者一直阻塞
					Thread.sleep(2000);
					System.out.println(Thread.currentThread().getName()+"-" + queue.take());
					Thread.sleep(1500);
					System.out.println(Thread.currentThread().getName()+"-" + queue.poll(1, TimeUnit.SECONDS));
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		},"consumer").start();
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					Thread.sleep(1000);
					System.out.println(Thread.currentThread().getName()+"-等待2被消耗:"+queue.offer(2));
					System.out.println(Thread.currentThread().getName()+"-等待3被消耗:");
					long one = System.currentTimeMillis();
					queue.put(3);
					System.out.println("3被消耗,耗時:" + (System.currentTimeMillis() - one));
					System.out.println(Thread.currentThread().getName()+"-等待4被消耗:" + 
							queue.offer(4, 1, TimeUnit.SECONDS));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		},"prodcuer").start();
		try {
			System.in.read();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}      
Java之集合(二十三)SynchronousQueue