天天看點

【值得收藏】你想知道的并發都在這裡【傳統并發】與【Go并發】

并發操作總結

參考書籍及網站

《計算機作業系統原理分析(第二版)》

《Go程式設計語言》

《Go并發程式設計實戰》

https://medium.com/rungo/anatomy-of-channels-in-go-concurrency-in-go-1ec336086adb

由于本人能力有限,
    如果本文有地方錯誤或是表達不清楚,歡迎在評論區評論或是私信我!
    我們一起交流,一起進步!
    Coding Change World!      

目錄

(一). 基礎知識

1. 知識儲備

1.1 并發主要思想

1.2 并發概念

1.3 程序

1.3.1 定義

1.3.2 特征

1.4 線程

1.4.1 定義

1.4.2 特征

(二). 基于記憶體共享的并發(傳統并發)

2. 制約關系

2.1 間接制約與互斥關系

簡介

流程

2.2 直接制約與同步關系

3.同步機制

3.1 互斥關系與加鎖機制

3.1.1 臨界區管理準則

3.1.2 加鎖機制原理

3.1.3 加鎖例子

3.1.4 加鎖機制分析

3.2 互斥關系與信号量機制

3.2.1 信号量機制原理

3.2.2 信号量分析

3.2.3 實作例子及分析

4. 例題

4.1 讀寫操作模型

4.2 PC問題

(三). 基于通道通信的并發

5 .通道簡介

5.1 聲明

5.1 讀寫

6 .通道詳解

6.1 例子

6.2 死鎖

6.3 關閉通道

6.4 緩沖區

6.5 通道的長度和容量

6.6 單向通道

6.7 Select

6.7.1 原理

6.7.2 default case 塊

6.7.3 空 select

6.8 Deadlock

6.9 nil通道

7. 多協程協同工作

8. WaitGroup

8.1 簡介

8.2 工作池

9. Mutex

(四). 結語

【值得收藏】你想知道的并發都在這裡【傳統并發】與【Go并發】

使多個任務可以在同一時間段内執行以便能夠得到結果。

在多道程式涉及環境下,處理器在開始執行一道程式的第一條指令後,在這道程式完成之前,處理器可以開始執行下一道程式,同樣地,更多其他的程式也可以開始運作。

也就是說,處理器在執行一道程式的兩條指令之間,可以執行其他程式的指令。

一道程式在一個資料集上的一次執行過程,稱為程序

為了實作并發執行,分析、解決并發執行中出現的問題,作業系統引入了一個概念,即程序,來深入揭示程式的運作規律和動态變化。

動态性:每個程序都有一個生命周期,具有一個從建立、運作到消亡的過程。程序是動态的,而程式是靜态的。程式可以以紙質或電子存儲媒體等形式存在,如果程式員沒有修改,程式還可以長期儲存;程序是程式在處理器上的運作過程,是動态變化的,具有從産生到消亡的過程。

從程序的動态性又可分為以下幾個部分:

運作狀态 :處理器目前執行的指令正是該程序對應的程式代碼,程序正占用CPU運作,或是說CPU配置設定給程序。

就緒狀态 :對于目前不再運作狀态的程序,如果把CPU配置設定給它,他就可以立即運作。

阻塞狀态 :對于目前不在運作狀态的一個程序,即使把處理器配置設定給它,他也不能運作。

并發性:多個程式可以并發執行。一個程序被建立後,在他消亡之前,其他的程序也可以被建立。這樣,宏觀上有多個程序同時在運作中,但是對于單處理器,任意時刻隻能運作一個程序的程式代碼。

獨立性:程序是作業系統配置設定資源的基本機關,一個程序的程式和資料隻能由該程序本身通路,程序位址空間是私有的。

結構性:作業系統經過概括、抽象後,定義了一個相對固定的格式即資料結構,用于表示一個程序,這個資料結構就是程序控制塊PCB。

異步性:多個程序并發執行時,每一個程序的運作過程不可預知,是以,它何時完成也無法準确預知,這就要求作業系統必須做到,在一個程式運作完成之前,随時可以建立一個或更多新的程序,這就是程序的異步性。

把程序細化為若幹個可以獨立運作的實體,每一個實體稱為一個線程。

實作程序内部的并發執行,提高并行程度。

減少處理器切換的開銷

簡化程序通信通信

這種并發就是我們在計算機作業系統中學習到的并發,也是傳統的并發。

并發程序的制約關系分為間接制約和直接制約,分别對應程序互斥關系和同步關系。

程序同步是對程序并發執行的協調,通過程序同步機制保證程式的可再現性。

兩個或是多個程序共享一種資源時,當一個程序在通路或使用該資源時,制約其他程序的通路或使用,否則,就可能造成執行結果的錯誤。

把并發程序之間的這種制約關系稱為間接制約關系,也就是程序通過第三方即共享的資源,暫時限制其他程序的運作,間接制約關系時由資源共享引起的。

臨界資源與間接制約

一次隻能讓一個程序使用的資源稱為臨界資源。

常見的臨界資源有列印機,存儲單元,堆棧等。

間接制約關系就是一組并發程序在共享某種臨界資源時存在的一種制約關系。

如下圖所示

【值得收藏】你想知道的并發都在這裡【傳統并發】與【Go并發】

臨界區與互斥關系

臨界區是指程序對應的程式中通路臨界資源的一段代碼。

- 稱一個程序要進入臨界區,是指該程序已經執行臨界區的第一條指令/語句;
- 稱一個程序離開或退出臨界區,是指該程序已經執行了臨界區的最後一條指令/語句;
- 稱一個程序在臨界區内執行,是指該程序已經開始執行臨界區的第一條指令但還沒有離開這個臨界區。      

作業系統對一組程序的間接制約關系的控制,轉為實作這組程序的互斥關系。

