天天看點

Springboot 指定重發的次數和延遲時間,定時異步執行 重發任務

以前寫過一篇基于Springboot使用retry架構進行實作重試業務場景的文章

前言:

重試這個需求場景在我們做項目的時候非常常見,實作這個場景的方法也是非常多,

定期輪詢

ScheduledExecutorService 周期性線程池

消息隊列

redis有序集合

Quartz,job等定時任務架構

Timer

delayQueue

等等,我們該篇介紹的是 異步Async+延遲隊列delayQueue 。

進入正題:

一個簡單的重試需求場景

我們服務端是個中間平台,

使用者調用我們服務端下單成功,我們需要通知第三方平台發貨。

但是這個通知發貨有可能通知失敗,我們允許最大失敗次數是N次;

也就是說除了第一次通知發出後,我們需要進行額外的N次發貨通知;

而且後面額外進行的N次發貨通知是有延遲時間的, 每個之間的間隔都是動态設定的;

期間隻要有一次通知成功了,那麼我們就不再重新發送通知;

如果通知沒發成功,就會根據我們設定的N次以及延遲時間,繼續發送通知。

先建立一個異步線程池的配置類(如果你還不了解springboot使用異步線程的,可以先去看看我這篇文章,AsyncThreadConfig.class:

ps: 這裡用的是spring提供的線程池

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;


@Configuration
@ComponentScan("com.jc.mytest.async.service")
@EnableAsync
public class AsyncThreadConfig  {
    /**
     * 執行需要依賴線程池,這裡就來配置一個線程池
     * @return
     */

    // 當池子大小小于corePoolSize,就建立線程,并處理請求
    // 當池子大小等于corePoolSize,把請求放入workQueue(QueueCapacity)中,池子裡的空閑線程就去workQueue中取任務并處理
    // 當workQueue放不下任務時,就建立線程入池,并處理請求,如果池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理
    // 當池子的線程數大于corePoolSize時,多餘的線程會等待keepAliveTime長時間,如果無請求可處理就自行銷毀

    @Bean("getExecutor")
    public Executor getExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //設定核心線程數
        executor.setCorePoolSize(10);
        //設定最大線程數
        executor.setMaxPoolSize(100);
        //線程池所使用的緩沖隊列
        executor.setQueueCapacity(250);
        //設定線程名
        executor.setThreadNamePrefix("JcTest-Async");
        //設定多餘線程等待的時間,機關:秒
        //executor.setKeepAliveSeconds();
        // 初始化線程
        executor.initialize();
        return executor;
    }
}      

然後是異步執行方法的service,TestAsyncService.class:

import java.io.IOException;

/**
 * @Author : JCccc
 * @CreateTime : 2020/4/16
 * @Description :
 **/
public interface TestAsyncService {


    String testNotice(int[] taskDelayMill) throws InterruptedException, IOException;
}      

對應的實作類impl,TestAsyncServiceImpl.class:

import com.jc.mytest.async.service.TestAsyncService;
import com.jc.mytest.util.DelayElement;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;

/**
 * @Author : JCccc
 * @CreateTime : 2020/4/16
 * @Description :
 **/

@Service
public class TestAsyncServiceImpl implements TestAsyncService {


    @Async("getExecutor")
    @Override
    public String testNotice(int[] taskDelayMill) throws InterruptedException, IOException {

        System.out.println(Thread.currentThread().getName() + "   -------正在異步執行任務------" + new Date());

        DelayQueue delayQueue = new DelayQueue();

        //數組的length大小就是額外需要發送的通知數
        int taskSum=taskDelayMill.length;

        //将每一次發送通知的間隔時間都對應建立一個延遲設定類,放入延遲隊列delayQueue裡
        for (int i=0;i<taskSum;i++){
            delayQueue.put(new DelayElement(taskDelayMill[i]));
        }

        System.out.println("開始時間:" +  DateFormat.getDateTimeInstance().format(new Date()));
        while (!delayQueue.isEmpty()){
            // 執行延遲任務
            System.out.println("現在執行延遲任務,調用業務接口");

            //模拟調用API,通知發貨,得到發貨結果 成功或失敗

            String result = getNoticeResult();

            System.out.println("通知發貨的結果是:"+result);
            if (!result.equals("success")){

                System.out.println("任務執行中:"+delayQueue.take());
            }else {

                break;
            }
        }
        //查詢訂單結果

        System.out.println("通知任務不需要再發,訂單結果已經确定");

        System.out.println("結束時間:" +  DateFormat.getDateTimeInstance().format(new Date()));




        return "success";
    }


    //模拟發貨通知的結果
    public String getNoticeResult() throws IOException {


        //模拟調用通知發貨API接口,擷取傳回結果
        String[] strs={"success", "-error-", "--error--","-error--"};

        return  RandomStr(strs);

    }

    //随機傳回字元串數組中的字元串
    public static String RandomStr(String[] strs){
        int random_index = (int) (Math.random()*strs.length);
        return strs[random_index];
    }


}      

延遲隊列需要的參數類,DelayElement.class:

import java.text.DateFormat;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @Author : JCccc
 * @CreateTime : 2020/4/17
 * @Description :
 **/
public class DelayElement  implements Delayed {
    // 延遲截止時間(單面:毫秒)
    long delayTime = System.currentTimeMillis();
    public DelayElement(long delayTime) {
        this.delayTime = (this.delayTime + delayTime);
    }
    @Override
    // 擷取剩餘時間
    public long getDelay(TimeUnit unit) {
        return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
    @Override
    // 隊列裡元素的排序依據
    public int compareTo(Delayed o) {
        if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
            return 1;
        } else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
            return -1;
        } else {
            return 0;
        }
    }
    @Override
    public String toString() {
        return DateFormat.getDateTimeInstance().format(new Date(delayTime));
    }
}      

最後寫個小接口來觸發一下這個場景,TestController.class:

/**
 * @Author : JCccc
 * @CreateTime : 2020/4/8
 * @Description :
 **/

@RestController
public class TestController {


    @Autowired
    TestAsyncService testAsyncService;

    @GetMapping("/testAsyncNotice")
    public void testAsyncNotice() throws Exception {
        System.out.println("發貨通知調用開始!");

         int[] taskArrays = new int[]{2000, 5000, 10000};
        testAsyncService.testNotice(taskArrays);

        System.out.println("已經開始通知,異步執行通知");

    }

}      

整個流程實作簡單介紹

可以看到一直傳遞的接收參數是一個數組 taskArrays,

數組的元素就是每個通知任務發出的延遲時間, 可以看到我弄得是 2000,5000,10000 ;

那就是額外發3次,

結合我們的impl代碼,

先判斷隊列裡面的任務還有沒有,有的話就回去執行。

第一次是延遲2秒發一次, 然後調用發貨通知接口,得到傳回狀态;

如果是success,那麼就是通知發貨成功,可以直接結束;

如果不是success,我們繼續調用 delayQueue.take() ,直到隊列裡面的任務都被執行完畢,也就是3次都發完。

測試效果

三次發送通知都是得到失敗的結果

Springboot 指定重發的次數和延遲時間,定時異步執行 重發任務

第二次發送通知得到成功的結果

Springboot 指定重發的次數和延遲時間,定時異步執行 重發任務

好了,該篇簡單的應用介紹就到此。

最後,大家可以深入一下這個延遲隊列,它不是個簡單貨,可以看看裡面的實作代碼哦(重入鎖ReentrantLock,阻塞和通知的Condition等)