天天看點

akka mysql_Akka系列(七):Actor持久化之Akka persistence

這次把這部分内容提到現在寫,是因為這段時間開發的項目剛好在這一塊遇到了一些難點,是以準備把經驗分享給大家,我們在使用Akka時,會經常遇到一些存儲Actor内部狀态的場景,在系統正常運作的情況下,我們不需要擔心什麼,但是當系統出錯,比如Actor錯誤需要重新開機,或者記憶體溢出,亦或者整個系統崩潰,如果我們不采取一定的方案的話,在系統重新開機時Actor的狀态就會丢失,這會導緻我們丢失一些關鍵的資料,造成系統資料不一緻的問題。Akka作為一款成熟的生産環境應用,為我們提供了相應的解決方案就是Akka persistence。

為什麼需要持久化的Actor?

萬變不離其宗,資料的一緻性是永恒的主題,一個性能再好的系統,不能保證資料的正确,也稱不上是一個好的系統,一個系統在運作的時候難免會出錯,如何保證系統在出錯後能正确的恢複資料,不讓資料出現混亂是一個難題。使用Actor模型的時候,我們會有這麼一個想法,就是能不對資料庫操作就盡量不對資料庫操作(這裡我們假定我們的資料庫是安全,可靠的,能保證資料的正确性和一緻性,比如使用國内某雲的雲資料庫),一方面如果大量的資料操作會使資料庫面臨的巨大的壓力,導緻崩潰,另一方面即使資料庫能處理的過來,比如一些count,update的大表操作也會消耗很多的時間,遠沒有記憶體中直接操作來的快,大大影響性能。但是又有人說幾人記憶體操作這麼快,為什麼不把資料都放記憶體中呢?答案顯而易見,當出現機器當機,或者記憶體溢出等問題時,資料很有可能就丢失了導緻無法恢複。在這種背景下,我們是不是有一種比較好的解決方案,既能滿足需求又能用最小的性能消耗,答案就是上面我們的說的Akka persistence。

Akka persistence的核心架構

在具體深入Akka persistence之前,我們可以先了解一下它的核心設計理念,其實簡單來說,我們可以利用一些thing來恢複Actor的狀态,這裡的thing可以是日志、資料庫中的資料,亦或者是檔案,是以說它的本質非常容易了解,在Actor處理的時候我們會儲存一些資料,Actor在恢複的時候能根據這些資料恢複其自身的狀态。

是以Akka persistence 有以下幾個關鍵部分組成:

PersistentActor:任何一個需要持久化的Actor都必須繼承它,并必須定義或者實作其中的三個關鍵屬性:

def persistenceId = "example" //作為持久化Actor的唯一表示,用于持久化或者查詢時使用

def receiveCommand: Receive = ??? //Actor正常運作時處理處理消息邏輯,可在這部分内容裡持久化自己想要的消息

def receiveRecover: Receive = ??? //Actor重新開機恢複是執行的邏輯

相比普通的Actor,除receiveCommand相似以外,還必須實作另外兩個屬性。

另外在持久化Actor中還有另外兩個關鍵的的概念就是Journal和Snapshot,前者用于持久化事件,後者用于儲存Actor的快照,兩者在Actor恢複狀态的時候都起到了至關重要的作用。

Akka persistence的demo實戰

這裡我首先會用一個demo讓大家能對Akka persistence的使用有一定了解的,并能大緻明白它的工作原理,後面再繼續講解一些實戰可能會遇到的問題。

假定現在有這麼一個場景,現在假設有一個1w元的大紅包,瞬間可能會很多人同時來搶,每個人搶的金額也可能不一樣,場景很簡單,實作方式也有很多種,但前提是保證資料的正确性,比如最普通的使用資料庫保證,但對這方面有所了解的同學都知道這并不是一個很好的方案,因為需要鎖,并需要大量的資料庫操作,導緻性能不高,那麼我們是否可以用Actor來實作這個需求麼?答案是當然可以。

我們首先來定義一個抽獎指令,

case class LotteryCmd(

userId: Long, // 參與使用者Id

username: String, //參與使用者名

email: String // 參與使用者郵箱

)

然後我們實作一個抽獎Actor,并繼承PersistentActor作出相應的實作:

