天天看点

Java创建线程的几种方式一、继承Thread类二、实现Runnable接口三、通过Callable和Future创建线程四、通过线程池创建线程

一、继承Thread类

Thread类本质上实现了

Runnable

接口的一个实例,通过继承Thread类并复写run方法启动新线程。调用start方法是告诉CPU此线程已经准备就绪可以执行,进而系统有时间就会来执行其run方法。而直接调用run方法,则不是异步执行,而是等同于调用函数般按顺序同步执行,这就失去了多线程的意义了。

public class ThreadDemo1 extends Thread {

    public ThreadDemo1(String name) {
        // 设置当前线程的名字
        this.setName(name);
    }

    @Override
    public void run() {
        System.out.println("当前运行的线程名为: " + Thread.currentThread().getName());
    }

    public static void main(String[] args) throws Exception {
        // 注意这里,要调用start方法才能启动线程,不能调用run方法
        new ThreadDemo1("MyThread1").start();
        new ThreadDemo1("MyThread2").start();
    }

}
           

二、实现Runnable接口

实现

Runnable

接口并实现其中的run方法,然后通过构造Thread实例,传入Runnable实现类,然后调用Thread的start方法开启一个新线程。继承Thread类只能单继承,而实现

Runnable

接口则可以实现多继承。

public class ThreadDemo2 implements Runnable {

    @Override
    public void run() {
        System.out.println("当前运行的线程名为: " + Thread.currentThread().getName());
    }

    public static void main(String[] args) throws Exception {
        ThreadDemo2 runnable = new ThreadDemo2();
        new Thread(runnable, "MyThread1").start();
        new Thread(runnable, "MyThread2").start();
    }

}
           

三、通过Callable和Future创建线程

前两种方法创建的线程无法获取子线程的返回值,run方法也不可以抛出异常,这时可以用Callable接口解决。

Callable

并不是

Runnable

的子接口,是个全新的接口,它的示例不能通过传入Thread构造。

在Java1.5的版本开始提供了

Future

接口来代表

Callable

接口里call()方法的返回值,并为Future接口提供了一个实现类

FutureTask

。该实现类不仅实现了Future接口,还实现了Runnable接口,所以可以直接传给Thread构造函数。

模拟场景:

同时启动五个线程并传入数值1到5作为标识,如果传入的数值是奇数执行3秒,偶数执行8秒,每个线程都有一个返回结果。

现在需要输出每个线程的返回结果,如果线程执行时间超过5秒则终止该线程。

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.concurrent.*;


public class ThreadDemo3 {
    static Logger logger = LogManager.getLogger(ThreadTest.class);

    static class Task implements Callable {
        private int num;

        public Task (int num) {
            this.num = num;
        }

        @Override
        public Object call() throws Exception {
            logger.info(num+"号线程启动");
            if((num&1)==1){
                Thread.sleep(3000);
            }else{
                Thread.sleep(8000);
            }
            return num+"号线程返回的结果";
        }
    }

    public static void main(String[] args) throws InterruptedException {

        ExecutorService exec = Executors.newCachedThreadPool();
        ArrayList<Future<String>> resultList = new ArrayList<>();

        for(int i=1;i<6;i++){
            Callable<String> call = new Task(i);
            Future<String> future = exec.submit(call);
            resultList.add(future);
        }

        for(Future<String> future:resultList){
            try {
                String rt = future.get(5, TimeUnit.SECONDS);
                logger.info(rt);
            } catch (TimeoutException e){
                future.cancel(true); //对查询数据类操作,无法停止当前线程
                logger.error("线程超时中止了");
            } catch (ExecutionException ex){
                ex.printStackTrace();
            }
        }

        exec.shutdown();
        logger.info("主线程结束");
    }
    
}
           

输出结果:

2021-07-30 08:23:29: 2号线程启动

2021-07-30 08:23:29: 1号线程启动

2021-07-30 08:23:29: 5号线程启动

2021-07-30 08:23:29: 4号线程启动

2021-07-30 08:23:29: 3号线程启动

2021-07-30 08:23:32: 1号线程返回的结果

