天天看點

ElasticMQ 0.7.0:長輪詢,使用Akka和Spray的非阻塞實作

ElasticMQ 0.7.0: Long Polling, Non-Blocking Implementation Using Akka and Spray

原文作者:Adam Warski

原文位址:https://dzone.com/articles/elasticmq-070-long-polling-non

譯者微網誌:@從流域到海域

譯者部落格:blog.csdn.net/solo95

ElasticMQ 0.7.0:長輪詢,使用Akka和Spray的非阻塞實作

ElasticMQ 0.7.0,一個附帶基于actor的Scala的消息隊列系統剛剛釋出。

這是一次重大的重寫(即版本更新),更新之後将在核心使用Akka actors 并在REST層使用Spray。到目前為止,隻有核心和SQS子產品被重寫, 日志( journaling),SQL後端和副本(replication)子產品的重寫尚未完成。

主要的用戶端改進是:

  • 支援長輪詢,這是SQS前一段時間的補充
  • 更簡單的獨立伺服器 - 隻需下載下傳一個jar包

使用長時間的輪詢的過程中,當收到消息時,可以指定一個額外的的

MessageWaitTime

屬性。如果隊列中沒有消息,,ElasticMQ将等待

MessageWaitTime

幾秒鐘直到消息到達,而不是用空響應完成請求。這有助于減少帶寬的使用(不需要非常頻繁地進行請求),進而提高系統整體性能(發送後立即收到消息)并降低SQS成本。

獨立的伺服器現在是一個單一的jar包。要運作本地記憶體SQS實作(例如,測試使用SQS的應用程式),隻需要下載下傳jar檔案并運作:

java -jar elasticmq-server-0.7.0.jar           

複制

這将在

http://localhost:9324

啟動伺服器。當然,接口和端口都是可配置的,詳情請參閱自述檔案。像以前一樣,您也可以使用任何基于JVM的語言來運作嵌入式伺服器。

實作說明

出于好奇,下面是對ElasticMQ如何實作的簡短描述,包括核心系統,REST層,Akka資料流使用和長輪詢實作。所有的代碼都可以在GitHub上找到。

如前所述,ElasticMQ現在使用Akka和Spray來實作,并且不包含任何阻塞調用。一切都是異步的。

核心

核心系統是基于角色的。有一個主角色(main actor)(QueueManagerActor),它知道系統中目前建立了哪些隊列,并提供了建立和删除隊列的可能性。

為了與actor溝通,使用了類型化問答模式。例如,要查找一個隊列(一個隊列也是一個actor),就會定義一個消息:

case class LookupQueue(queueName:String)extends Replyable [Option [ActorRef]]           

複制

用法如下所示:

import org.elasticmq.actor.reply._
val lookupFuture: Future[Option[ActorRef]] = queueManagerActor ? LookupQueue("q2")           

複制

如前所述,每個隊列都是一個actor,并且已經封裝了隊列狀态。我們可以使用簡單的可變資料結構,而不需要任何線程同步,因為角色模型(actor model)為我們處理了這個問題。有一些消息可以發送給queue-actor,例如:

case class SendMessage(message: NewMessageData)   extends Replyable[MessageData]
case class ReceiveMessages(visibilityTimeout: VisibilityTimeout, count: Int, 
           waitForMessages: Option[Duration])     extends Replyable[List[MessageData]]
case class GetQueueStatistics(deliveryTime: Long) extends Replyable[QueueStatistics]           

複制

Rest層

SQS查詢/ REST層是使用Spray來實作的,這是一個基于Akka的輕量級REST/HTTP工具包。

除了基于角色的非阻塞IO實作外,Spray還提供了強大的路由庫

spray-routing

。它包含一些内置的指令,用于在請求方法(get / post等)上進行比對,提取表單參數中的查詢參數或比對請求路徑。但它也可以讓你使用簡單的指令組合來定義你自己的指令。一個典型的ElasticMQ route示例如下所示:

val listQueuesDirective = 
  action("ListQueues") {
    rootPath {
      anyParam("QueueNamePrefix"?) { prefixOption =>
        // logic
      }
    }
  }           

複制

action

"Action"

URL的body參數中比對指定的action名稱并接受/拒絕請求的地方,

rootPath

會比對出空路徑(...)。Spray有一個很好的教程,如果你有興趣,我建議你看看這篇教程。

如何使用路由中的隊列角色(queue actors)來完成HTTP請求?

關于Spray的

RequestContext

好處是,它所做的隻是将一個執行個體傳遞給你的路由,不需要任何回複。完全放棄請求或使用某個value完成該請求僅僅取決于它的路由。該請求也可以在另一個線程中完成 - 或者,例如,在未來某個線程運作完成時。這正是ElasticMQ所做的。在這裡使用

map

flatMap

for-comprehensions

(這是一個針對

map

/

flatMap

更好的文法)是非常友善的,例如(省略了一些内容):

// Looking up the queue and deleting it are going to be called in sequence,
// but asynchronously, as ? returns a Future
for {
   queueActor <- queueManagerActor ? LookupQueue(queueName)
   _ <- queueActor ? DeleteMessage(DeliveryReceipt(receipt))
} {
   requestContext.complete(200, "message deleted")
}           

複制

有時,當流程更複雜時,ElasticMQ會使用Akka Dataflow,這需要啟用continuations插件。還有一個類似的項目,使用宏,Scala Async,但這個仍處于早期開發階段。

使用Akka Dataflow,您可以編寫使用

Future

們的代碼,就好像編寫正常的序列化代碼一樣。CPS插件會将其轉換為在需要時使用回調。這是一個來自CreateQueueDirectives的例子:

(序列化代碼sequential code,也有翻譯成順序代碼的,即按順序執行的代碼,過程中不存在多線程異步操作,譯者注)

flow {
  val queueActorOption = (queueManagerActor ? LookupQueue(newQueueData.name)).apply()
  queueActorOption match {
    case None => {
      val createResult = (queueManagerActor ? CreateQueue(newQueueData)).apply()
      createResult match {
        case Left(e) => throw new SQSException("Queue already created: " + e.message)
        case Right(_) => newQueueData
      }
    }
    case Some(queueActor) => {
      (queueActor ? GetQueueData()).apply()
    }
  }
}           

複制

這裡的重要部分是

flow

代碼塊,它界定了轉換的範圍,以及調用

Future

提取future内容的

apply()

。這看起來像完全正常的序列化代碼,但是在執行時,因為第一次

Future

是第一次使用将會異步運作。

長輪詢

由于所有的代碼都是異步和非阻塞的,實作長輪詢非常容易。請注意,從一個隊列接收消息時,我們得到一個

Future[List[MessageData]]

。為了發出響應已完成這個future,HTTP請求也将會以适當的響應來完成。然而,這個future幾乎可以立即完成(例如正常情況下),比如在10秒之後 - 代碼所需的支援沒有變化。唯一要做的就是延遲完成future,直到指定的時間過去或新的消息到達。

實作在QueueActorWaitForMessagesOps中。當接收到消息的請求到達時,隊列中沒有任何内容産生,而是立即回複(即向發送者actor發送空清單),我們将儲存原始請求的引用和發送方actor在map中。使用Akka排程程式,我們還計劃在指定的時間超過之後發回空清單并删除條目。

當新消息到達時,我們隻需從map上等待一個請求,然後嘗試去完成它。同樣,所有同步和并發問題都由Akka和actor模型來處理。

請測試新版本,如果您有任何回報,請讓我們知曉!

Adam