case class LuckyEvent( //抽獎成功事件

userId: Long,

luckyMoney: Int

)

case class FailureEvent( //抽獎失敗事件

userId: Long,

reason: String

)

case class Lottery(

totalAmount: Int, //紅包總金額

remainAmount: Int //剩餘紅包金額

) {

def update(luckyMoney: Int) = {

copy(

remainAmount = remainAmount - luckyMoney

)

}

}

class LotteryActor(initState: Lottery) extends PersistentActor with ActorLogging{

override def persistenceId: String = "lottery-actor-1"

var state = initState //初始化Actor的狀态

override def receiveRecover: Receive = {

case event: LuckyEvent =>

updateState(event) //恢複Actor時根據持久化的事件恢複Actor狀态

case SnapshotOffer(_, snapshot: Lottery) =>

log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}")

state = snapshot //利用快照恢複Actor的狀态

case RecoveryCompleted => log.info("the actor recover completed")

}

def updateState(le: LuckyEvent) =

state = state.update(le.luckyMoney) //更新自身狀态

override def receiveCommand: Receive = {

case lc: LotteryCmd =>

doLottery(lc) match { //進行抽獎,并得到抽獎結果,根據結果做出不同的處理

case le: LuckyEvent => //抽到随機紅包

persist(le) { event =>

updateState(event)

increaseEvtCountAndSnapshot()

sender() ! event

}

case fe: FailureEvent => //紅包已經抽完

sender() ! fe

}

case "saveSnapshot" => // 接收存儲快照指令執行存儲快照操作

saveSnapshot(state)

case SaveSnapshotSuccess(metadata) => ??? //你可以在快照存儲成功後做一些操作,比如删除之前的快照等

}

private def increaseEvtCountAndSnapshot() = {

val snapShotInterval = 5

if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) { //當有持久化5個事件後我們便存儲一次目前Actor狀态的快照

self ! "saveSnapshot"

}

}

def doLottery(lc: LotteryCmd) = { //抽獎邏輯具體實作

if (state.remainAmount > 0) {

val luckyMoney = scala.util.Random.nextInt(state.remainAmount) + 1

LuckyEvent(lc.userId, luckyMoney)

}

else {

FailureEvent(lc.userId, "下次早點來,紅包已被抽完咯!")

}

}

}

程式很簡單,關鍵位置我也給了注釋,相信大家對Actor有所了解的話很容易了解,當然要是有些疑惑,可以看看我之前寫的文章,下面我們就對剛才寫的抽紅包Actor進行測試:

object PersistenceTest extends App {

val lottery = Lottery(10000,10000)

val system = ActorSystem("example-05")

val lotteryActor = system.actorOf(Props(new LotteryActor(lottery)), "LotteryActor-1") //建立抽獎Actor

val pool: ExecutorService = Executors.newFixedThreadPool(10)

val r = (1 to 100).map(i =>

new LotteryRun(lotteryActor, LotteryCmd(i.toLong,"godpan","[email protected]")) //建立100個抽獎請求

)

r.map(pool.execute(_)) //使用線程池來發起抽獎請求,模拟同時多人參加

Thread.sleep(5000)

pool.shutdown()

system.terminate()

}

class LotteryRun(lotteryActor: ActorRef, lotteryCmd: LotteryCmd) extends Runnable { //抽獎請求

implicit val timeout = Timeout(3.seconds)

def run: Unit = {

for {

fut

} yield fut match { //根據不同僚件顯示不同的抽獎結果

case le: LuckyEvent => println(s"恭喜使用者${le.userId}抽到了${le.luckyMoney}元紅包")

case fe: FailureEvent => println(fe.reason)

case _ => println("系統錯誤,請重新抽取")

}

}

}

運作程式,我們可能看到以下的結果:

akka mysql_Akka系列(七):Actor持久化之Akka persistence

下面我會把persistence actor在整個運作過程的步驟給出,幫助大家了解它的原理:

1.初始化Persistence Actor

1.1若是第一次初始化,則與正常的Actor的初始化一緻。

1.2若是重新開機恢複Actor,這根據Actor之前持久的資料恢複。

