天天看點

自動調整速率的Actor設計模式

問題背景

與資料庫或者存儲系統互動是所有應用軟體都必不可少的功能之一,akka開發的系統也不例外。但akka特殊的地方在于,會盡可能的将所有的功能都設計成異步的,以避免Actor阻塞,然而無法避免IO這類的阻塞操作。我們往往會把IO消息發送給單獨的Actor進行處理,避免業務主邏輯受到阻塞。

在處理IO消息時,有兩種模式:批量和單條。批量是指一次性處理多個消息,這樣可以減少與存儲系統的互動,提高吞吐量,适合處理大量消息;單條是指一次隻處理一條消息,與存儲系統互動次數增多,但可以盡快的處理目前消息,這在消息比較少時非常有用。

但系統往往是複雜的,待處理的消息的分布并不集中,業務繁忙時,短時間内消息很多,此時批量處理可以增加吞吐量;業務閑暇時,消息零零散散,需要盡可能快的處理消息。一個優秀的系統需要能夠識别并合适的處理這兩種消息速率,用akka開發系統時,也需要擁有這種能力。

問題假設

記得以前數學老師講課時,最喜歡也是最經常說的兩個字就是“解、設”,就是在解決問題之前,總是會做一些假設。那麼我們也做一些假設,以簡化解決問題的難度,但這并不影響我們對原有問題的了解。現假設如下:

有一個actor接收其他actor發過來的消息,把它存入資料庫:

1、資料量比較少時。資料單條處理,盡量快速的入庫。

2、資料量比較大時。資料需要批量處理,比如調用jdbc的batch操作,以提高吞吐量。

3、需要能夠在不同消息速率之間自由切換。

基于以上的背景,這個actor該如何設計比較好呢?

解決思路

解決該問題有以下幾點因素需要考慮:

  1. 如何計算消息速率。
  2. 如何判斷消息速率過高或過低
  3. 批量、單條模式之間如何切換
  4. 基于akka解決問題(畢竟作者遇到這個問題就是在用akka開發軟體的過程中)

計算消息速率

上面問題的關鍵點之一是如何判斷目前的消息速率過高或過快,而計算速率的重要參數是時間,而在分布式場景下時間是一個不可忽視的因素。各個節點之間的時間有時無法做到完全一緻,作者所在的公司就是這樣。

計算時,時間有兩種選擇方式:

1、選擇消息本身的時間;

2、選擇處理消息時,目前系統時間。

兩種選擇方式各有優劣。第一種比較準确,畢竟計算速率的對象是消息,用消息的時間也最為準确,但這要求所有節點的時間保持同步,而且消息本身必須有一個時間字段;第二種準确度稍微差一點,畢竟收到消息與實際處理該消息會有一定的延時,可以處理任意類型的消息。

為了簡化并解決問題,作者選擇了消息本身的時間作為計算參數,所有的消息都有時間字段。

切換計算模式

我們已經計算出了消息速率,那麼是否就可以直接跟設定的門檻值進行對比,判斷目前的處理模式(批量或單條)了呢?這還不一定,要根據實際情況作出判斷。消息速率的計算有兩種計算方式:實時、固定速率。其中“固定速率”也有兩種方式:固定消息個數、固定處理間隔。

計算的方法各有千秋。如果實時計算消息速率,可以及時的切換批量或單條模式,但在速率不穩定的情況下,會造成“抖動”的情況,即頻繁的在兩種模式之間進行切換,很可能造成批量處理的消息數量過少,降低吞吐量;固定速率計算,則可以緩和這種“抖動”,當然也就不能及時的切換批量或單條模式。

同樣,為了簡化問題,并考慮遇到問題的實際情況,作者選擇用“固定速率”計算消息速率,計算方法如下:

1、 剛開始處于單條模式,儲存目前時間(或第一條消息的時間)為StartTime

2、 單條模式下處理消息,并儲存消息的目前時間問EndTime

3、

計算目前的處理消息的個數,如果達到一個批量門檻值,則計算此次批量時間跨度,即EndTime-StartTime。如果時間跨度大于批量時間的門檻值,即此次批量處理的消息比較少,繼續處于單條模式;如果時間跨度小于門檻值,則表示在短時間内,收到了大量的消息,則切換為批量模式,計數器清零。

4、 批量模式開始時,儲存第一條消息的時間為StartTime

5、 批量模式處理消息,并儲存消息的目前時間問EndTime

6、 計算目前的處理消息的個數,如果達到一個批量門檻值,則傳回“批量送出”消息,該消息作為特殊消息,提示處理程式送出目前批量。

7、 通過外部actor或者外部系統,給目前actor發送“批量心跳”消息,該消息主要為了彌補消息尾端的空白。即消息個數少于一個批量時,能夠及時處理目前剩餘消息。

8、 收到“批量心跳”消息時,檢查目前消息處理個數,如果小于一個批量門檻值,則表示目前消息速率過低,退出批量模式;如果大于一個批量門檻值,則計數器清零,保持批量模式。無論此時進入哪個模式,都會發送“批量送出”消息,以盡快送出目前批量。

demo代碼

為了保持通用,此處設計了一個抽象類,封裝了部分邏輯

object AutoSpeedActor{
  final case class BatchMessage(sourceMessage:Option[Any],lastMessageTime:Long, commit:Boolean = false)
  private[actor] trait InternalMessage{
    def systemTimestamp:Long
  }
  private[actor] final case class EnterBatch(systemTimestamp:Long) extends InternalMessage
  private[actor] final case class LeaveBatch(systemTimestamp:Long) extends InternalMessage
  private[actor] final case class BatchInterval(systemTimestamp:Long) extends InternalMessage
}
/**
  *
  * @param batchNumber 自适應時批量的數量
  * @param batchInterval 自适應時批量的時間間隔
  */
