天天看點

如何優雅地關閉Go channelThe Channel Closing Principle打破channel closing principle的解決方案保持channel closing principle的優雅方案結論

幾天前,我寫了一篇文章來說明golang中channel的使用規範。在reddit和HN,那篇文章收到了很多贊同,但是我也收到了下面幾個關于Go channel設計和規範的批評:

● 在不能更改channel狀态的情況下,沒有簡單普遍的方式來檢查channel是否已經關閉了

● 關閉已經關閉的channel會導緻panic,是以在closer(關閉者)不知道channel是否已經關閉的情況下去關閉channel是很危險的

● 發送值到已經關閉的channel會導緻panic,是以如果sender(發送者)在不知道channel是否已經關閉的情況下去向channel發送值是很危險的

那些批評看起來都很有道理(實際上并沒有)。是的,沒有一個内置函數可以檢查一個channel是否已經關閉。如果你能确定不會向channel發送任何值,那麼也确實需要一個簡單的方法來檢查channel是否已經關閉:

1package main

2

3import "fmt"

4

5type T int

6

7func IsClosed(ch <-chan T) bool {

8 select {

9 case <-ch:

10 return true

11 default:

12 }

13

14 return false

15}

16

17func main() {

18 c := make(chan T)

19 fmt.Println(IsClosed(c)) // false

20 close(c)

21 fmt.Println(IsClosed(c)) // true

22}

上面已經提到了,沒有一種适用的方式來檢查channel是否已經關閉了。但是,就算有一個簡單的

closed(chan T) bool

函數來檢查channel是否已經關閉,它的用處還是很有限的,就像内置的

len

函數用來檢查緩沖channel中元素數量一樣。原因就在于,已經檢查過的channel的狀态有可能在調用了類似的方法傳回之後就修改了,是以傳回來的值已經不能夠反映剛才檢查的channel的目前狀态了。

盡管在調用

closed(ch)

傳回

true

的情況下停止向channel發送值是可以的,但是如果調用

closed(ch)

false

,那麼關閉channel或者繼續向channel發送值就不安全了(會panic)。

The Channel Closing Principle

在使用Go channel的時候,一個适用的原則是不要從接收端關閉channel,也不要關閉有多個并發發送者的channel。換句話說,如果sender(發送者)隻是唯一的sender或者是channel最後一個活躍的sender,那麼你應該在sender的goroutine關閉channel,進而通知receiver(s)(接收者們)已經沒有值可以讀了。維持這條原則将保證永遠不會發生向一個已經關閉的channel發送值或者關閉一個已經關閉的channel。