2021-07-30 08:23:37: 2号线程返回的结果

2021-07-30 08:23:37: 3号线程返回的结果

2021-07-30 08:23:37: 4号线程返回的结果

2021-07-30 08:23:37: 5号线程返回的结果

2021-07-30 08:23:37: 主线程结束

结果说明:

根据需求启动的五个线程,2号和4号为偶数并且执行时间超过5秒,应该终止,但是结果却正常输出了。

其原因是get方法会阻塞住调用线程,在阻塞的同时,其他线程任务还在运行,1号线程阻塞了3秒,这时再去循环获取2号线程的结果时,2号和4号在后台实际已经执行了3秒,还需要5秒就可以返回结果,而设定超时的时间刚好是5秒,并没有超时,所以结果全部输出了。异步的效果在我们使用get拿结果时,会变得无效。

为了解决上述的问题,Java1.8新加入的一个实现类

CompletableFuture

,实现了

Future<T>

,

CompletionStage<T>

两个接口,针对Future完成事件,不阻塞等待,在这段时间内,线程任务能正常继续往下执行。CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。

使用 CompletableFuture.supplyAsync 改写上述需求代码:

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.concurrent.*;


public class ThreadDemo4 {
    static Logger logger = LogManager.getLogger(ThreadTest.class);

    static class Task implements Callable {
		//do something...
    }

    public static void main(String[] args) throws InterruptedException {

        ExecutorService exec = Executors.newCachedThreadPool();
        ArrayList<CompletableFuture<String>> resultList = new ArrayList<>();

        for(int i=1;i<6;i++){
            int finalI = i;
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                String result =null;
                Task task = new Task(finalI);
                Future<String> ft = exec.submit(task);
                try {
                    result = ft.get(5, TimeUnit.SECONDS);
                } catch (TimeoutException e) {
                    ft.cancel(true); //对查询数据类操作,无法停止当前线程
                    return finalI+"号线程超时中止了";
                } catch (ExecutionException | InterruptedException e) {
                    e.printStackTrace();
                }
                return result;
            });
            resultList.add(future);
        }

        for(CompletableFuture<String> future:resultList){
            String ret = null;
            try {
                ret = future.get();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            logger.info(ret);
        }

        exec.shutdown();
        logger.info("主线程结束");
    }

}
           

输出结果:

2021-07-30 08:44:09: 5号线程启动

2021-07-30 08:44:09: 4号线程启动

2021-07-30 08:44:09: 2号线程启动

2021-07-30 08:44:09: 1号线程启动

2021-07-30 08:44:09: 3号线程启动

2021-07-30 08:44:12: 1号线程返回的结果

2021-07-30 08:44:14: 2号线程超时中止了

2021-07-30 08:44:14: 3号线程返回的结果

2021-07-30 08:44:14: 4号线程超时中止了

2021-07-30 08:44:14: 5号线程返回的结果

2021-07-30 08:44:14: 主线程结束

结果说明:

输出达到了需求的效果,2号和4号线程超时终止,其他线程正常输出。其原因是将原先调用future.get阻塞的部分,放到CompletableFuture.supplyAsync方法中,异步执行,这样做弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。

四、通过线程池创建线程

对比创建new Thread对象,线程池可以更好的统一管理,减少对象的创建/销毁/开销,每个工作线程都可以被重复利用。根据实际需求可有效的控制并发数,提高系统资源使用率。

Executors 工具类

Java通过Executors工具类提供工厂方法来创建不同类型的线程池:

创建方式 说明
newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newSingleThreadExecutor 创建一个定长线程池,支持定时及周期性任务执行。创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

Executors

创建线程池的方法:

// 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量
ExecutorService pool = Executors.newCachedThreadPool();
// 创建固定大小的线程池
ExecutorService pool = Executors.newFixedThreadPool();
// 创建单个线程池。 线程池中只有一个线程
ExecutorService pool = Executors.newSingleThreadExecutor();

// 创建固定大小的线程,可以延迟或定时的执行任务
ScheduledExecutorService pool = Executors.newScheduledThreadPool();
           

