天天看点

如何优雅地关闭Go channelThe Channel Closing Principle打破channel closing principle的解决方案保持channel closing principle的优雅方案结论

几天前,我写了一篇文章来说明golang中channel的使用规范。在reddit和HN,那篇文章收到了很多赞同,但是我也收到了下面几个关于Go channel设计和规范的批评:

● 在不能更改channel状态的情况下,没有简单普遍的方式来检查channel是否已经关闭了

● 关闭已经关闭的channel会导致panic,所以在closer(关闭者)不知道channel是否已经关闭的情况下去关闭channel是很危险的

● 发送值到已经关闭的channel会导致panic,所以如果sender(发送者)在不知道channel是否已经关闭的情况下去向channel发送值是很危险的

那些批评看起来都很有道理(实际上并没有)。是的,没有一个内置函数可以检查一个channel是否已经关闭。如果你能确定不会向channel发送任何值,那么也确实需要一个简单的方法来检查channel是否已经关闭:

1package main

2

3import "fmt"

4

5type T int

6

7func IsClosed(ch <-chan T) bool {

8 select {

9 case <-ch:

10 return true

11 default:

12 }

13

14 return false

15}

16

17func main() {

18 c := make(chan T)

19 fmt.Println(IsClosed(c)) // false

20 close(c)

21 fmt.Println(IsClosed(c)) // true

22}

上面已经提到了,没有一种适用的方式来检查channel是否已经关闭了。但是,就算有一个简单的

closed(chan T) bool

函数来检查channel是否已经关闭,它的用处还是很有限的,就像内置的

len

函数用来检查缓冲channel中元素数量一样。原因就在于,已经检查过的channel的状态有可能在调用了类似的方法返回之后就修改了,因此返回来的值已经不能够反映刚才检查的channel的当前状态了。

尽管在调用

closed(ch)

返回

true

的情况下停止向channel发送值是可以的,但是如果调用

closed(ch)

false

,那么关闭channel或者继续向channel发送值就不安全了(会panic)。

The Channel Closing Principle

在使用Go channel的时候,一个适用的原则是不要从接收端关闭channel,也不要关闭有多个并发发送者的channel。换句话说,如果sender(发送者)只是唯一的sender或者是channel最后一个活跃的sender,那么你应该在sender的goroutine关闭channel,从而通知receiver(s)(接收者们)已经没有值可以读了。维持这条原则将保证永远不会发生向一个已经关闭的channel发送值或者关闭一个已经关闭的channel。