直接制約關系則時由任務協作引起的。幾個程序共同協作完成一項任務,因任務性質的要求,這些程序的執行順序由嚴格的固定,隻有按實作規定的順序依次執行,任務才能得到正确的處理,否則,就可能造成錯誤結果。把并發程序之間的這種制約關系稱為直接制約關系,也就是一個程序的執行狀況直接決定了另一個或幾個程序可否執行。

單向依賴關系

對于程序A和B,如果處理器在執行過程A中某條指令之前,要求先執行程序B的一條指令;在程序B指定的指令沒有執行之前,程序A的對應指令不能執行,這時稱程序A依賴于程序B。

【值得收藏】你想知道的并發都在這裡【傳統并發】與【Go并發】

互相依賴關系

如果程序A依賴于程序B,同時程序B也依賴于程序A,則稱A和B具有互相依賴關系。

【值得收藏】你想知道的并發都在這裡【傳統并發】與【Go并發】

同步關系

在一組并發程序中,如果每個程序至少同組中另一個程序存在單向或互相依賴關系,則稱這組程序具有同步關系,簡稱同步程序。

空閑讓進:在一個程序要求進入臨界區執行時,如果程序在相關臨界區内執行,則允許其進入臨界區運作。

忙則等待:當有一個程序在臨界區内執行時,要求進入相關臨界區執行的其他任何程序都要等待。

有限等待:對于要求進入臨界區執行的程序,之多經過有限時間的等待之後,應有機會進入臨界區執行,不能讓其進行無期限等待下去。是為了系統的公平性,也為了防止饑餓。

讓權等待:當程序離開臨界區時,即在臨界區内執行的程序在執行完臨界區的最後一條指令後,應把處理器讓給下一個程序執行。讓權等待與程序排程密切相關。

鎖變量key

對于一組相關臨界區定義的第一個變量稱為鎖變量,鎖變量取值0和1
并規定key=0時,鎖時開着的,臨界資源目前時空的,此時允許程序進入對應的臨界區執行;
key=1表示對應的鎖時關的,臨界資源目前是忙的狀态,此時禁止程序進入對應的臨界區。      
  1. 加鎖操作

檢查程序是否可以進入臨界區執行。在臨界區的第一條指令之前,加入一個枷鎖的奧做,以實作程序要進入臨界區執行時的檢查。

lock(key){
    while(key==1); //循環測試語句
    key=1; //設定語句
}      

隻有得到鎖的程序才允許進入臨界區執行,沒有得到鎖的程序要等待。

3. 解鎖操作

unlock(key){
    key = 0
}      

加鎖的控制方法

初始變量 key = 0;

...
lock (key);
臨界區
unlock(key);
...
      

程序PA()與程序PB()

PA(){
    int x;
    lock(key); 
    x = count;
    x = x + 1;
    count = x;
    unlock(key);
}

PB(){
    int y;
    lock(key);
    y = count;
    y = y - 1;
    count = y;
    unlock(key)
}      

1.由于一開始的key=0,是以在PA的lock的循環中,PA的key = 1,獲得了鎖,進入臨界區。

2.如果處理器要去處理B的操作,則會因為PB中的lock中的循環一直在轉,是以一直在阻塞,無法執行後續的步驟。

3. 隻有當PA的key釋放之後,使得B獲得了鎖才能進入臨界區。

是以在單單這種加鎖的條件下是不能實作互斥關系的。

需要借助硬體操作,以x86為例子,利用彙編指令xchg實作lock(key)

tsl :
    mov ax,1
    xchg ax,key
    cmp ax,o
    jne tsl

//對于多處理器系統,通常提供指令字首lock,利用指令字首lock封鎖總線實作指令的執行的互斥。
tsl:
    mov ax,1
lock xchg ax,key
    cmp ax,0
    jne tsl
      

普通的加鎖機制不能實作互斥關系,借助硬體的加鎖機制可以實作程序的互斥關系。

存在“忙等待”的狀況,浪費處理器時間。當一個B程序等待的時候,A程序完成之後,處理器可能會調用C程序,D程序,不執行B程序,因為并發的随機性,是以B可能永遠都無法執行。

存在“饑餓”現象。忙等待的狀況的不斷出現會造饑餓現象,或是餓死現象,可見加鎖機制不滿足臨界區管理準則。

多個鎖變量的加鎖操作可能造成程序死鎖。

信号量機制不僅可以實作程序互斥關系,還可以實作程序的同步關系

信号量

一種信号量對應一個整形變量value,一個等待隊列bq,同時還可以對應其他的控制資訊。

struct semaphore{
    int value;  //信号量的整形變量
    PCB *bp;    //信号量對應的等待隊列
}      
  • p操作

    s是一個信号量

p(s){
    s.value = s.value - 1;
    if(s.value<0) {
        blocked(s);  
        //這個是阻塞原語,把目前調用p(s)操作的程序設定為阻塞狀态并加入到信号量s對應的等待隊列bq中
    }
}      

v操作

v(s){
    s.value = s.value + 1;
    if(s.value <= 0) {
        wakeup(s);  
        // 這裡的wakeup(s)是喚醒原語,從信号量s對應的等待隊列bq中喚醒一個程序,也就是按照一定政策從等待隊列bq中選擇一個程序,将其轉化就緒狀态。
    }
}      

p、v操作也是一個原語操作。

當s.value>=1時,程序調用p(s)操作後,不會造成程序阻塞。

當s.value<=0時,程序調用p(s)操作後,會造成程序阻塞,系統會把處理器配置設定給下一個程序運作,而不像加鎖機制中的“忙等待”,阻塞程序就會被加入到等待隊列中,當執行目前程序後,再通過一定的算法(先進先出,先進後出等等),從等待隊列中選出執行的程序。激活程序。

定義一個信号量s,初始值為 1

信号量的控制描述如下

...
p(s);
臨界區;
v(s);
...      

改進上述加鎖機制中的PA() PB()操作

PA(){
    int x;
    p(s);
    x = count;
    x = x + 1;
    count = x;
    v(s);
}

