天天看点

线程池-Executor框架什么是Executor?ThreadPoolExecutor和ScheduledThreadPoolExecutorCompletionService接口ExecutorCompletionService类

文章目录

  • 什么是Executor?
    • Executor框架的结构
    • Executor框架主要成员简介
    • Executor框架主要成员结构图
    • Executor框架使用结构图
    • Executor框架的两级调度模型
  • ThreadPoolExecutor和ScheduledThreadPoolExecutor
  • CompletionService接口
  • ExecutorCompletionService类

什么是Executor?

Java的线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。

Executor框架的结构

Executor框架主要由3大部分组成如下:

1、任务

包括被执行任务需要实现的接口:Runnable接口或Callable接口。

2、任务的执行

包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。

3、异步计算的结果。包括接口Future和实现Future的FutureTask类。

Executor框架主要成员简介

  • Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开。只有一个execute方法,

    void execute(Runnable command)

  • ExecutorService继承于Executor接口, 他提供了更为丰富的线程实现方法。

    ExecutorService有三种状态:

    运行、关闭、终止

    创建后便进入

    行状态

    当调用了shutdown()方法时,便进入了

    关闭

    状态,此时意味着ExecutorService不再接受新的任务,但是他还是会执行已经提交的任务,

    当所有已经提交了的任务执行完后,便达到

    终止

    状态。

    如果不调用shutdown方法,

    ExecutorService

    方法会一直运行下去,系统一般不会主动关闭。
  • ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
  • ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
  • Future接口和实现Future接口的FutureTask类,代表

    异步计算

    的结果。
  • Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor执行。

Executor框架主要成员结构图

Executor框架使用结构图

线程池-Executor框架什么是Executor?ThreadPoolExecutor和ScheduledThreadPoolExecutorCompletionService接口ExecutorCompletionService类

1、主线程首先要创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象(Executors.callable(Runnable task)或Executors.callable(Runnable task,Object resule))。

2、然后可以把Runnable对象直接交给 ExecutorService执行(ExecutorService.execute(Runnable command));或者也可以把Runnable对象或Callable对象 提交给ExecutorService执行(Executor-Service.submit(Runnable task)或ExecutorService.submit(Callabletask))。

3、如果执行ExecutorService.submit(…),ExecutorService将返回一个实现Future接口的对象(返回的是FutureTask对象)。由于FutureTask实现了Runnable,程序员也可以创建FutureTask,然后直接交给ExecutorService执行。

4、最后,主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

Executor框架的两级调度模型

在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。

在上层,Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器Executor框架将这些任务映射为固定数量的线程;

在底层,操作系统内核将这些线程映射到硬件处理器上。

这种两级调度模型的示意图如图如下:

应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。

线程池-Executor框架什么是Executor?ThreadPoolExecutor和ScheduledThreadPoolExecutorCompletionService接口ExecutorCompletionService类

ThreadPoolExecutor和ScheduledThreadPoolExecutor

ThreadPoolExecutor

ScheduledThreadPoolExecutor

CompletionService接口

线程池-Executor框架什么是Executor?ThreadPoolExecutor和ScheduledThreadPoolExecutorCompletionService接口ExecutorCompletionService类

CompletionService与ExecutorService类似,都可以用来执行线程池的任务

ExecutorService继承了Executor接口,而CompletionService没有继承Executor接口。why?

主要是Executor的特性决定的,Executor框架不能完全保证任务执行的异步性,那就是如果需要实现任务task的异步性,只要为每个task创建一个线程就实现了任务的异步性。代码往往包含

new Thread(task).start()

。这种方式的问题在于,它没有限制可创建线程的数量(在ExecutorService可以限制),不过,这样最大的问题是在高并发的情况下,不断创建线程异步执行任务将会极大增大线程创建的开销、造成极大的资源消耗和影响系统的稳定性。

CompletionService想要完全的异步,就不能受到Executor的影响。

一般情况下,如果需要判断任务是否完成,思路是得到Future列表的每个Future,然后反复调用其get方法,并将timeout参数设为0,从而通过轮询的方式判断任务是否完成。

为了更精确实现任务的异步执行,可以使用CompletionService。

接口内的几个方法:

用于向服务中提交有返回结果的任务,并返回Future对象

用户向服务中提交有返回值的任务去执行,并返回Future对象

从服务中返回并移除一个已经完成的任务,如果获取不到,会一致阻塞到有返回值为止。此方法会响应线程中断。

从服务中返回并移除一个已经完成的任务,如果内部没有已经完成的任务,则返回空,此方法会立即响应。

