文章目录
- 什么是Executor?
-
- Executor框架的结构
- Executor框架主要成员简介
- Executor框架主要成员结构图
- Executor框架使用结构图
- Executor框架的两级调度模型
- ThreadPoolExecutor和ScheduledThreadPoolExecutor
- CompletionService接口
- ExecutorCompletionService类
什么是Executor?
Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。
Executor框架的结构
Executor框架主要由3大部分组成如下:
1、任务
包括被执行任务需要实现的接口:Runnable接口或Callable接口。
2、任务的执行
包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。
3、异步计算的结果。包括接口Future和实现Future的FutureTask类。
Executor框架主要成员简介
- Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开。只有一个execute方法,
。void execute(Runnable command)
-
ExecutorService继承于Executor接口, 他提供了更为丰富的线程实现方法。
ExecutorService有三种状态:
运行、关闭、终止
。
创建后便进入
运
行状态
当调用了shutdown()方法时,便进入了
关闭
状态,此时意味着ExecutorService不再接受新的任务,但是他还是会执行已经提交的任务,
当所有已经提交了的任务执行完后,便达到
终止
状态。
如果不调用shutdown方法,
方法会一直运行下去,系统一般不会主动关闭。ExecutorService
- ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
- ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
- Future接口和实现Future接口的FutureTask类,代表
的结果。异步计算
- Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。
Executor框架主要成员结构图
Executor框架使用结构图
1、主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。
2、然后可以把Runnable对象直接交给 ExecutorService执行(ExecutorService.execute(Runnable command));或者也可以把Runnable对象或Callable对象 提交给ExecutorService执行(Executor-Service.submit(Runnable task)或ExecutorService.submit(Callabletask))。
3、如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象(返回的是FutureTask对象)。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。
4、最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
Executor框架的两级调度模型
在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器Executor框架将这些任务映射为固定数量的线程;
在底层,操作系统内核将这些线程映射到硬件处理器上。
这种两级调度模型的示意图如图如下:
应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
ThreadPoolExecutor和ScheduledThreadPoolExecutor
ThreadPoolExecutor
ScheduledThreadPoolExecutor
CompletionService接口
CompletionService与ExecutorService类似,都可以用来执行线程池的任务
ExecutorService继承了Executor接口,而CompletionService没有继承Executor接口。why?
主要是Executor的特性决定的,Executor框架不能完全保证任务执行的异步性,那就是如果需要实现任务task的异步性,只要为每个task创建一个线程就实现了任务的异步性。代码往往包含
new Thread(task).start()
。这种方式的问题在于,它没有限制可创建线程的数量(在ExecutorService可以限制),不过,这样最大的问题是在高并发的情况下,不断创建线程异步执行任务将会极大增大线程创建的开销、造成极大的资源消耗和影响系统的稳定性。
CompletionService想要完全的异步,就不能受到Executor的影响。
一般情况下,如果需要判断任务是否完成,思路是得到Future列表的每个Future,然后反复调用其get方法,并将timeout参数设为0,从而通过轮询的方式判断任务是否完成。
为了更精确实现任务的异步执行,可以使用CompletionService。
接口内的几个方法:
用于向服务中提交有返回结果的任务,并返回Future对象
用户向服务中提交有返回值的任务去执行,并返回Future对象
从服务中返回并移除一个已经完成的任务,如果获取不到,会一致阻塞到有返回值为止。此方法会响应线程中断。
从服务中返回并移除一个已经完成的任务,如果内部没有已经完成的任务,则返回空,此方法会立即响应。
尝试在指定的时间内从服务中返回并移除一个已经完成的任务,等待的时间超时还是没有获取到已完成的任务,则返回空。此方法会响应线程中断
ExecutorCompletionService类
ExecutorCompletionService类是CompletionService接口的具体实现。
说一下其内部原理,ExecutorCompletionService创建的时候会传入一个线程池,调用submit方法传入需要执行的任务,任务由内部的线程池来处理;ExecutorCompletionService内部有个阻塞队列,任意一个任务完成之后,会将任务的执行结果(Future类型)放入阻塞队列中,然后其他线程可以调用它take、poll方法从这个阻塞队列中获取一个已经完成的任务,**获取任务返回结果的顺序和任务执行完成的先后顺序一致,所以先完成的任务会先返回。**能最大限度的提升性能。
看一下构造方法:
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
构造方法需要传入一个Executor对象,这个对象表示任务执行器,所有传入的任务会被这个执行器执行。
completionQueue是用来存储任务结果的阻塞队列,默认用采用的是LinkedBlockingQueue,也支持开发自己设置。通过submit传入需要执行的任务,任务执行完成之后,会放入completionQueue中。
在实现上,ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。
QueueingFuture 源码:
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
demo:
public class Demo14 {
static class GoodsModel {
//商品名称
String name;
//购物开始时间
long startime;
//送到的时间
long endtime;
public GoodsModel(String name, long startime, long endtime) {
this.name = name;
this.startime = startime;
this.endtime = endtime;
}
@Override
public String toString() {
return name + ",下单时间[" + this.startime + "," + endtime + "],耗时:" + (this.endtime - this.startime);
}
}
/**
* 将商品搬上楼
*
* @param goodsModel
* @throws InterruptedException
*/
static void moveUp(GoodsModel goodsModel) throws InterruptedException {
//休眠5秒,模拟搬上楼耗时
TimeUnit.SECONDS.sleep(5);
System.out.println("将商品搬上楼,商品信息:" + goodsModel);
}
/**
* 模拟下单
*
* @param name 商品名称
* @param costTime 耗时
* @return
*/
static Callable<GoodsModel> buyGoods(String name, long costTime) {
return () -> {
long startTime = System.currentTimeMillis();
System.out.println(startTime + "购买" + name + "下单!");
//模拟送货耗时
try {
TimeUnit.SECONDS.sleep(costTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
long endTime = System.currentTimeMillis();
System.out.println(endTime + name + "送到了!");
return new GoodsModel(name, startTime, endTime);
};
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
long st = System.currentTimeMillis();
System.out.println(st + "开始购物!");
ExecutorService executor = Executors.newFixedThreadPool(5);
//创建ExecutorCompletionService对象
ExecutorCompletionService<GoodsModel> executorCompletionService = new ExecutorCompletionService<>(executor);
//异步下单购买冰箱
executorCompletionService.submit(buyGoods("冰箱", 5));
//异步下单购买洗衣机
executorCompletionService.submit(buyGoods("洗衣机", 2));
executor.shutdown();
//购买商品的数量
int goodsCount = 2;
for (int i = 0; i < goodsCount; i++) {
//可以获取到最先到的商品
GoodsModel goodsModel = executorCompletionService.take().get();
//将最先到的商品送上楼
moveUp(goodsModel);
}
long et = System.currentTimeMillis();
System.out.println(et + "货物已送到家里咯,哈哈哈!");
System.out.println("总耗时:" + (et - st));
}
}
输出:
1564653208284开始购物!
1564653208349购买冰箱下单!
1564653208349购买洗衣机下单!
1564653210349洗衣机送到了!
1564653213350冰箱送到了!
将商品搬上楼,商品信息:洗衣机,下单时间[1564653208349,1564653210349],耗时:2000
将商品搬上楼,商品信息:冰箱,下单时间[1564653208349,1564653213350],耗时:5001
1564653220350货物已送到家里咯,哈哈哈!
总耗时:12066
洗衣机先到的,然后被送上楼了,冰箱后到被送上楼,先到先上楼=先执行先返回。不用程序员操心。