天天看點

架構方案:優化分布式延遲任務觸達時效性

作者:小傅哥

作者:小傅哥

部落格:https://bugstack.cn - 包含: Java 基礎,面經手冊,Netty4.x,手寫Spring,用Java實作JVM,重學Java設計模式,SpringBoot中間件開發,IDEA插件開發,DDD系統架構項目開發,位元組碼程式設計...

沉澱、分享、成長,讓自己和他人都能有所收獲!

一、前言

不卷了,能用就行!

哈哈哈,說好的不卷了,能湊活用就行了。但每次接到新需求時都手癢,想結合着上一次的架構設計和落地經驗,在這一次需求上在疊代更新,或者找到完全颠覆之前的更優方案。卷完代碼的那一刻總是神清氣爽

其實大部分喜歡寫代碼的一類純粹碼農,都是比較卷的,就比如一個需求在實作上是能用大概是P5、如果這個做出來的功能不隻是能用還非常好用是P6、除了好用還凝練共性需求開發成通用的元件服務是P7。每一個成長過來的碼農,都是在造輪子的路上一次次驗證自己的想法和加以實踐,絕對不是一篇篇的八股文就能累出來一個進階的技術大牛。

二、延遲任務場景

什麼是延遲任務?

當我們的實際業務需求場景中,有一些活動開始前的狀态變更、訂單結算後的T+1對賬、貸款單息費的産生,都是需要使用到延遲任務來進行觸達。實際的操作一般會有 Quartz、Schedule 來對你的庫表資料進行定時掃描和處理,當條件滿足後做資料狀态的變更或者産生新的資料插入到表中。

這樣一個簡單的需求就是延遲任務最初需求,如果需求前期内容較少、使用方不多,可能在實際開發中就隻是一個單台機器直接對着表一頓輪訓就完事了。但随着業務需求的發展和功能的複雜度提升,往往回報到研發設計和實作,就不那麼簡單了,比如:你需要保障盡可能低延遲完成較大規模的資料量掃描處理,否則就像貸款單息費的産生,已經到了第二天使用者還沒看到自己的息費資訊或者是還款後的重新對賬,可能就這個時候就要産生客訴了。

那麼,類似這樣的場景該如何設計呢?

三、延遲任務設計

通常的任務中心處理流程主要,主要是由定時任務掃描任務庫表,把即将達到逾時時間的任務資訊掃描到處理隊列(記憶體/MQ消息),再由業務系統進行處理任務,處理完成後更新庫表中的任務狀态。

架構方案:優化分布式延遲任務觸達時效性

高延時任務排程

問題:

  1. 海量資料規模較大的任務清單資料,在分庫分表下該需要快速掃描。
  2. 任務掃描服務與業務邏輯處理,耦合在一起,不具有通用性和複用性。
  3. 細分任務體系有些是需要低延遲處理的,不能等待過長時間。

1. 任務表方式

除了一些較小的狀态變更場景,例如在各自業務的庫表中,就包含了一個狀态字段,這個字段一方面有程式邏輯處理變更的狀态,也有到達指定到期時間後由任務服務自動變更處理的操作,一般這類功能,直接設計到自己的庫表中即可。

那麼還有一些較大也較為頻繁使用的場景,如果都是在每個系統的各自所需的N多個表中,都添加這樣的字段進行維護,就顯得非常備援了,也不那麼易于維護。是以針對這樣的場景就很适合做一個通用的任務延時系統,各業務系統把需要被延時執行的動作送出到延時系統中,再有延時系統在指定時間下進行回調,回調的動作可以是接口或者MQ消息進行觸達。例如可以設計這樣一個任務排程表:

架構方案:優化分布式延遲任務觸達時效性

任務排程庫表設計

  1. 抽取的任務排程表,主要是拿到什麼任務,在什麼時間發起動作,具體的動作處理仍交給業務工程處理。
  2. 大批量的各自業務的任務進行集中處理,則需要設計一個分庫分表,滿足于後續業務體量的增長。
  3. 門牌号設計,針對一張表的掃描,如果資料量較大,又不希望隻是一個任務掃描一個表,可以多個任務掃描一個表,加到掃描的體量。這個時候就需要一個門牌号來隔離不同任務掃描的範圍,避免掃描出重複的任務資料。

