天天看点

Java之集合(二十三)SynchronousQueue

  转载请注明源出处:http://www.cnblogs.com/lighten/p/7515729.html

1.前言

  本章介绍阻塞队列SynchronousQueue。之前介绍过LinkedTransferQueue,特点提供了让生产者知道消费者消费了其产出,没消费就等待的模式,本章介绍的这个类则必须是生产者生产后消费者消费了才会继续下去,反之亦然,消费者必须等待生产者产出。SynchronousQueue只有这一种模式,而LinkedTransferQueue是可选的,SynchronousQueue不存储元素,像接力棒一样,没有交接就一直等。所以其特定是没有容量,不能peek查看,如果没有消费者不能插入,不能遍历,该队列表现的就像一个空的集合。同样的,该队列不接受空元素。默认情况下,线程的等待唤醒是非公平的,可以设置成公平模式,保证线程是先入先出(先到先得)。通常两种模式的性能差不多,非公平模式可以维持更多的线程,公平模式则支持更高的吞吐量。

2.SynchronousQueue

2.1 实现原理

  该类的实现是基于dual stack和dual queue算法,dual queue在LinkedTransferQueue中介绍过。queue和stack都包含数据节点和请求节点,其特点就是任何操作都能明确当前队列的所处模式(数据--没有被消费者消费或请求--没有生产者)。stack和queue都继承自抽象类Transferer,其定义了唯一方法transfer用来put或者take,在dual数据结构中,定义成一个方法原因在于put和take操作是对称的。

  SynchronousQueue的实现与原算法有些不同的地方:1、原算法使用bit-marked指针,这里使用mode bits,导致了一系列的改动。2、SynchronousQueue会阻塞线程等待到装满。3、通过超时和中断,支持取消操作,包括清除所有取消节点/线程,避免垃圾存留或内存损耗。

  阻塞操作大多通过LockSupport类的park或unpark方法,除非是在多核CPU上该节点看起来是下一个首个填满的结点,通过自旋一位。在非常忙碌的队列中,自旋可以显著提升吞吐量。cleaning操作在queue和stack中不同,queue中remove操作是O(1)时间,但是stack为O(n)时间。

2.2 数据结构

Java之集合(二十三)SynchronousQueue

  Transferer就是上面所说的抽象类,里面只有一个方法。后面也有Stack和Queue的实现。

  NCPUS:当前主机CPU核数

  maxTimedSpins:限时等待阻塞前自旋的次数,单核为0,多核32

  maxUntimedSpins:不限时等待阻塞前自旋的次数,maxTimedSpins * 16

  spinForTimeoutThreshold:纳秒数,这个为了更快的自旋而不是使用park时间。初略估计足够了,默认1000

  transferer:具体使用的实现对象。

Java之集合(二十三)SynchronousQueue

  通过构造函数可以看出,公平模式使用的是queue,非公平模式使用的是stack,默认非公平。

2.3 基本操作

  该类的基本操作都是基于transferer实现的,所以这里就不进行介绍。

Java之集合(二十三)SynchronousQueue
Java之集合(二十三)SynchronousQueue

  存入取出的不同之处只在于第一参数是否是null,不为null就是存入,为null就是取出。所以该队列也不能存入null元素。其它的方法都是空。

2.4 TransferQueue

Java之集合(二十三)SynchronousQueue

  数据结构和之前所讲LinkedTransferQueue基本一致,方法也类似。主要看transfer(E,boolean,long)方法。基本的算法就是循环做两件事情:1、如果队列为空或者持有相同的模式的结点,尝试添加队列结点,等待fulfilled或cancelled,并返回匹配项。2、如果队列不为空,放入的和其模式相反,即可以匹配就通过CAS操作填充该节点的item字段并出队列,返回匹配项。

  代码过长不给出,描述一下相关过长:

  1、通过E来判断当前调用是一个什么模式的结点。

  2、死循环处理:

    1.头尾节点存在null,为初始化进行循环。

    2.队列为空或模式一致:

      判断t是否是当前的尾,不是意味丢失尾,重新循环

      判断当前尾的下一个是否为null,不为null就是尾结点滞后了,重新设置尾结点,重新循环

      不等待就返回null

      创建该节点

      设置尾结点的下一个节点失败,被抢先,重新循环

      成功重置尾结点。

      进行等待指定时间。

      超时被取消,清除返回null。

      丢失顺序,重置头

      返回结果。

    3.队列不为空且模式不一致:

      头结点的下一个节点,如果为空或者头尾结点被改变了,读取不一致重新循环。

      此刻没有乱序,取出节点的item,进行CAS操作判断是否被抢先了,被抢先了移除该节点,继续循环尝试。

      成功了移除该节点,解除waiter的等待。

2.5 TransferStack

Java之集合(二十三)SynchronousQueue

  stack的结点数据结构,和queue的有些不同,就是多了一个match结点。node有四个方法:1、CAS设置next结点。2、CAS设置match结点,返回匹配结构。3、取消当前结点。4、返回当前结点是否取消。

  transfer方法的逻辑和queue的类似,stack的transfer循环需要做三件事情:1、如果栈为空或者模式相同,生成结点入栈等待匹配,返回结果或空如果超时。2、如果栈不为空且模式不同,匹配等待的结点,两个都出栈,返回匹配值。由于其他线程可能执行第3点,匹配或者断开连接可能不是必须的。3、如果栈顶元素匹配成功,帮助其出栈匹配,然后继续循环。

  整个流程就是上面3点,其他的照着看代码应该较为简单,和queue的思路差不多。由于第三点需要帮助其他线程出栈,这个过程可能被其它后到线程抢先,所以是非公平的。

3.使用例子

@Test
	public void testSynchronous() {
		SynchronousQueue<Integer> queue = new SynchronousQueue<>();
		System.out.println(queue.offer(1));	// 立即返回,必须要有消费者
		System.out.println(queue.poll());	// 立即返回,必须要有生产者
		long start = System.currentTimeMillis();
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					System.out.println(Thread.currentThread().getName()+"-" + 
				queue.take()+",耗时:"+(System.currentTimeMillis()-start)); // 没有生产者一直阻塞
					Thread.sleep(2000);
					System.out.println(Thread.currentThread().getName()+"-" + queue.take());
					Thread.sleep(1500);
					System.out.println(Thread.currentThread().getName()+"-" + queue.poll(1, TimeUnit.SECONDS));
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		},"consumer").start();
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					Thread.sleep(1000);
					System.out.println(Thread.currentThread().getName()+"-等待2被消耗:"+queue.offer(2));
					System.out.println(Thread.currentThread().getName()+"-等待3被消耗:");
					long one = System.currentTimeMillis();
					queue.put(3);
					System.out.println("3被消耗,耗时:" + (System.currentTimeMillis() - one));
					System.out.println(Thread.currentThread().getName()+"-等待4被消耗:" + 
							queue.offer(4, 1, TimeUnit.SECONDS));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		},"prodcuer").start();
		try {
			System.in.read();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}      
Java之集合(二十三)SynchronousQueue