天天看點

golang channel 用法轉的

一、Golang并發基礎理論

Actor模型廣義上講與CSP模型很相似。但兩種模型就提供的原語而言,又有一些根本上的不同之處:

    – CSP模型處理過程是匿名的,而Actor模型中的Actor則具有身份辨別。

    – CSP模型的消息傳遞在收發消息程序間包含了一個交會點,即發送方隻能在接收方準備好接收消息時才能發送消息。相反,actor模型中的消息傳遞是異步 的,即消息的發送和接收無需在同一時間進行,發送方可以在接收方準備好接收消息前将消息發送出去。這兩種方案可以認為是彼此對偶的。在某種意義下,基于交 會點的系統可以通過構造帶緩沖的通信的方式來模拟異步消息系統。而異步系統可以通過構造帶消息/應答協定的方式來同步發送方和接收方來模拟交會點似的通信 方式。

    – CSP使用顯式的Channel用于消息傳遞,而Actor模型則将消息發送給命名的目的Actor。這兩種方法可以被認為是對偶的。某種意義下,程序可 以從一個實際上擁有身份辨別的channel接收消息,而通過将actors構造成類Channel的行為模式也可以打破actors之間的名字耦合。

二、Go Channel基本操作文法

Go Channel的基本操作文法如下:

c := make(chan bool) //建立一個無緩沖的bool型Channel


c <- x        //向一個Channel發送一個值

<- c          //從一個Channel中接收一個值

x = <- c      //從Channel c接收一個值并将其存儲到x中

x, ok = <- c  //從Channel接收一個值,如果channel關閉了或沒有資料,那麼ok将被置為false

不帶緩沖的Channel兼具通信和同步兩種特性,頗受青睐。

三、Channel用作信号(Signal)的場景

1、等待一個事件(Event)

等待一個事件,有時候通過close一個Channel就足夠了。例如:

//testwaitevent1.go

package main

import "fmt"

func main() {

        fmt.Println("Begin doing something!")

        c := make(chan bool)

        go func() {

                fmt.Println("Doing something…")

                close(c)

        }()

        <-c

        fmt.Println("Done!")

}

這裡main goroutine通過"<-c"來等待sub goroutine中的“完成事件”,sub goroutine通過close channel促發這一事件。當然也可以通過向Channel寫入一個bool值的方式來作為事件通知。main goroutine在channel c上沒有任何資料可讀的情況下會阻塞等待。

關于輸出結果:

我們可以很容易判斷出上面程式的輸出結果:

Begin doing something!

Doing something…

Done!

如果将close(c)換成c<-true,則根據《Go memory model》中的定義:A receive from an unbuffered channel happens before the send on that channel completes.

"<-c"要先于"c<-true"完成,但也不影響日志的輸出順序,輸出結果仍為上面三行。

2、協同多個Goroutines

同上,close channel還可以用于協同多個Goroutines,比如下面這個例子,我們建立了100個Worker Goroutine,這些Goroutine在被建立出來後都阻塞在"<-start"上,直到我們在main goroutine中給出開工的信号:"close(start)",這些goroutines才開始真正的并發運作起來。

//testwaitevent2.go

