天天看点

Java 并发编程之任务取消 (三)通过Future来取消任务处理不可中断的阻塞

通过Future来取消任务

前面的栗子都是直接使用runnable来执行本身,所以如果要取消任务的话只能使用wait join sleep与Interrupt来组合取消任务。

其实Future早已经提供这样的功能 ,它是ExecutorService.submit返回的结果,实现了Cancel方法。该方法带有一个Boolean类型的参数mayinterruptIfRunning,表示取消操作是否成功 。(这只是表示任务是否能够接收中断,而不是表示任务是否能检测并处理中断)。如果mayinterruptIfRunning为true并且任务当前正在某个线程中运行,那么 这个线程能被中断,如果 这个参数为false,那么意味着若任务还没有启动,就不要运行它。这种方法应用于那些 不处理中断的任务中。

我们深入FutureTask的源码内去看它的取消方法

public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
           

也是运用了线程的interrupt方法来实现的

下面是使用Future来实现任务取消的栗子

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Init {
	private static final ScheduledExecutorService cancelExec = Executors
			.newScheduledThreadPool(1);

 

	public void timedrun(final Runnable r, long timeout, TimeUnit unit)
			throws InterruptedException {
		Future<?> task = cancelExec.submit(r);
		try {
			task.get(timeout, unit);
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (TimeoutException e) {
			// TODO Auto-generated catch block
			throw launderThrowable(e.getCause());

		} finally {
			task.cancel(true);
		}
	}

	public static RuntimeException launderThrowable(Throwable t) {

		if (t instanceof RuntimeException)
			return (RuntimeException) t;

		else if (t instanceof Error)
			throw (Error) t;

		else
			throw new IllegalStateException("Not unchecked", t);

	}

	public static void main(String[] args) {
		Init init = new Init();
		Runnable run = new Runnable() {

			@Override
			public void run() {
				// TODO Auto-generated method stub
				int i = 0;
				for (int j = 0; j < 1000000000; j++) {
					i++;
					if (i % 20000000 == 0) {
						System.out.println(i);
					}
				}
			}
		};
		try {
			init.timedrun(run, 1500, TimeUnit.MILLISECONDS);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}
           

上面栗子给出了一个好的编程习惯,取消那些不再需要结果的任务。

结果:

200000000

400000000

Exception in thread "main" java.lang.IllegalStateException: Not unchecked

    at Init.launderThrowable(Init.java:40)

    at Init.timedrun(Init.java:24)

    at Init.main(Init.java:61)

600000000

800000000

1000000000

处理不可中断的阻塞

在java库中,许多可阻塞的方法都是通过提前返回或者抛出InterruptedException来响应中断请求的,从而使开发人员更容易构建出响应取消请求的任务。如果一个线程由于执行同步的Socket I/O或者等待获得内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有其他任何作用。

Java.io包中的同步Socket I/O 最常见的阻塞I/O形式就是对套接字进行读取和写入。虽然inputStream和outputStream的read和write都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等 方法而被阻塞的线程抛出一个socketException

Java io包中的同步 I/O 当中断一个正在interruptibleChannel上等待的线程时,将抛出ClosedByInterruptException并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出ClosedByinterruptException)当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程抛出AsynchronousCloseException,大多数标准 的Channel都实现了InterruptibleChannel

Selector的异步 I/O如果一个线程在调用Selctor.select方法在(Java.nio.channels中)时阻塞了,那么 调用Close或者wakeup方法会使线程抛出ClosedSelectorException并提前返回

获取某个锁 如果 一个线程由于等待某个内置锁而阻塞那么将无法响应中断,因为线程认为它肯定会获得锁,所以将不会理会中断请求,但是在Lock类中提供了lockInterruptibly方法,该 方法允许在等待一个锁的同时仍能响应中断。

import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;

public class ReaderThread extends Thread {
	private final Socket socket;
	private final InputStream in;

	public ReaderThread(Socket socket, InputStream in) {
		super();
		this.socket = socket;
		this.in = in;
	}

	@Override
	public void interrupt() {
		// TODO Auto-generated method stub
		try {
			socket.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
		} finally {
			super.interrupt();
		}

	}

}
           

ReaderThread改写了interrupt方法使其既能处理标准的中断,也能关闭底层的套接字,因此无论ReaderThread线程是在read方法 中阻塞还是在某个可中断的方法中阻塞,都可以被 中断并停止执行当前的工作。