天天看點

akka mysql_面對使用Scala Slick MySQL Akka Stream的問題

問題陳述:我們将

MySQL DB表中特定子產品的使用者的所有傳入請求參數作為一行添加(這是一個巨大的資料).現在,我們想要設計一個程序,該程序将從該表中讀取每條記錄,并通過調用第三方API獲得有關該使用者請求的更多資訊,之後它将把這個傳回的元資訊放在另一個表中.

目前的嘗試:

我正在使用Scala Slick來做到這一點.由于要讀取的資料很大,我想一次一行地讀取該表并進行處理.我嘗試使用光滑的akka​​流,但是我得到’java.util.concurrent.RejectedExecutionException’

以下是我嘗試過的粗略邏輯,

implicit val system = ActorSystem("Example")

import system.dispatcher

implicit val materializer = ActorMaterializer()

val future = db.stream(SomeQuery.result)

Source.fromPublisher(future).map(row => {

id = dataEnrichmentAPI.process(row)

}).runForeach(id => println("Processed row : "+ id))

dataEnrichmentAPI.process:此函數進行第三方REST調用,并執行一些資料庫查詢以擷取所需資料.這個資料庫查詢是使用’db.run’方法完成的,它也會一直等到它完成(使用Await)

例如.,

def process(row: RequestRecord): Int = {

// SomeQuery2 = Check if data is already there in DB

val retId: Seq[Int] = Await.result(db.run(SomeQuery2.result), Duration.Inf)

if(retId.isEmpty){

val metaData = RestCall()

// SomeQuery3 = Store this metaData in DB

Await.result(db.run(SomeQuery3.result), Duration.Inf)

return metaData.id;

}else{

// SomeQuery4 = Get meta data id

return Await.result(db.run(SomeQuery4.result), Duration.Inf)

}

}

我正在使用阻止調用DB的這個異常.我不認為我是否可以擺脫它,因為後續流程需要傳回值才能繼續.

“阻止呼叫”是否是此異常背後的原因?

解決此類問題的最佳做法是什麼?

謝謝.