天天看點

golang筆記11--go語言并發程式設計子產品 channel

golang筆記11--go語言并發程式設計子產品 channel

  • ​​1 介紹​​
  • ​​2 Channel​​
  • ​​2.1 channel​​
  • ​​2.2 使用Channel等待任務結束​​
  • ​​2.3 使用Channel進行樹的周遊​​
  • ​​2.4 用select進行排程​​
  • ​​2.5 傳統同步機制​​
  • ​​3 注意事項​​
  • ​​4 說明​​

1 介紹

本文繼上文 ​​golang筆記10–go語言并發程式設計子產品 goroutine​​​, 進一步了解 go語言的并發程式設計子產品 --channel,以及相應注意事項。

具體包括 : channel、使用Channel等待任務結束、使用Channel進行樹的周遊、用select進行排程、傳統同步機制 等内容。

2 Channel

2.1 channel

channel 是go語言在語言級别提供的 goroutine 間的通信方式。我們可以使用channel 在兩個或者多個goroutine 之間傳遞消息;channel 是程序内的通信方式,是以通過channel 傳遞對象的過程和調用函數時的參數傳遞行為比較一緻,比如也可以傳遞指針等。

channel 是類型相關的,也就是說一個 channel 隻能傳遞一種類型的值,這個類型需要在申明 channel 時指定。

channel 常見操作包括 申明 chan,定義 chan,将資料寫入 chan,從 chan 中讀資料;

申明一個channel: var ch chan int;

定義一個channel: ch := make(chan int)

資料寫入channel: ch <- 32

從channel讀資料: value <-ch

package main

import (
    "fmt"
    "time"
)

func worker(id int, c chan int) {
    for {
        fmt.Printf("Worker %d received %d\n", id, <-c)
    }
}

func chanDemo() {
    var channels [10]chan int
    for i := 0; i < 10; i++ {
        channels[i] = make(chan int)
        go worker(i, channels[i])
    }
    for i := 0; i < 10; i++ {
        channels[i] <- 'a' + i
    }
    time.Sleep(time.Second) // 防止 main 函數過早退出
}

// chan<- int 表明傳回的chan隻能用來收資料
func createWorker(id int) chan<- int {

    c := make(chan int)
    go func() {
        for {
            fmt.Printf("Worker %d received %d\n", id, <-c)
        }
    }()
    return c
}
func chanDemo2() {
    var channels [10]chan<- int
    for i := 0; i < 10; i++ {
        channels[i] = createWorker(i)
    }
    for i := 0; i < 10; i++ {
        channels[i] <- 'a' + i
    }
    for i := 0; i < 10; i++ {
        channels[i] <- 'A' + i
    }
    time.Sleep(time.Second) // 防止 main 函數過早退出
}

func bufferedChannel() {
    c := make(chan int, 3) // 設定緩沖區為3
    go worker(0, c)
    c <- 'a'
    c <- 'b'
    c <- 'c'
    c <- 'd'
    time.Sleep(time.Second)
}

func workerClose(id int, c chan int) {
    //for {
    //  n, ok := <-c
    //  if !ok {
    //      break
    //  }
    //  fmt.Printf("Worker %d received %d\n", id, n)
    //}
    for n := range c {
        fmt.Printf("Worker %d received %d\n", id, n)
    }
}
func channelClose() {
    c := make(chan int, 3) // 設定緩沖區為3
    go workerClose(0, c)   // 在workerClose中檢測是否有資料,沒資料就退出,若不檢測就會導緻不停的收到 0 值
    c <- 'a'
    c <- 'b'
    c <- 'c'
    c <- 'd'
    close(c)
    time.Sleep(time.Second)
}

func main() {
    fmt.Println("channel as first-class citizen")
    chanDemo()
    chanDemo2()
    fmt.Println("buffered channel")
    bufferedChannel()
    fmt.Println("channel close and range")
    channelClose()
}      

