此文被筆者收錄在系列文章 架構師必備(系列) 中
java中沒有提供任何機制,來安全是強迫線程停止手頭的工作,Thread.stop和Thread.suspend方法存在嚴重的缺陷,不能使用。但每個Thread提供了Interruption中斷,一種協作機制來協調線程間的操作和控制。這是JAVA中推薦的方式。程式不應該立即停止,應該采用中斷這種協作機制來處理,正确的做法是:先清除目前程序中的工作,再終止。正常有四種方法:
- 正常結束;
- 設定一個标志位,由外部線程來控制,原理是設定一個volatile變量,使線程池不在建立新線程達到平滑關閉的效果;适合一直在運作的長時間任務;
- 阻塞線程:用interrupt()方法會馬上抛出異常,捕獲到這個線程後break跳出強制關閉;
- 未阻塞線程:用interrupt()方法設定中斷标志位,然後在循環時用isInterrupted()來判斷中斷标志位,其實和自定義标志位一樣原理;
一、任務取消
當外部代碼能在活動自然完成之前,把它更改為完成狀态,被稱為取消。取消的原因很多種可能,比如:使用者請求、限時活動、應用程式設計如此、錯誤、關閉。
java中沒有一種絕對安全停止線程的方法,隻能選擇互相協作的機制,通過協作,使任務和代碼遵循一個統一的協定,用來請求取消。一個可取消的任務必須有取消政策,這個政策是一套程式,規定了不同任務或機制間的協作,保證資料的統一。
@ThreadSafe
public class PrimeGenerator implements Runnable {
private static ExecutorService exec = Executors.newCachedThreadPool();
@GuardedBy("this")
private final List<BigInteger> primes = new ArrayList<BigInteger>();
private volatile boolean cancelled;
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
//讓上一個程式在1秒後停止,但這并不是嚴格的一秒,可能存在誤差。
static List<BigInteger> aSecondOfPrimes() throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
exec.execute(generator);
try {
SECONDS.sleep(1);
} finally {
generator.cancel();
}
return generator.get();//這是一個自定義的非阻塞的方法
}
}
中斷
線程中斷是一個協作機制,一個線程給另一個線程發送信号,通知它在友善或可能的情況下停止正在做的工作,去做其他事情 。但實際上,使用中斷來處理取消之外的任何事情都是不明智的,中斷通常是實作取消最明知的選擇,一般在取消方法中設定中斷狀态。
每個線程都有一個boolean的中斷狀态,Thread包含一些中斷線程的方法:interrupt方法中斷目标線程,isInterrupted傳回目标線程的中斷狀态,interrupted用于清除目前線程的中斷狀态,這是清除中斷狀态唯一的方法。中斷一般不能與可阻塞的函數一起使用。
靜态的interrupted方法應該小心使用,它會清除并發線程的中斷狀态,如果傳回了true,必須進行處理,如果想掩蓋這個中斷,可以抛出 InterruptedException 異常或者再次調用interrupt來儲存中斷狀态。
調用interrupt并不意味着必須停止目标線程正在進行的工作,它僅僅傳遞了請求中斷的資訊,意味着完成目前任務,保證資料結構的統一,然後在下一周期結束。有一些方法對這樣的請求很重視,比如wait,sleep(阻塞方式),當它們接到中斷請求時會抛出一個異常,或者進入時中斷狀态就已經被設定了。是以這兩個方法盡量不要用。
下例中在兩處用到了檢測中斷技術,因為put是個阻塞操作,是以在之前檢測總比在之後檢測性能更好。前提是此時沒有消費者線程或是put是個很耗時的操作,像這種對中斷狀态進行顯式的檢測會對調用可中斷的阻塞方法時很有用外,因為通常我們不能得到期望的響應。
public class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
//這個例子可以很好的處理阻塞操作的中斷問題,通過中斷。
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
/* Allow thread to exit */
}
}
public void cancel() {
interrupt();
}
}
中斷政策
正如需要為任務制定取消政策一樣,也應該定制線程中斷政策。一個中斷政策決定線程如何應對中斷請求--當發現中斷請求時,它會做什麼。
區分任務和線程對中斷的反應是很重要的,任務不會在自己擁有的線程中執行,它們借用屬于服務的線程,比如線程池,如果代碼并不是線程的所有者就應該小心地儲存中斷狀态(如果你給主人打掃房間,主人不在的這段時間你不能把收到的郵件全丢掉,應該收起來待主人回來再處理)。這就是為什麼大多數可阻塞的庫函數,僅僅抛出InterruptedException做為中斷的響應,這也是最合理的政策。
在理中斷時應該儲存中斷狀态,也不能簡單的是把InterruptedException傳遞給調用者,應該在它之後恢複中斷的狀态:Thread.currentThread().interrupt();當檢查到中斷請求時,任務不需要放棄所有的事情,可以選擇推遲直到更合适的時機。這需要記得它已經被請求過中斷了,完成目前正在進行的任務,再抛出中斷異常或指明中斷,這種技術可以保證資料結構不被破壞。
響應中斷
有兩種處理InterruptedException的實用政策:傳遞異常和恢複中斷狀态,使你的方法也成為可中斷的阻塞方法,或者儲存中斷狀态,上層調用者代碼能對其進行處理。
如果不想或不能傳遞 InterruptedException 異常,需要另一種方式儲存中斷請求,因為大多數代碼并不知道它們在哪個線程中運作,并再次調用interrupt來恢複中斷狀态,而不應該掩蓋 InterruptedException 異常,如果你的代碼沒有相應的處理程式,就不應該在catch中捕獲這個異常。過早設定中斷可能會引起無限循環。
不可取消的任務在任務退出前儲存中斷
public Task getNextTask(BlockingQueue<Task> queue) {
boolean interrupted = false;
try {
while (true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
// fall through and retry
}
}
} finally {
if (interrupted)
Thread.currentThread().interrupt();
}
}
在中斷線程之前,應該了解它的中斷政策,且不要在外部線程中安排中斷。如果要在一個專門的線程中中斷任務,這裡用到了jion方法,這個方法有不足之處,它如果傳一個逾時參數,無法确定是由于異常還是因為逾時退出的狀态。
public class TimedRun2 {
private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);
public static void timedRun(final Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
class RethrowableTask implements Runnable {
private volatile Throwable t;
public void run() {
try {
r.run();
} catch (Throwable t) {
this.t = t;
}
}
void rethrow() {
if (t != null)
throw launderThrowable(t);
}
}
RethrowableTask task = new RethrowableTask();
final Thread taskThread = new Thread(task);
taskThread.start();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
taskThread.join(unit.toMillis(timeout));
task.rethrow();
}
}
通過Future取消
Future有一個cancel方法,它需要一個boolean參數,它的傳回值表示取消嘗試是否成功(這僅僅告訴你它是否能接收中斷,而不是任務是否檢測并處理了中斷)。
如果為true并且任務正在一線程中運作,那麼這個線程是應該中斷的。如果是false并且任務還沒啟動的話,那這個任務永遠不會啟動了。除非知道線程的中斷政策,否則不應該中斷線程,這個例子中cancel何時設定true和false需要考慮。
但任務執行線程是由标準的Executor實作建立的,它實作了一個中斷政策,使得任務可以通過中斷被取消,這時cancel是安全的。通過Future來中斷任務并不影響線程池中其它的線程。在一個專門的線程中中斷任務。通過Future來取消任務。
public static void timedRun(Runnable r,
long timeout, TimeUnit unit)
throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// 下面任務會被取消
} catch (ExecutionException e) {
// task中抛出的異常,重抛出
throw launderThrowable(e.getCause());
} finally {
// 如果任務已經取消,是無害的
task.cancel(true); // interrupt if running,如果為false表示如果還沒有啟動的話,不要運作這個任務,用于那些不進行中斷的任務。
}
}
處理不可中斷阻塞
很多可阻塞的庫方法通過提前傳回和抛出InterruptedException來實作對中斷的響應,這使得建構可以響應取消的任務更加容易。但有些阻塞方法或阻塞機制并不響應中斷。但可以通過與中斷類似的手段,來確定可以停止這些線程,前提是我們需要清楚地知道線程為什麼會阻塞。
下例展現了一項用來封裝非标準取消的技術,為了友善終止一個使用者的連接配接或關閉伺服器。重寫了interrupt方法。
public class ReaderThread extends Thread {
private static final int BUFSZ = 512;
private final Socket socket;
private final InputStream in;
public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}
public void interrupt() {
try {
socket.close();
} catch (IOException ignored) {
} finally {
super.interrupt();
}
}
public void run() {
try {
byte[] buf = new byte[BUFSZ];
while (true) {
int count = in.read(buf);
if (count < 0)
break;
else if (count > 0)
processBuffer(buf, count);
}
} catch (IOException e) { /* Allow thread to exit */
}
}
public void processBuffer(byte[] buf, int count) {
}
}
用newTaskFor鈎子方法封裝非标準取消
在上個例子中,可以使用newTaskFor鈎子函數來改進用來封裝非标準取消的方法。這是java 6 中添加到ThreadPoolExecutor的新特性,當送出一個Callable給ExecutorService時,submit傳回一個Future,可以用Future來取消任務。newTaskFor鈎子是一個工廠方法,建立Future來代表任務,它傳回一個RunnableFuture接口,它擴充了Future和Runnable由FutureTask來實作。
自定義的任務Future可以重寫canel方法,實作日志或收集取消的統計資訊。也可以通過重寫Thread.interrupt()實作上面的非标準取消功能。
public interface CancellableTask<T> extends Callable<T> {
void cancel();
RunnableFuture<T> newTask();
}
public class CancellingExecutor extends ThreadPoolExecutor{
protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable){
if (callable instanceof CancellableTask)
return ((CancellableTask<T>) callable).newTask();
else
return super.newTaskFor(callable);
}
}
public abstract class SocketUsingTask <T> implements CancellableTask<T> {
@GuardedBy("this") private Socket socket;
protected synchronized void setSocket(Socket s) {
socket = s;
}
public synchronized void cancel() {
try {
if (socket != null)
socket.close();
} catch (IOException ignored) {
}
}
public RunnableFuture<T> newTask() {
return new FutureTask<T>(this) {
public boolean cancel(boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancel();
} finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}
如果SocketUsingTask通過自身Future被取消,執行線程會被中斷,這提高了任務對取消的響應性,這樣做在保證響應取消的同時,不僅可以安全地調用可中斷方法,還可以調用阻塞中的Socket I/O方法。
二、停止基于線程的服務
由于java不提供退出線程慣用的方法,是以需要自行編碼結束。實踐指出,我們不應該操控某個線程--中斷它、改變它的優先級等等,除非你擁有這個線程。 線程通過一個Thread對象表示,線程的所有權也是不能被傳遞的,但線程可以被自由的共享。
一般的應用程式會有三個部分組成:應用程式擁有服務,服務擁有工作線程,但應用程式并不擁有工作線程,是以應用程式如果想控制線程隻能通過服務來處理。就像線程池擁有工作者線程一樣。服務比如ExecutorService應該提供生命周期方法來關閉它自己并關閉它擁有的線程。
//這是一個多生産者,單消費者設計,日志資訊通過BlockingQueue移交給日志線程
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
@GuardedBy("this") private boolean isShutdown;
@GuardedBy("this") private int reservations;
public LogService(Writer writer) {
this.queue = new LinkedBlockingQueue<String>();
this.loggerThread = new LoggerThread();
this.writer = new PrintWriter(writer);
}
public void start() {
loggerThread.start();
}
public void stop() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutdown)
throw new IllegalStateException(/*...*/);
++reservations;
}
queue.put(msg);
}
//内部類
private class LoggerThread extends Thread {
public void run() {
try {
while (true) {
try {
synchronized (LogService.this) {
if (isShutdown && reservations == 0)
break;
}
String msg = queue.take();
synchronized (LogService.this) {
--reservations;
}
writer.println(msg);
} catch (InterruptedException e) { /* retry */
}
}
} finally {
writer.close();
}
}
}
}
另一種更進階的方法。複雜的程式可能把會ExecutorService封裝在一個更高層級的服務中,通過增加連結,把所有權鍊從應用程式擴充到服務,再到線程,每一個鍊上的成員管理它所擁有的服務或線程的生命周期。
public class LogService1 {
private final ExecutorService exec =
public void start(){}
public void stop() throws InterruptedException{
exec.shutdown();
exec.awaitTermination(timeout, unit);
writer.close();
}//LogService委托給ExecutorService執行,LogService管理自己的生命周期
public void log(String msg){
try{
exec.execute(new WriteTask(msg));
}catch(Exception e){}
}
}
緻命藥丸
另一種保證生産--消費服務關閉的方式是使用poison pill:一個可識别的對象,置于隊列中,意味着“當你得到它時或得到一定數量時,停止一切工作”。這種方式适合在生産--消費數量已知的情況下使用。不過在生産--消費者數量較大時很難處理,緻命藥丸隻在無限隊列中工作時,才是可靠的。
//生産者線程
class CrawlerThread extends Thread {
public void run() {
try {
crawl(root);
} catch (InterruptedException e) { /* fall through */
} finally {
while (true) {
try {
queue.put(POISON);
break;
} catch (InterruptedException e1) { /* retry */
}
}
}
}
private void crawl(File root) throws InterruptedException {
File[] entries = root.listFiles(fileFilter);
if (entries != null) {
for (File entry : entries) {
if (entry.isDirectory())
crawl(entry);
else if (!alreadyIndexed(entry))
queue.put(entry);
}
}
}
}
//消費者線程
class IndexerThread extends Thread {
public void run() {
try {
while (true) {
File file = queue.take();
if (file == POISON)
break;
else
indexFile(file);
}
} catch (InterruptedException consumed) {
}
}
public void indexFile(File file) {
/*...*/
};
}
public class IndexingService {
private static final int CAPACITY = 1000;
private static final File POISON = new File("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File> queue;
private final FileFilter fileFilter;
private final File root;
public IndexingService(File root, final FileFilter fileFilter) {
this.root = root;
this.queue = new LinkedBlockingQueue<File>(CAPACITY);
this.fileFilter = new FileFilter() {
public boolean accept(File f) {
return f.isDirectory() || fileFilter.accept(f);
}
};
}
private boolean alreadyIndexed(File f) {
return false;
}
public void start() {
producer.start();
consumer.start();
}
public void stop() {
producer.interrupt();
}
public void awaitTermination() throws InterruptedException {
consumer.join();
}
}
隻執行一次的服務
如果一個方法需要處理一批任務,并在所有任務結束前不會傳回,那麼可以通過私有的Executor來簡化服務的生命周期管理,其中Executor的壽命限定在該方法中(通常會用到invokeAll和invokeAny方法):向每個主機送出任務,在這之後,當所有檢查郵件的任務完成後,會關閉Executor,并等待結束。
public boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
//為了從内部Runnable通路hasNewMail标志,它必須是final類型的,才能避免被更改
final AtomicBoolean hasNewMail = new AtomicBoolean(false);
try {
for (final String host : hosts)
exec.execute(new Runnable() {
public void run() {
if (checkMail(host))
hasNewMail.set(true);
}
});
} finally {
exec.shutdown();
exec.awaitTermination(timeout, unit);
}
return hasNewMail.get();
}
TrackingExecutor任務跟蹤
但這個方法會強制中斷正在運作的任務,也就是無法區分哪些任務正在執行中,哪些執行完了,必須自己通過設定檢查點來區分,如果不處理可能會造成資料的不一緻性。
但可以通過擴充AbstractExecutorService來區分取消和中止的任務。 如TrackingExecutor可以識别那些已經開始,但沒有正常結束的任務。任務必須在傳回時儲存線程的中斷狀态。TrackingExecutorService例子說明了為後續執行來儲存未完成的任務。
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown =
Collections.synchronizedSet(new HashSet<Runnable>());
public TrackingExecutor(ExecutorService exec) {
this.exec = exec;
}
//傳回被取消(已經開始,但沒有正常結束)的任務清單
public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated())
throw new IllegalStateException(/*...*/);
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}
public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
}
public abstract class WebCrawler {
private volatile TrackingExecutor exec;
@GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<URL>();
private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
private static final long TIMEOUT = 500;
private static final TimeUnit UNIT = MILLISECONDS;
public WebCrawler(URL startUrl) {
urlsToCrawl.add(startUrl);
}
public synchronized void start() {
exec = new TrackingExecutor(Executors.newCachedThreadPool());
for (URL url : urlsToCrawl) submitCrawlTask(url);
urlsToCrawl.clear();
}
public synchronized void stop() throws InterruptedException {
try {
saveUncrawled(exec.shutdownNow());
if (exec.awaitTermination(TIMEOUT, UNIT))
saveUncrawled(exec.getCancelledTasks());
} finally {
exec = null;
}
}
protected abstract List<URL> processPage(URL url);
private void saveUncrawled(List<Runnable> uncrawled) {
for (Runnable task : uncrawled)
urlsToCrawl.add(((CrawlTask) task).getPage());
}
private void submitCrawlTask(URL u) {
exec.execute(new CrawlTask(u));
}
private class CrawlTask implements Runnable {
private final URL url;
CrawlTask(URL url) {
this.url = url;
}
private int count = 1;
boolean alreadyCrawled() {
return seen.putIfAbsent(url, true) != null;
}
void markUncrawled() {
seen.remove(url);
System.out.printf("marking %s uncrawled%n", url);
}
public void run() {
for (URL link : processPage(url)) {
if (Thread.currentThread().isInterrupted())
return;
submitCrawlTask(link);
}
}
public URL getPage() {
return url;
}
}
}
三、處理反常的線程終止
導緻線程dead的主要原因是RuntimeException,因為這種異常錯誤是不可修複的。
下面的例子闡述了如何線上程池内部建構一個工作者線程,如果任務抛出了一個未檢查的異常,它将允許線程終結,但是會首先通知架構,線程已經終結。然後,架構可能會用新的線程取代這個工作線程,也可能不這麼做,因為線程池也許正在關閉,抑或目前已有足夠多的線程,能夠滿足需要了。ThreadPoolExecutor和Swing使用這項技術來確定那些不能正常運轉的任務不會影響到後續任務的執行。
需查異常的處理
典型線程池的工作者線程的建構
public void run(){
Throwable thrown = null;
try{
while(!isInterrupted())
runTask(getTaskFromWorkQueue());
}catch(Throwable e){
thrown = e;
}finally{
threadExited(this, thrown);
}
}
不需查異常的處理
上面講到了不需查異常的處理,線程的API同樣提供了UncaughtExceptionHandler工具,便于監測到線程因不需查異常引起的dead。這兩個方案互為補充,可以有效防止線程的洩漏問題。
當一個線程因為不需查異常退出時,JVM會把這個事件報告給應用程式自定義的Handler。如果handler不存在,預設會用System.err列印資訊。至于hander如何處理取決于應用程式對服務品質的要求了,一般會直接寫入日志。
public class UEHLogger implements Thread.UncaughtExceptionHandler {
public void uncaughtException(Thread t, Throwable e) {
Logger logger = Logger.getAnonymousLogger();
logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);
}
}
為了給線程設定UncaughtExceptionHandler需要向ThreadPoolExecutor的構造函數中提供一個ThreadFactory(隻有線程的所有者能夠改變其UncaughtExceptionHandler)。隻有通過execute送出的任務,才能将它抛出的異常送給Handler,而通過submit送出的任務,抛出的任何異常,都會認為是任務傳回狀态的一部分。如果一個由submit送出的任務以異常作為終結,這個異常會被Future.get重抛出。包裝在ExecutionException中。
在一個長時間運作的應用程式中,所有的線程都要給未捕獲異常設定一個處理器,這個處理器至少要将異常資訊記入日志中。
四、JVM關閉
JVM即可以正常關閉,也可以通過System.exit或是Crtl-C來強制關閉。但JVM也可以通過Runtime.halt或者“殺死”JVM的作業系統程序被強行關閉。`
關閉鈎子
在正常的關閉中,JVM首先啟動所有已注冊的shutdown hook,shutodwn hook是使用Runtime.addShutdownHook(new Thread())注冊的尚未開始的線程,JVM不能保證shutodwn hook的開始順序。當所有shutodwn hook結束的時候,如果runFinalizerOnExit為true,可以選擇運作finalizer,之後停止。JVM不會嘗試停止或中斷任何關閉時仍在運作中的應用程式線程,它們在JVM最終終止時被強制退出。如果shutdown hook或finalizer沒有完成,那麼正常的關閉程序會“挂起”并且JVM必須強制關閉,這時JVM隻會運作強制關閉程式,其它的線程根本不管,包括shutodwn hook。
shutodwn hook應該是線程安全的。shutodwn hook可以用于服務或應用程式的清理,比如del臨時檔案。并且shutodwn hook全部是并發執行的。關閉日志檔案可能引起其他需要使用日志服務的shutodwn hook麻煩,是以shutodwn hook不應該依賴于可能被應用程式或其他shutodwn hook關閉的服務,所有的服務應使用唯一的shutodwn hook。確定關閉的動作在單線程上順序發生。也就是說一般的應用程式隻會注冊一人shutdown Hook。
注冊shutodwn hook來停止日志服務
public void start(){
Runtime.getRuntime().addShutdownHook(new Thread(){
public void run(){
LogService.this.stop();
}
});
}
精靈線程
有時需要建立一個線程,執行一些輔助工作,但這不希望這個線程的存在阻礙JVM的關閉,這裡就需要用到daemon thread。線程分為:普通線程和精靈線程。JVM啟動的時候建立所有的線程,除了主線程外,其他的都是精靈線程(比如GC)。