一、继承Thread类
Thread类本质上实现了
Runnable
接口的一个实例,通过继承Thread类并复写run方法启动新线程。调用start方法是告诉CPU此线程已经准备就绪可以执行,进而系统有时间就会来执行其run方法。而直接调用run方法,则不是异步执行,而是等同于调用函数般按顺序同步执行,这就失去了多线程的意义了。
public class ThreadDemo1 extends Thread {
public ThreadDemo1(String name) {
// 设置当前线程的名字
this.setName(name);
}
@Override
public void run() {
System.out.println("当前运行的线程名为: " + Thread.currentThread().getName());
}
public static void main(String[] args) throws Exception {
// 注意这里,要调用start方法才能启动线程,不能调用run方法
new ThreadDemo1("MyThread1").start();
new ThreadDemo1("MyThread2").start();
}
}
二、实现Runnable接口
实现
Runnable
接口并实现其中的run方法,然后通过构造Thread实例,传入Runnable实现类,然后调用Thread的start方法开启一个新线程。继承Thread类只能单继承,而实现
Runnable
接口则可以实现多继承。
public class ThreadDemo2 implements Runnable {
@Override
public void run() {
System.out.println("当前运行的线程名为: " + Thread.currentThread().getName());
}
public static void main(String[] args) throws Exception {
ThreadDemo2 runnable = new ThreadDemo2();
new Thread(runnable, "MyThread1").start();
new Thread(runnable, "MyThread2").start();
}
}
三、通过Callable和Future创建线程
前两种方法创建的线程无法获取子线程的返回值,run方法也不可以抛出异常,这时可以用Callable接口解决。
Callable
并不是
Runnable
的子接口,是个全新的接口,它的示例不能通过传入Thread构造。
在Java1.5的版本开始提供了
Future
接口来代表
Callable
接口里call()方法的返回值,并为Future接口提供了一个实现类
FutureTask
。该实现类不仅实现了Future接口,还实现了Runnable接口,所以可以直接传给Thread构造函数。
模拟场景:
同时启动五个线程并传入数值1到5作为标识,如果传入的数值是奇数执行3秒,偶数执行8秒,每个线程都有一个返回结果。
现在需要输出每个线程的返回结果,如果线程执行时间超过5秒则终止该线程。
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.concurrent.*;
public class ThreadDemo3 {
static Logger logger = LogManager.getLogger(ThreadTest.class);
static class Task implements Callable {
private int num;
public Task (int num) {
this.num = num;
}
@Override
public Object call() throws Exception {
logger.info(num+"号线程启动");
if((num&1)==1){
Thread.sleep(3000);
}else{
Thread.sleep(8000);
}
return num+"号线程返回的结果";
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<Future<String>> resultList = new ArrayList<>();
for(int i=1;i<6;i++){
Callable<String> call = new Task(i);
Future<String> future = exec.submit(call);
resultList.add(future);
}
for(Future<String> future:resultList){
try {
String rt = future.get(5, TimeUnit.SECONDS);
logger.info(rt);
} catch (TimeoutException e){
future.cancel(true); //对查询数据类操作,无法停止当前线程
logger.error("线程超时中止了");
} catch (ExecutionException ex){
ex.printStackTrace();
}
}
exec.shutdown();
logger.info("主线程结束");
}
}
输出结果:
2021-07-30 08:23:29: 2号线程启动
2021-07-30 08:23:29: 1号线程启动
2021-07-30 08:23:29: 5号线程启动
2021-07-30 08:23:29: 4号线程启动
2021-07-30 08:23:29: 3号线程启动
2021-07-30 08:23:32: 1号线程返回的结果
2021-07-30 08:23:37: 2号线程返回的结果
2021-07-30 08:23:37: 3号线程返回的结果
2021-07-30 08:23:37: 4号线程返回的结果
2021-07-30 08:23:37: 5号线程返回的结果
2021-07-30 08:23:37: 主线程结束
结果说明:
根据需求启动的五个线程,2号和4号为偶数并且执行时间超过5秒,应该终止,但是结果却正常输出了。
其原因是get方法会阻塞住调用线程,在阻塞的同时,其他线程任务还在运行,1号线程阻塞了3秒,这时再去循环获取2号线程的结果时,2号和4号在后台实际已经执行了3秒,还需要5秒就可以返回结果,而设定超时的时间刚好是5秒,并没有超时,所以结果全部输出了。异步的效果在我们使用get拿结果时,会变得无效。
为了解决上述的问题,Java1.8新加入的一个实现类
CompletableFuture
,实现了
Future<T>
,
CompletionStage<T>
两个接口,针对Future完成事件,不阻塞等待,在这段时间内,线程任务能正常继续往下执行。CompletableFuture能够将回调放到与任务不同的线程中执行,也能将回调作为继续执行的同步函数,在与任务相同的线程中执行。
使用 CompletableFuture.supplyAsync 改写上述需求代码:
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.concurrent.*;
public class ThreadDemo4 {
static Logger logger = LogManager.getLogger(ThreadTest.class);
static class Task implements Callable {
//do something...
}
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
ArrayList<CompletableFuture<String>> resultList = new ArrayList<>();
for(int i=1;i<6;i++){
int finalI = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
String result =null;
Task task = new Task(finalI);
Future<String> ft = exec.submit(task);
try {
result = ft.get(5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
ft.cancel(true); //对查询数据类操作,无法停止当前线程
return finalI+"号线程超时中止了";
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
return result;
});
resultList.add(future);
}
for(CompletableFuture<String> future:resultList){
String ret = null;
try {
ret = future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
logger.info(ret);
}
exec.shutdown();
logger.info("主线程结束");
}
}
输出结果:
2021-07-30 08:44:09: 5号线程启动
2021-07-30 08:44:09: 4号线程启动
2021-07-30 08:44:09: 2号线程启动
2021-07-30 08:44:09: 1号线程启动
2021-07-30 08:44:09: 3号线程启动
2021-07-30 08:44:12: 1号线程返回的结果
2021-07-30 08:44:14: 2号线程超时中止了
2021-07-30 08:44:14: 3号线程返回的结果
2021-07-30 08:44:14: 4号线程超时中止了
2021-07-30 08:44:14: 5号线程返回的结果
2021-07-30 08:44:14: 主线程结束
结果说明:
输出达到了需求的效果,2号和4号线程超时终止,其他线程正常输出。其原因是将原先调用future.get阻塞的部分,放到CompletableFuture.supplyAsync方法中,异步执行,这样做弥补了Future模式的缺点。在异步的任务完成后,需要用其结果继续操作时,无需等待。
四、通过线程池创建线程
对比创建new Thread对象,线程池可以更好的统一管理,减少对象的创建/销毁/开销,每个工作线程都可以被重复利用。根据实际需求可有效的控制并发数,提高系统资源使用率。
Executors 工具类
Java通过Executors工具类提供工厂方法来创建不同类型的线程池:
创建方式 | 说明 |
---|---|
newCachedThreadPool | 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。 |
newFixedThreadPool | 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。 |
newSingleThreadExecutor | 创建一个定长线程池,支持定时及周期性任务执行。创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。 |
newScheduledThreadPool | 创建一个定长线程池,支持定时及周期性任务执行。 |
Executors
创建线程池的方法:
// 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量
ExecutorService pool = Executors.newCachedThreadPool();
// 创建固定大小的线程池
ExecutorService pool = Executors.newFixedThreadPool();
// 创建单个线程池。 线程池中只有一个线程
ExecutorService pool = Executors.newSingleThreadExecutor();
// 创建固定大小的线程,可以延迟或定时的执行任务
ScheduledExecutorService pool = Executors.newScheduledThreadPool();
Executor, ExecutorService 都是接口,ExecutorService继承于Executor,Executors是工具类,他提供了对ThreadPoolExecutor的封装。
ExecutorService 接口提供了返回 Future 对象,终止,关闭线程池等方法,当调用
shutdown()
时,线程池会停止接受新的任务,但会完成正在 pending 中的任务。文中的 ThreadDemo3 和 ThreadDemo4 示例也是使用的该方法创建线程。
newCachedThreadPool 源码
/**
@param corePoolSize=0:无限制线程数
@param maximumPoolSize=Integer.MAX_VALUE:最大线程数为Interger的最大值
@param keepAliveTime=60:线程空闲60s后自动结束
@param TimeUnit=TimeUnit.SECONDS:单位秒
@param workQueue=SynchronousQueue:同步队列,一进一出,避免队列里缓冲数据,入队出队必须同时传递
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
newFixedThreadPool 源码
/**
@param corePoolSize=nThreads:int类型传参,线程池大小
@param maximumPoolSize=nThreads:线程全为核心线程,与corePoolSize相等
@param keepAliveTime=0:该参数默认对核心线程无效
@param TimeUnit=TimeUnit.MILLISECONDS:
@param workQueue=LinkedBlockingQueue:无界阻塞队列,队列最大值为Integer.MAX_VALUE,如果任务提交速度大于执行速度,回造成大量阻塞,最后导致内存溢出
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor 源码
/**
@param corePoolSize=1:固定单个线程
@param maximumPoolSize=1:固定单个线程
@param keepAliveTime=0:该参数默认对核心线程无效
@param TimeUnit=TimeUnit.MILLISECONDS:
@param workQueue=LinkedBlockingQueue:无界阻塞队列,队列最大值为Integer.MAX_VALUE
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newScheduledThreadPool 源码
/**
说明:newScheduledThreadPool调用的是ScheduledThreadPoolExecutor的构造方法,而ScheduledThreadPoolExecutor继承了ThreadPoolExecutor
@param corePoolSize=int corePoolSize:线程池大小
@param maximumPoolSize=Integer.MAX_VALUE:整形的最大值
@param workQueue=DelayedWorkQueue:定制的优先级队列,基于堆数据结构的实现
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
ThreadPoolExecutor 对象
当ExecutorService 线程池中的线程均处于工作状态,并且线程数已达线程池允许的最大线程数时,就会采取指定的饱和策略来处理新提交的任务。
总共有四种策略:
-
AbortPolicy
(终止执行):默认策略。
Executor会抛出一个RejectedExecutionException运行异常到调用者线程来完成终止。
-
CallerRunsPolicy
(调用者线程来运行任务):
这种策略会由调用execute方法的线程自身来执行任务,它提供了一个简单的反馈机制并能降低新任务的提交频率。
-
(丢弃策略):不处理,直接丢弃提交的任务。DiscardPolicy
-
(丢弃队列里最近的一个任务):DiscardOldestPolicy
如果
Executor
还未
shutdown()
的话,则丢弃工作队列的最近的一个任务,然后执行当前任务。
如果使用
Executors
的工厂方法创建的线程池,那么饱和策略都是采用默认的
AbortPolicy
,所以如果我们想当线程池已满时,使用调用者的线程来运行任务,就要自己创建线程池,指定想要的饱和策略,而不是使用
Executors
。
所以我们可以根据需要创建
ThreadPoolExecutor
( ExecutorService 接口的实现类 ) 对象,自定义一些参数,而不是调用
Executors
的工厂方法创建。
/**
@param corePoolSize:线程池大小
@param maximumPoolSize:线程池中的最大线程数量,根据workQueue任务队列的类型,决定线程池会开辟的最大线程数量
@param keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁
@param unit:keepAliveTime的单位
@param workQueue:任务队列,被添加到线程池中,但尚未被执行的任务
@param threadFactory:线程工厂,用于创建线程,一般用默认即可
@param handler:拒绝策略;
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
//Example
ExecutorService exec = new ThreadPoolExecutor(coreSize, maxCoreSize, aliveTime, timeUnit, queue);