(下面,我们将会称上面的原则为channel closing principle

打破channel closing principle的解决方案

如果你因为某种原因从接收端(receiver side)关闭channel或者在多个发送者中的一个关闭channel,那么你应该使用列在Golang panic/recover Use Cases的函数来安全地发送值到channel中(假设channel的元素类型是T)

1func SafeSend(ch chan T, value T) (closed bool) {

2 defer func() {

3 if recover() != nil {

4 // the return result can be altered

5 // in a defer function call

6 closed = true

7 }

8 }()

9

10 ch <- value // panic if ch is closed

11 return false // <=> closed = false; return

12}

如果channel

ch

没有被关闭的话,那么这个函数的性能将和

ch <- value

接近。对于channel关闭的时候,

SafeSend

函数只会在每个sender goroutine中调用一次,因此程序不会有太大的性能损失。

同样的想法也可以用在从多个goroutine关闭channel中:

1func SafeClose(ch chan T) (justClosed bool) {

2 defer func() {

3 if recover() != nil {

4 justClosed = false

5 }

6 }()

7

8 // assume ch != nil here.

9 close(ch) // panic if ch is closed

10 return true

11}

很多人喜欢用

sync.Once

来关闭channel:

1type MyChannel struct {

2 C chan T

3 once sync.Once

4}

5

6func NewMyChannel() *MyChannel {

7 return &MyChannel{C: make(chan T)}

8}

9

10func (mc *MyChannel) SafeClose() {

11 mc.once.Do(func(){

12 close(mc.C)

13 })

14}

当然了,我们也可以用

sync.Mutex

来避免多次关闭channel:

1type MyChannel struct {

2 C chan T

3 closed bool

4 mutex sync.Mutex

5}

6

7func NewMyChannel() *MyChannel {

8 return &MyChannel{C: make(chan T)}

9}

10

11func (mc *MyChannel) SafeClose() {

12 mc.mutex.Lock()

13 if !mc.closed {

14 close(mc.C)

15 mc.closed = true

16 }

17 mc.mutex.Unlock()

18}

19

20func (mc *MyChannel) IsClosed() bool {

21 mc.mutex.Lock()

22 defer mc.mutex.Unlock()

23 return mc.closed

24}

我们应该要理解为什么Go不支持内置

SafeSend

SafeClose

函数,原因就在于并不推荐从接收端或者多个并发发送端关闭channel。Golang甚至禁止关闭只接收(receive-only)的channel。

保持channel closing principle的优雅方案

上面的

SaveSend

函数有一个缺点是,在select语句的

case

关键字后不能作为发送操作被调用(译者注:类似于

case SafeSend(ch, t):

)。另外一个缺点是,很多人,包括我自己都觉得上面通过使用

panic

/

recover

sync

包的方案不够优雅。针对各种场景,下面介绍不用使用

panic

recover

sync

包,纯粹是利用channel的解决方案。

(在下面的例子总,

sync.WaitGroup

只是用来让例子完整的。它的使用在实践中不一定一直都有用)

● M个receivers,一个sender,sender通过关闭data channel说“不再发送”

这是最简单的场景了,就只是当sender不想再发送的时候让sender关闭data 来关闭channel:

1package main

2

3import (

4 "time"

5 "math/rand"

6 "sync"

7 "log"

8)

9

10func main() {

11 rand.Seed(time.Now().UnixNano())

12 log.SetFlags(0)

13

14 // ...

15 const MaxRandomNumber = 100000

16 const NumReceivers = 100

17

18 wgReceivers := sync.WaitGroup{}

19 wgReceivers.Add(NumReceivers)

20

21 // ...

22 dataCh := make(chan int, 100)

23

24 // the sender

25 go func() {

26 for {

27 if value := rand.Intn(MaxRandomNumber); value == 0 {

28 // the only sender can close the channel safely.

29 close(dataCh)

30 return

31 } else {

32 dataCh <- value

33 }

34 }

35 }()

36

37 // receivers

38 for i := 0; i < NumReceivers; i++ {

39 go func() {

40 defer wgReceivers.Done()

41

42 // receive values until dataCh is closed and

43 // the value buffer queue of dataCh is empty.

44 for value := range dataCh {

45 log.Println(value)

46 }

47 }()

48 }

49

50 wgReceivers.Wait()

51}

● 一个receiver,N个sender,receiver通过关闭一个额外的signal channel说“请停止发送”

这种场景比上一个要复杂一点。我们不能让receiver关闭data channel,因为这么做将会打破channel closing principle。但是我们可以让receiver关闭一个额外的signal channel来通知sender停止发送值:

1package main

2

3import (

4 "time"

5 "math/rand"

6 "sync"

7 "log"

8)

9

10func main() {

11 rand.Seed(time.Now().UnixNano())

12 log.SetFlags(0)

13

14 // ...

15 const MaxRandomNumber = 100000

16 const NumSenders = 1000

17

18 wgReceivers := sync.WaitGroup{}

19 wgReceivers.Add(1)

20

21 // ...

22 dataCh := make(chan int, 100)

23 stopCh := make(chan struct{})

24 // stopCh is an additional signal channel.

25 // Its sender is the receiver of channel dataCh.

26 // Its reveivers are the senders of channel dataCh.

27

28 // senders

29 for i := 0; i < NumSenders; i++ {

30 go func() {

31 for {

32 value := rand.Intn(MaxRandomNumber)

33

34 select {

35 case <- stopCh:

36 return

37 case dataCh <- value:

38 }

39 }

40 }()

41 }

42

43 // the receiver

44 go func() {

45 defer wgReceivers.Done()

46

47 for value := range dataCh {

48 if value == MaxRandomNumber-1 {

49 // the receiver of the dataCh channel is

50 // also the sender of the stopCh cahnnel.

51 // It is safe to close the stop channel here.

52 close(stopCh)

53 return

54 }

55

56 log.Println(value)

57 }

58 }()

59

60 // ...

61 wgReceivers.Wait()

62}

正如注释说的,对于额外的signal channel来说,它的sender是data channel的receiver。这个额外的signal channel被它唯一的sender关闭,遵守了channel closing principle。

● M个receiver,N个sender,它们当中任意一个通过通知一个moderator(仲裁者)关闭额外的signal channel来说“让我们结束游戏吧”

这是最复杂的场景了。我们不能让任意的receivers和senders关闭data channel,也不能让任何一个receivers通过关闭一个额外的signal channel来通知所有的senders和receivers退出游戏。这么做的话会打破channel closing principle。但是,我们可以引入一个moderator来关闭一个额外的signal channel。这个例子的一个技巧是怎么通知moderator去关闭额外的signal channel:

1package main

2

3import (

4 "time"

5 "math/rand"

6 "sync"

7 "log"

8 "strconv"

9)

10

11func main() {

12 rand.Seed(time.Now().UnixNano())

13 log.SetFlags(0)

14

15 // ...

16 const MaxRandomNumber = 100000

17 const NumReceivers = 10

18 const NumSenders = 1000

19

20 wgReceivers := sync.WaitGroup{}

21 wgReceivers.Add(NumReceivers)

22

23 // ...

24 dataCh := make(chan int, 100)

25 stopCh := make(chan struct{})

26 // stopCh is an additional signal channel.

27 // Its sender is the moderator goroutine shown below.

28 // Its reveivers are all senders and receivers of dataCh.

29 toStop := make(chan string, 1)

30 // the channel toStop is used to notify the moderator

31 // to close the additional signal channel (stopCh).

32 // Its senders are any senders and receivers of dataCh.

33 // Its reveiver is the moderator goroutine shown below.

34

35 var stoppedBy string

36

37 // moderator

38 go func() {

39 stoppedBy = <- toStop // part of the trick used to notify the moderator

40 // to close the additional signal channel.

41 close(stopCh)

42 }()

43

44 // senders

45 for i := 0; i < NumSenders; i++ {

46 go func(id string) {

47 for {

48 value := rand.Intn(MaxRandomNumber)

49 if value == 0 {

50 // here, a trick is used to notify the moderator

51 // to close the additional signal channel.

52 select {

53 case toStop <- "sender#" + id:

54 default:

55 }

56 return

57 }

58

59 // the first select here is to try to exit the

60 // goroutine as early as possible.

61 select {

62 case <- stopCh:

63 return

64 default:

65 }

66

67 select {

68 case <- stopCh:

69 return

70 case dataCh <- value:

71 }

72 }

73 }(strconv.Itoa(i))

74 }

75

76 // receivers

77 for i := 0; i < NumReceivers; i++ {

78 go func(id string) {

79 defer wgReceivers.Done()

80

81 for {

82 // same as senders, the first select here is to

83 // try to exit the goroutine as early as possible.

84 select {

85 case <- stopCh:

86 return

87 default:

88 }

89

90 select {

91 case <- stopCh:

92 return

93 case value := <-dataCh:

94 if value == MaxRandomNumber-1 {

95 // the same trick is used to notify the moderator

96 // to close the additional signal channel.

97 select {

98 case toStop <- "receiver#" + id:

99 default:

100 }

101 return

102 }

103

104 log.Println(value)

105 }

106 }

107 }(strconv.Itoa(i))

108 }

109

110 // ...

111 wgReceivers.Wait()

112 log.Println("stopped by", stoppedBy)

113}

在这个例子中,仍然遵守着channel closing principle。

请注意channel

toStop

的缓冲大小是1.这是为了避免当mederator goroutine 准备好之前第一个通知就已经发送了,导致丢失。

● 更多的场景?

很多的场景变体是基于上面三种的。举个例子,一个基于最复杂情况的变体可能要求receivers读取buffer channel中剩下所有的值。这应该很容易处理,所有这篇文章也就不提了。

尽管上面三种场景不能覆盖所有Go channel的使用场景,但它们是最基础的,实践中的大多数场景都可以分类到那三种中。

结论

这里没有一种场景要求你去打破channel closing principle。如果你遇到了这种场景,请思考一下你的设计并重写你的代码。

用Go编程就像在创作艺术。

原文发布时间为:2018-09-9

本文作者:天唯

本文来自云栖社区合作伙伴“

Golang语言社区

”,了解相关信息可以关注“

”。