前言
通常情況下,SpringMVC接收到請求後會将請求具體分發給單個線程進行處理。如果請求進行中涉及到比較耗時的操作,為了能更快地将響應傳回給使用者,那麼就需要将耗時的業務操作交由别的線程進行異步處理,而SpringBoot已經為我們提供了這樣的實作。
@Async注解
建立一個AsyncController,給需要異步執行的方法加上@Async注解,代碼如下:
kotlin複制代碼package geek.springboot.application.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RequestMapping("/async/")
@RestController
public class AsyncController {
@GetMapping
public void processGet() {
log.info("start process get");
this.doSomeThing();
}
// 給需要異步執行的方法加上@Async
@Async
public void doSomeThing() {
log.info("do some thing");
}
}
需要開啟SpringBoot異步任務執行功能,還需要加上@EnableAsync注解,在SpringApplication啟動類加上,代碼如下:
typescript複制代碼package geek.springboot.application;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@EnableAsync // 開啟異步任務執行功能
@Slf4j
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@Async無效的原因
啟動後Get /async/ ,控制台輸出如下:
可以看到執行doSomeThing()的線程和處理Get請求的線程是同一個,都是nio-8080-exec-1,那這裡之是以@Async注解無效,這是因為Spring是給掃描到的每個Bean都建立代理對象(@Component、@Service、@Controller...),而隻有這些代理對象才做了方法增強,調用代理對象的方法才能實作我們期望的行為。比如@Async、@Transactional...
是以@Async無效的問題在于AsyncController中,this.doSomeThing();調用的不是代理對象的doSomeThing(),而是目前對象this的doSomeThing().
明白了問題的根源,那麼建立一個AsyncService,代碼如下:
java複制代碼package geek.springboot.application.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class AsyncService {
// 給需要異步執行的方法加上@Async
@Async
public void doSomeThing() {
// 為了示範線程池的效果,這裡模拟耗時操作
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("do some thing");
}
}
将AsyncController稍作調整
kotlin複制代碼package geek.springboot.application.controller;
import geek.springboot.application.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RequestMapping("/async/")
@RestController
public class AsyncController {
@Autowired
private AsyncService asyncService;
@GetMapping
public void processGet() {
log.info("start process get");
// 調用代理對象上@Async修飾的方法
this.asyncService.doSomeThing();
}
}
Get /asysnc/ ,輸出如下:
可以看到doSomeThing()和processGet()不是同一個線程,異步處理成功。
@Async無效的其他原因
- @Async修飾的方法不能是private的,最好是public,可以保證别的Bean通路該方法的權限,也保證Spring方法增強成功。
- @Async修飾的方法不能是static的。
- @Async修飾的方法傳回值隻能是void或者Future 。
- 沒有加上@EnableAsync注解,或者添加@Async的類沒有被Spring掃描到。
- 調用對象中@Async修飾的方法時,該對象一定要是被Spring托管的,如果是自行new出來的,因為不受Spring托管,并沒有做方法增強。
自定義異步線程池
@Async本質,其實就是SpringBoot預設給我們封裝好了一個線程池,所有Spring代理對象的@Async修飾方法,都會被扔到線程池中執行,進而不影響Http請求線程處理邏輯。而自定義的方式有如下幾種:
application.yml
最簡單的,通過application.yml自定義線程池
yaml複制代碼spring:
task:
execution:
# 核心線程數1,最大線程數2,任務隊列容量為0,除核心線程外的線程空閑時存活時間60秒
pool:
core-size: 1
max-size: 2
queue-capacity: 0
keep-alive: "60s"
短時間内多次Get /async/ ,輸出如下:
說明線程池配置生效,每一次doSomeThing()執行,都是由線程池中的線程輪流處理。
還可能出現以下錯誤,原因是當時線程池全部線程都在執行doSomeThing(),同時因為任務隊列容量為0,是以再有新的doSomeThing()任務,直接被拒絕
less複制代碼2023-07-28 05:24:14.496 ERROR 39436 --- [nio-8080-exec-6] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@1a67c1cf[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 2]] did not accept task: org.springframework.aop.interceptor.AsyncExecutionInterceptor$Lambda$665/842994983@196a33ce] with root cause
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@17adcf4b rejected from java.util.concurrent.ThreadPoolExecutor@1a67c1cf[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 2]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063) ~[na:1.8.0_241]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830) ~[na:1.8.0_241]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379) ~[na:1.8.0_241]
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134) ~[na:1.8.0_241]
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.submit(ThreadPoolTaskExecutor.java:388) ~[spring-context-5.3.29.jar:5.3.29]
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:292) ~[spring-aop-5.3.29.jar:5.3.29]
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129) ~[spring-aop-5.3.29.jar:5.3.29]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.3.29.jar:5.3.29]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-5.3.29.jar:5.3.29]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708) ~[spring-aop-5.3.29.jar:5.3.29]
at geek.springboot.application.service.AsyncService$EnhancerBySpringCGLIB$ffc452ec.doSomeThing(<generated>) ~[classes/:na]
at geek.springboot.application.controller.AsyncController.processGet(AsyncController.java:22) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_241]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_241]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_241]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_241]
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205) ~[spring-web-5.3.29.jar:5.3.29]
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:150) ~[spring-web-5.3.29.jar:5.3.29]
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:117) ~[spring-webmvc-5.3.29.jar:5.3.29]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:895) ~[spring-webmvc-5.3.29.jar:5.3.29]
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808) ~[spring-webmvc-5.3.29.jar:5.3.29]
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.3.29.jar:5.3.29]
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1072) ~[spring-webmvc-5.3.29.jar:5.3.29]
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:965) ~[spring-webmvc-5.3.29.jar:5.3.29]
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006) ~[spring-webmvc-5.3.29.jar:5.3.29]
at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:898) ~[spring-webmvc-5.3.29.jar:5.3.29]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:529) ~[tomcat-embed-core-9.0.78.jar:4.0.FR]
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883) ~[spring-webmvc-5.3.29.jar:5.3.29]
at javax.servlet.http.HttpServlet.service(HttpServlet.java:623) ~[tomcat-embed-core-9.0.78.jar:4.0.FR]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:209) ~[tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:153) ~[tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51) ~[tomcat-embed-websocket-9.0.78.jar:9.0.78]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:178) ~[tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:153) ~[tomcat-embed-core-9.0.78.jar:9.0.78]
at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:100) ~[spring-web-5.3.29.jar:5.3.29]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.29.jar:5.3.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:178) ~[tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:153) ~[tomcat-embed-core-9.0.78.jar:9.0.78]
at org.springframework.web.filter.FormContentFilter.doFilterInternal(FormContentFilter.java:93) ~[spring-web-5.3.29.jar:5.3.29]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.29.jar:5.3.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:178) ~[tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:153) ~[tomcat-embed-core-9.0.78.jar:9.0.78]
at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:201) ~[spring-web-5.3.29.jar:5.3.29]
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:117) ~[spring-web-5.3.29.jar:5.3.29]
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:178) ~[tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:153) ~[tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:167) ~[tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:90) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:481) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:130) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:93) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:74) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:390) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:63) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:926) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1791) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.tomcat.util.threads.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1191) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.tomcat.util.threads.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:659) [tomcat-embed-core-9.0.78.jar:9.0.78]
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-9.0.78.jar:9.0.78]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_241]
ThreadPoolTaskExecutor
自定義ThreadPoolTaskExecutor,并作為Spring中的Bean存在,代碼如下:
java複制代碼package geek.springboot.application.configuration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 任務執行Config
*
* @author Bruse
*/
@Slf4j
@Configuration
public class TaskExecutorConfig {
private final AtomicInteger threadSeq = new AtomicInteger(0);
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心線程數
executor.setCorePoolSize(2);
// 最大線程數
executor.setMaxPoolSize(2);
// 空閑線程存活時間,機關:秒
executor.setKeepAliveSeconds(60);
// 任務隊列容量
executor.setQueueCapacity(0);
// 線程名稱字首,如果設定了線程池工廠自定義線程,那麼設定該參數無效
executor.setThreadNamePrefix("async task_");
// 自定義線程工廠,可以在這裡自定義線程池中線程的一些參數
executor.setThreadFactory(r -> {
Thread thread = new Thread(r);
thread.setName("async task_" + threadSeq.getAndIncrement());
thread.setUncaughtExceptionHandler((t, e) -> log.error("線程抛出未捕獲異常 {} {}", t.getName(), e.getMessage()));
return thread;
});
// 任務拒絕政策,實作RejectedExecutionHandler接口,自定義處理邏輯
executor.setRejectedExecutionHandler((r, executor1) -> log.error("任務被拒絕"));
// 或者使用ThreadPoolExecutor現有的拒絕政策
// executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
return executor;
}
}
再次短時間内多次請求,輸出如下:
可以看到設定到拒絕政策,線程名稱等符合預期。
TaskExecutorBuilder
除了上面執行個體化ThreadPoolTaskExecutor的方式,也可以通過TaskExecutorBuilder自定義線程池,使用TaskExecutorBuilder最終也是建構出ThreadPoolTaskExecutor,代碼如下:
scss複制代碼@Bean
public ThreadPoolTaskExecutor taskExecutor() {
TaskExecutorBuilder builder = new TaskExecutorBuilder()
.corePoolSize(2)
.maxPoolSize(2)
.threadNamePrefix("async thread_")
.queueCapacity(0)
.keepAlive(Duration.ofSeconds(60))
.taskDecorator(runnable -> {
// 裝飾者模式,這裡可以線上程執行任務前,做一些額外操作
log.info("任務将要執行");
return runnable;
});
return builder.build();
}
輸出如下:
或者直接定義一個TaskExecutorBuilder Bean,那麼SpringBoot建立的ThreadPoolTaskExecutor将會使用該builder進行建構,代碼如下:
scss複制代碼@Bean
public TaskExecutorBuilder executorBuilder() {
return new TaskExecutorBuilder()
.corePoolSize(2)
.maxPoolSize(2)
.threadNamePrefix("async thread_")
.queueCapacity(0)
.keepAlive(Duration.ofSeconds(60))
.taskDecorator(runnable -> {
// 裝飾者模式,這裡可以線上程執行任務前,做一些額外操作
log.info("任務将要執行");
return runnable;
});
}
定時任務
@Scheduled
SpringBoot提供了簡單的定時任務實作,隻需要使用@EnableScheduling和@Scheduled這兩個注解,建立一個SyncTask,代碼如下:
kotlin複制代碼package geek.springboot.application.task;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
/**
* 定時同步
*
* @author Bruse
*/
@Slf4j
@EnableScheduling // 開啟定時任務
@Component
public class SyncTask {
// 在要執行的方法上加上@Scheduled注解,這裡表示每5秒執行一次sync()
@Scheduled(cron = "0/5 * * * * ?")
public void sync() {
log.info("do sync task ...");
}
}
啟動後控制台輸出如下:
application.yml
定時任務線程池也可以通過application.yml自定義,示範代碼如下:
yaml複制代碼spring:
task:
scheduling:
thread-name-prefix: "scheduling-"
pool:
size: 2
再次重新開機,控制台輸出如下,可以看到定時任務線程池的線程名稱字首已生效:
ThreadPoolTaskScheduler
SpringBoot定時任務執行底層實作依賴于ThreadPoolTaskScheduler,我們也可以自行建立ThreadPoolTaskScheduler并注冊到Spring中來進行自定義,代碼如下:
scss複制代碼@Bean
public ThreadPoolTaskScheduler taskScheduler() {
return new TaskSchedulerBuilder()
// 線程名稱字首
.threadNamePrefix("scheduling-")
// 線程數
.poolSize(1)
.build();
}
TaskSchedulerBuilder
也可以自行建立TaskSchedulerBuilder,并注冊到Spring中,那麼SpringBoot在建立定時任務線程池時,則會按照該builder來進行線程池的建立,代碼如下:
typescript複制代碼@Bean
public TaskSchedulerBuilder schedulerBuilder() {
return new TaskSchedulerBuilder()
// 線程名稱字首
.threadNamePrefix("schedule-")
// 線程數
.poolSize(1);
}
注意
ThreadPoolTaskScheduler預設線程數隻有1,是以如果執行的任務過于耗時,那麼則不能執行任務時都非常準時,SyncTask稍作如下調整:
typescript複制代碼@Scheduled(cron = "0/5 * * * * ?")
public void sync() {
// 模拟耗時操作
try {
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("do sync task ...");
}
在不修改線程數的情況下,期望sync()每5秒執行一次,實際sync()執行耗時6秒,那麼輸出如下:
可以看到sync()裡每次log列印的時間間隔都大于5秒。
那麼是不是調整線程數就可以了呢?答案是否定的。就算把線程數調整為2,重新開機後輸出可以看到,sync()每次列印的時間間隔還是大于5秒。
因為sync()這個需要定時執行的任務,已經和ThreadPoolTaskScheduler中的某個線程綁定了,sync()将由這個線程一直負責處理。
調整線程數的意義在于如果有多個定時任務需要處理的時候,需要将不同的定時任務交由更多的線程進行處理。
比如線上程數預設為1的情況下,建立另一個需要定時調用的方法doSomeThing():
typescript複制代碼@Scheduled(cron = "0/5 * * * * ?")
public void doSomeThing() {
try {
TimeUnit.SECONDS.sleep(6);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("do some thing ...");
}
啟動後輸出如下,可以看到系統中已有的定時任務都交由單個線程處理:
那這個時候我們把線程數調整為2,重新開機後輸出如下:
這次2個定時任務分别交由2個線程進行處理,互不幹擾了。
作者:LBruse
連結:https://juejin.cn/post/7260690057660006461