PB(){
    int y;
    p(s);
    y = count;
    y = y - 1;
    count = y;
    v(s);
}      

處理器再執行PA()程序的程序的時候,會執行p(s)操作,由于s.value = 1,是以執行完之後,s.value=0,是不會阻塞的。

此時如果處理器要處理PB()操作,由于s.value = 0 是以執行完之後,s.value<0,将B程序阻塞,并将B程序放入等待隊列bq中。

當處理器處理完A的操作的時候,就會在等待隊列中喚醒一個程式,是否下一個程式是B就看是什麼算法了。

第一題

實作Read、Move、Write并發執行

semaphore s1 = 1,s2 = 0 , s3 =1 ,s4 = 0;
Read(){
    從原磁盤讀取一個檔案;
    p(s1);  //執行從檔案讀到buff1的操作
    檔案資料存入緩沖區buff1
    v(s2);  //激活讀取buff1的操作,使得進入就緒狀态
}

Move(){
    p(s2);   //執行從buff1中讀取檔案的操作
    從緩沖區buff1取檔案資料
    v(s1);  //執行完了從檔案讀到buff1的讀操作
    p(s3);  //執行從buff1讀取的檔案寫入buff2的寫操作
    将檔案資料存入buff2中
    v(s4);  //激活從buff2寫入磁盤的寫操作
}

Write(){
    p(s4);   //執行從buff2寫入磁盤的寫操作
    把buff2中的資料存入目标磁盤的檔案中;
    v(s3);  //執行完從buff1讀取的檔案寫入buff2的寫操作
}

main(){
    cobegin
    {
        repeat Read();
        repeat Move();
        repeat Write();
    }
}
      

第二題

實作生産者/消費者模型(PC問題)

semaphore mutex = 1, empty = k , full = 0;
Producer(){
    生産一個物品
    p(empty)
    p(mutex);
    物品存入緩沖區buf[]的某個單元格
    v(mutex);
    p(full);
}

Consumer(){
    p(full);
    p(mutex);
    從緩沖區buf[]的某個單元格取物品
    v(mutex);
    v(empty);
    消費
    
}
      

隻有生産者生産出,消費者才能進行消費,不然消費者會處于阻塞狀态。

【值得收藏】你想知道的并發都在這裡【傳統并發】與【Go并發】

在go社群有這樣一句話

不要通過共享記憶體來通信,而是通過通信來共享記憶體。

go官方是建議使用管道通信的方式來進行并發。

通道 是用于協程間交流的通信載體。嚴格地來說,通道就是資料傳輸的管道,資料通過這根管道被 “傳入” 或被 “讀出”。 是以協程可以發送資料到通道中,而另一個協程可以從該通道中讀取資料。

在這裡就要引入一個新名詞:協程

将線程再細分為多個協程,比如說是一條流水線上的多人協作。那麼就可以減少各個線程内部的等待時間。

Go 提供一個 chan 關鍵詞去建立一個通道。一個通道隻能傳入一種類型的資料,其他的資料類型不允許被傳輸。

【值得收藏】你想知道的并發都在這裡【傳統并發】與【Go并發】

将線程再分成更細的協程,使得中間等待時候更少,提高效率!

【值得收藏】你想知道的并發都在這裡【傳統并發】與【Go并發】
package main

import "fmt"

func main(){
    var channel chan int //聲明了一個可以傳入 int 類型資料的通道 channel 。
    fmt.Println(channel)  
    //程式會列印nil, 因為通道的 0 值是 nil。
}      

一個 nil 通道是沒有用的。你不能向它傳遞資料或者讀取資料。

是以,我們必須使用 make 函數器建立一個可以使用的通道。

package main

import "fmt"

func main(){
    channel := make(chan int) 
    //聲明了一個可以傳入 int 類型資料的通道 channel 。
    fmt.Println(channel)  
    //程式會列印channel的位址。 0xc0000180c0
}
      

它是一個指針記憶體位址。通道變量預設是一個指針。多數情況下,當你想要和一個協程溝通的時候,你可以給函數或者方法傳遞一個通道作為參數。當從協程接收到通道參數後,你不需要再對其進行解引用就可以從通道接收或者發送資料。

Go 語言提供一個非常簡潔的左箭頭文法 <- 去從通道讀寫資料。

有變量接受管道值

channel <- data      

上面的代碼意味着我們想要把 data 資料推入到通道 channel 中,注意看箭頭的指向。

它表明是從 data資料 to到 通道 channel。

是以我們可以當作我們正在把 data 推入到通道 channel。

無變量接受管道值

<- data      

這個語句不會把資料傳輸給任何變量,但是仍然是一個有效的語句。

上面的通道操作預設是阻塞的。

在以前的課程中,我們知道可以使用 time.Sleep 去阻塞一個通道。通道操作本質上是阻塞的。當一些資料被寫入通道,對應的協程将阻塞直到有其他的協程可以從此通道接收資料。

通道操作會通知排程器去排程其他的協程,這就是為什麼程式不會一直阻塞在一個協程。通道的這些特性在不同的協程溝通的時候非常有用,它避免了我們使用鎖或者一些 hack 手段去達到阻塞協程的目的。

package main

import "fmt"

func Rush(c chan string) {
    fmt.Println("Hello "+ <-c + "!")
    // 聲明一個函數 greet, 這個函數的參數 c 是一個 string 類型的通道。
    // 在這個函數中,我們從通道 c 中接收資料并列印到控制台上。
}


func main(){
    fmt.Println("Main Start") 
    // main 函數的第一個語句是列印 main start 到控制台。
    channel := make(chan string)
    // 在 main 函數中使用 make 函數建立一個 string 類型的通道指派給 ‘ channel ’ 變量
    go Rush(channel)        
    // 把 channel 通道傳遞給 greet 函數并用 go 關鍵詞以協程方式運作它。
    // 此時,程式有兩個協程并且正在排程運作的是 main goroutine 主函數 
    channel <- "DEMO"       
    // 給通道 channel 傳入一個資料 DEMO.
    // 此時主線程将阻塞直到有協程接收這個資料. Go 的排程器開始排程 greet 協程接收通道 channel 的資料 
    fmt.Println("Main Stop")   
    // 然後主線程激活并且執行後面的語句,列印 main stopped
}
/*
Main Start
Hello DEMO!
Main Stop
*/      
【值得收藏】你想知道的并發都在這裡【傳統并發】與【Go并發】

