最近看了一篇文章,用go處理每分鐘達百萬條的資料請求
原文位址:
http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/ 翻譯位址: https://www.jianshu.com/p/21de03ac682c這裡作者為處理高峰期高并發的資料請求,用了3個版本的處理方式,下面是自己的一些了解:
第一種方式很簡單,就是用go的協程處理請求,來一條請求開一個協程處理,由于每個請求是一個資料上傳
任務,有一定的耗時和資源消耗,當高峰期請求突然增多達到每分鐘百萬條的時候,不可避免的造成了攜程爆炸,系統崩潰。
第二種方式用channel做了一個任務隊列,來一條請求加到任務隊列裡,簡單的生産者消費者模式,但這種方式治标不治本,高峰期的時候,任務隊列長度一樣
會爆炸,造成記憶體不夠用,是以也不太合适。
第三種方式就有點意思了,這裡貼一下完整的代碼,幫助了解。
package main
import (
"fmt"
"time"
"runtime"
)
var (
//最大任務隊列數
MaxWorker = 10
//有效載荷
type Payload struct {
Num int
Test string
}
//待執行的工作
type Job struct {
Payload Payload
//任務隊列channal
var JobQueue chan Job
//執行任務的工作者單元
type Worker struct {
WorkerPool chan chan Job //工作者池--每個元素是一個工作者的私有任務channal
JobChannel chan Job //每個工作者單元包含一個任務管道 用于擷取任務
quit chan bool //退出信号
no int //編号
// 停止信号
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
//排程中心
type Dispatcher struct {
//工作者池(任務隊列池)
WorkerPool chan chan Job
//工作者單元數量
MaxWorkers int
//建立排程中心
func NewDispatcher(maxWorkers int) *Dispatcher {
//建立工作者池,存放待處理的任務隊列,maxWorkers為最大任務隊列數
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
//建立一個新工作者單元
func NewWorker(workerPool chan chan Job, no int) Worker {
fmt.Println("建立一個新工作者單元")
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool),
no: no,
//循環 監聽任務和結束信号
func (w Worker) Start() {
//啟動協程,監聽任務
for {
// register the current worker into the worker queue.
//工作者放回工作者池
w.WorkerPool <- w.JobChannel
//fmt.Println("w.WorkerPool <- w.JobChannel", w)
select {
case job := <-w.JobChannel:
//fmt.Println("job := <-w.JobChannel")
// 收到任務,執行列印任務
fmt.Println(job.Payload.Test)
//執行任務需要1秒時間
time.Sleep(500 * time.Microsecond)
case <-w.quit:
// 收到退出信号,停止監聽,結束該協程
return
}
}
//排程,任務分發
func (d *Dispatcher) dispatch() {
for {
select {
//從任務隊列中擷取任務
case job := <-JobQueue:
//fmt.Println("job := <-JobQueue:")
go func(job Job) {
//等待空閑worker (任務多的時候會阻塞這裡)
//從(10個)工作者池中擷取一個任務隊列channel,
jobChannel := <-d.WorkerPool
//fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
// 将任務放到上述woker的私有任務channal中,jobChannel是一個無緩沖信道,每次隻能放一個任務
jobChannel <- job
//fmt.Println("jobChannel <- job")
}(job)
}
//工作者池的初始化,注意Run為Dispatcher結構體指針的方法,是以此方法内對Dispathcer的修改在方法外也可見
func (d *Dispatcher) Run() {
// starting n number of workers
//建立10個工作者單元
for i := 1; i < d.MaxWorkers+1; i++ {
worker := NewWorker(d.WorkerPool, i)
worker.Start()
go d.dispatch()
//建立任務并放入任務隊列
func addQueue() {
for i := 0; i < 1000000; i++ {
time.Sleep(10*time.Microsecond)
fmt.Println("目前請求數:",i)
// 建立一個任務
payLoad := Payload{Num: 1, Test:"this is Test string"}
work := Job{Payload: payLoad}
// 任務放入任務隊列channal
fmt.Println("新任務入隊列!")
JobQueue <- work
//fmt.Println("隊列總長度:",cap(JobQueue))
//fmt.Println("隊列未消費任務數量:",len(JobQueue))
//fmt.Println("JobQueue <- work")
fmt.Println("目前協程數:", runtime.NumGoroutine())
//time.Sleep(1 * time.Second)
func main() {
//建立任務隊列,每個任務隊列中可以放10個任務
JobQueue = make(chan Job, 10)
fmt.Println("成功建立任務隊列!")
//建立任務分發器
dispatcher := NewDispatcher(MaxWorker)
fmt.Println("成功建立任務分發器!")
dispatcher.Run()
//time.Sleep(1 * time.Second)
go addQueue()
time.Sleep(1000 * time.Second)
這種方式簡單來說就是用一個兩級channel作為一個任務隊列池,隊列池中存放多個任務隊列,來一個任務後加入其中的一個任務隊列,每個任務隊列開一個協程處理
資料上傳任務,基本過程如下圖所示:
這種方式很好的解決了每個任務處理都要開協稱協程造成攜程太多的問題,因為總共隻有10個協程在處理work,并且工作池中隻有10個任務隊列,美個任務隊列隻存放一個任務,是以也不存在隊列長度爆炸的問題,但是這種方式也有問題,就是當工作池中沒有任務隊列空閑的時候,新來的任務隻能開一個協程等待空閑的任務隊列,如果通路高峰期過長或者任務處理很耗時,會造成等待任務隊列的協程急劇增長,最後也會造成系統記憶體崩潰。這種方式的好處在于等待任務隊列的協程都是輕量級協程,每個協程占用的記憶體資源很少,是以如果隻是處理高峰期每分鐘百萬條的資料請求,是可以的,相當于高峰期把每個任務緩存在輕量級協程裡了,過了高峰期,輕量級協程就會減少,不會造成協程爆炸導緻記憶體崩潰。但是如果每分鐘百萬條資料請求是常态,這種方式肯定也會造成記憶體崩潰。
解決的方式有兩種:
1.資料庫級别的緩存作為任務隊列,硬碟比記憶體更大更便宜,但是這種方式的延遲會很大
2.就是加機器了做負載均衡了,沒有什麼事是加一台機器解決不了的,如果有,那就加兩台。