1.2.1從快照恢複,可快速恢複Actor,但并非每次持久化事件都會儲存快照,在快照完整的情況下,Actor優先從快照恢複自身狀态。

1.2.2從事件(日志,資料庫記錄等)恢複,通過重放持久化事件恢複Actor狀态,比較關鍵。

2.接收指令進行處理,轉化為需要持久化的事件(持久化的事件盡量隻包含關鍵性的資料)使用Persistence Actor的持久化方法進行持久化(上述例子中的persist,後面我會講一下批量持久化),并處理持久化成功後的邏輯處理,比如修改Actor狀态,向外部Actor發送消息等。

3.若是我們需要存儲快照,那麼可以主動指定存儲快照的頻率,比如持久化事件100次我們就存儲一次快照,這個頻率應該要考慮實際的業務場景,在存儲快照成功後我們也可以執行一些操作。

總的來說Persistence Actor運作時的大緻操作就是以上這些,當然它是r如何持久化事件,恢複時的機制是怎麼樣的等有興趣的可以看一下Akka源碼。

使用Akka persistence的相關配置

首先我們必須加載相應的依賴包,在bulid.sbt中加入以下依賴:

libraryDependencies ++= Seq(

"com.typesafe.akka" %% "akka-actor" % "2.4.16", //Akka actor 核心依賴

"com.typesafe.akka" %% "akka-persistence" % "2.4.16", //Akka persistence 依賴

"org.iq80.leveldb" % "leveldb" % "0.7", //leveldb java版本依賴

"org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8", //leveldb java版本依賴

"com.twitter" %% "chill-akka" % "0.8.0" //事件序列化依賴

)

另外我們還需在application.conf加入以下配置:

akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"

akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"

akka.persistence.journal.leveldb.dir = "log/journal"

akka.persistence.snapshot-store.local.dir = "log/snapshots"

# DO NOT USE THIS IN PRODUCTION !!!

# See also https://github.com/typesafehub/activator/issues/287

akka.persistence.journal.leveldb.native = false //因為我們本地并沒有安裝leveldb,是以這個屬性置為false,但是生産環境并不推薦使用

akka.actor.serializers {

kryo = "com.twitter.chill.akka.AkkaSerializer"

}

akka.actor.serialization-bindings {

"scala.Product" = kryo

"akka.persistence.PersistentRepr" = kryo

}

至此為止我們整個Akka persistence demo已經搭建好了,可以正常運作了,有興趣的同學可以下載下傳源碼。源碼連結

Akka persistence進階

1.持久化插件

有同學可能會問,我對leveldb不是很熟悉亦或者覺得單機存儲并不是安全,有沒有支援分布式資料存儲的插件呢,比如某爸的雲資料庫?答案當然是有咯,良心的我當然是幫你們都找好咯。

1.akka-persistence-sql-async: 支援MySQL和PostgreSQL,另外使用了全異步的資料庫驅動,提供異步非阻塞的API,我司用的就是它的變種版,6的飛起。項目位址

2.akka-persistence-cassandra: 官方推薦的插件,使用寫性能very very very fast的cassandra資料庫,是幾個插件中比較流行的一個,另外它還支援persistence query。項目位址

3.akka-persistence-redis: redis應該也很符合Akka persistence的場景,熟悉redis的同學可以使用看看。項目位址

4.akka-persistence-jdbc: 怎麼能少了jdbc呢?不然怎麼對的起java爸爸呢,支援scala和java哦。項目位址

相應的插件的具體使用可以看該項目的具體介紹使用,我看了下相對來說都是比較容易的。

2.批量持久化

上面說到我司用的是akka-persistence-sql-async插件,是以我們是将事件和快照持久化到資料庫的,一開始我也是像上面demo一樣,每次事件都會持久化到資料庫,但是後來在性能測試的時候,因為本身業務場景對資料庫的壓力也比較大,在當資料庫到達每秒1000+的讀寫量後,另外說明一下使用的是某雲資料庫,性能中配以上,發現每次持久化的時間将近要15ms,這樣換算一下的話Actor每秒隻能處理60~70個需要持久化的事件,而實際業務場景要求Actor必須在3秒内傳回處理結果,這種情況下導緻大量消息處理逾時得不到回報,另外還有大量的消息得不到處理,導緻系統錯誤暴增,使用者體驗下降,既然我們發現了問題,那麼我們能不能進行優化呢?事實上當然是可以,既然單個插入慢,那麼我們能不能批量插入呢,Akka persistence為我們提供了persistAll方法,下面我就對上面的demo進行一下改造,讓其變成批量持久化:

