ElasticMQ 0.7.0: Long Polling, Non-Blocking Implementation Using Akka and Spray
原文作者:Adam Warski
原文位址:https://dzone.com/articles/elasticmq-070-long-polling-non
譯者微網誌:@從流域到海域
譯者部落格:blog.csdn.net/solo95
ElasticMQ 0.7.0:長輪詢,使用Akka和Spray的非阻塞實作
ElasticMQ 0.7.0,一個附帶基于actor的Scala的消息隊列系統剛剛釋出。
這是一次重大的重寫(即版本更新),更新之後将在核心使用Akka actors 并在REST層使用Spray。到目前為止,隻有核心和SQS子產品被重寫, 日志( journaling),SQL後端和副本(replication)子產品的重寫尚未完成。
主要的用戶端改進是:
- 支援長輪詢,這是SQS前一段時間的補充
- 更簡單的獨立伺服器 - 隻需下載下傳一個jar包
使用長時間的輪詢的過程中,當收到消息時,可以指定一個額外的的
MessageWaitTime
屬性。如果隊列中沒有消息,,ElasticMQ将等待
MessageWaitTime
幾秒鐘直到消息到達,而不是用空響應完成請求。這有助于減少帶寬的使用(不需要非常頻繁地進行請求),進而提高系統整體性能(發送後立即收到消息)并降低SQS成本。
獨立的伺服器現在是一個單一的jar包。要運作本地記憶體SQS實作(例如,測試使用SQS的應用程式),隻需要下載下傳jar檔案并運作:
java -jar elasticmq-server-0.7.0.jar
複制
這将在
http://localhost:9324
啟動伺服器。當然,接口和端口都是可配置的,詳情請參閱自述檔案。像以前一樣,您也可以使用任何基于JVM的語言來運作嵌入式伺服器。
實作說明
出于好奇,下面是對ElasticMQ如何實作的簡短描述,包括核心系統,REST層,Akka資料流使用和長輪詢實作。所有的代碼都可以在GitHub上找到。
如前所述,ElasticMQ現在使用Akka和Spray來實作,并且不包含任何阻塞調用。一切都是異步的。
核心
核心系統是基于角色的。有一個主角色(main actor)(QueueManagerActor),它知道系統中目前建立了哪些隊列,并提供了建立和删除隊列的可能性。
為了與actor溝通,使用了類型化問答模式。例如,要查找一個隊列(一個隊列也是一個actor),就會定義一個消息:
case class LookupQueue(queueName:String)extends Replyable [Option [ActorRef]]
複制
用法如下所示:
import org.elasticmq.actor.reply._
val lookupFuture: Future[Option[ActorRef]] = queueManagerActor ? LookupQueue("q2")
複制
如前所述,每個隊列都是一個actor,并且已經封裝了隊列狀态。我們可以使用簡單的可變資料結構,而不需要任何線程同步,因為角色模型(actor model)為我們處理了這個問題。有一些消息可以發送給queue-actor,例如:
case class SendMessage(message: NewMessageData) extends Replyable[MessageData]
case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, count: Int,
waitForMessages: Option[Duration]) extends Replyable[List[MessageData]]
case class GetQueueStatistics(deliveryTime: Long) extends Replyable[QueueStatistics]
複制
Rest層
SQS查詢/ REST層是使用Spray來實作的,這是一個基于Akka的輕量級REST/HTTP工具包。
除了基于角色的非阻塞IO實作外,Spray還提供了強大的路由庫
spray-routing
。它包含一些内置的指令,用于在請求方法(get / post等)上進行比對,提取表單參數中的查詢參數或比對請求路徑。但它也可以讓你使用簡單的指令組合來定義你自己的指令。一個典型的ElasticMQ route示例如下所示:
val listQueuesDirective =
action("ListQueues") {
rootPath {
anyParam("QueueNamePrefix"?) { prefixOption =>
// logic
}
}
}
複制
在
action
到
"Action"
URL的body參數中比對指定的action名稱并接受/拒絕請求的地方,
rootPath
會比對出空路徑(...)。Spray有一個很好的教程,如果你有興趣,我建議你看看這篇教程。
如何使用路由中的隊列角色(queue actors)來完成HTTP請求?
關于Spray的
RequestContext
好處是,它所做的隻是将一個執行個體傳遞給你的路由,不需要任何回複。完全放棄請求或使用某個value完成該請求僅僅取決于它的路由。該請求也可以在另一個線程中完成 - 或者,例如,在未來某個線程運作完成時。這正是ElasticMQ所做的。在這裡使用
map
,
flatMap
和
for-comprehensions
(這是一個針對
map
/
flatMap
更好的文法)是非常友善的,例如(省略了一些内容):
// Looking up the queue and deleting it are going to be called in sequence,
// but asynchronously, as ? returns a Future
for {
queueActor <- queueManagerActor ? LookupQueue(queueName)
_ <- queueActor ? DeleteMessage(DeliveryReceipt(receipt))
} {
requestContext.complete(200, "message deleted")
}
複制
有時,當流程更複雜時,ElasticMQ會使用Akka Dataflow,這需要啟用continuations插件。還有一個類似的項目,使用宏,Scala Async,但這個仍處于早期開發階段。
使用Akka Dataflow,您可以編寫使用
Future
們的代碼,就好像編寫正常的序列化代碼一樣。CPS插件會将其轉換為在需要時使用回調。這是一個來自CreateQueueDirectives的例子:
(序列化代碼sequential code,也有翻譯成順序代碼的,即按順序執行的代碼,過程中不存在多線程異步操作,譯者注)
flow {
val queueActorOption = (queueManagerActor ? LookupQueue(newQueueData.name)).apply()
queueActorOption match {
case None => {
val createResult = (queueManagerActor ? CreateQueue(newQueueData)).apply()
createResult match {
case Left(e) => throw new SQSException("Queue already created: " + e.message)
case Right(_) => newQueueData
}
}
case Some(queueActor) => {
(queueActor ? GetQueueData()).apply()
}
}
}
複制
這裡的重要部分是
flow
代碼塊,它界定了轉換的範圍,以及調用
Future
提取future内容的
apply()
。這看起來像完全正常的序列化代碼,但是在執行時,因為第一次
Future
是第一次使用将會異步運作。
長輪詢
由于所有的代碼都是異步和非阻塞的,實作長輪詢非常容易。請注意,從一個隊列接收消息時,我們得到一個
Future[List[MessageData]]
。為了發出響應已完成這個future,HTTP請求也将會以适當的響應來完成。然而,這個future幾乎可以立即完成(例如正常情況下),比如在10秒之後 - 代碼所需的支援沒有變化。唯一要做的就是延遲完成future,直到指定的時間過去或新的消息到達。
實作在QueueActorWaitForMessagesOps中。當接收到消息的請求到達時,隊列中沒有任何内容産生,而是立即回複(即向發送者actor發送空清單),我們将儲存原始請求的引用和發送方actor在map中。使用Akka排程程式,我們還計劃在指定的時間超過之後發回空清單并删除條目。
當新消息到達時,我們隻需從map上等待一個請求,然後嘗試去完成它。同樣,所有同步和并發問題都由Akka和actor模型來處理。
請測試新版本,如果您有任何回報,請讓我們知曉!
Adam