先說一下異步方法的使用場景。根本目的肯定還是為了節省時間。
1、子流程不影響主流程。例如記錄日志,發送短信驗證碼等,主流程不關心這些子流程的結果,或者說預設這些子流程的結果一定正常——如果不正常則啟動自己的補償機制。
2、需要子流程傳回結果,但各個子流程互相不依賴。例如已知使用者ID,使用者的經緯度,通過三個HTTP請求,分别擷取使用者詳細資訊、使用者訂單清單、經緯度所在城市。完全沒必要串行。
一、簡單開啟異步
關鍵注解:@EnableAsync、@Async
開啟CGLIB代理:
spring.aop.proxy-target-class=true
@EnableAsync預設是關閉的,使用JDK的代理,與CGLIB代理的差別我想大家應該都懂,不多說了。
@[email protected] class SpringBootDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringBootDemoApplication.class, args); }}
@Servicepublic class TaskService implements ITaskService { //接口省略 @Async @Override public void testAsync1() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("異步任務執行完畢"); }}
Controller調用:
@RequestMapping("/async")public DefaultResult async() throws InterruptedException, ExecutionException { taskService.testAsync1(); UserResult ur = new UserResult(); ur.setId(111); ur.setUserName("測試異步"); DefaultResult result = new DefaultResult(); result.setData(ur); System.out.println("Controller執行完畢"); return result;}
結果就不放了,頁面會很快展示JSON串,背景日志“Controller執行完畢”一定在“異步任務執行完畢”後面。
二、傳回結果的異步
@[email protected] Future asyncGetResult1() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("傳回結果A"); return new AsyncResult("結果A");}@[email protected] Future asyncGetResult2() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("傳回結果B"); return new AsyncResult("結果B"); }
Controller調用:
@RequestMapping("/async")public DefaultResult async() throws InterruptedException, ExecutionException { taskService.testAsync1(); Future r1=taskService.asyncGetResult1(); Future r2=taskService.asyncGetResult2(); UserResult ur = new UserResult(); ur.setId(111); ur.setUserName(r1.get()+":"+r2.get()); DefaultResult result = new DefaultResult(); result.setData(ur); System.out.println("Controller執行完畢"); return result;}
結果如圖:
可以看到日志列印的是亂序的,如果testAsync1()設定的休眠時間長一點,肯定會在“Controller執行完畢”後列印日志。
三、自定義線程池
大家可以看@EnableAsync源代碼的注釋,Spring 會去找實作了TaskExecutor的線程池定義Bean、或者名為“taskExecutor”的Executor類。如果都沒有,則使用SimpleAsyncTaskExecutor——這個線程池,叫它線程池都對不起線程池的“池”字。每過來一個請求,就建立一個線程,請求完畢就銷毀,我們用這玩意兒幹嘛!
是以還是要自己配置一下。
配置檔案:
#短信線程池配置sms.threadpool.corePoolSize=5sms.threadpool.maxPoolSize=10sms.threadpool.keepAliveSeconds=60sms.threadpool.queueCapacity=15
配置類:
public class GlobalAsyncCfg { /** * 核心線程數 */ private Integer corePoolSize = 5; /** * 最大線程數 */ private Integer maxPoolSize = 10; /** * 空閑線程存活時間 */ private Integer keepAliveSeconds = 60; /** * 等待隊列長度 */ private Integer queueCapacity = 15; //省略Get、Set}
短信線程池配置類:
@[email protected](prefix="sms.threadpool")public class SmsThreadPoolCfg extends GlobalAsyncCfg {}
Executor配置類:
@Configurationpublic class GlobalExecutorCfg { private SmsThreadPoolCfg smsThreadPoolCfg; public GlobalExecutorCfg(SmsThreadPoolCfg smsThreadPoolCfg) { super(); this.smsThreadPoolCfg = smsThreadPoolCfg; } @Bean(name = "smsExecutor") public Executor getSmsExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(smsThreadPoolCfg.getCorePoolSize()); executor.setMaxPoolSize(smsThreadPoolCfg.getMaxPoolSize()); executor.setKeepAliveSeconds(smsThreadPoolCfg.getKeepAliveSeconds()); executor.setQueueCapacity(smsThreadPoolCfg.getQueueCapacity()); executor.setThreadNamePrefix("SmsExecutor-"); return executor; }}
接着修改我們的異步方法:
@Async("smsExecutor")@Overridepublic void testAsync1() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("異步任務執行完畢");}
其他兩個方法代碼省略。
執行結果日志:
大家注意輸出的“SmsExecutor-1”等字樣。
四、配置多個線程池
第三節我的代碼都帶上了“sms”的字樣,為的就是本節内容。
假設我們有一個請求,需要異步調用以下接口:
1、通過HTTP請求,擷取使用者資訊。
2、通過Kafka發送消息給消息隊列,供别的子產品處理相關流程。
3、向DB插入使用者通路日志。
4、發送短信驗證碼。
如果這些異步調用共用一個線程池的話,如果其中一個方法比較耗時,那就很快占滿線程池,導緻其他執行很快的方法無法執行。
是以我們建議不同的業務,使用不同的線程池。
配置檔案:
#短信線程池配置sms.threadpool.corePoolSize=5sms.threadpool.maxPoolSize=10sms.threadpool.keepAliveSeconds=60sms.threadpool.queueCapacity=15#日志線程池配置log.threadpool.corePoolSize=5log.threadpool.maxPoolSize=10log.threadpool.keepAliveSeconds=60log.threadpool.queueCapacity=15
增加日志線程池配置類:
@[email protected](prefix = "log.threadpool")public class LogThreadPoolCfg extends GlobalAsyncCfg {}
修改Executor配置類:
@Configurationpublic class GlobalExecutorCfg { private SmsThreadPoolCfg smsThreadPoolCfg; private LogThreadPoolCfg logThreadPoolCfg; public GlobalExecutorCfg(SmsThreadPoolCfg smsThreadPoolCfg, LogThreadPoolCfg logThreadPoolCfg) { super(); this.smsThreadPoolCfg = smsThreadPoolCfg; this.logThreadPoolCfg = logThreadPoolCfg; } @Bean(name = "smsExecutor") public Executor getSmsExecutor() { return this.buildExecutor(smsThreadPoolCfg, "SmsExecutor-"); } @Bean(name = "logExecutor") public Executor getLogExecutor() { return this.buildExecutor(logThreadPoolCfg, "LogExecutor-"); } private Executor buildExecutor(GlobalAsyncCfg cfg, String prefix) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(cfg.getCorePoolSize()); executor.setMaxPoolSize(cfg.getMaxPoolSize()); executor.setKeepAliveSeconds(cfg.getKeepAliveSeconds()); executor.setQueueCapacity(cfg.getQueueCapacity()); executor.setThreadNamePrefix(prefix); return executor; }}
修改異步方法:
@Async("logExecutor")@Overridepublic void testAsync1() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("異步任務執行完畢");}
結果:
注意日志裡面的logExecutor和SmsExecutor,這樣不同業務使用不同線程池,一旦出現問題既友善排查,也做了業務隔離。