當通道讀寫資料時,所在協程會阻塞并且排程控制權會轉移到其他未阻塞的協程。

如果目前協程正在從一個沒有任何值的通道中讀取資料,那麼目前協程會阻塞并且等待其他協程往此通道寫入值。

是以,讀操作将被阻塞。類似的,如果你發送資料到一個通道,它将阻塞目前協程直到有其他協程從通道中讀取資料。此時寫操作将阻塞 。

下面是一個主線程在進行通道操作的時候造成死鎖的例子

package main

import "fmt"

func main() {
    fmt.Println("main start")
    // main 函數的第一個語句是列印 main start 到控制台。
    channel := make(chan string)
    // 在 main 函數中使用 make 函數建立一個 string 類型的通道指派給 ‘ channel ’ 變量
    channel <- "GoLang"
    // 給通道 channel 傳入一個資料 DEMO.
    // 此時主線程将阻塞直到有協程接收這個資料. Go 的排程器開始排程協程接收通道 channel 的資料
    // 但是由于沒有協程接受,沒有協程是可被排程的。所有協程都進入休眠狀态,即是主程式阻塞了。
    fmt.Println("main stop")
}

/*
報錯
main start
fatal error: all goroutines are asleep - deadlock!  //所有協程都進入休眠狀态,死鎖

goroutine 1 [chan send]:
main.main()
*/      

package main

import "fmt"

func RushChan(c chan string) {
    <- c
    fmt.Println("1")
    <- c
    fmt.Println("2")
}

func main() {
    fmt.Println("main start")
    c := make(chan string, 1)
    go RushChan(c)
    c <- "Demo1"
    close(c)
    /*
    不能向一個關了的channel發資訊
    main start
    panic: send on closed channel
    */
    c <- "Demo2"
    //close(c)
    /*
    close 放這裡的話可以
    main start
    1
    2
    Main Stop
    */
    fmt.Println("Main Stop")
}      

第一個操作 c <- "Demo2" 将阻塞協程直到有其他協程從此通道中讀取資料,是以 greet 會被排程器排程執行。

第一個操作 <-c 是非阻塞的 因為現在通道c有資料可讀。

第二個操作 <-c将被阻塞因為通道c已經沒資料可讀.

此時main協程将被激活并且程式執行close(c)關閉通道操作。

c := make(chan Type, n)      

當緩沖區參數不是 0 的時候。協程将不會阻塞除非緩沖區被填滿。

當緩沖區滿了之後,想要再往緩沖區發送資料隻有等到有其他協程從緩沖區接收資料, 此時的發送協程是阻塞的。

有一點需要注意, 讀緩沖區的操作是渴望式讀取,意味着一旦讀操作開始它将讀取緩沖區所有資料,直到緩沖區為空。

原理上來說讀操作的協程将不會阻塞直到緩沖區為空。

package main

import "fmt"

func RushChan(c chan string) {
    for {
        val ,_ := <-c
        fmt.Println(val)
    }
}

func main() {
    fmt.Println("Main Start")
    c := make(chan string, 1)
    go RushChan(c)
    c <- "Demo1" //結果1
    //c <- "Demo2"  //結果2
    fmt.Println("Main Stop")
}
/*
結果1:
Main Start
Main Stop
*/

/*
結果2:
Main Start
Join
Mike
Main Stop
*/
      

由于這是一個緩沖的通道,當我隻有c <- Demo1的時候,這裡面隻是滿了,但是是不會阻塞的。是以子協程接受到了這個資料Demo1,但是由于是非阻塞,是以主線程沒有被阻塞,并沒有等子協程完成就結束了,結果1就是這樣出現了。

當加多一個c <- Demo2 的時候,這時就要等緩沖區空了,也就是等有協程把Demo1讀取,是以就會導緻主線程阻塞,此時的結果就是結果2了。

package main

import "fmt"

func RushChan(c chan string) {
    for {
        val ,_ := <-c
        fmt.Println(val)
    }
}

func main() {
    c := make(chan int,3)
    c <- 1
    c <- 2
    c <- 3
    close(c)
    for elem := range c {
        fmt.Println(elem)
    }
}      

這裡雖然關閉了通道,但是其實資料不僅在通道裡面,資料還在緩沖區中的,我們依然可以讀取到這個資料。

和切片類似,一個緩沖通道也有長度和容量。

通道的長度是其内部緩沖隊列未讀的資料量,而通道的容量是緩沖區可最大盛放的資料量。

我們可以使用 len 函數去計算通道的長度,使用 cap 函數去獲得通道的容量。和切片用法神似

package main

import "fmt"

func RushChan(c chan string) {
    for {
        val ,_ := <-c
        fmt.Println(val)
    }
}

func main() {
    c := make(chan int,3)
    c <- 1
    c <- 2
    fmt.Println("長度: ",len(c))
    fmt.Println(<-c)
    fmt.Println("長度: ",len(c))
    fmt.Println(<-c)
    fmt.Println("長度: ",len(c))
    fmt.Println("容量: ",cap(c))
}
/*
結果:

長度:  2
1
長度:  1
2
長度:  0
容量:  3

*/      

這個 c 通道容量為 3,但隻盛放了 2 個資料。Go 就不用去阻塞主線程去排程其他協程。

你也可以在主線程中去讀取這些資料,因為雖然通道沒有放滿,也不會阻止你去從通道讀取資料。

目前為止,我們已經學習到可以雙向傳遞資料的通道,或者說,我們可以對通道做讀操作和寫操作。但是事實上我們也可以建立單向通道。比如隻讀通道隻允許讀操作,隻寫通道隻允許寫操作。