(下面,我們将會稱上面的原則為channel closing principle

打破channel closing principle的解決方案

如果你因為某種原因從接收端(receiver side)關閉channel或者在多個發送者中的一個關閉channel,那麼你應該使用列在Golang panic/recover Use Cases的函數來安全地發送值到channel中(假設channel的元素類型是T)

1func SafeSend(ch chan T, value T) (closed bool) {

2 defer func() {

3 if recover() != nil {

4 // the return result can be altered

5 // in a defer function call

6 closed = true

7 }

8 }()

9

10 ch <- value // panic if ch is closed

11 return false // <=> closed = false; return

12}

如果channel

ch

沒有被關閉的話,那麼這個函數的性能将和

ch <- value

接近。對于channel關閉的時候,

SafeSend

函數隻會在每個sender goroutine中調用一次,是以程式不會有太大的性能損失。

同樣的想法也可以用在從多個goroutine關閉channel中:

1func SafeClose(ch chan T) (justClosed bool) {

2 defer func() {

3 if recover() != nil {

4 justClosed = false

5 }

6 }()

7

8 // assume ch != nil here.

9 close(ch) // panic if ch is closed

10 return true

11}

很多人喜歡用

sync.Once

來關閉channel:

1type MyChannel struct {

2 C chan T

3 once sync.Once

4}

5

6func NewMyChannel() *MyChannel {

7 return &MyChannel{C: make(chan T)}

8}

9

10func (mc *MyChannel) SafeClose() {

11 mc.once.Do(func(){

12 close(mc.C)

13 })

14}

當然了,我們也可以用

sync.Mutex

來避免多次關閉channel:

1type MyChannel struct {

2 C chan T

3 closed bool

4 mutex sync.Mutex

5}

6

7func NewMyChannel() *MyChannel {

8 return &MyChannel{C: make(chan T)}

9}

10

11func (mc *MyChannel) SafeClose() {

12 mc.mutex.Lock()

13 if !mc.closed {

14 close(mc.C)

15 mc.closed = true

16 }

17 mc.mutex.Unlock()

18}

19

20func (mc *MyChannel) IsClosed() bool {

21 mc.mutex.Lock()

22 defer mc.mutex.Unlock()

23 return mc.closed

24}

我們應該要了解為什麼Go不支援内置

SafeSend

SafeClose

函數,原因就在于并不推薦從接收端或者多個并發發送端關閉channel。Golang甚至禁止關閉隻接收(receive-only)的channel。

保持channel closing principle的優雅方案

上面的

SaveSend

函數有一個缺點是,在select語句的

case

關鍵字後不能作為發送操作被調用(譯者注:類似于

case SafeSend(ch, t):

)。另外一個缺點是,很多人,包括我自己都覺得上面通過使用

panic

/

recover

sync

包的方案不夠優雅。針對各種場景,下面介紹不用使用

panic

recover

sync

包,純粹是利用channel的解決方案。

(在下面的例子總,

sync.WaitGroup

隻是用來讓例子完整的。它的使用在實踐中不一定一直都有用)

● M個receivers,一個sender,sender通過關閉data channel說“不再發送”

這是最簡單的場景了,就隻是當sender不想再發送的時候讓sender關閉data 來關閉channel:

1package main

2

3import (

4 "time"

5 "math/rand"

6 "sync"

7 "log"

8)

9

10func main() {

11 rand.Seed(time.Now().UnixNano())

12 log.SetFlags(0)

13

14 // ...

15 const MaxRandomNumber = 100000

16 const NumReceivers = 100

17

18 wgReceivers := sync.WaitGroup{}

19 wgReceivers.Add(NumReceivers)

20

21 // ...

22 dataCh := make(chan int, 100)

23

24 // the sender

25 go func() {

26 for {

27 if value := rand.Intn(MaxRandomNumber); value == 0 {

28 // the only sender can close the channel safely.

29 close(dataCh)

30 return

31 } else {

32 dataCh <- value

33 }

34 }

35 }()

36

37 // receivers

38 for i := 0; i < NumReceivers; i++ {

39 go func() {

40 defer wgReceivers.Done()

41

42 // receive values until dataCh is closed and

43 // the value buffer queue of dataCh is empty.

44 for value := range dataCh {

45 log.Println(value)

46 }

47 }()

48 }

49

50 wgReceivers.Wait()

51}

● 一個receiver,N個sender,receiver通過關閉一個額外的signal channel說“請停止發送”

這種場景比上一個要複雜一點。我們不能讓receiver關閉data channel,因為這麼做将會打破channel closing principle。但是我們可以讓receiver關閉一個額外的signal channel來通知sender停止發送值:

1package main

2

3import (

4 "time"

5 "math/rand"

6 "sync"

7 "log"

8)

9

10func main() {

11 rand.Seed(time.Now().UnixNano())

12 log.SetFlags(0)

13

14 // ...

15 const MaxRandomNumber = 100000

16 const NumSenders = 1000

17

18 wgReceivers := sync.WaitGroup{}

19 wgReceivers.Add(1)

20

21 // ...

22 dataCh := make(chan int, 100)

23 stopCh := make(chan struct{})

24 // stopCh is an additional signal channel.

25 // Its sender is the receiver of channel dataCh.

26 // Its reveivers are the senders of channel dataCh.

27

28 // senders

29 for i := 0; i < NumSenders; i++ {

30 go func() {

31 for {

32 value := rand.Intn(MaxRandomNumber)

33

34 select {

35 case <- stopCh:

36 return

37 case dataCh <- value:

38 }

39 }

40 }()

41 }

42

43 // the receiver

44 go func() {

45 defer wgReceivers.Done()

46

47 for value := range dataCh {

48 if value == MaxRandomNumber-1 {

49 // the receiver of the dataCh channel is

50 // also the sender of the stopCh cahnnel.

51 // It is safe to close the stop channel here.

52 close(stopCh)

53 return

54 }

55

56 log.Println(value)

57 }

58 }()

59

60 // ...

61 wgReceivers.Wait()

62}

正如注釋說的,對于額外的signal channel來說,它的sender是data channel的receiver。這個額外的signal channel被它唯一的sender關閉,遵守了channel closing principle。

● M個receiver,N個sender,它們當中任意一個通過通知一個moderator(仲裁者)關閉額外的signal channel來說“讓我們結束遊戲吧”

這是最複雜的場景了。我們不能讓任意的receivers和senders關閉data channel,也不能讓任何一個receivers通過關閉一個額外的signal channel來通知所有的senders和receivers退出遊戲。這麼做的話會打破channel closing principle。但是,我們可以引入一個moderator來關閉一個額外的signal channel。這個例子的一個技巧是怎麼通知moderator去關閉額外的signal channel:

1package main

2

3import (

4 "time"

5 "math/rand"

6 "sync"

7 "log"

8 "strconv"

9)

10

11func main() {

12 rand.Seed(time.Now().UnixNano())

13 log.SetFlags(0)

14

15 // ...

16 const MaxRandomNumber = 100000

17 const NumReceivers = 10

18 const NumSenders = 1000

19

20 wgReceivers := sync.WaitGroup{}

21 wgReceivers.Add(NumReceivers)

22

23 // ...

24 dataCh := make(chan int, 100)

25 stopCh := make(chan struct{})

26 // stopCh is an additional signal channel.

27 // Its sender is the moderator goroutine shown below.

28 // Its reveivers are all senders and receivers of dataCh.

29 toStop := make(chan string, 1)

30 // the channel toStop is used to notify the moderator

31 // to close the additional signal channel (stopCh).

32 // Its senders are any senders and receivers of dataCh.

33 // Its reveiver is the moderator goroutine shown below.

34

35 var stoppedBy string

36

37 // moderator

38 go func() {

39 stoppedBy = <- toStop // part of the trick used to notify the moderator

40 // to close the additional signal channel.

41 close(stopCh)

42 }()

43

44 // senders

45 for i := 0; i < NumSenders; i++ {

46 go func(id string) {

47 for {

48 value := rand.Intn(MaxRandomNumber)

49 if value == 0 {

50 // here, a trick is used to notify the moderator

51 // to close the additional signal channel.

52 select {

53 case toStop <- "sender#" + id:

54 default:

55 }

56 return

57 }

58

59 // the first select here is to try to exit the

60 // goroutine as early as possible.

61 select {

62 case <- stopCh:

63 return

64 default:

65 }

66

67 select {

68 case <- stopCh:

69 return

70 case dataCh <- value:

71 }

72 }

73 }(strconv.Itoa(i))

74 }

75

76 // receivers

77 for i := 0; i < NumReceivers; i++ {

78 go func(id string) {

79 defer wgReceivers.Done()

80

81 for {

82 // same as senders, the first select here is to

83 // try to exit the goroutine as early as possible.

84 select {

85 case <- stopCh:

86 return

87 default:

88 }

89

90 select {

91 case <- stopCh:

92 return

93 case value := <-dataCh:

94 if value == MaxRandomNumber-1 {

95 // the same trick is used to notify the moderator

96 // to close the additional signal channel.

97 select {

98 case toStop <- "receiver#" + id:

99 default:

100 }

101 return

102 }

103

104 log.Println(value)

105 }

106 }

107 }(strconv.Itoa(i))

108 }

109

110 // ...

111 wgReceivers.Wait()

112 log.Println("stopped by", stoppedBy)

113}

在這個例子中,仍然遵守着channel closing principle。

請注意channel

toStop

的緩沖大小是1.這是為了避免當mederator goroutine 準備好之前第一個通知就已經發送了,導緻丢失。

● 更多的場景?

很多的場景變體是基于上面三種的。舉個例子,一個基于最複雜情況的變體可能要求receivers讀取buffer channel中剩下所有的值。這應該很容易處理,所有這篇文章也就不提了。

盡管上面三種場景不能覆寫所有Go channel的使用場景,但它們是最基礎的,實踐中的大多數場景都可以分類到那三種中。

結論

這裡沒有一種場景要求你去打破channel closing principle。如果你遇到了這種場景,請思考一下你的設計并重寫你的代碼。

用Go程式設計就像在創作藝術。

原文釋出時間為:2018-09-9

本文作者:天唯

本文來自雲栖社群合作夥伴“

Golang語言社群

”,了解相關資訊可以關注“

”。