2. 低延遲方式

低延遲處理方案,是在任務表方式的基礎上,新增加的時間把控處理。它可以把即将到期的前一段時間的任務,放置到 Redis 叢集隊裡中,在消費的時候再從隊列中 pop 出來,這樣可以更快的接近任務的處理時效,避免因為掃庫間隔較大延遲任務執行。

架構方案:優化分布式延遲任務觸達時效性

任務處理流程

  • 在接收業務系統送出進來的延遲任務時,按照執行時間的長短放置到任務庫或者也同步到 Redis 叢集中,一些執行時間較晚的任務則可以先放到任務庫,再通過掃描的方式添加到逾時任務執行隊列中。
  • 那麼關于這塊的設計核心在于 Redis 隊列的使用,以及為了保證消費的可靠性需要引入二階段消費、注冊 ZK 注冊中心至少保證一次消費的處理。本文重點主要放在 Redis 隊列的設計,其他更多的邏輯處理,可以按照業務需求進行擴充和完善

Redis 消費隊列

架構方案:優化分布式延遲任務觸達時效性

Redis 消費隊列

  • 按照消息體計算對應資料所屬的槽位 index = CRC32 & 7
  • StoreQueue 采用 Slot 按照 SlotKey = #{topic}_#{index} 和 Sorted Set 的資料結構按執行任務分數排序,存放任務執行資訊。定時消息将時間戳作為分數,消費時每次彈出分數小于目前時間戳的一個消息
  • 為了保障每條消息至少可消費一次,消費者不是直接 pop 有序集合中的元素,而是将元素從 StoreQueue 移動到 PrepareQueue 并傳回消息給消費者。消費成功後再從 PrepareQueue 從删除,如果消費失敗則從PreapreQueue 重新移動到 StoreQueue,這樣二階段消費的方式進行處理。
  • 參考文檔:2021 阿裡技術人的百寶黑皮書PDF文,低延遲的逾時中心實作方式

簡單案例

@Test
public void test_delay_queue() throws InterruptedException {
    RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue("TASK");
    RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
    new Thread(() -> {
        try {
            while (true){
                Object take = blockingQueue.take();
                System.out.println(take);
                Thread.sleep(10);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();
    int i = 0;
    while (true){
        delayedQueue.offerAsync("測試" + ++i, 100L, TimeUnit.MILLISECONDS);
        Thread.sleep(1000L);
    }
}
           

測試資料

2022-02-13  WARN 204760 --- [      Finalizer] i.l.c.resource.DefaultClientResources    : io.lettuce.core.resource.DefaultClientResources was not shut down properly, shutdown() was not called before it's garbage-collected. Call shutdown() or shutdown(long,long,TimeUnit) 
測試1
測試2
測試3
測試4
測試5

Process finished with exit code -1
           
  • 源碼:https://git***.com/fuzhengwei/TimeOutCenter
  • 描述:使用 redisson 中的 DelayedQueue 作為消息隊列,寫入後等待消費時間進行 POP 消費。

四、總結

  • 排程任務的使用在實際的場景中非常頻繁,例如我們經常使用 xxl-job,也有一些大廠自研的分布式任務排程元件,這些可能原本都是很小很簡單的功能,但經過抽象、整合、提煉,變成了一個個核心通用的中間件服務。
  • 當我們在考慮使用任務排程的時候,無論哪種方式的設計和實作,都需要考慮這個功能使用時候的以為疊代和維護性,如果僅僅是一個非常小的場景,又沒多少人使用的話,那麼在自己機器上折騰就可以。過渡的設計和使用有時候也會把研發資源代入泥潭
  • 其實各項技術的知識點,都像是一個個工具,刀槍棍棒斧钺鈎,那能怎麼結合各自的特點,把這些兵器用起來,才是一個程式員不斷成長的過程。如果你希望了解更多此類有深度的技術内容,可以加入 Lottery 分布式抽獎秒殺系統 學習更有價值的更抗用的實戰手段。

繼續閱讀