單向通道也可以使用 make 函數建立,不過需要額外加一個箭頭文法。

roc := make(<-chan int)
soc := make(chan<- int)      

在上面的程式中, roc 是一個隻讀通道,<- 在 chan 關鍵詞前面。 soc is 隻寫通道,<- 在 chan 關鍵詞後面。 他們也算不同的資料類型。

但是單向通道有什麼作用呢 ?

使用單向通道可以 提高程式的類型安全性, 使得程式不容易出錯。

但是假如你在一個協程中隻需要讀操作某通道,但是在主線程中卻需要讀寫操作這個通道該怎麼辦呢?

幸運的是 Go 提供了一個簡單的文法去把雙向通道轉化為單向通道。

package main

import "fmt"

func greet(roc <-chan string) {
    fmt.Println("Hello " + <-roc ,"!")
}

func main() {
    fmt.Println("Main Start")
    c := make(chan string)
    go greet(c)
    c <- "Demo"
    fmt.Println("Main Stop")
}
/*
結果
Main Start
Hello Demo !
Main Stop
*/      

我們修改 greet 協程函數,把參數 c 類型從雙向通道改成單向接收通道。

現在我們隻能從通道中讀取資料,通道上的任何寫入操作将會發生錯誤:

“invalid operation: roc <- “Temp” (send to receive-only type <-chan string)”.

【值得收藏】你想知道的并發都在這裡【傳統并發】與【Go并發】

select 和 switch 很像,它不需要輸入參數,并且僅僅被使用在通道操作上。

Select 語句被用來執行多個通道操作的一個和其附帶的 case 塊代碼。

讓我們來看下面的例子,讨論下其執行原理

package main

import (
    "fmt"
    "time"
)

var start time.Time

func init() {
    start = time.Now()
}

func service1(c chan string) {
    time.Sleep(3 * time.Second)
    c <- "Hello from service 1"
}

func service2(c chan string) {
    time.Sleep(5 * time.Second)
    c <- "Hello from service 2"
}

func main() {
    fmt.Println("main start", time.Since(start))

    chan1 := make(chan string)
    chan2 := make(chan string)

    go service1(chan1)
    go service2(chan2)

    select {
    case res := <-chan1:
        fmt.Println("Response form service 1", res, time.Since(start))
    case res := <-chan2:
        fmt.Println("Response form service 2", res, time.Since(start))
    }

    fmt.Println("main stop ",time.Since(start))
}

/*
結果:
main start 0s
Response form service 1 Hello from service 1 3.0018445s
main stop  3.0019815s
*/      

從上面的程式來看,我們知道 select 語句和 switch 很像,不同點是用通道讀寫操作代替了布爾操作。通道将被阻塞,除非它有預設的 default 塊 (之後将介紹)。一旦某個 case 條件執行,它将不阻塞。

是以一個 case 條件什麼時候執行呢 ?

如果所有的 case 語句(通道操作)被阻塞,那麼 select 語句将阻塞直到這些 case 條件的一個不阻塞(通道操作),case 塊執行。

如果有多個 case 塊(通道操作)都沒有阻塞,那麼運作時将随機選擇一個不阻塞的 case 塊立即執行。

為了示範上面的程式,我們開啟兩個協程并傳入對應的通道變量。然後我們寫一個帶有兩個 case 操作的 select 語句。 一個 case 操作從 chan1 讀資料,另外一個從 chan2 讀資料。這兩個通道都是無緩沖的 , 讀操作将被阻塞 。是以 select 語句将阻塞。是以 select 将等待,直到有 case 語句不阻塞。

當程式執行到select語句後,主線程将阻塞并開始排程 service1 和service2協程。 service1 休眠 3 秒 後未阻塞的把資料寫入通道 chan1 與其類似,service2等待 5 秒 後未阻塞的把資料寫入通道chan2

因為 service1 比 service2 早一步執行完畢,case 1 将首先排程執行,其他的 cases 塊 (這裡指 case 2) 将被忽略。 一旦 case 塊執行完畢, main 線程将開始繼續執行。

是以并沒有輸出case2的結果

上述程式真實模拟了一個數百萬請求的伺服器負載均衡的例子,它從多個有效服務中傳回其中一個響應。

使用協程,通道和 select 語句,我們可以向多個伺服器請求資料并擷取其中最快響應的那個。

為了模拟上面哪個 case 塊率先傳回資料,我們可以直接去掉 Sleep 函數調用。

package main

import (
    "fmt"
    "time"
)

var start time.Time

func init() {
    start = time.Now()
}

func service1(c chan string) {
    c <- "Hello from service 1"
}

func service2(c chan string) {
    c <- "Hello from service 2"
}

func main() {
    fmt.Println("main start", time.Since(start))

    chan1 := make(chan string)
    chan2 := make(chan string)

    go service1(chan1)
    go service2(chan2)

    select {
    case res := <-chan1:
        fmt.Println("Response form service 1", res, time.Since(start))
    case res := <-chan2:
        fmt.Println("Response form service 2", res, time.Since(start))
    }

    fmt.Println("main stop ",time.Since(start))
}      

結果一:

main start 0s

Response form service 1 Hello from service 1 539.3µs

main stop 539.3µs

結果二:

Response form service 2 Hello from service 2 0s

main stop 0s

結果一共有2!個不同的結果

為了證明當所有 case 塊都是非阻塞的時候,golang 會随機選擇一個代碼塊執行列印 response,我們使用緩沖通道來改造程式。

package main

import (
    "fmt"
    "time"
)

var start time.Time

func init() {
    start = time.Now()
}

func service1(c chan string) {
    c <- "Hello from service 1"
}

func service2(c chan string) {
    c <- "Hello from service 2"
}

