問題背景
與資料庫或者存儲系統互動是所有應用軟體都必不可少的功能之一,akka開發的系統也不例外。但akka特殊的地方在于,會盡可能的将所有的功能都設計成異步的,以避免Actor阻塞,然而無法避免IO這類的阻塞操作。我們往往會把IO消息發送給單獨的Actor進行處理,避免業務主邏輯受到阻塞。
在處理IO消息時,有兩種模式:批量和單條。批量是指一次性處理多個消息,這樣可以減少與存儲系統的互動,提高吞吐量,适合處理大量消息;單條是指一次隻處理一條消息,與存儲系統互動次數增多,但可以盡快的處理目前消息,這在消息比較少時非常有用。
但系統往往是複雜的,待處理的消息的分布并不集中,業務繁忙時,短時間内消息很多,此時批量處理可以增加吞吐量;業務閑暇時,消息零零散散,需要盡可能快的處理消息。一個優秀的系統需要能夠識别并合适的處理這兩種消息速率,用akka開發系統時,也需要擁有這種能力。
問題假設
記得以前數學老師講課時,最喜歡也是最經常說的兩個字就是“解、設”,就是在解決問題之前,總是會做一些假設。那麼我們也做一些假設,以簡化解決問題的難度,但這并不影響我們對原有問題的了解。現假設如下:
有一個actor接收其他actor發過來的消息,把它存入資料庫:
1、資料量比較少時。資料單條處理,盡量快速的入庫。
2、資料量比較大時。資料需要批量處理,比如調用jdbc的batch操作,以提高吞吐量。
3、需要能夠在不同消息速率之間自由切換。
基于以上的背景,這個actor該如何設計比較好呢?
解決思路
解決該問題有以下幾點因素需要考慮:
- 如何計算消息速率。
- 如何判斷消息速率過高或過低
- 批量、單條模式之間如何切換
- 基于akka解決問題(畢竟作者遇到這個問題就是在用akka開發軟體的過程中)
計算消息速率
上面問題的關鍵點之一是如何判斷目前的消息速率過高或過快,而計算速率的重要參數是時間,而在分布式場景下時間是一個不可忽視的因素。各個節點之間的時間有時無法做到完全一緻,作者所在的公司就是這樣。
計算時,時間有兩種選擇方式:
1、選擇消息本身的時間;
2、選擇處理消息時,目前系統時間。
兩種選擇方式各有優劣。第一種比較準确,畢竟計算速率的對象是消息,用消息的時間也最為準确,但這要求所有節點的時間保持同步,而且消息本身必須有一個時間字段;第二種準确度稍微差一點,畢竟收到消息與實際處理該消息會有一定的延時,可以處理任意類型的消息。
為了簡化并解決問題,作者選擇了消息本身的時間作為計算參數,所有的消息都有時間字段。
切換計算模式
我們已經計算出了消息速率,那麼是否就可以直接跟設定的門檻值進行對比,判斷目前的處理模式(批量或單條)了呢?這還不一定,要根據實際情況作出判斷。消息速率的計算有兩種計算方式:實時、固定速率。其中“固定速率”也有兩種方式:固定消息個數、固定處理間隔。
計算的方法各有千秋。如果實時計算消息速率,可以及時的切換批量或單條模式,但在速率不穩定的情況下,會造成“抖動”的情況,即頻繁的在兩種模式之間進行切換,很可能造成批量處理的消息數量過少,降低吞吐量;固定速率計算,則可以緩和這種“抖動”,當然也就不能及時的切換批量或單條模式。
同樣,為了簡化問題,并考慮遇到問題的實際情況,作者選擇用“固定速率”計算消息速率,計算方法如下:
1、 剛開始處于單條模式,儲存目前時間(或第一條消息的時間)為StartTime
2、 單條模式下處理消息,并儲存消息的目前時間問EndTime
3、
計算目前的處理消息的個數,如果達到一個批量門檻值,則計算此次批量時間跨度,即EndTime-StartTime。如果時間跨度大于批量時間的門檻值,即此次批量處理的消息比較少,繼續處于單條模式;如果時間跨度小于門檻值,則表示在短時間内,收到了大量的消息,則切換為批量模式,計數器清零。
4、 批量模式開始時,儲存第一條消息的時間為StartTime
5、 批量模式處理消息,并儲存消息的目前時間問EndTime
6、 計算目前的處理消息的個數,如果達到一個批量門檻值,則傳回“批量送出”消息,該消息作為特殊消息,提示處理程式送出目前批量。
7、 通過外部actor或者外部系統,給目前actor發送“批量心跳”消息,該消息主要為了彌補消息尾端的空白。即消息個數少于一個批量時,能夠及時處理目前剩餘消息。
8、 收到“批量心跳”消息時,檢查目前消息處理個數,如果小于一個批量門檻值,則表示目前消息速率過低,退出批量模式;如果大于一個批量門檻值,則計數器清零,保持批量模式。無論此時進入哪個模式,都會發送“批量送出”消息,以盡快送出目前批量。
demo代碼
為了保持通用,此處設計了一個抽象類,封裝了部分邏輯
object AutoSpeedActor{
final case class BatchMessage(sourceMessage:Option[Any],lastMessageTime:Long, commit:Boolean = false)
private[actor] trait InternalMessage{
def systemTimestamp:Long
}
private[actor] final case class EnterBatch(systemTimestamp:Long) extends InternalMessage
private[actor] final case class LeaveBatch(systemTimestamp:Long) extends InternalMessage
private[actor] final case class BatchInterval(systemTimestamp:Long) extends InternalMessage
}
/**
*
* @param batchNumber 自适應時批量的數量
* @param batchInterval 自适應時批量的時間間隔
*/
abstract class AutoSpeedActor(batchNumber:Long,batchInterval:Duration,startTime:Long) extends Actor with ActorLogging {
/**
* 時間守衛。
* 用來在批量模式下,及時送出目前剩餘批量消息
*/
private var timerGuard: ActorRef = _
/**
* 目前批量開始時間
*/
private var batchStartTimestamp:Long = startTime
/**
* 目前批量結束時間
*/
private var batchEndTimestamp:Long = batchStartTimestamp
/**
* 批量計數器
*/
private var batchCounter:Long = 0
/**
* 擷取目前消息的時間戳
* @param msg 目前消息
* @return 目前消息的時間戳
*/
def getMessageTimestamp(msg: Any):Long
/**
* 判斷目前消息是否自動駕駛,
* @param msg 目前消息
* @return true則對此類型的消息自動調整速率
*/
def isAutoDriveMessage(msg:Any):Boolean
/**
* 判斷目前是否為内部消息
* @param msg 目前消息
* @return true表示目前消息為内部消息
*/
private def isIntervalMessage(msg:Any):Boolean = msg.isInstanceOf[AutoSpeedActor.InternalMessage]
override def preStart(): Unit = {
super.preStart()
timerGuard = context.actorOf(Props.create(classOf[AutoSpeedActorGuard],batchInterval),self.path.name+"timerGuard")
}
override def postStop(): Unit = {
super.postStop()
context.stop(timerGuard)
}
/**
* 消息攔截器,初始化為單條模式
*/
private var messageIntercept: (Any) => Any = singleProcess
/**
* 批量模式下,封裝目前消息
* @param currentMsg 目前消息
* @return 封裝後的批量消息
*/
private def batchProcess(currentMsg:Any):Any = currentMsg match {
case AutoSpeedActor.BatchInterval(systemTimestamp) =>
log.debug(s"Receive AutoSpeedActor.BatchInterval message at $systemTimestamp ")
if( batchCounter < batchNumber ){
timerGuard ! AutoSpeedActor.LeaveBatch(System.currentTimeMillis())
messageIntercept = singleProcess
}
// 收到逾時時間時,目前消息過少,則退出批量模式
batchCounter = 0
batchStartTimestamp = batchEndTimestamp
AutoSpeedActor.BatchMessage(None,batchEndTimestamp,commit = true)
case _ =>
batchEndTimestamp = getMessageTimestamp(currentMsg)
val commit = batchCounter % batchNumber == 0
AutoSpeedActor.BatchMessage(Some(currentMsg),batchEndTimestamp,commit)
}
/**
* 單條模式下,封裝目前消息
* @param currentMsg 目前消息
* @return 封裝後的消息
*/
private def singleProcess(currentMsg:Any):Any = {
batchEndTimestamp = getMessageTimestamp(currentMsg)
if(batchCounter == batchNumber){
batchCounter = 0
log.debug(s"Reach an batch which from $batchStartTimestamp to $batchEndTimestamp")
// 在一個批量内,時間跨度大于設定的批量門檻值,則表示接收的消息比較慢
if (batchEndTimestamp - batchStartTimestamp > batchInterval.toMillis ){
batchStartTimestamp = batchEndTimestamp
}else{
// 在一個批量内,時間跨度小于設定的批量門檻值,則表示接收的消息比較快,進入批量模式
timerGuard ! AutoSpeedActor.EnterBatch(System.currentTimeMillis())
messageIntercept = batchProcess
}
}
currentMsg
}
override def aroundReceive(receive: Receive, msg: Any): Unit = {
val interceptedMessage = if(isAutoDriveMessage(msg) || isIntervalMessage(msg)){
batchCounter += 1
messageIntercept(msg)
}else{
msg
}
super.aroundReceive(receive, interceptedMessage)
}
}
private[actor] class AutoSpeedActorGuard(timeout:Duration) extends Actor with ActorLogging{
private var batchMode = false
private implicit val executionContextExecutor: ExecutionContextExecutor = context.dispatcher
override def receive: Receive = {
case AutoSpeedActor.EnterBatch(systemTimestamp) =>
log.debug(s"Enter batch mode at $systemTimestamp")
batchMode = true
context.system.scheduler.scheduleOnce(FiniteDuration(timeout._1,timeout._2),self,AutoSpeedActor.BatchInterval(systemTimestamp))
case AutoSpeedActor.LeaveBatch(systemTimestamp) =>
log.debug(s"Leave batch mode at $systemTimestamp")
batchMode = false
case evt: AutoSpeedActor.BatchInterval =>
log.debug(s"Receive an AutoSpeedActor.BatchInterval message $evt ,batchMode = $batchMode")
if(batchMode){
context.system.scheduler.scheduleOnce(FiniteDuration(timeout._1,timeout._2),self,AutoSpeedActor.BatchInterval(System.currentTimeMillis()))
context.parent ! evt
}
}
}
TODO
demo代碼還是有點簡單,後期需要進一步的優化,例如該actor隻能對某一類消息進行自動速率調整,無法适應多個不同類型消息的AutoDrive,歡迎大家進行讨論,提出各種優化方案。