2.2 使用Channel等待任務結束

本案例中使用 2 種方式實作等待任務結束,第一種為自定義 done chan bool, 第二種使用go 内置的 sync.WaitGroup 的 wait() 來實作。

package main**加粗樣式**

import (
    "fmt"
    "sync"
)

func doWork(id int, c chan int, done chan bool) {
    for {
        fmt.Printf("Worker %d received %c\n", id, <-c)
        go func() { // 讓其并行發done, 防止 chanDemo() 中大寫字母被卡死
            done <- true
        }()

    }
}

type worker struct {
    in   chan int
    done chan bool
}

// chan<- int 表明傳回的chan隻能用來收資料
func createWorker(id int) worker {
    w := worker{
        in:   make(chan int),
        done: make(chan bool),
    }
    go doWork(id, w.in, w.done)
    return w
}
func chanDemo() {
    var workers [10]worker
    for i := range workers {
        workers[i] = createWorker(i)
    }
    for i, worker := range workers {
        worker.in <- 'a' + i
    }
    for i, worker := range workers {
        worker.in <- 'A' + i
    }
    for _, worker := range workers {
        <-worker.done
        <-worker.done
    }
}

func doWorkV2(id int, c chan int, w workerV2) {
    for {
        fmt.Printf("Worker %d received %c\n", id, <-c)
        go func() { // 讓其并行發done, 防止 chanDemo() 中大寫字母被卡死
            w.done()
        }()

    }
}

type workerV2 struct {
    in   chan int
    done func()
}

func createWorkerV2(id int, wg *sync.WaitGroup) workerV2 {
    w := workerV2{
        in: make(chan int),
        done: func() {
            wg.Done()
        },
    }
    go doWorkV2(id, w.in, w)
    return w
}

func chanDemoV2() {
    var wg sync.WaitGroup
    var workers [10]workerV2
    for i := range workers {
        workers[i] = createWorkerV2(i, &wg)
    }
    wg.Add(20)
    for i, worker := range workers {
        worker.in <- 'a' + i
    }
    for i, worker := range workers {
        worker.in <- 'A' + i
    }
    wg.Wait()
}

func main() {
    fmt.Println("chapter11.2, channel as first-class citizen")
    chanDemo()
    fmt.Println("sync.WaitGroup wait()")
    chanDemoV2()
}
輸出:
chapter11.2, channel as first-class citizen
Worker 0 received a
......
Worker 9 received J
sync.WaitGroup wait()
Worker 0 received a
......
Worker 8      

2.3 使用Channel進行樹的周遊

本案例中使用channel 來周遊樹,并計算樹節點的最大值,具體案例如下:

vim node.go
package tree

import (
    "fmt"
)

type Node struct {
    Value       int
    Left, Right *Node
}

func (node Node) Print() {
    fmt.Print(node.Value, " ")
}

func (node *Node) SetValue(value int) {
    if node == nil {
        fmt.Println("Setting value to nil node, ignored")
        return
    }
    node.Value = value
}

func CreateNode(value int) *Node {
    return &Node{Value: value}
}

func (node *Node) Traverse() {
    if node == nil {
        return
    }
    node.Left.Traverse()
    node.Print()
    node.Right.Traverse()
}

func (node *Node) TraverseV2() {
    node.TraverseFunc(func(nd *Node) {
        nd.Print()
    })
    fmt.Println()
}

func (node *Node) TraverseFunc(f func(*Node)) {
    if node == nil {
        return
    }
    node.Left.TraverseFunc(f)
    f(node)
    node.Right.TraverseFunc(f)
}

func (node *Node) TraverseWithChannel() chan *Node {
    out := make(chan *Node)
    go func() {
        node.TraverseFunc(func(node *Node) {
            out <- node
        })
        close(out)
    }()
    return out
}

vim main.go
package main

import (
    "fmt"
    "learngo/chapter11/11.3/tree"
)