func worker(start chan bool, index int) {

        <-start

        fmt.Println("This is Worker:", index)

        start := make(chan bool)

        for i := 1; i <= 100; i++ {

                go worker(start, i)

        }

        close(start)

        select {} //deadlock we expected

3、Select

【select的基本操作】

select是Go語言特有的操作,使用select我們可以同時在多個channel上進行發送/接收操作。下面是select的基本操作。

select {

case x := <- somechan:

    // … 使用x進行一些操作

case y, ok := <- someOtherchan:

    // … 使用y進行一些操作,

    // 檢查ok值判斷someOtherchan是否已經關閉

case outputChan <- z:

    // … z值被成功發送到Channel上時

default:

    // … 上面case均無法通信時,執行此分支

【慣用法:for/select】

我們在使用select時很少隻是對其進行一次evaluation,我們常常将其與for {}結合在一起使用,并選擇适當時機從for{}中退出。

for {

        select {

        case x := <- somechan:

            // … 使用x進行一些操作

        case y, ok := <- someOtherchan:

            // … 使用y進行一些操作,

            // 檢查ok值判斷someOtherchan是否已經關閉

        case outputChan <- z:

            // … z值被成功發送到Channel上時

        default:

            // … 上面case均無法通信時,執行此分支

【終結workers】

下面是一個常見的終結sub worker goroutines的方法,每個worker goroutine通過select監視一個die channel來及時擷取main goroutine的退出通知。

//testterminateworker1.go

import (

    "fmt"

    "time"

)

func worker(die chan bool, index int) {

    fmt.Println("Begin: This is Worker:", index)

    for {

        select {

        //case xx:

            //做事的分支

        case <-die:

            fmt.Println("Done: This is Worker:", index)

            return

        }

    }

    die := make(chan bool)

    for i := 1; i <= 100; i++ {

        go worker(die, i)

    time.Sleep(time.Second * 5)

    close(die)

    select {} //deadlock we expected

【終結驗證】

有時候終結一個worker後,main goroutine想确認worker routine是否真正退出了,可采用下面這種方法:

//testterminateworker2.go

    //"time"

func worker(die chan bool) {

    fmt.Println("Begin: This is Worker")

        //做事的分支

            fmt.Println("Done: This is Worker")

            die <- true

    go worker(die)

    die <- true

    <-die

    fmt.Println("Worker goroutine has been terminated")

【關閉的Channel永遠不會阻塞】

下面示範在一個已經關閉了的channel上讀寫的結果:

//testoperateonclosedchannel.go

        cb := make(chan bool)

        close(cb)

        x := <-cb

        fmt.Printf("%#v\n", x)

        x, ok := <-cb

        fmt.Printf("%#v %#v\n", x, ok)

        ci := make(chan int)

        close(ci)

        y := <-ci

        fmt.Printf("%#v\n", y)

        cb <- true

$go run testoperateonclosedchannel.go

false

false false

panic: runtime error: send on closed channel

可以看到在一個已經close的unbuffered channel上執行讀操作,回傳回channel對應類型的零值,比如bool型channel傳回false,int型channel傳回0。但向close的channel寫則會觸發panic。不過無論讀寫都不會導緻阻塞。

【關閉帶緩存的channel】

将unbuffered channel換成buffered channel會怎樣?我們看下面例子:

//testclosedbufferedchannel.go

        c := make(chan int, 3)

        c <- 15

        c <- 34

        c <- 65

        close(c)

        fmt.Printf("%d\n", <-c)

        c <- 1

$go run testclosedbufferedchannel.go

15

34

65

可以看出帶緩沖的channel略有不同。盡管已經close了,但我們依舊可以從中讀出關閉前寫入的3個值。第四次讀取時,則會傳回該channel類型的零值。向這類channel寫入操作也會觸發panic。

【range】

Golang中的range常常和channel并肩作戰,它被用來從channel中讀取所有值。下面是一個簡單的執行個體:

//testrange.go

func generator(strings chan string) {

        strings <- "Five hour's New York jet lag"

        strings <- "and Cayce Pollard wakes in Camden Town"

        strings <- "to the dire and ever-decreasing circles"

        strings <- "of disrupted circadian rhythm."

        close(strings)

        strings := make(chan string)

        go generator(strings)

        for s := range strings {

                fmt.Printf("%s\n", s)

        fmt.Printf("\n")

四、隐藏狀态

下面通過一個例子來示範一下channel如何用來隐藏狀态:

1、例子:唯一的ID服務

//testuniqueid.go

func newUniqueIDService() <-chan string {

        id := make(chan string)

                var counter int64 = 0

                for {

                        id <- fmt.Sprintf("%x", counter)

                        counter += 1

                }

        return id

        id := newUniqueIDService()

        for i := 0; i < 10; i++ {

                fmt.Println(<-id)

$ go run testuniqueid.go

1

2

3

4

5

6

7

8

9

newUniqueIDService通過一個channel與main goroutine關聯,main goroutine無需知道uniqueid實作的細節以及目前狀态,隻需通過channel獲得最新id即可。

五、預設情況

我想這裡John Graham-Cumming主要是想告訴我們select的default分支的實踐用法。

1、select  for non-blocking receive

idle:= make(chan []byte, 5) //用一個帶緩沖的channel構造一個簡單的隊列

case b = <-idle:
 //嘗試從idle隊列中讀取

    …

default:  //隊列空,配置設定一個新的buffer

        makes += 1

        b = make([]byte, size)

2、select for non-blocking send

idle:= make(chan []byte, 5) //用一個帶緩沖的channel構造一個簡單的隊列

case idle <- b: //嘗試向隊列中插入一個buffer

        //…

default: //隊列滿?

六、Nil Channels

1、nil channels阻塞

對一個沒有初始化的channel進行讀寫操作都将發生阻塞,例子如下:

        var c chan int

        <-c

$go run testnilchannel.go

fatal error: all goroutines are asleep – deadlock!

2、nil channel在select中很有用

看下面這個例子:

//testnilchannel_bad.go

import "time"

        var c1, c2 chan int = make(chan int), make(chan int)

                time.Sleep(time.Second * 5)

                c1 <- 5

                close(c1)

                time.Sleep(time.Second * 7)

                c2 <- 7

                close(c2)

        for {

                select {

                case x := <-c1:

                        fmt.Println(x)

                case x := <-c2:

        fmt.Println("over")

我們原本期望程式交替輸出5和7兩個數字,但實際的輸出結果卻是:

… … 0死循環

再仔細分析代碼,原來select每次按case順序evaluate:

    – 前5s,select一直阻塞;

    – 第5s,c1傳回一個5後被close了,“case x := <-c1”這個分支傳回,select輸出5,并重新select

    – 下一輪select又從“case x := <-c1”這個分支開始evaluate,由于c1被close,按照前面的知識,close的channel不會阻塞,我們會讀出這個 channel對應類型的零值,這裡就是0;select再次輸出0;這時即便c2有值傳回,程式也不會走到c2這個分支

    – 依次類推,程式無限循環的輸出0

我們利用nil channel來改進這個程式,以實作我們的意圖,代碼如下:

//testnilchannel.go

                case x, ok := <-c1:

                        if !ok {

                                c1 = nil

                        } else {

                                fmt.Println(x)

                        }

                case x, ok := <-c2:

                                c2 = nil

                if c1 == nil && c2 == nil {

                        break

over

可以看出:通過将已經關閉的channel置為nil,下次select将會阻塞在該channel上,使得select繼續下面的分支evaluation。

七、Timers

1、逾時機制Timeout

帶逾時機制的select是正常的tip,下面是示例代碼,實作30s的逾時select:

func worker(start chan bool) {

        timeout := time.After(30 * time.Second)

                     // … do some stuff

                case <- timeout:

                    return

2、心跳HeartBeart

與timeout實作類似,下面是一個簡單的心跳select實作:

        heartbeat := time.Tick(30 * time.Second)

                case <- heartbeat:

                    //… do heartbeat stuff