天天看點

scala系列--并發01

  最近研究了一些scala并發的知識,總結一下。

一.簡介

即時響應性是一項決定任何應用程式成敗的關鍵因素。有兩種方式來提高即時響應性:1.多線程,并行運作多個任務。2.有政策的計算,惰性運作任務。

二.惰性求值

1.短路控制,scala不會向前看,是以用到lazy。

注意:多個變量綁定,後續調用,順序不可預知。

2.惰性集合,建立臨時視圖,調用的時候立即求值。

3.無限流轉換有限流

有限序列

無限流-》Stream

三.并行集合

順序集合

順序集合上的方法行為:它們為它們的集合中的每個元素順序地執行它們的操作。

并行集合

,Scala 都擁有其并行版本。例如,ParArray 是Array 對應的并行版本,同樣的,ParHashMap、ParHashSet 和ParVector 分别對應于HashMap、HashSet 和Vector。我們可以使用par()和seq()方法來在順序集合及其并行版本之間進行互相轉換。

四.Actor

簡介:

 Acotr模式是一種并發模型與另一種模型共享記憶體完全相反,Actor模型share nothing。

 所有線程(或程序)通過消息傳遞方式進行合作,這些線程(或程序)稱為Actor,共享記憶體更适合單機多核的并發程式設計。

特點:

  • 保證互斥通路的活動對象。
  • 一個Actor将隻會處理一條消息。Actor模型具有與生俱來的線程安全性。
  • 多個Actor并發地運作,同時處理多條消息。
  • Actor是異步。
  • 不會阻塞調用者。
  • 不用顯示建立一個線程池,顯示排程任務。
  • 線程并不和Actor綁定--一個線程池服務于多個Actor。
  • java建立共享可變變量,并使用同步原語來提供線程安全性。使用JDK 庫很難在短時間内做到正确的并發。

注意:

  • 更多依賴無狀态Actor,少用有狀态Actor。
  • 確定Actor之間傳遞消息是不可變對象(case,String,Int等)。保證我們不會無意間修改共享狀态。
  • 盡量避免使用ask().雙向通信不是一個好主意。“發送并忘記”(!)模型好得多。

示例:

統計字元串的次數。

import akka.actor._
import scala.collection.mutable
/**
  * 接收消息
  */
class MessageActor extends Actor{
  val messagesCount: mutable.Map[String, Int] =mutable.Map()
  override def receive: Receive = {
    case Play(role) =>
      val currentCount = messagesCount.getOrElse(role, 0)
      messagesCount.update(role, currentCount + 1)
      println(s"Playing $role")

    case ReportCount(role) =>
      sender ! messagesCount.getOrElse(role,0)

  }
}
case class Play(role: String)
case class ReportCount(role: String)           
import akka.actor._
import akka.util.Timeout
import akka.pattern.Patterns
import scala.concurrent.Await
import scala.concurrent.duration._

/**
  * Acotr模式是一種并發模型與另一種模型共享記憶體完全相反,Actor模型share nothing。
  * 所有線程(或程序)通過消息傳遞方式進行合作,這些線程(或程序)稱為Actor,共享記憶體更适合單機多核的并發程式設計。
  */
object  UseActor extends App{
  val system = ActorSystem("sample")
  val depp = system.actorOf(Props[MessageActor])
  val hanks = system.actorOf(Props[MessageActor])
   /**
    *tell屬于發了就完,什麼都不管的類型。
    *
    */
  depp ! Play("Depp1")
  hanks ! Play("Hanks1")

  depp ! Play("Depp1")
  depp ! Play("Depp2")


  println("Sent roles to play ")
  /**
    * 詢問(ask ?)模式,因為發送一條消息并等待響應可能會導緻潛在的活鎖
    * 消息可能永遠不會到達,設定逾時時間
    */
  implicit val timeout: Timeout = Timeout(2.seconds)
  val depp1 = Patterns.ask(depp, ReportCount("Depp1"), timeout)
  val depp2 = Patterns.ask(depp, ReportCount("Depp2"), timeout)
  val hanks1 = Patterns.ask(hanks, ReportCount("Hanks1"), timeout)

  val depp1Count = Await.result(depp1, timeout.duration)
  val depp2Count = Await.result(depp2, timeout.duration)
  val hanks1Count = Await.result(hanks1, timeout.duration)

  println(s"Depp played Depp1 $depp1Count time(s)")
  println(s"Depp played Depp2 $depp2Count time(s)")
  println(s"Hanks played Hanks1 $hanks1Count time(s)")

  val terminateFuture = system.terminate()
  Await.ready(terminateFuture, Duration.Inf)
}
           

結果:

Playing Depp1

Playing Hanks1

Sent roles to play 

Playing Depp1

Playing Depp2

Depp played Depp1 2 time(s)

Depp played Depp2 1 time(s)

Hanks played Hanks1 1 time(s)