先说一下异步方法的使用场景。根本目的肯定还是为了节省时间。
1、子流程不影响主流程。例如记录日志,发送短信验证码等,主流程不关心这些子流程的结果,或者说默认这些子流程的结果一定正常——如果不正常则启动自己的补偿机制。
2、需要子流程返回结果,但各个子流程相互不依赖。例如已知用户ID,用户的经纬度,通过三个HTTP请求,分别获取用户详细信息、用户订单列表、经纬度所在城市。完全没必要串行。
一、简单开启异步
关键注解:@EnableAsync、@Async
开启CGLIB代理:
spring.aop.proxy-target-class=true
@EnableAsync默认是关闭的,使用JDK的代理,与CGLIB代理的区别我想大家应该都懂,不多说了。
@[email protected] class SpringBootDemoApplication { public static void main(String[] args) { SpringApplication.run(SpringBootDemoApplication.class, args); }}
@Servicepublic class TaskService implements ITaskService { //接口省略 @Async @Override public void testAsync1() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("异步任务执行完毕"); }}
Controller调用:
@RequestMapping("/async")public DefaultResult async() throws InterruptedException, ExecutionException { taskService.testAsync1(); UserResult ur = new UserResult(); ur.setId(111); ur.setUserName("测试异步"); DefaultResult result = new DefaultResult(); result.setData(ur); System.out.println("Controller执行完毕"); return result;}
结果就不放了,页面会很快展示JSON串,后台日志“Controller执行完毕”一定在“异步任务执行完毕”后面。
二、返回结果的异步
@[email protected] Future asyncGetResult1() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("返回结果A"); return new AsyncResult("结果A");}@[email protected] Future asyncGetResult2() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("返回结果B"); return new AsyncResult("结果B"); }
Controller调用:
@RequestMapping("/async")public DefaultResult async() throws InterruptedException, ExecutionException { taskService.testAsync1(); Future r1=taskService.asyncGetResult1(); Future r2=taskService.asyncGetResult2(); UserResult ur = new UserResult(); ur.setId(111); ur.setUserName(r1.get()+":"+r2.get()); DefaultResult result = new DefaultResult(); result.setData(ur); System.out.println("Controller执行完毕"); return result;}
结果如图:
可以看到日志打印的是乱序的,如果testAsync1()设置的休眠时间长一点,肯定会在“Controller执行完毕”后打印日志。
三、自定义线程池
大家可以看@EnableAsync源代码的注释,Spring 会去找实现了TaskExecutor的线程池定义Bean、或者名为“taskExecutor”的Executor类。如果都没有,则使用SimpleAsyncTaskExecutor——这个线程池,叫它线程池都对不起线程池的“池”字。每过来一个请求,就创建一个线程,请求完毕就销毁,我们用这玩意儿干嘛!
所以还是要自己配置一下。
配置文件:
#短信线程池配置sms.threadpool.corePoolSize=5sms.threadpool.maxPoolSize=10sms.threadpool.keepAliveSeconds=60sms.threadpool.queueCapacity=15
配置类:
public class GlobalAsyncCfg { /** * 核心线程数 */ private Integer corePoolSize = 5; /** * 最大线程数 */ private Integer maxPoolSize = 10; /** * 空闲线程存活时间 */ private Integer keepAliveSeconds = 60; /** * 等待队列长度 */ private Integer queueCapacity = 15; //省略Get、Set}
短信线程池配置类:
@[email protected](prefix="sms.threadpool")public class SmsThreadPoolCfg extends GlobalAsyncCfg {}
Executor配置类:
@Configurationpublic class GlobalExecutorCfg { private SmsThreadPoolCfg smsThreadPoolCfg; public GlobalExecutorCfg(SmsThreadPoolCfg smsThreadPoolCfg) { super(); this.smsThreadPoolCfg = smsThreadPoolCfg; } @Bean(name = "smsExecutor") public Executor getSmsExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(smsThreadPoolCfg.getCorePoolSize()); executor.setMaxPoolSize(smsThreadPoolCfg.getMaxPoolSize()); executor.setKeepAliveSeconds(smsThreadPoolCfg.getKeepAliveSeconds()); executor.setQueueCapacity(smsThreadPoolCfg.getQueueCapacity()); executor.setThreadNamePrefix("SmsExecutor-"); return executor; }}
接着修改我们的异步方法:
@Async("smsExecutor")@Overridepublic void testAsync1() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("异步任务执行完毕");}
其他两个方法代码省略。
执行结果日志:
大家注意输出的“SmsExecutor-1”等字样。
四、配置多个线程池
第三节我的代码都带上了“sms”的字样,为的就是本节内容。
假设我们有一个请求,需要异步调用以下接口:
1、通过HTTP请求,获取用户信息。
2、通过Kafka发送消息给消息队列,供别的模块处理相关流程。
3、向DB插入用户访问日志。
4、发送短信验证码。
如果这些异步调用共用一个线程池的话,如果其中一个方法比较耗时,那就很快占满线程池,导致其他执行很快的方法无法执行。
所以我们建议不同的业务,使用不同的线程池。
配置文件:
#短信线程池配置sms.threadpool.corePoolSize=5sms.threadpool.maxPoolSize=10sms.threadpool.keepAliveSeconds=60sms.threadpool.queueCapacity=15#日志线程池配置log.threadpool.corePoolSize=5log.threadpool.maxPoolSize=10log.threadpool.keepAliveSeconds=60log.threadpool.queueCapacity=15
增加日志线程池配置类:
@[email protected](prefix = "log.threadpool")public class LogThreadPoolCfg extends GlobalAsyncCfg {}
修改Executor配置类:
@Configurationpublic class GlobalExecutorCfg { private SmsThreadPoolCfg smsThreadPoolCfg; private LogThreadPoolCfg logThreadPoolCfg; public GlobalExecutorCfg(SmsThreadPoolCfg smsThreadPoolCfg, LogThreadPoolCfg logThreadPoolCfg) { super(); this.smsThreadPoolCfg = smsThreadPoolCfg; this.logThreadPoolCfg = logThreadPoolCfg; } @Bean(name = "smsExecutor") public Executor getSmsExecutor() { return this.buildExecutor(smsThreadPoolCfg, "SmsExecutor-"); } @Bean(name = "logExecutor") public Executor getLogExecutor() { return this.buildExecutor(logThreadPoolCfg, "LogExecutor-"); } private Executor buildExecutor(GlobalAsyncCfg cfg, String prefix) { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(cfg.getCorePoolSize()); executor.setMaxPoolSize(cfg.getMaxPoolSize()); executor.setKeepAliveSeconds(cfg.getKeepAliveSeconds()); executor.setQueueCapacity(cfg.getQueueCapacity()); executor.setThreadNamePrefix(prefix); return executor; }}
修改异步方法:
@Async("logExecutor")@Overridepublic void testAsync1() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } logger.info("异步任务执行完毕");}
结果:
注意日志里面的logExecutor和SmsExecutor,这样不同业务使用不同线程池,一旦出现问题既方便排查,也做了业务隔离。