天天看点

任务取消(Cancellation)

作者:doug lea 译者:丁一

当某个线程中的活动执行失败或想改变运行意图,也许就有必要或想要在其它线程中取消这个线程的活动,而不管这个线程正在做什么。取消会给运行中的线程带来一些无法预料的失败情况。取消操作异步特性相关的设计技巧,让人想起了因系统崩溃和连接断开任何时候都有可能失败的分布式系统的那些技巧。并发程序还要确保多线程共享的对象的状态一致性。

在大多数多线程程序中,取消任务(cancellation)是普遍存在的,常见于:

几乎所有与gui中取消按钮相关的活动。

多媒体演示(如动画循环)中的正常终止活动。

线程中生成的结果不再需要。例如使用多个线程搜索数据库,只要某个线程返回了结果,其它的都可以取消掉。

由于一组活动中的一或多个遇到意外错误或异常导致整组活动无法继续。

脚注:在并发编程中两个l的cancellation最常见。译者注:英语”取消”有两种写法cancelation和cancellation

<a>中断(interruption)</a>

实现取消任务的最佳技术是使用线程的中断状态,这个状态由thread.interrupt设置,可被thread.isinterrupted检测到,通过thread.interrupted清除,有时候抛出interruptedexception异常来响应。

脚注:jdk1.0不支持中断机制。版本间政策与机制(policies and mechanisms)的更新说明对任务取消支持中的不规则行为做出了说明

线程中断起着请求取消活动的作用。但无法阻止有人将其用作它途,而用来作取消操作是预期的用途。基于中断的任务取消依赖于取消者和被取消者间的一个协议以确保跨多线程使用的对象在被取消线程终止的时候不被损坏。大部分(理想情况下是所有的)java.*包中的类都遵守这个协议。

几乎在所有的情况下,取消一个与线程有关系的活动都应当终止对应的线程。但中断机制不会强制线程立马终止。这就给任何被中断的线程一个在终止前做些清理操作的机会,但也给代码强加了及时检查中断状态以及采取合适操作的职责。

延迟甚至忽略任务取消的请求给写出良好响应性且非常健壮的代码提供了途径。因为不会直接将线程中断掉,所以很难或不可能撤销的动作的前面可以作为一个安全点,然后在此安全点检查中断状态。响应中断大部分可选的方式在§3.1.1中有讨论:

继续执行(忽略或清除了中断)可能适用于那些不打算终止的线程;例如,那些对于程序基本功能不可或缺的数据库管理服务。一旦遇到中断,可中止这些特殊的任务,然后允许线程继续执行其它任务。然而,即使在这里,将中断的线程替换成一个处于初始状态的新启动的线程会更易于管理。

突然终止(比如抛出错误)一般适用于提供独立服务、除了run方法中finally子句外无需其它清理操作的线程。但是,当线程执行的服务被其它线程依赖时(见§4.3),就应当以某种形式通知这些依赖的线程或设置状态指示。(异常本身不会自动在线程间传播)

线程中使用的对象被其它线程依赖时必须使用回滚或前滚技术。

在某种程度上,可以通过决定多久用thread.currentthread().isinterrupted()来检查中断状态以控制代码对中断的响应灵敏性。中断状态检查不需要太频繁以免影响程序效率。例如,如果需要取消的活动包含大约10000条指令,每10000条指令做一次取消检查,那么从取消请求到关闭平均会耗费15000条指令。只要活动继续运行没有什么实际的危害,这个数量级可以满足大部分应用的需要。通常,这个理由可以让你将中断检测代码仅放到既方便检测又是重要的程序点。在性能关键型应用中,也许值得构建一个分析模型或收集经验值来准确地决定响应性与吞吐量间的最佳权衡(参见§4.4.1.7)。

object.wait、thread.join、thread.sleep以及它们衍生出的方法都会自动检测中断。这些方法一旦中断就会抛出interruptedexception来中止,然后让线程苏醒并执行与活动取消相关的代码。

按照惯例,应当在抛出interruptedexception时清除中断状态。有时候有必要这样做来支持一些清理工作,但这也可能是错误与混乱之源。当处理完interruptedexception后想要传播中断状态,必须要么重新抛出捕获的interruptedexception,要么通过thread.currentthread().interrupt()重新设置中断状态。如果你的代码调用了其它未正确维持中断状态的代码(例如,忽略interruptedexception又不重设状态),可以能通过维持一个字段来规避问题,这个字段用于保存活动取消的标识,在调用interrupt的时候设置该字段,从有问题的调用中返回时检查该字段。