Executor, ExecutorService 都是接口,ExecutorService继承于Executor,Executors是工具类,他提供了对ThreadPoolExecutor的封装。

ExecutorService 接口提供了返回 Future 对象,终止,关闭线程池等方法,当调用

shutdown()

时,线程池会停止接受新的任务,但会完成正在 pending 中的任务。文中的 ThreadDemo3 和 ThreadDemo4 示例也是使用的该方法创建线程。

newCachedThreadPool 源码

/**
@param corePoolSize=0:无限制线程数
@param maximumPoolSize=Integer.MAX_VALUE:最大线程数为Interger的最大值
@param keepAliveTime=60:线程空闲60s后自动结束
@param TimeUnit=TimeUnit.SECONDS:单位秒
@param workQueue=SynchronousQueue:同步队列,一进一出,避免队列里缓冲数据,入队出队必须同时传递
*/

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
           

newFixedThreadPool 源码

/**
@param corePoolSize=nThreads:int类型传参,线程池大小
@param maximumPoolSize=nThreads:线程全为核心线程,与corePoolSize相等
@param keepAliveTime=0:该参数默认对核心线程无效
@param TimeUnit=TimeUnit.MILLISECONDS:
@param workQueue=LinkedBlockingQueue:无界阻塞队列,队列最大值为Integer.MAX_VALUE,如果任务提交速度大于执行速度,回造成大量阻塞,最后导致内存溢出
*/

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
           

newSingleThreadExecutor 源码

/**
@param corePoolSize=1:固定单个线程
@param maximumPoolSize=1:固定单个线程
@param keepAliveTime=0:该参数默认对核心线程无效
@param TimeUnit=TimeUnit.MILLISECONDS:
@param workQueue=LinkedBlockingQueue:无界阻塞队列,队列最大值为Integer.MAX_VALUE
*/

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
           

newScheduledThreadPool 源码

/**
说明:newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor

@param corePoolSize=int corePoolSize:线程池大小
@param maximumPoolSize=Integer.MAX_VALUE:整形的最大值
@param workQueue=DelayedWorkQueue:定制的优先级队列,基于堆数据结构的实现
*/

public static ScheduledExecutorService newScheduledThreadPool(
    int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory);
}

           

ThreadPoolExecutor 对象

当ExecutorService 线程池中的线程均处于工作状态,并且线程数已达线程池允许的最大线程数时,就会采取指定的饱和策略来处理新提交的任务。

总共有四种策略:

  1. AbortPolicy

    (终止执行):默认策略。

    Executor会抛出一个RejectedExecutionException运行异常到调用者线程来完成终止。

  2. CallerRunsPolicy

    (调用者线程来运行任务):

    这种策略会由调用execute方法的线程自身来执行任务,它提供了一个简单的反馈机制并能降低新任务的提交频率。

  3. DiscardPolicy

    (丢弃策略):不处理,直接丢弃提交的任务。
  4. DiscardOldestPolicy

    (丢弃队列里最近的一个任务):

如果

Executor

还未

shutdown()

的话,则丢弃工作队列的最近的一个任务,然后执行当前任务。

如果使用

Executors

的工厂方法创建的线程池,那么饱和策略都是采用默认的

AbortPolicy

,所以如果我们想当线程池已满时,使用调用者的线程来运行任务,就要自己创建线程池,指定想要的饱和策略,而不是使用

Executors

所以我们可以根据需要创建

ThreadPoolExecutor

( ExecutorService 接口的实现类 ) 对象,自定义一些参数,而不是调用

Executors

的工厂方法创建。

/**
@param corePoolSize:线程池大小
@param maximumPoolSize:线程池中的最大线程数量,根据workQueue任务队列的类型,决定线程池会开辟的最大线程数量
@param keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁
@param unit:keepAliveTime的单位
@param workQueue:任务队列,被添加到线程池中,但尚未被执行的任务
@param threadFactory:线程工厂,用于创建线程,一般用默认即可
@param handler:拒绝策略;
*/

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

//Example
ExecutorService exec = new ThreadPoolExecutor(coreSize, maxCoreSize, aliveTime, timeUnit, queue);