幾天前,我寫了一篇文章來說明golang中channel的使用規範。在reddit和HN,那篇文章收到了很多贊同,但是我也收到了下面幾個關于Go channel設計和規範的批評:
● 在不能更改channel狀态的情況下,沒有簡單普遍的方式來檢查channel是否已經關閉了
● 關閉已經關閉的channel會導緻panic,是以在closer(關閉者)不知道channel是否已經關閉的情況下去關閉channel是很危險的
● 發送值到已經關閉的channel會導緻panic,是以如果sender(發送者)在不知道channel是否已經關閉的情況下去向channel發送值是很危險的
那些批評看起來都很有道理(實際上并沒有)。是的,沒有一個内置函數可以檢查一個channel是否已經關閉。如果你能确定不會向channel發送任何值,那麼也确實需要一個簡單的方法來檢查channel是否已經關閉:
1package main
2
3import "fmt"
4
5type T int
6
7func IsClosed(ch <-chan T) bool {
8 select {
9 case <-ch:
10 return true
11 default:
12 }
13
14 return false
15}
16
17func main() {
18 c := make(chan T)
19 fmt.Println(IsClosed(c)) // false
20 close(c)
21 fmt.Println(IsClosed(c)) // true
22}
上面已經提到了,沒有一種适用的方式來檢查channel是否已經關閉了。但是,就算有一個簡單的
closed(chan T) bool
函數來檢查channel是否已經關閉,它的用處還是很有限的,就像内置的
len
函數用來檢查緩沖channel中元素數量一樣。原因就在于,已經檢查過的channel的狀态有可能在調用了類似的方法傳回之後就修改了,是以傳回來的值已經不能夠反映剛才檢查的channel的目前狀态了。
盡管在調用
closed(ch)
傳回
true
的情況下停止向channel發送值是可以的,但是如果調用
closed(ch)
false
,那麼關閉channel或者繼續向channel發送值就不安全了(會panic)。
The Channel Closing Principle
在使用Go channel的時候,一個适用的原則是不要從接收端關閉channel,也不要關閉有多個并發發送者的channel。換句話說,如果sender(發送者)隻是唯一的sender或者是channel最後一個活躍的sender,那麼你應該在sender的goroutine關閉channel,進而通知receiver(s)(接收者們)已經沒有值可以讀了。維持這條原則将保證永遠不會發生向一個已經關閉的channel發送值或者關閉一個已經關閉的channel。
(下面,我們将會稱上面的原則為channel closing principle
打破channel closing principle的解決方案
如果你因為某種原因從接收端(receiver side)關閉channel或者在多個發送者中的一個關閉channel,那麼你應該使用列在Golang panic/recover Use Cases的函數來安全地發送值到channel中(假設channel的元素類型是T)
1func SafeSend(ch chan T, value T) (closed bool) {
2 defer func() {
3 if recover() != nil {
4 // the return result can be altered
5 // in a defer function call
6 closed = true
7 }
8 }()
9
10 ch <- value // panic if ch is closed
11 return false // <=> closed = false; return
12}
如果channel
ch
沒有被關閉的話,那麼這個函數的性能将和
ch <- value
接近。對于channel關閉的時候,
SafeSend
函數隻會在每個sender goroutine中調用一次,是以程式不會有太大的性能損失。
同樣的想法也可以用在從多個goroutine關閉channel中:
1func SafeClose(ch chan T) (justClosed bool) {
2 defer func() {
3 if recover() != nil {
4 justClosed = false
5 }
6 }()
7
8 // assume ch != nil here.
9 close(ch) // panic if ch is closed
10 return true
11}
很多人喜歡用
sync.Once
來關閉channel:
1type MyChannel struct {
2 C chan T
3 once sync.Once
4}
5
6func NewMyChannel() *MyChannel {
7 return &MyChannel{C: make(chan T)}
8}
9
10func (mc *MyChannel) SafeClose() {
11 mc.once.Do(func(){
12 close(mc.C)
13 })
14}
當然了,我們也可以用
sync.Mutex
來避免多次關閉channel:
1type MyChannel struct {
2 C chan T
3 closed bool
4 mutex sync.Mutex
5}
6
7func NewMyChannel() *MyChannel {
8 return &MyChannel{C: make(chan T)}
9}
10
11func (mc *MyChannel) SafeClose() {
12 mc.mutex.Lock()
13 if !mc.closed {
14 close(mc.C)
15 mc.closed = true
16 }
17 mc.mutex.Unlock()
18}
19
20func (mc *MyChannel) IsClosed() bool {
21 mc.mutex.Lock()
22 defer mc.mutex.Unlock()
23 return mc.closed
24}
我們應該要了解為什麼Go不支援内置
SafeSend
和
SafeClose
函數,原因就在于并不推薦從接收端或者多個并發發送端關閉channel。Golang甚至禁止關閉隻接收(receive-only)的channel。
保持channel closing principle的優雅方案
上面的
SaveSend
函數有一個缺點是,在select語句的
case
關鍵字後不能作為發送操作被調用(譯者注:類似于
case SafeSend(ch, t):
)。另外一個缺點是,很多人,包括我自己都覺得上面通過使用
panic
/
recover
sync
包的方案不夠優雅。針對各種場景,下面介紹不用使用
panic
recover
sync
包,純粹是利用channel的解決方案。
(在下面的例子總,
sync.WaitGroup
隻是用來讓例子完整的。它的使用在實踐中不一定一直都有用)
● M個receivers,一個sender,sender通過關閉data channel說“不再發送”
這是最簡單的場景了,就隻是當sender不想再發送的時候讓sender關閉data 來關閉channel:
1package main
2
3import (
4 "time"
5 "math/rand"
6 "sync"
7 "log"
8)
9
10func main() {
11 rand.Seed(time.Now().UnixNano())
12 log.SetFlags(0)
13
14 // ...
15 const MaxRandomNumber = 100000
16 const NumReceivers = 100
17
18 wgReceivers := sync.WaitGroup{}
19 wgReceivers.Add(NumReceivers)
20
21 // ...
22 dataCh := make(chan int, 100)
23
24 // the sender
25 go func() {
26 for {
27 if value := rand.Intn(MaxRandomNumber); value == 0 {
28 // the only sender can close the channel safely.
29 close(dataCh)
30 return
31 } else {
32 dataCh <- value
33 }
34 }
35 }()
36
37 // receivers
38 for i := 0; i < NumReceivers; i++ {
39 go func() {
40 defer wgReceivers.Done()
41
42 // receive values until dataCh is closed and
43 // the value buffer queue of dataCh is empty.
44 for value := range dataCh {
45 log.Println(value)
46 }
47 }()
48 }
49
50 wgReceivers.Wait()
51}
● 一個receiver,N個sender,receiver通過關閉一個額外的signal channel說“請停止發送”
這種場景比上一個要複雜一點。我們不能讓receiver關閉data channel,因為這麼做将會打破channel closing principle。但是我們可以讓receiver關閉一個額外的signal channel來通知sender停止發送值:
1package main
2
3import (
4 "time"
5 "math/rand"
6 "sync"
7 "log"
8)
9
10func main() {
11 rand.Seed(time.Now().UnixNano())
12 log.SetFlags(0)
13
14 // ...
15 const MaxRandomNumber = 100000
16 const NumSenders = 1000
17
18 wgReceivers := sync.WaitGroup{}
19 wgReceivers.Add(1)
20
21 // ...
22 dataCh := make(chan int, 100)
23 stopCh := make(chan struct{})
24 // stopCh is an additional signal channel.
25 // Its sender is the receiver of channel dataCh.
26 // Its reveivers are the senders of channel dataCh.
27
28 // senders
29 for i := 0; i < NumSenders; i++ {
30 go func() {
31 for {
32 value := rand.Intn(MaxRandomNumber)
33
34 select {
35 case <- stopCh:
36 return
37 case dataCh <- value:
38 }
39 }
40 }()
41 }
42
43 // the receiver
44 go func() {
45 defer wgReceivers.Done()
46
47 for value := range dataCh {
48 if value == MaxRandomNumber-1 {
49 // the receiver of the dataCh channel is
50 // also the sender of the stopCh cahnnel.
51 // It is safe to close the stop channel here.
52 close(stopCh)
53 return
54 }
55
56 log.Println(value)
57 }
58 }()
59
60 // ...
61 wgReceivers.Wait()
62}
正如注釋說的,對于額外的signal channel來說,它的sender是data channel的receiver。這個額外的signal channel被它唯一的sender關閉,遵守了channel closing principle。
● M個receiver,N個sender,它們當中任意一個通過通知一個moderator(仲裁者)關閉額外的signal channel來說“讓我們結束遊戲吧”
這是最複雜的場景了。我們不能讓任意的receivers和senders關閉data channel,也不能讓任何一個receivers通過關閉一個額外的signal channel來通知所有的senders和receivers退出遊戲。這麼做的話會打破channel closing principle。但是,我們可以引入一個moderator來關閉一個額外的signal channel。這個例子的一個技巧是怎麼通知moderator去關閉額外的signal channel:
1package main
2
3import (
4 "time"
5 "math/rand"
6 "sync"
7 "log"
8 "strconv"
9)
10
11func main() {
12 rand.Seed(time.Now().UnixNano())
13 log.SetFlags(0)
14
15 // ...
16 const MaxRandomNumber = 100000
17 const NumReceivers = 10
18 const NumSenders = 1000
19
20 wgReceivers := sync.WaitGroup{}
21 wgReceivers.Add(NumReceivers)
22
23 // ...
24 dataCh := make(chan int, 100)
25 stopCh := make(chan struct{})
26 // stopCh is an additional signal channel.
27 // Its sender is the moderator goroutine shown below.
28 // Its reveivers are all senders and receivers of dataCh.
29 toStop := make(chan string, 1)
30 // the channel toStop is used to notify the moderator
31 // to close the additional signal channel (stopCh).
32 // Its senders are any senders and receivers of dataCh.
33 // Its reveiver is the moderator goroutine shown below.
34
35 var stoppedBy string
36
37 // moderator
38 go func() {
39 stoppedBy = <- toStop // part of the trick used to notify the moderator
40 // to close the additional signal channel.
41 close(stopCh)
42 }()
43
44 // senders
45 for i := 0; i < NumSenders; i++ {
46 go func(id string) {
47 for {
48 value := rand.Intn(MaxRandomNumber)
49 if value == 0 {
50 // here, a trick is used to notify the moderator
51 // to close the additional signal channel.
52 select {
53 case toStop <- "sender#" + id:
54 default:
55 }
56 return
57 }
58
59 // the first select here is to try to exit the
60 // goroutine as early as possible.
61 select {
62 case <- stopCh:
63 return
64 default:
65 }
66
67 select {
68 case <- stopCh:
69 return
70 case dataCh <- value:
71 }
72 }
73 }(strconv.Itoa(i))
74 }
75
76 // receivers
77 for i := 0; i < NumReceivers; i++ {
78 go func(id string) {
79 defer wgReceivers.Done()
80
81 for {
82 // same as senders, the first select here is to
83 // try to exit the goroutine as early as possible.
84 select {
85 case <- stopCh:
86 return
87 default:
88 }
89
90 select {
91 case <- stopCh:
92 return
93 case value := <-dataCh:
94 if value == MaxRandomNumber-1 {
95 // the same trick is used to notify the moderator
96 // to close the additional signal channel.
97 select {
98 case toStop <- "receiver#" + id:
99 default:
100 }
101 return
102 }
103
104 log.Println(value)
105 }
106 }
107 }(strconv.Itoa(i))
108 }
109
110 // ...
111 wgReceivers.Wait()
112 log.Println("stopped by", stoppedBy)
113}
在這個例子中,仍然遵守着channel closing principle。
請注意channel
toStop
的緩沖大小是1.這是為了避免當mederator goroutine 準備好之前第一個通知就已經發送了,導緻丢失。
● 更多的場景?
很多的場景變體是基于上面三種的。舉個例子,一個基于最複雜情況的變體可能要求receivers讀取buffer channel中剩下所有的值。這應該很容易處理,所有這篇文章也就不提了。
盡管上面三種場景不能覆寫所有Go channel的使用場景,但它們是最基礎的,實踐中的大多數場景都可以分類到那三種中。
結論
這裡沒有一種場景要求你去打破channel closing principle。如果你遇到了這種場景,請思考一下你的設計并重寫你的代碼。
用Go程式設計就像在創作藝術。
原文釋出時間為:2018-09-9
本文作者:天唯
本文來自雲栖社群合作夥伴“
Golang語言社群”,了解相關資訊可以關注“
”。