func main() {
    fmt.Println("main start", time.Since(start))

    chan1 := make(chan string,2)
    chan2 := make(chan string,2)


    chan1 <- "Value 1"
    chan1 <- "Value 2"

    chan2 <- "Value 1"
    chan2 <- "Value 2"

    select {
    case res := <-chan1:
        fmt.Println("Response form service 1", res, time.Since(start))
    case res := <-chan2:
        fmt.Println("Response form service 2", res, time.Since(start))
    }

    fmt.Println("main stop ",time.Since(start))
}
      

上述的程式的結果是有不同的

Response form service 1 Value 1 496.2µs

main stop 496.2µs

Response form service 2 Value 1 0s

在上面的程式中,兩個通道在其緩沖區中都有兩個值。因為我們向容量為 2 的緩沖區通道分别發送了兩個值,是以這些通道發送操作不會阻塞并且會執行下面的 select 塊。 select 塊中的所有 case 操作都不會阻塞,因為每個通道中都有兩個值,而我們的 case 操作隻需要取出其中一個值。是以,go 運作時會随機選擇一個 case 操作并執行其中的代碼。

像 switch 一樣, select 語句也有 default case 塊。default case 塊 是非阻塞的,不僅如此, default case 塊可以使 select 語句永不阻塞,這意味着, 任何通道的 發送 和 接收 操作 (不管是緩沖或者非緩沖) 都不會阻塞目前線程。

如果有 case塊的通道操作是非阻塞,那麼 select會執行其case 塊。如果沒有那麼 select将預設執行 default塊.

package main

import (
    "fmt"
    "time"
)

var start time.Time

func init() {
    start = time.Now()
}

func service1(c chan string) {
    c <- "Hello from service 1"
}

func service2(c chan string) {
    c <- "Hello from service 2"
}

func main() {
    fmt.Println("main start", time.Since(start))

    chan1 := make(chan string)
    chan2 := make(chan string)

    go service1(chan1)
    go service2(chan2)

    select {
    case res := <-chan1:
        fmt.Println("Response form service 1", res, time.Since(start))
    case res := <-chan2:
        fmt.Println("Response form service 2", res, time.Since(start))
    default:
        fmt.Println("No Response received",time.Since(start))
    }

    fmt.Println("main stop ",time.Since(start))
}

/*
結果:
main start 0s
No Response received 0s
main stop  0s
*/      

在上面的程式中,因為通道是非緩沖的,case 塊的通道操作都是阻塞的,所有 default 塊将被執行。

如果上面的 select 語句沒有 default 塊,select 将阻塞,沒有 response 會被列印出來,知道通道變成非阻塞。

如果帶有 default, select 将是非阻塞的,排程器将不會從主線程轉而排程其他協程。

但是我們可以使用 time.Sleep 改變這一點。 通過這種方式,主線程将把排程權轉移到其他協程,在其他協程執行完畢後,排程權從新回到主線程手裡。

當主線程重新執行的時候,通道裡面已經有值了,case 操作将不會阻塞。

package main

import (
    "fmt"
    "time"
)

var start time.Time

func init() {
    start = time.Now()
}

func service1(c chan string) {
    fmt.Println("service1 start")
    c <- "Hello from service 1"
}

func service2(c chan string) {
    fmt.Println("service2 start")
    c <- "Hello from service 2"
}

func main() {
    fmt.Println("main start", time.Since(start))

    chan1 := make(chan string)
    chan2 := make(chan string)

    go service1(chan1)
    go service2(chan2)

    time.Sleep(3*time.Second)

    select {
    case res := <-chan1:
        fmt.Println("Response form service 1", res, time.Since(start))
    case res := <-chan2:
        fmt.Println("Response form service 2", res, time.Since(start))
    default:
        fmt.Println("No Response received",time.Since(start))
    }

    fmt.Println("main stop ",time.Since(start))
}
/*
結果不唯一。
main start 0s
service2 start
service1 start
Response form service 1 Hello from service 1 3.0006729s
main stop  3.0006729s
*/
      

和 for{} 這樣的空循環很像,空 select{} 文法也是有效的。但是有一點必須要說明。

我們知道 select 将被阻塞除非有 case 塊沒有阻塞。因為 select{} 沒有 case 非阻塞語句,主線程将阻塞并可能會導緻死鎖。

package main

import "fmt"

func service() {
    fmt.Println("Hello from service")
}

func main() {
    fmt.Println("main started")
    go service()
    select {}
    fmt.Println("main stop")
}
/*
結果
main started
Hello from service
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select (no cases)]:
*/      
【值得收藏】你想知道的并發都在這裡【傳統并發】與【Go并發】

在上面的程式中我們知道 select 将阻塞 main 線程,排程器将會排程 service 這個協程。在 service 執行完畢後,排程器會再去排程其他可用的協程,但是此時已經沒有可用的協程,主線程也正在阻塞,是以最後的結果就是發生死鎖.

default 塊在通道操作阻塞的時候是非常有用的,他可以避免死鎖。 同時由于 default塊的非阻塞特性,Go 可以避免在其他協程阻塞的時候去排程其他協程,進而避免死鎖。

通道的發送操作也類似,, default 可以在其他協程不能被排程的時候被執行,進而避免死鎖。

寫兩個協程,一個用來計算數字的平方,另一個用來計算數字的立方。

package main

import "fmt"

func square(c chan int) {
    fmt.Println("[square] reading")
    num := <-c
    c <- num * num
}

func cube(c chan int) {
    fmt.Println("[cube] reading")
    num := <-c
    c <- num * num * num
}

func main() {
    fmt.Println("[main] main started")
    squareChan := make(chan int)
    cubeChan := make(chan int)
    go square(squareChan)
    go cube(cubeChan)

    testNum := 3

    fmt.Println("[main] send testNum to squareChan")
    squareChan <- testNum
    fmt.Println("[main] resuming")


    fmt.Println("[main] send testNum to cubeChane")
    cubeChan <- testNum
    fmt.Println("[main] resuming")


    fmt.Println("[main] reading from channels")
    squareVal,cubeVal := <-squareChan, <-cubeChan
    sum := squareVal + cubeVal

    fmt.Println("[main] sum of square and cube of",testNum," is",sum)
    fmt.Println("[main] main stop")
}