有两种情况线程会保持休眠而无法检测中断状态或接收interruptedexception:在同步块中和在io中阻塞时。线程在等待同步方法或同步块的锁时不会对中断有响应。但是,如§2.5中讨论的,当需要大幅降低在活动取消期间被卡在锁等待中的几率,可以使用lock工具类。使用lock类的代码阻塞仅是为了访问锁对象本身,而不是这些锁所保护的代码。这些阻塞的耗时天生就很短(尽管时间不能严格保证)。

<a>io和资源撤销(io and resource revocation)</a>

一些io支持类(尤其是java.net.socket及其相关类)提供了在读操作阻塞的时候能够超时的可选途径,在这种情况下就可以在超时后检测中断。java.io中的其它类采用了另一种方式——一种特殊形式的资源撤销。如果某个线程在一个io对象s(如inputstream)上执行s.close(),那么任何其它尝试使用s的线程将收到一个ioexception。io关闭会影响所有使用关闭了的io对象的线程,会导致io对象不可用。如有必要,可以创建一个新io对象来替代关闭了的io对象。

这与其它资源撤销的用途密切相关(如为了安全目的)。该策略也会保护应用免让共享的io对象因其它使用了此io对象的线程被取消而自动变得不可用。大部分java.io中的类不会也不能在出现io异常时清除失败状态。例如,如果在streamtokenizer或objectinputstream操作中间出现了一个底层io异常,没有一个实用的恢复动作能继续保持预期的保障。所以,作为一种策略,jvm不会自动中断io操作。

这给代码强加了额外的职责来处理取消事件。若一个线程正在执行io操作,如果在此io操作期间试图取消该io操作,必须意识到io对象正在使用且关闭该io对象是你想要的行为。如果能接受这种情况,就可以通过关闭io对象和中断线程来完成活动取消。例如:

<a href="http://ifeve.com/cancellation/#viewsource">查看源代码</a>

<code>01</code>

