以前寫過一篇基于Springboot使用retry架構進行實作重試業務場景的文章
前言:
重試這個需求場景在我們做項目的時候非常常見,實作這個場景的方法也是非常多,
定期輪詢
ScheduledExecutorService 周期性線程池
消息隊列
redis有序集合
Quartz,job等定時任務架構
Timer
delayQueue
等等,我們該篇介紹的是 異步Async+延遲隊列delayQueue 。
進入正題:
一個簡單的重試需求場景
我們服務端是個中間平台,
使用者調用我們服務端下單成功,我們需要通知第三方平台發貨。
但是這個通知發貨有可能通知失敗,我們允許最大失敗次數是N次;
也就是說除了第一次通知發出後,我們需要進行額外的N次發貨通知;
而且後面額外進行的N次發貨通知是有延遲時間的, 每個之間的間隔都是動态設定的;
期間隻要有一次通知成功了,那麼我們就不再重新發送通知;
如果通知沒發成功,就會根據我們設定的N次以及延遲時間,繼續發送通知。
先建立一個異步線程池的配置類(如果你還不了解springboot使用異步線程的,可以先去看看我這篇文章,AsyncThreadConfig.class:
ps: 這裡用的是spring提供的線程池
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@ComponentScan("com.jc.mytest.async.service")
@EnableAsync
public class AsyncThreadConfig {
/**
* 執行需要依賴線程池,這裡就來配置一個線程池
* @return
*/
// 當池子大小小于corePoolSize,就建立線程,并處理請求
// 當池子大小等于corePoolSize,把請求放入workQueue(QueueCapacity)中,池子裡的空閑線程就去workQueue中取任務并處理
// 當workQueue放不下任務時,就建立線程入池,并處理請求,如果池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理
// 當池子的線程數大于corePoolSize時,多餘的線程會等待keepAliveTime長時間,如果無請求可處理就自行銷毀
@Bean("getExecutor")
public Executor getExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//設定核心線程數
executor.setCorePoolSize(10);
//設定最大線程數
executor.setMaxPoolSize(100);
//線程池所使用的緩沖隊列
executor.setQueueCapacity(250);
//設定線程名
executor.setThreadNamePrefix("JcTest-Async");
//設定多餘線程等待的時間,機關:秒
//executor.setKeepAliveSeconds();
// 初始化線程
executor.initialize();
return executor;
}
}
然後是異步執行方法的service,TestAsyncService.class:
import java.io.IOException;
/**
* @Author : JCccc
* @CreateTime : 2020/4/16
* @Description :
**/
public interface TestAsyncService {
String testNotice(int[] taskDelayMill) throws InterruptedException, IOException;
}
對應的實作類impl,TestAsyncServiceImpl.class:
import com.jc.mytest.async.service.TestAsyncService;
import com.jc.mytest.util.DelayElement;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
/**
* @Author : JCccc
* @CreateTime : 2020/4/16
* @Description :
**/
@Service
public class TestAsyncServiceImpl implements TestAsyncService {
@Async("getExecutor")
@Override
public String testNotice(int[] taskDelayMill) throws InterruptedException, IOException {
System.out.println(Thread.currentThread().getName() + " -------正在異步執行任務------" + new Date());
DelayQueue delayQueue = new DelayQueue();
//數組的length大小就是額外需要發送的通知數
int taskSum=taskDelayMill.length;
//将每一次發送通知的間隔時間都對應建立一個延遲設定類,放入延遲隊列delayQueue裡
for (int i=0;i<taskSum;i++){
delayQueue.put(new DelayElement(taskDelayMill[i]));
}
System.out.println("開始時間:" + DateFormat.getDateTimeInstance().format(new Date()));
while (!delayQueue.isEmpty()){
// 執行延遲任務
System.out.println("現在執行延遲任務,調用業務接口");
//模拟調用API,通知發貨,得到發貨結果 成功或失敗
String result = getNoticeResult();
System.out.println("通知發貨的結果是:"+result);
if (!result.equals("success")){
System.out.println("任務執行中:"+delayQueue.take());
}else {
break;
}
}
//查詢訂單結果
System.out.println("通知任務不需要再發,訂單結果已經确定");
System.out.println("結束時間:" + DateFormat.getDateTimeInstance().format(new Date()));
return "success";
}
//模拟發貨通知的結果
public String getNoticeResult() throws IOException {
//模拟調用通知發貨API接口,擷取傳回結果
String[] strs={"success", "-error-", "--error--","-error--"};
return RandomStr(strs);
}
//随機傳回字元串數組中的字元串
public static String RandomStr(String[] strs){
int random_index = (int) (Math.random()*strs.length);
return strs[random_index];
}
}
延遲隊列需要的參數類,DelayElement.class:
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @Author : JCccc
* @CreateTime : 2020/4/17
* @Description :
**/
public class DelayElement implements Delayed {
// 延遲截止時間(單面:毫秒)
long delayTime = System.currentTimeMillis();
public DelayElement(long delayTime) {
this.delayTime = (this.delayTime + delayTime);
}
@Override
// 擷取剩餘時間
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
// 隊列裡元素的排序依據
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
return 1;
} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
return -1;
} else {
return 0;
}
}
@Override
public String toString() {
return DateFormat.getDateTimeInstance().format(new Date(delayTime));
}
}
最後寫個小接口來觸發一下這個場景,TestController.class:
/**
* @Author : JCccc
* @CreateTime : 2020/4/8
* @Description :
**/
@RestController
public class TestController {
@Autowired
TestAsyncService testAsyncService;
@GetMapping("/testAsyncNotice")
public void testAsyncNotice() throws Exception {
System.out.println("發貨通知調用開始!");
int[] taskArrays = new int[]{2000, 5000, 10000};
testAsyncService.testNotice(taskArrays);
System.out.println("已經開始通知,異步執行通知");
}
}
整個流程實作簡單介紹
可以看到一直傳遞的接收參數是一個數組 taskArrays,
數組的元素就是每個通知任務發出的延遲時間, 可以看到我弄得是 2000,5000,10000 ;
那就是額外發3次,
結合我們的impl代碼,
先判斷隊列裡面的任務還有沒有,有的話就回去執行。
第一次是延遲2秒發一次, 然後調用發貨通知接口,得到傳回狀态;
如果是success,那麼就是通知發貨成功,可以直接結束;
如果不是success,我們繼續調用 delayQueue.take() ,直到隊列裡面的任務都被執行完畢,也就是3次都發完。
測試效果
三次發送通知都是得到失敗的結果
第二次發送通知得到成功的結果
好了,該篇簡單的應用介紹就到此。
最後,大家可以深入一下這個延遲隊列,它不是個簡單貨,可以看看裡面的實作代碼哦(重入鎖ReentrantLock,阻塞和通知的Condition等)