天天看点

[源码]Concurrent包之BlockingQueue、BlockingDeque

Java程序员不懂concurrent包可能很难说的过去,准备搞一系列文章,按照某外国小哥的思路看一遍concurrent包。小哥的文章主要是用法,我就主要看源码了。

BlockingQueue

同步的FIFO容器,主要方法:

[源码]Concurrent包之BlockingQueue、BlockingDeque

实现有五种:ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue

ArrayBlockingQueue

继承自AbstractQueue,用数组存数据,大小固定。真正需要实现的方法是:

- offer:使用ReentrantLock#lock,保证同步

- put:使用ReentrantLock#lockInterruptibly,原因应该是offer一定要有一个返回值,而put可以中断掉。区别

- offer带超时时间:使用ReentrantLock#lockInterruptibly来做数组同步。等待时间是指队列满时等待的时间,用的是一个Condition#awaitNanos,这个Condition(notFull)专门用于判断是否满

- enqueue和dequeue专门用来操作数组(private,无同步),会在这进行Condition的signal和Iterator操作

标准的数组,没有特别的地方,Iteration是用锁同步的。

DelayQueue

只有元素delay时长超出才能take走。继承自AbstractQueue,用PriorityQueue存储对象,ReentrantLock做锁。整体上很像Handler#postDelay的逻辑,最先到时间的Element在队首,每次都看第一个元素是否可用。真正需要实现的方法是:

- offer:使用ReentrantLock#lock,保证同步。如果第一个元素是刚刚offer的,证明最早时间有变化,会通知(Condition#signal)线程查看当前元素是否可用

- take:看第一个元素是否可用,可用返回,不可用使用Condition#awaitNanos等待,等待时间是delay时长。等待时会记录等待的线程

LinkedBlockingQueue

LinkedList和ArrayList的关系,这个的容量可以不限制。实现特色是用了“Two Lock Queue”算法的变种,有一个入队锁、一个出队锁,大小用AtomicInteger减少互锁的可能。看起来性能会比ArrayBlockingQueue好。还是那几个方法:

- offer:仍然是ReentrantLock#lock,大小用AtomicInteger#getAndIncrement增加。如果不满,会notFull.signal(和Array不一样)

- Condition#signal的时候,会把对应的锁锁上。这个细节比较重要

- 随机indexing的时候会锁两个锁,性能比Array更差了

PriorityBlockingQueue

和DelayedQueue很像,只是排序用Comparator,存储使用的是数组,逻辑是堆排的逻辑。锁和ArrayBlockingQueue一样。

SynchronousQueue

是一个队列长度为0的同步队列,用于同步读写线程的动作。使用了Dual stack and dual queue算法。所有操作,都是对一个Transfer对象进行操作的。

大概看了一下思路(不一定参透了道理),基本就是一个队列/栈,保存一个行为,这个行为就像左右括号的匹配,匹配上就去掉一个节点,匹配不上就入队。队列使用链表,所有交换都是使用UNSAFE的位操作,保证原子性。

分为公平和不公平队列,区别在Transfer的存储方式实现不同:

公平-TransferQueue:FIFO

不公平-TransferStack:LIFO

BlockingDeque

双向队列,首先注意是BlockingDeque extends BlockingQueue

方法如下:

[源码]Concurrent包之BlockingQueue、BlockingDeque

实现只有一个LinkedBlockingDeque,就是一个用锁保护起来的双向链表。