<code>class</code> <code>cancellablereader {                       </code><code>// incomplete</code>

<code>02</code>

<code>    </code><code>private</code> <code>thread readerthread;</code><code>// only one at a time supported</code>

<code>03</code>

<code>    </code><code>private</code> <code>fileinputstream datafile;</code>

<code>04</code>

<code>05</code>

<code>    </code><code>public</code> <code>synchronized</code> <code>void</code> <code>startreaderthread()</code>

<code>06</code>

<code>        </code><code>throws</code> <code>illegalstateexception, filenotfoundexception {</code>

<code>07</code>

<code>        </code><code>if</code> <code>(readerthread !=</code><code>null</code><code>)</code><code>throw</code> <code>new</code> <code>illegalstateexception();</code>

<code>08</code>

<code>            </code><code>datafile =</code><code>new</code> <code>fileinputstream(</code><code>"data"</code><code>);</code>

<code>09</code>

<code>            </code><code>readerthread =</code><code>new</code> <code>thread(</code><code>new</code> <code>runnable() {</code>

<code>10</code>

<code>                </code><code>public</code> <code>void</code> <code>run() { doread(); }</code>

<code>11</code>

<code>            </code><code>});</code>

<code>12</code>

<code>        </code><code>readerthread.start();</code>

<code>13</code>

<code>    </code><code>}</code>

<code>14</code>

<code>15</code>

<code>    </code><code>protected</code> <code>synchronized</code> <code>void</code> <code>closefile() {</code><code>// utility method</code>

<code>16</code>

<code>        </code><code>if</code> <code>(datafile !=</code><code>null</code><code>) {</code>

<code>17</code>

<code>            </code><code>try</code> <code>{ datafile.close(); }</code>

<code>18</code>

<code>            </code><code>catch</code> <code>(ioexception ignore) {}</code>

<code>19</code>

<code>            </code><code>datafile =</code><code>null</code><code>;</code>

<code>20</code>

<code>        </code><code>}</code>

<code>21</code>

<code>22</code>

<code>23</code>

<code>    </code><code>protected</code> <code>void</code> <code>doread() {</code>

<code>24</code>

<code>        </code><code>try</code> <code>{</code>

<code>25</code>

<code>            </code><code>while</code> <code>(!thread.interrupted()) {</code>

<code>26</code>

<code>                </code><code>try</code> <code>{</code>

<code>27</code>

<code>                    </code><code>int</code> <code>c = datafile.read();</code>

<code>28</code>

<code>                    </code><code>if</code> <code>(c == -</code><code>1</code><code>)</code><code>break</code><code>;</code>

<code>29</code>

<code>                    </code><code>else</code> <code>process(c);</code>

<code>30</code>

<code>                </code><code>}</code><code>catch</code> <code>(ioexception ex) {</code>

<code>31</code>

<code>                    </code><code>break</code><code>;</code><code>// perhaps first do other cleanup</code>

<code>32</code>

<code>                </code><code>}</code>

<code>33</code>

<code>            </code><code>}</code>

<code>34</code>

<code>        </code><code>}</code><code>finally</code> <code>{</code>

<code>35</code>

<code>            </code><code>closefile();</code>

<code>36</code>

<code>            </code><code>synchronized</code><code>(</code><code>this</code><code>) { readerthread =</code><code>null</code><code>; }</code>

<code>37</code>

<code>38</code>

<code>39</code>

<code>40</code>

<code>    </code><code>public</code> <code>synchronized</code> <code>void</code> <code>cancelreaderthread() {</code>

<code>41</code>

<code>        </code><code>if</code> <code>(readerthread !=</code><code>null</code><code>) readerthread.interrupt();</code>

<code>42</code>

<code>43</code>

<code>44</code>

<code>}</code>

很多其它取消io的场景源于需要中断那些等待输入而输入却不会或不能及时到来的线程。大部分基于套接字的流,可以通过设置套接字的超时参数来处理。其它的,可以依赖inputstream.available,然后手写自己的带时间限制的轮询循环来避免超时之后还阻塞在io中(见§4.1.5)。这种设计可以使用一种类似于§3.1.1.5中描述的有时间限制的退避重试协议。例如:

<code>class</code> <code>readerwithtimeout {               </code><code>// generic code sketch</code>

<code>    </code><code>// ...</code>

<code>    </code><code>void</code> <code>attemptread(inputstream stream,</code><code>long</code> <code>timeout)</code><code>throws</code><code>... {</code>

<code>        </code><code>long</code> <code>starttime = system.currenttimemillis();</code>

<code>            </code><code>for</code> <code>(;;) {</code>

<code>                </code><code>if</code> <code>(stream.available() &gt;</code><code>0</code><code>) {</code>

<code>                    </code><code>int</code> <code>c = stream.read();</code>

<code>                    </code><code>if</code> <code>(c != -</code><code>1</code><code>) process(c);</code>

<code>                    </code><code>else</code> <code>break</code><code>;</code><code>// eof</code>

<code>                </code><code>}</code><code>else</code> <code>{</code>

<code>                    </code><code>try</code> <code>{</code>

<code>                        </code><code>thread.sleep(</code><code>100</code><code>);</code><code>// arbitrary fixed back-off time</code>

<code>                    </code><code>}</code><code>catch</code> <code>(interruptedexception ie) {</code>

<code>                        </code><code>/* ... quietly wrap up and return ... */</code>

<code>                    </code><code>}</code>

<code>                    </code><code>long now = system.currenttimemillis();</code>

<code>                    </code><code>if (now - starttime &gt;= timeout) {</code>

<code>                        </code><code>/* ... fail ...*/</code>

<code>        </code><code>} catch (ioexception ex) { /* ... fail ... */</code> <code>}</code>

脚注:有些jdk发布版本也支持interruptedioexception,但只是部分实现了且仅限于某些平台。在本文撰写之时,未来版本打算停止对其支持,部分原因是由于io对象不可用会引起不良后果。但既然interruptedioexception定义为ioexception的一个子类,这种设计的工作方式与包含interruptedioexception支持的版本上描述的相似,尽管存在额外的不确定性:中断可能抛出interruptedioexception或interruptedexception。捕获interruptedioexception然后将其作为一个interruptedexception重新抛出能部分解决该问题。

<a>异步终止(asynchronous termination)</a>

stop方法起初包含在thread类中,但是已经不推荐使用了。thread.stop会导致不管线程正在做什么就突然抛出一个threaddeath异常。(与interrupt类似,stop不会中止锁等待或io等待。但与interrupt不同的是,它不严格保证会中止wait,sleep或join)

这会是个非常危险的操作。因为thread.stop产生异步信号,某些操作由于程序安全和对象一致性必须回滚或前滚,而活动正在执行这些操作或代码段时可能被终止掉。看下面例子:

<code>class</code> <code>c {                                        </code><code>// fragments</code>

<code>    </code><code>private</code> <code>int</code> <code>v; </code><code>// invariant: v &gt;= 0</code>

<code>    </code><code>synchronized</code> <code>void</code> <code>f() {</code>

<code>        </code><code>v = -</code><code>1</code>  <code>;  </code><code>// temporarily set to illegal value as flag</code>

<code>        </code><code>compute(); </code><code>// possible stop point (*)</code>

<code>        </code><code>v =</code><code>1</code><code>;     </code><code>// set to legal value</code>

<code>    </code><code>synchronized</code> <code>void</code> <code>g() {</code>

<code>        </code><code>while</code> <code>(v !=</code><code>0</code><code>) {</code>

<code>            </code><code>--v;</code>

<code>            </code><code>something();</code>

如果thread.stop碰巧导致(*)行终止,对象就被破坏了:线程一终止,对象将保持在不一致状态,因为变量v被设了一个非法的值。其它线程在该对象上的任何调用会执行不想要的或危险的操作。例如,这里g方法中的循环将自旋2*integer.max_value次。stop让回滚或前滚恢复技术的使用变得极其困难。乍一看,这个问题看起来不太严重 —— 毕竟,调用compute抛出的任何未捕获异常都会破坏状态。但是,thread.stop的后果更隐蔽,因为在可能忽略了threaddeath异常(由thread.stop抛出)而仍传播取消请求的方法中你什么也做不了。而且,除非在每行代码后都放一个catch(threaddeath),否则就没办法准确恢复当前对象的状态,所以可能碰到未检测到的破坏。相比之下,通常可以将代码写的健壮些,不用大费周章就能消除或处理其它类型的运行时异常。

换而言之,禁用thread.stop不是为了修复它有缺陷的逻辑,而是纠正对其功能的错误认识。不可能允许所有方法的每条字节码都能出现取消操作导致的异常(底层操作系统代码开发者非常熟悉这个事实。即使程序非常短,很小的异步取消安全的例程也会是个艰巨的任务。)

注意,任意正在执行的方法可以捕获并忽略由stop导致的threaddeath异常。这样的话,stop就和interrupt一样不能保证线程会被终止,这更危险。任何stop的使用都暗含着开发者评估过试图突然终止某个活动带来的潜在危害比不这样做的潜在危害更大。

<a>资源控制(resource control)</a>

活动取消可能出现在可装载和执行外部代码的任一系统的设计中。试图取消未遵守标准约定的代码面临着难题。外部代码也许完全忽略了中断,甚至是捕获threaddeath异常后将其丢弃,在这种情况下调用thread.interrupt和thread.stop将不会有什么效果。

你无法精确控制外来代码的行为及其耗时。但能够且应当使用标准的安全措施来限制不良后果。一种方式是创建和使用一个securitymanager,当某个线程运行的时间太长,就拒绝所有对受检资源的请求。(细节内容超出本书范围,参考推荐读物。)这种形式的资源拒绝同§3.1.2.2中讨论的资源撤销策略一起能够阻止外部代码执行任一与其它应当继续执行的线程竞争资源。副作用就是这些措施经常最终会导致线程因异常而挂掉。

此外,可以调用某个线程的setpriority(thread.min_priority)将cpu资源的竞争降到最小。可以用一个securitymanager来阻止该线程将优先级提高。

<a>多步取消(multiphase cancellation)</a>

有时候,即使取消的是普通的代码,损害也比通常的更大。为应付这种可能性,可以建立一个通用的多步取消功能,尽可能尝试以破坏性最小的方式来取消任务,如果稍候还没有终止,再使用一种破坏性较大的方式。

在大多数操作系统进程级,多步取消是一种常见的模式。例如,它用在unix关闭期间,先尝试使用kill -1终止任务,若有必要随后再使用kill -9.大多数win系统中的任务管理器也使用了类似的策略。

这里有个简单版本的示例。(thread.join使用方面的更多细节参见§4.3.2.)

<code>class</code> <code>terminator {</code>

<code>    </code><code>// try to kill; return true if known to be dead</code>

<code>    </code><code>static</code> <code>boolean</code> <code>terminate(thread t,</code><code>long</code> <code>maxwaittodie) {</code>

<code>        </code> 

<code>        </code><code>if</code> <code>(!t.isalive())</code><code>return</code> <code>true</code><code>; </code><code>// already dead</code>

<code>        </code><code>// phase 1 -- graceful cancellation</code>

<code>        </code><code>t.interrupt();      </code>

<code>        </code><code>try</code> <code>{ t.join(maxwaittodie); }</code>

<code>        </code><code>catch</code><code>(interruptedexception e){}</code><code>//  ignore</code>

<code>        </code><code>if</code> <code>(!t.isalive())</code><code>return</code> <code>true</code><code>; </code><code>// success</code>

<code>        </code><code>// phase 2 -- trap all security checks</code>

<code>        </code><code>thesecuritymgr.denyallchecksfor(t);</code><code>// a made-up method</code>

<code>        </code><code>catch</code><code>(interruptedexception ex) {}</code>

<code>        </code><code>if</code> <code>(!t.isalive())</code><code>return</code> <code>true</code><code>;</code>

<code>        </code><code>// phase 3 -- minimize damage</code>

<code>        </code><code>t.setpriority(thread.min_priority);</code>

<code>        </code><code>return</code> <code>false</code><code>;</code>

注意这里的terminate方法本身忽略了中断。这表明取消操作所做的这种策略选择一旦开始就必须继续。取消正在执行的取消操作,会给处理已经开始的与终止相关的清理带来另外一些问题。

因不同jvm实现中thread.isalive的行为不尽相同(参见§1.1.2),当join因线程结束返回后,在线程完全死掉之前isalive还有可能返回true。