/*
結果:
[main] main started
[main] send testNum to squareChan
[cube] reading
[square] reading
[main] resuming
[main] send testNum to cubeChane
[main] resuming
[main] reading from channels
[main] sum of square and cube of 3  is 36
[main] main stop

*/      

流程:

建立兩個函數 square 和 cube 作為協程運作。

兩個函數都有一個 int 類型通道參數c,從 c 中讀取資料到變量num,最後把計算的資料再寫入到通道 c 中。

在主線程中使用 make函數建立兩個 int類型通道 squareChan and cubeChan

然後分别運作square和cube 協程。

因為排程權還在主線程,是以執行testNumb 指派為 3。

然後我們把資料放入通道 squareChan 。主線程将阻塞直到通道的資料被讀取。 一旦通道的資料被讀取,主線程将繼續執行。

在主線程中我們試圖從這兩個通道中讀取資料,此時線程可能阻塞直到有資料寫入到通道。這裡我們使用:=文法來接收多個通道的值。

一旦這些協程把資料寫入到通道,主線程将阻塞。

當資料被寫入通道中,主線程将繼續執行,最後我們計算出數字的總和并列印到控制台。

有一種業務場景是你需要知道所有的協程是否已執行完成他們的任務。這個和隻需要随機選擇一個條件為true 的 select 不同,他需要你滿足所有的條件都是 true 才可以激活主線程繼續執行。 這裡的條件指的是非阻塞的通道操作。

WaitGroup 是一個帶着計數器的結構體,這個計數器可以追蹤到有多少協程建立,有多少協程完成了其工作。當計數器為 0 的時候說明所有協程都完成了其工作。

package main

import (
    "fmt"
    "sync"
    "time"
)

func service(wg *sync.WaitGroup, instance int) {
    time.Sleep(2 * time.Second)
    fmt.Println("Service called on instance",instance)
    wg.Done() //協程數-1
}

func main() {
    fmt.Println("main started")
    var wg sync.WaitGroup
    for i:=1;i<= 3; i++{
        wg.Add(1)
        go service(&wg,i)
    }
    wg.Wait()//阻塞
    fmt.Println("main stop")
}

/*
結果:(結果是不唯一的,一共有3!次可能的結果)
main started
Service called on instance 2
Service called on instance 1
Service called on instance 3
main stop
*/      

在上面的程式中,我們建立了一個sync.WaitGroup 類型的空結構體 (帶着 0 值字段) wg 。 WaitGroup 結構體有一些像 noCopy, state1 和 sema 這樣的内部字段。 這個結構體也有三個公開方法: Add, Wait 和 Done.

Add 方法的參數是一個變量名叫 delta 的int 類型參數,主要用來内部計數。 内部計數器預設值為 0. 它用于記錄多少個協程在運作。

當 WaitGroup建立後,計數器值為 0,我們可以通過給 Add方法傳 int類型值來增加它的數量。 記住, 當協程建立後,計數器的值不會自動遞增 ,是以需要我們手動遞增它。

Wait 方法用來阻塞目前協程。一旦計數器為 0, 協程将恢複運作。 是以,我們需要一個方法去降低計數器的值。

Done 方法可以降低計數器的值。他不接受任何參數,是以,它每執行一次計數器就減 1。

上面的例子中,我們在建立 wg 變量後,運作了三次 for 循環,每次運作的時候我們建立一個協程并給計數器加 1。

這意味着現在我們有三個協程在等待運作并且 WaitGroup 的計數器值為 3。注意我們傳給協程函數的是一個指針,這是因為一旦在協程内部工作完成後,我們需要通過調用Done方法去降低計數器的值。

如果 wg 通過值複制方式傳過去, 因為傳遞的是一個拷貝,主線程中的 wg将不會得到修改。

在 for 循環執行完成後,我們通過調用 wg.Wait()去阻塞目前主線程,并把排程權讓給其他協程,直到計數器值為 0 之後,主線程才會被再次排程。

我們在另外三個協程中通過Done方法把計數器值降為 0,此時主線程将再次被排程并開始執行之後的代碼。

顧名思義,一個工作池并發執行某項工作的協程集合。 在上面,我們已經用到的多個協程執行一個任務,但是他們并沒有執行特定的工作,隻是 sleep 了一下。 如果你向協程中傳一個通道,他們可以去完成一些工作,變成一個工作池。

是以工作池其實就是維護了多個工作協程,這些協程的功能是可以收到任務,執行任務并傳回結果。他們完成任務後我們就可以收到結果。這些協程使用相同的通道來達到自己的目的。

package main
import (
    "fmt"
    "time"
)
func sqrWorker(tasks <-chan int, results chan <-int, instance int) {
    for num := range tasks {
        time.Sleep(time.Millisecond) //阻塞
        fmt.Printf("[worker %v ] Sending result by worker %v \n",instance,instance)
        results <- num*num
    }
}
func main() {
    fmt.Println("main started")
    tasks := make(chan int,10)
    results := make(chan int,10)
    for i:=0;i<3;i++{
        go sqrWorker(tasks,results,i)
    }
    for i := 0; i < 5; i++ {
        tasks <- i*2
    }
    fmt.Println("[main] write 5 tasks")
    close(tasks)
    for i := 0; i < 5; i++ {
        result := <-results
        fmt.Println("[main] Result" , i , ":", result)
    }
    fmt.Println("main stop")
}
/*
//結果之一
[main] write 5 tasks
[worker 0 ] Sending result by worker 0 
[worker 1 ] Sending result by worker 1 
[worker 2 ] Sending result by worker 2 
[main] Result 0 : 4
[main] Result 1 : 16
[main] Result 2 : 0
[worker 1 ] Sending result by worker 1 
[main] Result 3 : 64
[worker 0 ] Sending result by worker 0 
[main] Result 4 : 36
main stop
*/      