class LotteryActorN(initState: Lottery) extends PersistentActor with ActorLogging{

override def persistenceId: String = "lottery-actor-2"

var state = initState //初始化Actor的狀态

override def receiveRecover: Receive = {

case event: LuckyEvent =>

updateState(event) //恢複Actor時根據持久化的事件恢複Actor狀态

case SnapshotOffer(_, snapshot: Lottery) =>

log.info(s"Recover actor state from snapshot and the snapshot is ${snapshot}")

state = snapshot //利用快照恢複Actor的狀态

case RecoveryCompleted => log.info("the actor recover completed")

}

def updateState(le: LuckyEvent) =

state = state.update(le.luckyMoney) //更新自身狀态

var lotteryQueue : ArrayBuffer[(LotteryCmd, ActorRef)] = ArrayBuffer()

context.system.scheduler //定時器,定時觸發抽獎邏輯

.schedule(

0.milliseconds,

100.milliseconds,

new Runnable {

def run = {

self ! "doLottery"

}

}

)

override def receiveCommand: Receive = {

case lc: LotteryCmd =>

lotteryQueue = lotteryQueue :+ (lc, sender()) //參與資訊加入抽獎隊列

println(s"the lotteryQueue size is ${lotteryQueue.size}")

if (lotteryQueue.size > 5) //當參與人數有5個時觸發抽獎

joinN(lotteryQueue)

case "doLottery" =>

if (lotteryQueue.size > 0)

joinN(lotteryQueue)

case "saveSnapshot" => // 接收存儲快照指令執行存儲快照操作

saveSnapshot(state)

case SaveSnapshotSuccess(metadata) => ??? //你可以在快照存儲成功後做一些操作,比如删除之前的快照等

}

private def joinN(lotteryQueue: ArrayBuffer[(LotteryCmd, ActorRef)]) = { //批量處理抽獎結果

val rs = doLotteryN(lotteryQueue)

val success = rs.collect { //得到其中中獎的相應資訊

case (event: LuckyEvent, ref: ActorRef) =>

event -> ref

}.toMap

val failure = rs.collect { //得到其中未中獎的相應資訊

case (event: FailureEvent, ref: ActorRef) => event -> ref

}

persistAll(success.keys.toIndexedSeq) { //批量持久化中獎使用者事件

case event => println(event)

updateState(event)

increaseEvtCountAndSnapshot()

success(event) ! event

}

failure.foreach {

case (event, ref) => ref ! event

}

this.lotteryQueue.clear() //清空參與隊列

}

private def increaseEvtCountAndSnapshot() = {

val snapShotInterval = 5

if (lastSequenceNr % snapShotInterval == 0 && lastSequenceNr != 0) { //當有持久化5個事件後我們便存儲一次目前Actor狀态的快照

self ! "saveSnapshot"

}

}

private def doLotteryN(lotteryQueue: ArrayBuffer[(LotteryCmd, ActorRef)]) = { //抽獎邏輯具體實作

var remainAmount = state.remainAmount

lotteryQueue.map(lq =>

if (remainAmount > 0) {

val luckyMoney = scala.util.Random.nextInt(remainAmount) + 1

remainAmount = remainAmount - luckyMoney

(LuckyEvent(lq._1.userId, luckyMoney),lq._2)

}

else {

(FailureEvent(lq._1.userId, "下次早點來,紅包已被抽完咯!"),lq._2)

}

)

}

}

這是改造後的參與Actor,實作了批量持久的功能,當然這裡為了給發送者傳回消息,處理邏輯稍微複雜了一點,不過真實場景可能會更複雜,相關源碼也在剛才的項目上。

3.Persistence Query

另外Akka Persistence還提供了Query接口,用于需要查詢持久化事件的需求,這部分内容可能要根據實際業務場景考慮是否需要應用,我就不展開講了,另外我也寫了一個小demo在項目中,想要嘗試的同學也可以試試。