func main() {
    fmt.Println("this is chapter 11.3")
    var root tree.Node
    root = tree.Node{Value: 3}
    root.Left = &tree.Node{}
    root.Right = &tree.Node{Value: 5}
    root.Right.Left = new(tree.Node)
    root.Left.Right = tree.CreateNode(2)
    root.Right.Left.SetValue(4)
    root.Traverse()
    fmt.Println("\nthis func traverseV2")
    root.TraverseV2()

    nodeCount := 0
    root.TraverseFunc(func(node *tree.Node) {
        nodeCount++
    })
    fmt.Println("Node count:", nodeCount)

    c := root.TraverseWithChannel()
    maxNode := 0
    for node := range c {
        if node.Value > maxNode {
            maxNode = node.Value
        }
    }
    fmt.Println("Max node value:", maxNode)
}
輸出:
this is chapter 11.3
0 2 3 4 5 
this func traverseV2
0 2 3 4 5 
Node count: 5
Max node value: 5      

2.4 用select進行排程

Go 語言中的 select 也能夠讓 Goroutine 同時等待多個 Channel 可讀或者可寫,在多個檔案或者 Channel狀态改變之前,select 會一直阻塞目前線程或者 Goroutine。

select 是與 switch 相似的控制結構,與 switch 不同的是,select 中雖然也有多個 case,但是這些 case 中的表達式必須都是 Channel 的收發操作。

本案例使用 select 進行排程,其包括了多個channel 的收發操作:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func generator() chan int {
    out := make(chan int)
    go func() {
        i := 0
        for {
            time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
            out <- i
            i++
        }
    }()
    return out
}

func worker(id int, c chan int) {
    for n := range c {
        time.Sleep(time.Second)
        fmt.Printf("Worker %d received %d\n", id, n)
    }
}

func createWorker(id int) chan<- int {
    c := make(chan int)
    go worker(id, c)
    return c
}

func main() {
    var c1, c2 = generator(), generator()
    var worker = createWorker(0)
    var values []int
    tm := time.After(10 * time.Second)
    tick := time.Tick(time.Second)
    for {
        var activeWorker chan<- int
        var activeValue int
        if len(values) > 0 {
            activeWorker = worker
            activeValue = values[0]
        }
        select {
        case n := <-c1:
            values = append(values, n)
        case n := <-c2:
            values = append(values, n)
        case activeWorker <- activeValue:
            values = values[1:]
        case <-time.After(800 * time.Millisecond):
            fmt.Println("timeout")
        case <-tick:
            fmt.Println(
                "queue len =", len(values))
        case <-tm:
            fmt.Println("bye")
            return
        }
    }
}
輸出:
queue len = 3
Worker 0 received 0
queue len = 5
Worker 0 received 0
queue len = 5
Worker 0 received 1
queue len = 10
Worker 0 received 1
queue len = 10
Worker 0 received 2
queue len = 11
Worker 0 received 2
queue len = 12
Worker 0 received 3
queue len = 13
Worker 0 received 3
queue len = 14
Worker 0 received 4      

2.5 傳統同步機制

package main

import (
    "fmt"
    "sync"
    "time"
)

type atomicInt struct {
    value int
    lock  sync.Mutex
}

func (a *atomicInt) increment() {
    func() {
        a.lock.Lock()
        defer a.lock.Unlock()
        a.value++
    }()
}
func (a *atomicInt) get() int {
    a.lock.Lock()
    defer a.lock.Unlock()
    return a.value
}

func main() {
    fmt.Println("this chapter 11.5")
    var a atomicInt
    a.increment()
    go func() {
        a.increment()
    }()
    time.Sleep(time.Millisecond)
    fmt.Println(a.get())
}
輸出:
this chapter 11.5
2      

3 注意事項

4 說明

  1. 軟體環境

    go版本:go1.15.8

    作業系統:Ubuntu 20.04 Desktop

    Idea:2020.01.04