sqrWorker 是一個帶有 tasks 通道,results 通道 和 id 三個參數的協程函數。這個協程函數的任務是把從 tasks 通道接收到的數字的平方發送到 results通道。

在主函數中,我們建立了兩個帶緩沖區,容量為 10 的通道tasks and result。是以在緩沖區被充滿之前,任何操作都是非阻塞的。是以有時候設定一個大點的緩沖區是個好辦法。

然後我們循環建立多個 sqrWorker 協程,并傳入 tasks 通道, results 通道 和 id 三個參數,用來傳遞和擷取協程執行前後的資料。

接着我們向 tasks 非阻塞通道放入 5 個任務資料。

因為我們已經向任務通道放入的資料,是以我們可以關閉它,雖然這個操作不是必須的,但是如果以後運作中出現錯誤的話可以防止通道 range 帶來的死鎖問題。

然後我們開啟循環 5 次從 results 通道接收資料,因為目前通道緩沖區沒有資料,是以通道讀取操作造成主線程阻塞,排程器将排程工作池的協程,直到有資料添加到 results通道。

目前我們有 3 個work 協程在工作,我們使用了 sleep 操作來模拟阻塞操作,是以排程器在某一個阻塞的時候會去調用其他的 work 協程,當某個 work 協程 sleep 完成後會把計算數字的平方的結果資料放入 results 緩沖無阻塞通道。

當 3 個協程依次交替把 task 通道的任務都完成後,for range 循環将完成,并且因為之前我們已經關閉了任務通道,是以協程也不會發生死鎖。排程器将繼續傳回排程主線程。

有時候所有的工作協程可能都在阻塞,此時排程器将去排程主線程,直到 results 通道再次為空。

當所有 work 協程都完成任務退出後,主線程将繼續拿到排程權并列印 results 通道剩下的資料,繼續之後代碼的執行。

互斥是 Go 中一個簡單的概念。在我解釋它之前,先要明白什麼是競态條件。 goroutines 都有自己的獨立的調用棧,是以他們之間不分享任何資料。但是有一種情況是資料存放在堆上,并且被多個 goroutines 使用。 多個 goroutines 試圖去操作一個記憶體區域的資料會造成意想不到的後果.

package main

import (
    "fmt"
    "sync"
)

var i int

func worker(wg *sync.WaitGroup) {
    i = i+1
    wg.Done()
}


func main() {
    fmt.Println("main started")
    var wg sync.WaitGroup
    for i:=0;i<1000;i++{
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    fmt.Println("main stop",i)
}
/*
結果是不同的!!
main started
main stop 985
*/      

i = i + 1 這個計算有 3 步

(1) 得到 i 的值

(2) 給 i 的值加 1

(3) 更新 i 的值

這裡發生很多事情,因為go是協程,這三步裡面不一定都是同時順序執行的。有可能A是順利執行,使得i=2,但是B是讀取的是A沒更新的之前的i也就是1,是以就是結果會小于等于1000的,

除非一個協程阻塞,否則其他協程是沒有機會獲得排程的。那麼 i = i + 1 也沒有阻塞,為什麼 Go 的排程器會去排程其他協程呢?

在任何情況下,都不應該依賴 Go 的排程算法,而應該實作自己的邏輯來同步不同的 goroutine.

實作方法之一就是使用我們上面提到的互斥鎖。互斥鎖是一個程式設計概念,它保證了在同一時間隻能有一個線程或者協程去操作同一個資料。當一個協程想要操作資料的時候,必須擷取該資料的一個鎖,操作完成後必須釋放鎖,如果沒有擷取到該資料的鎖,那麼就不能操作這個資料。

在 Go 中,互斥資料結構 ( map) 由 sync 包提供。在 Go 中,多協程去操作一個值都可能會引起競态條件。我們需要在操作資料之前使用 mutex.Lock() 去鎖定它,一旦我們完成操作,比如上面提到的 i = i + 1, 我們就可以使用 mutext.Unlock() 方法解鎖。

如果在鎖定的時候,有一個協程想要讀寫 i 的值,那麼此協程将阻塞 直到前面的協程完成操作并解鎖資料。是以在某一時刻有且僅有一個協程可以操作資料,進而避免競态條件。記住,任何鎖之間的變量在解鎖之前對于其他協程都不是可用的。

讓我們使用互斥鎖修改上面的例子

package main

import (
    "fmt"
    "sync"
)

var i int

func worker(wg *sync.WaitGroup,m *sync.Mutex) {
    m.Lock()
    i = i+1
    m.Unlock()
    wg.Done()
}


func main() {
    fmt.Println("main started")
    var wg sync.WaitGroup
    var m sync.Mutex
    for i:=0;i<1000;i++{
        wg.Add(1)
        go worker(&wg,&m)
    }
    wg.Wait()
    fmt.Println("main stop",i)
}
/*結果
main started
main stop 1000
*/      

在上面的程式中,我們建立了一個互斥鎖變量 m,并把它的指針傳遞給所有已建立的協程。

在協程内部,當我們要開始操作 i變量的時候,我們先通過 m.Lock()獲得鎖,操作完成後我們使用 m.Unlock()釋放鎖。

互斥鎖可以幫助我們解決競态條件。 但首要規則是避免 goroutine 之間共享資源。

是以官方建議不要共享記憶體并發,而是通過管道通信的方式并發。

這篇部落格前部分傳統并發内容是書上學的知識

後部分go并發知識是參考作者summar的go并發以及書上的知識點,非常感謝作者的翻譯工作,使得我能更好的了解go的channel并發機制!連結點這裡channel

随着業務的不斷擴大,并發能更好的發揮伺服器的性能。這篇文章我寫了很久,因為我自己一開始也不太懂并發。但是在梳理的過程也算有了眉目。

我是凡一,如果你對go語言感興趣,歡迎關注我。

你的關注就是我最大的動力

繼續閱讀