尝试在指定的时间内从服务中返回并移除一个已经完成的任务,等待的时间超时还是没有获取到已完成的任务,则返回空。此方法会响应线程中断

ExecutorCompletionService类

ExecutorCompletionService类是CompletionService接口的具体实现。

线程池-Executor框架什么是Executor?ThreadPoolExecutor和ScheduledThreadPoolExecutorCompletionService接口ExecutorCompletionService类

说一下其内部原理,ExecutorCompletionService创建的时候会传入一个线程池,调用submit方法传入需要执行的任务,任务由内部的线程池来处理;ExecutorCompletionService内部有个阻塞队列,任意一个任务完成之后,会将任务的执行结果(Future类型)放入阻塞队列中,然后其他线程可以调用它take、poll方法从这个阻塞队列中获取一个已经完成的任务,**获取任务返回结果的顺序和任务执行完成的先后顺序一致,所以先完成的任务会先返回。**能最大限度的提升性能。

看一下构造方法:

public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
           

构造方法需要传入一个Executor对象,这个对象表示任务执行器,所有传入的任务会被这个执行器执行。

completionQueue是用来存储任务结果的阻塞队列,默认用采用的是LinkedBlockingQueue,也支持开发自己设置。通过submit传入需要执行的任务,任务执行完成之后,会放入completionQueue中。

在实现上,ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。当计算完成时,调用FutureTask的done方法。当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。

QueueingFuture 源码:

private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }
           

demo:

public class Demo14 {
    static class GoodsModel {
        //商品名称
        String name;
        //购物开始时间
        long startime;
        //送到的时间
        long endtime;

        public GoodsModel(String name, long startime, long endtime) {
            this.name = name;
            this.startime = startime;
            this.endtime = endtime;
        }

        @Override
        public String toString() {
            return name + ",下单时间[" + this.startime + "," + endtime + "],耗时:" + (this.endtime - this.startime);
        }
    }

    /**
     * 将商品搬上楼
     *
     * @param goodsModel
     * @throws InterruptedException
     */
    static void moveUp(GoodsModel goodsModel) throws InterruptedException {
        //休眠5秒,模拟搬上楼耗时
        TimeUnit.SECONDS.sleep(5);
        System.out.println("将商品搬上楼,商品信息:" + goodsModel);
    }

    /**
     * 模拟下单
     *
     * @param name     商品名称
     * @param costTime 耗时
     * @return
     */
    static Callable<GoodsModel> buyGoods(String name, long costTime) {
        return () -> {
            long startTime = System.currentTimeMillis();
            System.out.println(startTime + "购买" + name + "下单!");
            //模拟送货耗时
            try {
                TimeUnit.SECONDS.sleep(costTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long endTime = System.currentTimeMillis();
            System.out.println(endTime + name + "送到了!");
            return new GoodsModel(name, startTime, endTime);
        };
    }

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        long st = System.currentTimeMillis();
        System.out.println(st + "开始购物!");
        ExecutorService executor = Executors.newFixedThreadPool(5);

        //创建ExecutorCompletionService对象
        ExecutorCompletionService<GoodsModel> executorCompletionService = new ExecutorCompletionService<>(executor);
        //异步下单购买冰箱
        executorCompletionService.submit(buyGoods("冰箱", 5));
        //异步下单购买洗衣机
        executorCompletionService.submit(buyGoods("洗衣机", 2));
        executor.shutdown();

        //购买商品的数量
        int goodsCount = 2;
        for (int i = 0; i < goodsCount; i++) {
            //可以获取到最先到的商品
            GoodsModel goodsModel = executorCompletionService.take().get();
            //将最先到的商品送上楼
            moveUp(goodsModel);
        }

        long et = System.currentTimeMillis();
        System.out.println(et + "货物已送到家里咯,哈哈哈!");
        System.out.println("总耗时:" + (et - st));
    }
}
           

输出:

1564653208284开始购物!
1564653208349购买冰箱下单!
1564653208349购买洗衣机下单!
1564653210349洗衣机送到了!
1564653213350冰箱送到了!
将商品搬上楼,商品信息:洗衣机,下单时间[1564653208349,1564653210349],耗时:2000
将商品搬上楼,商品信息:冰箱,下单时间[1564653208349,1564653213350],耗时:5001
1564653220350货物已送到家里咯,哈哈哈!
总耗时:12066
           

洗衣机先到的,然后被送上楼了,冰箱后到被送上楼,先到先上楼=先执行先返回。不用程序员操心。