Spring @Async實作原理
前面我們已經詳細介紹了Spring架構所提供的用于實作異步方法調用的@Async注解。
該注解可以被标注在方法上,以便異步地執行該方法,也就是說請求者在發起請求之後能夠立即傳回,而方法的實際執行過程将送出給Spring的任務執行器TaskExecutor進行執行。異步程式設計模型是一個相對複雜的話題,很多開發人員隻會簡單地使用@Async注解,而不知道其實作原理,這樣就可能導緻錯誤地使用該注解。
本節将通過分析Spring的源碼來深入了解@Async注解背後的實作原理,避免誤用該注解。
基于代理的異步執行模型案例分析
Spring作為一款內建性的開發架構,代碼結構複雜、冗長。對于普通開發人員而言,想要深入了解架構所提供的某項特定功能是有一定難度的。是以,在了解@Async注解的底層實作原理時,并不推薦直接從架構的源碼進行切入,而是建議另辟蹊徑,先嘗試讨論如何設計和實作一個簡單的異步執行模型。讓我們先從如代碼清單10-8所示的示例代碼開始講起。
代碼清單10-8 DemoService接口及其實作類代碼
//定義業務接口
public interface DemoService {
public void perform() throws InterruptedException;
}
//實作業務接口
public class DemoServiceImpl implements DemoService {
@Override
public void perform() throws InterruptedException {
System.out.println(String.format("Request Thread:%s",
Thread.currentThread().getName())); }
}
//執行業務對象執行個體化以及方法調用
DemoService demoService = new DemoServiceImpl();
demoService.perform();
這段代碼非常簡單,我們隻是定義并建立了一個DemoService執行個體對象,并調用了它的perform()方法。顯然,這樣的執行過程肯定是同步的。
現在,需求來了,我們希望DemoService的perform()方法以異步的方式進行執行。請注意,這種異步執行的實作過程對開發人員而言應該是無感覺的,也就是說開發人員在使用這個方法時不需要手動去完成建立線程、送出任務到Executor以及擷取異步處理結果等過程。
1. 異步執行模型整體架構設計
在開發過程中,想要把複雜度隐藏在方法調用的背後,有一個常見且強大的工具,這就是代理機制。在第3章中,我們分析了如何通過代理機制實作AOP。我們知道代理機制的核心作用就是對方法的執行過程進行攔截,并添加各種定制化的執行邏輯。基于代理機制的這一作用,我們就可以對DemoService的perform()方法進行代理,進而為該方法的執行過程添加異步效果,具體設計思路如圖10-2所示。
圖10-2 基于代理機制的異步執行設計
圖10-2所展示的基本思想在于:當代理機制對perform()這一目标方法進行攔截時,并不是直接執行該方法中的業務邏輯,而是通過AsyncExecutor啟動一個異步任務,并在該異步任務中完成具體業務邏輯的執行。
2. 內建JDK動态代理機制
接下來,我們将基于JDK的動态代理機制來示範如何實作圖10-2中的執行效果。我們知道想要使用JDK的動态代理機制,需要實作InvocationHandler接口,為此可以定義一個DynamicProxy類來實作這個接口,如代碼清單10-9所示。
代碼清單10-9 DynamicProxy類代碼
public class DynamicProxy implements InvocationHandler, AsyncProxy {
//被代理的對象
private final Object target;
public DynamicProxy(Object target) {
this.target = target;
} @Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
//送出到Executor并傳回結果
return ThreadPoolBasedAsyncExecutor.submit(target, method,
args);
}
}
可以看到,在這裡的invoke()方法中,我們通過ThreadPoolBasedAsyncExecutor的submit()方法把傳入的對象送出到這個異步執行器中進行執行。從命名上看,我們不難明白這是一個基于線程池的異步執行器。
3. 異步執行器
ThreadPoolBasedAsyncExecutor中submit()方法實作過程如代碼清單10-10所示。
代碼清單10-10 ThreadPoolBasedAsyncExecutor類代碼
public class ThreadPoolBasedAsyncExecutor extends ThreadPoolExecutor
implements AsyncExecutor {
public static <T> AsyncResult<T> submit(Object target, Method
method, Object[] objects) {
//通過線程池送出
Future future = executorService.submit(new Runnable() {
@Override
public void run() {
try {
method.invoke(target, objects);
} catch () …
}
}
}); FutureBasedAsyncResult<T> asyncResult = new
FutureBasedAsyncResult<>();
asyncResult.setFuture(future);
return asyncResult;
}
}
可以看到,ThreadPoolBasedAsyncExecutor繼承了JDK中的ThreadPoolExecutor,并通過JDK中的ExecutorService的submit()方法啟動了異步線程,該異步線程負責對傳入的方法進行執行,并最終傳回一個Future對象。然後我們基于Future對象建構了一個代表異步執行結果的FutureBasedAsyncResult。這個FutureBasedAsyncResult的定位就像Spring@Async注解提供的AsyncResult對象一樣。
接下來就是擷取代理對象的實作過程,這個過程比較固定化,如代碼清單10-11所示。
代碼清單10-11 基于JDK代理擷取代理對象代碼
InvocationHandler handler = new DynamicProxy(target);
Object result =
Proxy.newProxyInstance(handler.getClass().getClassLoader(),
target.getClass().getInterfaces(), handler);
一旦擷取這個代理對象,我們就可以使用如代碼清單10-12所示的代碼來執行異步操作了。
代碼清單10-12 基于代理對象執行異步操作代碼
public static void main(String[] args) throws InterruptedException,
ExecutionException {
System.out.println(String.format("Main Thread:%s",Thread.currentThread().getName()));
DemoService demoService = new DemoServiceImpl();
AsyncProxy dynamicProxy = new DynamicProxy(demoService);
DemoService target = (DemoService)dynamicProxy.proxy();
target.perform();
}
運作以上代碼,我們可以在控制台中看到如代碼清單10-13所示的日志資訊。
代碼清單10-13 基于代理對象執行異步記錄檔
Main Thread: main
Request Thread: pool-1-thread-1
顯然,現在的perform()方法已經運作在獨立的線程中,進而實作了異步執行效果。
本案例的完整代碼可以在GitHub上擷取:
https://github.com/tianminzheng/spring-boot
examples/tree/main/ProxyBasedAsyncSample。
上述實作方法雖然比較簡單,但已經包含了一個異步執行模型的基本結構。事實上,Spring @Async注解也采用了同樣的設計理念。接下來,就讓我們基于已有的知識體系來深入分析Spring Async的實作原理。
Spring @Async注解原理分析
一方面,@Async注解的運作過程依賴于Spring中對Bean生命周期的處理;
另一方面,我們也需要充分利用基于代理的攔截器機制來實作異步操作。是以,本節接下來的内容就分别從這兩個次元出發讨論Spring @Async注解的運作原理。但在此之前,我們還是有必要對@EnableAsync注解展開一些講解,因為它是整個異步執行機制的入口。
1. @EnableAsync注解與配置入口
要分析@Async注解,我們首先需要讨論@EnableAsync注解,該注解的定義如代碼清單10-14所示。
代碼清單10-14 @EnableAsync注解定義代碼
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {
//定義啟動異步執行的類型:方法級别還是類級别
Class<? extends Annotation> annotation() default Annotation.class;
//是否啟用CGLIB代理,預設為false,即使用JDK代理
boolean proxyTargetClass() default false;
//切面通知模式,預設使用代理模式
AdviceMode mode() default AdviceMode.PROXY;
//BeanPostProcessor執行的順序
int order() default Ordered.LOWEST_PRECEDENCE;
}
在這個注解中,我們看到了代理相關的配置項,可以初步推斷出Spring中異步執行模式和代理機制之間應該存在密切的關聯關系。至于這種關聯關系是如何建立的,我們繼續往下看。針對類似@EnableAsync這樣的注解,要想了解它的實作原理,我們一般都是從它的@Import(AsyncConfigurationSelector.class)語句進行切入,進而導入AsyncConfigurationSelector配置類。AsyncConfigurationSelector類如代碼清單10-15所示。
代碼清單10-15 AsyncConfigurationSelector類代碼
public class AsyncConfigurationSelector extends
AdviceModeImportSelector<EnableAsync> {
private static final String
ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";
@Override
@Nullable
public String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {
ProxyAsyncConfiguration.class.getName() };
case ASPECTJ:
return new String[] {
ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME };
default:
return null;
}
}
}
這裡指定了兩種不同的代理機制,我們關注預設的基于代理的配置類ProxyAsync-Configuration。這個配置類的名稱翻譯過來就是代理異步配置,從命名中也可以猜測該類所應該具備的功能。ProxyAsyncConfiguration配置類的具體實作如代碼清單10-16所示。
代碼清單10-16 ProxyAsyncConfiguration配置類代碼
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends
AbstractAsyncConfiguration {
@Bean(name =
TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
//建立一個後處理元件AsyncAnnotationBeanPostProcessor
AsyncAnnotationBeanPostProcessor bpp = new
AsyncAnnotationBeanPostProcessor();
…
//設定執行器
if (this.executor != null) {
bpp.setExecutor(this.executor);
}
//設定異常處理器
if (this.exceptionHandler != null) {
bpp.setExceptionHandler(this.exceptionHandler);
}
//設定代理目标類
bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"
));
//設定AsyncAnnotationBeanPostProcessor的順序
bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
return bpp;
}
}
可以看到對于ProxyAsyncConfiguration而言,該代碼主要建構了一個AsyncAnnotation-BeanPostProcessor并填充了@EnableAsync注解中的相關配置項。
2. AsyncAnnotationBeanPostProcessor
在建構AsyncAnnotationBeanPostProcessor過程中,我們從ProxyAsyncConfiguration的父類AbstractAsyncConfiguration中擷取了執行器executor、異常處理器exceptionHandler等用于處理異步請求的基礎元件。AsyncAnnotationBeanPostProcessor類的類層關系如圖10-3所示。
圖10-3 AsyncAnnotationBeanPostProcessor的類層關系
圖10-3中的類層結構比較複雜,我們需要結合Spring中的Bean後置處理器BeanPost-Processor來進行了解。在Spring中,BeanPostProcessor的作用是在完成Bean執行個體化和依賴注入之後,在顯式調用初始化方法的前後添加我們自己的邏輯,可以認為這裡是添加代理機制的絕佳位置。在類的繼承關系上,AsyncAnnotationBeanPostProcessor間接實作了BeanFactoryAware接口來注入BeanFactory,是以存在如代碼清單10-17所示的setBean-Factory()方法。
代碼清單10-17 setBeanFactory()方法代碼
@Override
public void setBeanFactory(BeanFactory beanFactory) {
super.setBeanFactory(beanFactory);
//建立異步注解Advisor AsyncAnnotationAdvisor advisor = new
AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);
if (this.asyncAnnotationType != null) {
advisor.setAsyncAnnotationType(this.asyncAnnotationType);
}
advisor.setBeanFactory(beanFactory);
this.advisor = advisor;
}
我們重點關注這裡的AsyncAnnotationAdvisor,其構造函數如代碼清單10-18所示。
代碼清單10-18 AsyncAnnotationAdvisor構造函數代碼
public AsyncAnnotationAdvisor(@Nullable Executor executor, @Nullable
AsyncUncaughtExceptionHandler exceptionHandler) {
//處理@Async注解
asyncAnnotationTypes.add(Async.class);
...
//建構Advice
this.advice = buildAdvice(executor, this.exceptionHandler);
//建構Pointcut
this.pointcut = buildPointcut(asyncAnnotationTypes);
}
終于,我們看到了熟悉的@Async注解。AsyncAnnotationAdvisor使用executor和exception-Handler來建立Advisor,并根據@Async注解标注的位置建構Pointcut。@Async注解的定義如代碼清單10-19所示。
代碼清單10-19 @Async注解定義代碼
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)@Documented
public @interface Async {
String value() default "";
}
@Async注解可以被标注在類或方法上,用于實作方法的異步執行。當被标注在類上時,表明類中的所有方法都由指定的異步執行器進行執行。這裡的異步執行器可以是Java并發包中的Executor元件,也可以是Spring架構自帶的TaskExecutor。我們會在後面具體讨論Spring TaskExecutor元件。
接下來,我們看看Advice是如何被建構的,buildAdvice()方法的定義如代碼清單10-20所示。
代碼清單10-20 buildAdvice()方法代碼
protected Advice buildAdvice(@Nullable Executor executor,
AsyncUncaughtExceptionHandler exceptionHandler) {
return new AnnotationAsyncExecutionInterceptor(executor,
exceptionHandler);
}
這裡我們就看到了用于嵌入代理邏輯的攔截器元件AnnotationAsyncExecutionInterceptor。
3. AnnotationAsyncExecutionInterceptor
對這個攔截器元件,我們檢視AnnotationAsyncExecutionInterceptor源碼,可以發現它的類層關系如圖10-4所示。
可以看到AnnotationAsyncExecutionInterceptor間接實作了MethodInterceptor接口。而通過第3章中對AOP的相關學習,我們知道MethodInterceptor是AOP中Pointcut的處理器,最終被調用的是invoke()方法。如代碼清單10-21所示的就是AnnotationAsyncExecuti-onInterceptor中invoke()方法的源碼(對部分代碼做了裁剪)。
圖10-4 AnnotationAsyncExecutionInterceptor類層結構
代碼清單10-21 AnnotationAsyncExecutionInterceptor中invoke()方法代碼
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws
Throwable {
//查找指定的Executor
AsyncTaskExecutor executor =
determineAsyncExecutor(userDeclaredMethod);
Callable<Object> task = () -> {
try {
//執行具體的方法調用 Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {//處理異常
handleError(ex.getCause(), userDeclaredMethod,
invocation.getArguments());
}
catch (Throwable ex) {//處理異常
handleError(ex, userDeclaredMethod,
invocation.getArguments());
}
return null;
};
//送出給線程池處理
return doSubmit(task, executor,
invocation.getMethod().getReturnType());
}
大家看到這段代碼,是不是覺得有點眼熟?我們在上面介紹異步執行模型的設計方法時,所給出的代碼結構與上述代碼非常相似,都是針對方法調用建構一個Task,然後再把這個Task送出到Executor中進行執行。這裡的doSubmit()方法如代碼清單10-22所示。
代碼清單10-22 doSubmit()方法代碼
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor
executor, Class<?> returnType) {
//傳回值為CompletableFuture的場景
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) { throw new CompletionException(ex);
}
}, executor);
}
//傳回值為ListenableFuture的場景
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor)
executor).submitListenable(task);
}
//傳回值為Future的場景
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
//預設是沒有傳回值的場景
else {
executor.submit(task);
return null;
}
}
可以看到針對方法調用的傳回類型,分别建構了三種支援異步的傳回值,即CompletableFuture、ListenableFuture以及普通Future。同時,這裡也支援沒有傳回值的應用場景。
至此,整個@Async注解的解析、執行以及傳回的流程都介紹完畢了,我們可以基于圖10-5對這一過程進行總結。你也可以把這張圖和上面介紹的異步執行模型的設計方法結合在一起進行對比學習。
圖10-5 Spring @Async注解執行流程
本文給大家講解的内容是并發程式設計:解析SpringAsync并發程式設計,Spring@Async實作原理
- 下文給大家講解的是并發程式設計:解析SpringAsync并發程式設計,Spring@Async異步處理