天天看点

使用Goroutine和channel实现并发的多个例子

  • 在现实场景中,因为业务需求,可能需要实现function level的并发,并且需要控制发送者或者控制接收者的数量。所以我稍稍总结了一下,具体发送者和接收者不同的示例,而不是简单使用随机数什么的控制并发的停止。

一、一个发送者和多个接收者的例子

import (
	"fmt"
	"strings"
	"sync"
)

// sender: 1, receivers: M
const (
	NumReceivers1 = 20
)

var wg sync.WaitGroup
var strMessages = []string{"aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh", "iii", "jjj", "kkk"}

func sender(msg []string, ch chan string) {
	for i, v := range msg {
		ch <- v
		if i == len(msg)-1 {
			close(ch)
		}
	}

}

func receiver(ch chan string) {
	defer wg.Done()

	for value := range ch {
		fmt.Println(strings.ToUpper(value)) // process messages
	}
}

func main() {
	//rand.Seed(time.Now().Unix())

	ch := make(chan string, 20)

	go sender(strMessages, ch)

	wg.Add(NumReceivers1)
	for i := 0; i < NumReceivers1; i++ {
		go receiver(ch)
	}
	wg.Wait()
}
           

二、多个发送者和一个接收者的例子

import (
	"context"
	"fmt"
	"sync"
)

//const maxGoroutineNum = 10

var wgSender sync.WaitGroup
var wgReceiver sync.WaitGroup

type ProcessMessage func(ctx context.Context, originMsg *Person) (resMsg *Person)

type Person struct {
	Name string
	Job  string
}

func main() {
	ctx := context.Background()
	firstPerson := &Person{Name: "John", Job: "worker"}
	secondPerson := &Person{Name: "Tom", Job: "engineer"}
	originalMsg := []*Person{firstPerson, secondPerson}
	ch := make(chan *Person, len(originalMsg))

	StartSend(ctx, originalMsg, ch, process)

	wgReceiver.Add(1)
	var resList []*Person
	go func() {
		resList = StartReceive(ch)
	}()
	wgSender.Wait()
	close(ch)
	wgReceiver.Wait()
	fmt.Println(resList[0].Job, resList[1].Job)
}

func StartSend(ctx context.Context, originalMsgs []*Person, ch chan *Person, process ProcessMessage) {
	for _, originalMsg := range originalMsgs {
		wgSender.Add(1)
		go Sender(ctx, originalMsg, ch, process)
	}
}

func StartReceive(ch chan *Person) (resList []*Person) {
	defer wgReceiver.Done()
	resList = Receiver(ch)
	return
}

func Sender(ctx context.Context, originalMsg *Person, ch chan *Person, process ProcessMessage) {
	defer wgSender.Done()
	ch <- process(ctx, originalMsg)

}

func Receiver(ch chan *Person) (eleList []*Person) {
	for {
		select {
		case ele, isOpened := <-ch:
			if !isOpened {
				return eleList
			}
			eleList = append(eleList, ele)
		default:
		}

	}
}

func process(ctx context.Context, originMsg *Person) (resMsg *Person) {
	originMsg.Job = "good " + originMsg.Job
	return originMsg

}
           

三、多个发送者和多个接收者的例子

import (
	"context"
	"fmt"
	"strings"
	"sync"
	"time"
)

type ProcessMsg func(ctx context.Context, originMsg interface{}) (resMsg interface{})

var (
	wg3        sync.WaitGroup
)

func senders3(i int, msg []string,stopCh chan struct{},  ch chan string) {
	for _, v := range msg{
		ch <- v
	}
	if i == -1 {
		time.Sleep(time.Millisecond)
		close(stopCh)
	}

}

func receivers3(ctx context.Context, stopCh chan struct{}, ch chan string, process ProcessMsg) {
	defer wg3.Done()

	for {
		select {
		case <-stopCh:
			return

		case value := <-ch:
			resMsg := process(ctx, value)
			fmt.Println(resMsg)
		default:
		}
	}
}

func processMsg (ctx context.Context, originMsg interface{}) (resMsg interface{}) {
	msg := originMsg.(string)
	msg = strings.ToUpper(msg)
	return msg
}

func main() {
	ctx := context.Background()

	ch := make(chan string, 100)
	stopCh := make(chan struct{})

	msg := [][]string{{"abc", "def", "ghi"}, {"123", "234", "345"}, {"a1a", "b2b", "c3c"}}

	length := len(msg)

	for i := 0; i < length; i++ {
		j := i
		if i == length - 1 {
			j = -1
		}

		go senders3(j, msg[i],stopCh, ch)
	}

	wg3.Add(length)
	for i := 0; i < length; i++ {
		go receivers3(ctx, stopCh, ch, processMsg)
	}
	wg3.Wait()

}