abstract class AutoSpeedActor(batchNumber:Long,batchInterval:Duration,startTime:Long) extends Actor with ActorLogging {
  /**
    * 時間守衛。
    * 用來在批量模式下,及時送出目前剩餘批量消息
    */
  private var timerGuard: ActorRef = _
  /**
    * 目前批量開始時間
    */
  private var batchStartTimestamp:Long = startTime
  /**
    * 目前批量結束時間
    */
  private var batchEndTimestamp:Long = batchStartTimestamp
  /**
    * 批量計數器
    */
  private var batchCounter:Long = 0
  /**
    * 擷取目前消息的時間戳
    * @param msg 目前消息
    * @return 目前消息的時間戳
    */
  def getMessageTimestamp(msg: Any):Long
  /**
    * 判斷目前消息是否自動駕駛,
    * @param msg 目前消息
    * @return true則對此類型的消息自動調整速率
    */
  def isAutoDriveMessage(msg:Any):Boolean
  /**
    * 判斷目前是否為内部消息
    * @param msg 目前消息
    * @return true表示目前消息為内部消息
    */
  private def isIntervalMessage(msg:Any):Boolean = msg.isInstanceOf[AutoSpeedActor.InternalMessage]
  override def preStart(): Unit = {
    super.preStart()
    timerGuard = context.actorOf(Props.create(classOf[AutoSpeedActorGuard],batchInterval),self.path.name+"timerGuard")
  }
  override def postStop(): Unit = {
    super.postStop()
    context.stop(timerGuard)
  }
  /**
    * 消息攔截器,初始化為單條模式
    */
  private var messageIntercept: (Any) => Any = singleProcess
  /**
    * 批量模式下,封裝目前消息
    * @param currentMsg 目前消息
    * @return 封裝後的批量消息
    */
  private def batchProcess(currentMsg:Any):Any = currentMsg match {
      case AutoSpeedActor.BatchInterval(systemTimestamp) =>
        log.debug(s"Receive AutoSpeedActor.BatchInterval message at $systemTimestamp ")
        if( batchCounter < batchNumber ){
          timerGuard ! AutoSpeedActor.LeaveBatch(System.currentTimeMillis())
          messageIntercept = singleProcess
        }
        // 收到逾時時間時,目前消息過少,則退出批量模式
        batchCounter = 0
        batchStartTimestamp = batchEndTimestamp
        AutoSpeedActor.BatchMessage(None,batchEndTimestamp,commit = true)
      case _ =>
        batchEndTimestamp = getMessageTimestamp(currentMsg)
        val commit = batchCounter % batchNumber == 0
        AutoSpeedActor.BatchMessage(Some(currentMsg),batchEndTimestamp,commit)
  }
  /**
    * 單條模式下,封裝目前消息
    * @param currentMsg 目前消息
    * @return 封裝後的消息
    */
  private def singleProcess(currentMsg:Any):Any = {
    batchEndTimestamp = getMessageTimestamp(currentMsg)
    if(batchCounter == batchNumber){
      batchCounter = 0
      log.debug(s"Reach an batch which from $batchStartTimestamp to $batchEndTimestamp")
      // 在一個批量内,時間跨度大于設定的批量門檻值,則表示接收的消息比較慢
      if (batchEndTimestamp - batchStartTimestamp > batchInterval.toMillis ){
        batchStartTimestamp = batchEndTimestamp
      }else{
        // 在一個批量内,時間跨度小于設定的批量門檻值,則表示接收的消息比較快,進入批量模式
        timerGuard ! AutoSpeedActor.EnterBatch(System.currentTimeMillis())
        messageIntercept = batchProcess
      }
    }
    currentMsg
  }
  override def aroundReceive(receive: Receive, msg: Any): Unit = {
    val interceptedMessage = if(isAutoDriveMessage(msg) || isIntervalMessage(msg)){
      batchCounter += 1
      messageIntercept(msg)
    }else{
      msg
    }
    super.aroundReceive(receive, interceptedMessage)
  }
}
private[actor] class AutoSpeedActorGuard(timeout:Duration) extends Actor with ActorLogging{
  private var batchMode = false
  private implicit val executionContextExecutor: ExecutionContextExecutor = context.dispatcher
  override def receive: Receive = {
    case AutoSpeedActor.EnterBatch(systemTimestamp) =>
      log.debug(s"Enter batch mode at $systemTimestamp")
      batchMode = true
      context.system.scheduler.scheduleOnce(FiniteDuration(timeout._1,timeout._2),self,AutoSpeedActor.BatchInterval(systemTimestamp))
    case AutoSpeedActor.LeaveBatch(systemTimestamp) =>
      log.debug(s"Leave batch mode at $systemTimestamp")
      batchMode = false
    case evt: AutoSpeedActor.BatchInterval =>
      log.debug(s"Receive an AutoSpeedActor.BatchInterval message $evt ,batchMode = $batchMode")
      if(batchMode){
        context.system.scheduler.scheduleOnce(FiniteDuration(timeout._1,timeout._2),self,AutoSpeedActor.BatchInterval(System.currentTimeMillis()))
        context.parent ! evt
      }
  }
}
      

TODO

demo代碼還是有點簡單,後期需要進一步的優化,例如該actor隻能對某一類消息進行自動速率調整,無法适應多個不同類型消息的AutoDrive,歡迎大家進行讨論,提出各種優化方案。