阻塞队列(blockingqueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。
阻塞队列提供了四种处理方法:
方法\处理方式
抛出异常
返回特殊值
一直阻塞
超时退出
插入方法
add(e)
offer(e)
put(e)
offer(e,time,unit)
移除方法
remove()
poll()
take()
poll(time,unit)
检查方法
element()
peek()
不可用
抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出illegalstateexception(“queue full”)异常。当队列为空时,从队列里获取元素时会抛出nosuchelementexception异常 。
返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
jdk7提供了7个阻塞队列。分别是
arrayblockingqueue :一个由数组结构组成的有界阻塞队列。
linkedblockingqueue :一个由链表结构组成的有界阻塞队列。
priorityblockingqueue :一个支持优先级排序的无界阻塞队列。
delayqueue:一个使用优先级队列实现的无界阻塞队列。
synchronousqueue:一个不存储元素的阻塞队列。
linkedtransferqueue:一个由链表结构组成的无界阻塞队列。
linkedblockingdeque:一个由链表结构组成的双向阻塞队列。
arrayblockingqueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(fifo)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列:
<code>1</code>
<code>arrayblockingqueue fairqueue =</code><code>new</code> <code>arrayblockingqueue(</code><code>1000</code><code>,</code><code>true</code><code>);</code>
访问者的公平性是使用可重入锁实现的,代码如下:
<code>public</code> <code>arrayblockingqueue(</code><code>int</code> <code>capacity,</code><code>boolean</code> <code>fair) {</code>
<code>2</code>
<code> </code><code>if</code> <code>(capacity <=</code><code>0</code><code>)</code>
<code>3</code>
<code> </code><code>throw</code> <code>new</code> <code>illegalargumentexception();</code>
<code>4</code>
<code> </code><code>this</code><code>.items =</code><code>new</code> <code>object[capacity];</code>
<code>5</code>
<code> </code><code>lock =</code><code>new</code> <code>reentrantlock(fair);</code>
<code>6</code>
<code> </code><code>notempty = lock.newcondition();</code>
<code>7</code>
<code> </code><code>notfull = lock.newcondition();</code>
<code>8</code>
<code>}</code>
linkedblockingqueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为integer.max_value。此队列按照先进先出的原则对元素进行排序。
priorityblockingqueue是一个支持优先级的无界队列。默认情况下元素采取自然顺序排列,也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。
delayqueue是一个支持延时获取元素的无界阻塞队列。队列使用priorityqueue来实现。队列中的元素必须实现delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。我们可以将delayqueue运用在以下应用场景:
缓存系统的设计:可以用delayqueue保存缓存元素的有效期,使用一个线程循环查询delayqueue,一旦能从delayqueue中获取元素时,表示缓存有效期到了。
定时任务调度。使用delayqueue保存当天将会执行的任务和执行时间,一旦从delayqueue中获取到任务就开始执行,从比如timerqueue就是使用delayqueue实现的。
队列中的delayed必须实现compareto来指定元素的顺序。比如让延时时间最长的放在队列的末尾。实现代码如下:
<code>01</code>
<code>public</code> <code>int</code> <code>compareto(delayed other) {</code>
<code>02</code>
<code> </code><code>if</code> <code>(other ==</code><code>this</code><code>)</code><code>// compare zero only if same object</code>
<code>03</code>
<code> </code><code>return</code> <code>0</code><code>;</code>
<code>04</code>
<code> </code><code>if</code> <code>(other</code><code>instanceof</code> <code>scheduledfuturetask) {</code>
<code>05</code>
<code> </code><code>scheduledfuturetask x = (scheduledfuturetask)other;</code>
<code>06</code>
<code> </code><code>long</code> <code>diff = time - x.time;</code>
<code>07</code>
<code> </code><code>if</code> <code>(diff <</code><code>0</code><code>)</code>
<code>08</code>
<code> </code><code>return</code> <code>-</code><code>1</code><code>;</code>
<code>09</code>
<code> </code><code>else</code> <code>if</code> <code>(diff ></code><code>0</code><code>)</code>
<code>10</code>
<code> </code><code>return</code> <code>1</code><code>;</code>
<code>11</code>
<code> </code><code>else</code> <code>if</code> <code>(sequencenumber < x.sequencenumber)</code>
<code>12</code>
<code>13</code>
<code> </code><code>else</code>
<code>14</code>
<code>15</code>
<code> </code><code>}</code>
<code>16</code>
<code> </code><code>long</code> <code>d = (getdelay(timeunit.nanoseconds) -</code>
<code>17</code>
<code> </code><code>other.getdelay(timeunit.nanoseconds));</code>
<code>18</code>
<code> </code><code>return</code> <code>(d ==</code><code>0</code><code>) ?</code><code>0</code> <code>: ((d <</code><code>0</code><code>) ? -</code><code>1</code> <code>:</code><code>1</code><code>);</code>
<code>19</code>
<code> </code><code>}</code>
<b>如何实现delayed接口</b>
我们可以参考scheduledthreadpoolexecutor里scheduledfuturetask类。这个类实现了delayed接口。首先:在对象创建的时候,使用time记录前对象什么时候可以使用,代码如下:
<code>scheduledfuturetask(runnable r, v result,</code><code>long</code> <code>ns,</code><code>long</code> <code>period) {</code>
<code> </code><code>super</code><code>(r, result);</code>
<code> </code><code>this</code><code>.time = ns;</code>
<code> </code><code>this</code><code>.period = period;</code>
<code> </code><code>this</code><code>.sequencenumber = sequencer.getandincrement();</code>
然后使用getdelay可以查询当前元素还需要延时多久,代码如下:
通过构造函数可以看出延迟时间参数ns的单位是纳秒,自己设计的时候最好使用纳秒,因为getdelay时可以指定任意单位,一旦以纳秒作为单位,而延时的时间又精确不到纳秒就麻烦了。使用时请注意当time小于当前时间时,getdelay会返回负数。
<b>如何实现延时队列</b>
延时队列的实现很简单,当消费者从队列里获取元素时,如果元素没有达到延时时间,就阻塞当前线程。
<code>long</code> <code>delay = first.getdelay(timeunit.nanoseconds);</code>
<code> </code><code>if</code> <code>(delay <=</code><code>0</code><code>)</code>
<code> </code><code>return</code> <code>q.poll();</code>
<code> </code><code>else</code> <code>if</code> <code>(leader !=</code><code>null</code><code>)</code>
<code> </code><code>available.await();</code>
synchronousqueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。synchronousqueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合于传递性场景,比如在一个线程中使用的数据,传递给另外一个线程使用,synchronousqueue的吞吐量高于linkedblockingqueue 和 arrayblockingqueue。
linkedtransferqueue是一个由链表结构组成的无界阻塞transferqueue队列。相对于其他阻塞队列linkedtransferqueue多了trytransfer和transfer方法。
transfer方法。如果当前有消费者正在等待接收元素(消费者使用take()方法或带时间限制的poll()方法时),transfer方法可以把生产者传入的元素立刻transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。transfer方法的关键代码如下:
<code>node pred = tryappend(s, havedata);</code>
<code>return</code> <code>awaitmatch(s, pred, e, (how == timed), nanos);</code>
第一行代码是试图把存放当前元素的s节点作为tail节点。第二行代码是让cpu自旋等待消费者消费元素。因为自旋会消耗cpu,所以自旋一定的次数后使用thread.yield()方法来暂停当前正在执行的线程,并执行其他线程。
trytransfer方法。则是用来试探下生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和transfer方法的区别是trytransfer方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。
对于带有时间限制的trytransfer(e e, long timeout, timeunit unit)方法,则是试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。
linkedblockingdeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的你可以从队列的两端插入和移出元素。双端队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,linkedblockingdeque多了addfirst,addlast,offerfirst,offerlast,peekfirst,peeklast等方法,以first单词结尾的方法,表示插入,获取(peek)或移除双端队列的第一个元素。以last单词结尾的方法,表示插入,获取或移除双端队列的最后一个元素。另外插入方法add等同于addlast,移除方法remove等效于removefirst。但是take方法却等同于takefirst,不知道是不是jdk的bug,使用时还是用带有first和last后缀的方法更清楚。在初始化linkedblockingdeque时可以初始化队列的容量,用来防止其再扩容时过渡膨胀。另外双向阻塞队列可以运用在“工作窃取”模式中。
如果队列是空的,消费者会一直等待,当生产者添加元素时候,消费者是如何知道当前队列有元素的呢?如果让你来设计阻塞队列你会如何设计,让生产者和消费者能够高效率的进行通讯呢?让我们先来看看jdk是如何实现的。
使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。通过查看jdk源码发现arrayblockingqueue使用了condition来实现,代码如下:
<code>private</code> <code>final</code> <code>condition notfull;</code>
<code>private</code> <code>final</code> <code>condition notempty;</code>
<code> </code><code>//省略其他代码</code>
<code> </code><code>}</code>
<code>public</code> <code>void</code> <code>put(e e)</code><code>throws</code> <code>interruptedexception {</code>
<code> </code><code>checknotnull(e);</code>
<code> </code><code>final</code> <code>reentrantlock lock =</code><code>this</code><code>.lock;</code>
<code> </code><code>lock.lockinterruptibly();</code>
<code> </code><code>try</code> <code>{</code>
<code> </code><code>while</code> <code>(count == items.length)</code>
<code> </code><code>notfull.await();</code>
<code> </code><code>insert(e);</code>
<code> </code><code>}</code><code>finally</code> <code>{</code>
<code> </code><code>lock.unlock();</code>
<code>20</code>
<code>21</code>
<code>22</code>
<code>23</code>
<code>public</code> <code>e take()</code><code>throws</code> <code>interruptedexception {</code>
<code>24</code>
<code>25</code>
<code>26</code>
<code>27</code>
<code> </code><code>while</code> <code>(count ==</code><code>0</code><code>)</code>
<code>28</code>
<code> </code><code>notempty.await();</code>
<code>29</code>
<code> </code><code>return</code> <code>extract();</code>
<code>30</code>
<code> </code><code>}</code><code>finally</code> <code>{</code>
<code>31</code>
<code>32</code>
<code>33</code>
<code>34</code>
<code>35</code>
<code>private</code> <code>void</code> <code>insert(e x) {</code>
<code>36</code>
<code> </code><code>items[putindex] = x;</code>
<code>37</code>
<code> </code><code>putindex = inc(putindex);</code>
<code>38</code>
<code> </code><code>++count;</code>
<code>39</code>
<code> </code><code>notempty.signal();</code>
<code>40</code>
当我们往队列里插入一个元素时,如果队列不可用,阻塞生产者主要通过locksupport.park(this);来实现
<code>public</code> <code>final</code> <code>void</code> <code>await()</code><code>throws</code> <code>interruptedexception {</code>
<code> </code><code>if</code> <code>(thread.interrupted())</code>
<code> </code><code>throw</code> <code>new</code> <code>interruptedexception();</code>
<code> </code><code>node node = addconditionwaiter();</code>
<code> </code><code>int</code> <code>savedstate = fullyrelease(node);</code>
<code> </code><code>int</code> <code>interruptmode =</code><code>0</code><code>;</code>
<code> </code><code>while</code> <code>(!isonsyncqueue(node)) {</code>
<code> </code><code>locksupport.park(</code><code>this</code><code>);</code>
<code> </code><code>if</code> <code>((interruptmode = checkinterruptwhilewaiting(node)) !=</code><code>0</code><code>)</code>
<code> </code><code>break</code><code>;</code>
<code> </code><code>if</code> <code>(acquirequeued(node, savedstate) && interruptmode != throw_ie)</code>
<code> </code><code>interruptmode = reinterrupt;</code>
<code> </code><code>if</code> <code>(node.nextwaiter !=</code><code>null</code><code>)</code><code>// clean up if cancelled</code>
<code> </code><code>unlinkcancelledwaiters();</code>
<code> </code><code>if</code> <code>(interruptmode !=</code><code>0</code><code>)</code>
<code>reportinterruptafterwait(interruptmode);</code>
继续进入源码,发现调用setblocker先保存下将要阻塞的线程,然后调用unsafe.park阻塞当前线程。
<code>public</code> <code>static</code> <code>void</code> <code>park(object blocker) {</code>
<code> </code><code>thread t = thread.currentthread();</code>
<code> </code><code>setblocker(t, blocker);</code>
<code> </code><code>unsafe.park(</code><code>false</code><code>, 0l);</code>
<code> </code><code>setblocker(t,</code><code>null</code><code>);</code>
unsafe.park是个native方法,代码如下:
<code>public</code> <code>native</code> <code>void</code> <code>park(</code><code>boolean</code> <code>isabsolute,</code><code>long</code> <code>time);</code>
park这个方法会阻塞当前线程,只有以下四种情况中的一种发生时,该方法才会返回。
与park对应的unpark执行或已经执行时。注意:已经执行是指unpark先执行,然后再执行的park。
线程被中断时。
如果参数中的time不是零,等待了指定的毫秒数时。
发生异常现象时。这些异常事先无法确定。
我们继续看一下jvm是如何实现park方法的,park在不同的操作系统使用不同的方式实现,在linux下是使用的是系统方法pthread_cond_wait实现。实现代码在jvm源码路径src/os/linux/vm/os_linux.cpp里的 os::platformevent::park方法,代码如下:
<code>void</code> <code>os::platformevent::park() {</code>
<code> </code><code>int</code> <code>v ;</code>
<code> </code><code>for</code> <code>(;;) {</code>
<code> </code><code>v = _event ;</code>
<code> </code><code>if</code> <code>(atomic::cmpxchg (v-1, &_event, v) == v)</code><code>break</code> <code>;</code>
<code> </code><code>}</code>
<code> </code><code>guarantee (v >= 0,</code><code>"invariant"</code><code>) ;</code>
<code> </code><code>if</code> <code>(v == 0) {</code>
<code> </code><code>// do this the hard way by blocking ...</code>
<code> </code><code>int</code> <code>status = pthread_mutex_lock(_mutex);</code>
<code> </code><code>assert_status(status == 0, status,</code><code>"mutex_lock"</code><code>);</code>
<code> </code><code>guarantee (_nparked == 0,</code><code>"invariant"</code><code>) ;</code>
<code> </code><code>++ _nparked ;</code>
<code> </code><code>while</code> <code>(_event < 0) {</code>
<code> </code><code>status = pthread_cond_wait(_cond, _mutex);</code>
<code> </code><code>// for some reason, under 2.7 lwp_cond_wait() may return etime ...</code>
<code> </code><code>// treat this the same as if the wait was interrupted</code>
<code> </code><code>if</code> <code>(status == etime) { status = eintr; }</code>
<code> </code><code>assert_status(status == 0 || status == eintr, status,</code><code>"cond_wait"</code><code>);</code>
<code> </code><code>-- _nparked ;</code>
<code> </code><code>// in theory we could move the st of 0 into _event past the unlock(),</code>
<code> </code><code>// but then we'd need a membar after the st.</code>
<code> </code><code>_event = 0 ;</code>
<code> </code><code>status = pthread_mutex_unlock(_mutex);</code>
<code> </code><code>assert_status(status == 0, status,</code><code>"mutex_unlock"</code><code>);</code>
<code> </code><code>guarantee (_event >= 0,</code><code>"invariant"</code><code>) ;</code>
<code> </code><code>}</code>
pthread_cond_wait是一个多线程的条件变量函数,cond是condition的缩写,字面意思可以理解为线程在等待一个条件发生,这个条件是一个全局变量。这个方法接收两个参数,一个共享变量_cond,一个互斥量_mutex。而unpark方法在linux下是使用pthread_cond_signal实现的。park 在windows下则是使用waitforsingleobject实现的。
当队列满时,生产者往阻塞队列里插入一个元素,生产者线程会进入waiting (parking)状态。我们可以使用jstack dump阻塞的生产者线程看到这点:
<code>"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]</code>
<code> </code><code>java.lang.thread.state: waiting (parking)</code>
<code> </code><code>at sun.misc.unsafe.park(native method)</code>
<code> </code><code>- parking to wait for <0x0000000140559fe8> (a java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobject)</code>
<code> </code><code>at java.util.concurrent.locks.locksupport.park(locksupport.java:186)</code>
<code> </code><code>at java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobject.await(abstractqueuedsynchronizer.java:2043)</code>
<code> </code><code>at java.util.concurrent.arrayblockingqueue.put(arrayblockingqueue.java:324)</code>
<code> </code><code>at blockingqueue.arrayblockingqueuetest.main(arrayblockingqueuetest.java:11)</code>