天天看点

Golang 异步对象序列化Map并发冲突和解决方法冲突过程冲突解决一个冲突实例的源码冲突错误日志前半部分

冲突过程

异步对象序列化,对象里面如果有map,并且使用json.marshal to string,或者其他方式对map 执行了read、write操作。

而业务逻辑同步写对象map值,那么,就会触发: fatal error: concurrent map read and map write

本质原因在于:两个goroute对同一个map执行了读写并发,而golang map默认是不支持并发操作、没有加锁

冲突解决

(1)在主业务逻辑里面,提前把对象转为string,丢给异步任务去执行(不局限写log,可以是其他对map的操作)

(2)map 的读写加锁,而读写加锁对性能的影响,以及死锁等又带来新的麻烦。

关于优化并发map的可以参考这个分析,比较全面:

https://misfra.me/optimizing-concurrent-map-access-in-go/

(3)主、异步任务交互参数改为string,或者对象的深clone,也就是完整的copy对象值,重造对象,而不是直接引用指针。

一个冲突实例的源码

读写map的行,都加以注释说明了。

补充说明:

(1)json.marshal 一个对象,如果对象里面有map,那么会触发对map的read操作

(2)seelog 在配置里面配置的是异步log,也就是写log的时候,加入异步队列,而代码里面也是传入对象的,从而这个对象是异步序列化,也就是异步map read操作

package main

import (
    "encoding/json"
    "fmt"
    seelog "github.com/cihub/seelog"
    "sync"
    "time"
)

//https://github.com/cihub/seelog/wiki/Logger-types
/* file seelog-main.xml
<seelog>  
    <outputs formatid="main">  
        <buffered size="10" flushperiod="1000">  
            <rollingfile type="date" filename="gologs/main.log" datepattern="2006.01.02" maxrolls="30"/>  
        </buffered>  
    </outputs>  
    <formats>  
        <format id="main" format="%Msg%n"/>  
    </formats>  
</seelog>  
*/

func main() {
    var logger, _ = seelog.LoggerFromConfigAsFile("conf/seelog-main.xml")
    seelog.ReplaceLogger(logger)
    defer seelog.Flush()
    seelog.Info("需要输入的日志")

    for i := 0; i < 40; i++ {
        //i := 0
        go workerSimlutor(i)
    }
    fmt.Printf("40 worker simlutor running....")
    time.Sleep(time.Duration(3) * 60 * time.Second)
    fmt.Printf("40 worker simlutor finish....")
}

func workerSimlutor(i int) {
    bmap := make(map[string]string)
    name := fmt.Sprintf("text_%d", i)

    ei := EventInfo{
        Name:           name,
        BusinessKey:    name,
        BusinessParams: bmap,
        lockBizParams:  &sync.RWMutex{},
    }

    for id := 0; id < 100; id++ {
        //strOut := ei.String() // 这里提前主动string,那么异步log 里面不会有map的read操作。
        seelog.Infof("i:%d,ei:%v", i, ei)// 这里是对象传入log,log里面 异步,异步marshal 对象,触发对对象里面map的read操作
    }

    for id := 0; id < 100; id++ {
        ei.SetVariable("num", fmt.Sprintf("%d", i)) // 这里更新map,触发write操作,
    }
}

type EventInfo struct {
    // 注册的 handler 的名称
    Name           string            `json:"name"`
    BusinessKey    string            `json:"businessKey"`
    BusinessParams map[string]string `json:"businessParams"`
    lockBizParams  *sync.RWMutex     `json:"-"`
}

func (ei *EventInfo) String() string {
    //ei.lockBizParams.Lock()
    //defer ei.lockBizParams.Unlock()  // 这里加锁能解决问题,不过复杂场景下,会导致死锁
    if b, err := json.Marshal(ei); err != nil {
        return fmt.Sprintf("%s", ei)
    } else {
        return fmt.Sprintf("%s", b)
    }
}

func (ei *EventInfo) SetVariable(key, val string) {
    ei.BusinessParams[key] = val
}

func (ei *EventInfo) GetVariable(key string) string {
    val, ok := ei.BusinessParams[key]
    if !ok {
        return ""
    }
    return val
}           

冲突错误日志前半部分

40 worker simlutor running....fatal error: concurrent map read and map write

goroutine 8 [running]:

runtime.throw(0x64f48e, 0x21)

D:/AmiddleStability/Go1.7/go/src/runtime/panic.go:566 +0x9c fp=0xc0420276f0 sp=0xc0420276d0           

runtime.mapaccess2(0x6079e0, 0xc042054f90, 0xc04213a540, 0xc0421608a0, 0xc04213a540)

D:/AmiddleStability/Go1.7/go/src/runtime/hashmap.go:340 +0x250 fp=0xc042027738 sp=0xc0420276f0           

reflect.mapaccess(0x6079e0, 0xc042054f90, 0xc04213a540, 0xc042054f90)

D:/AmiddleStability/Go1.7/go/src/runtime/hashmap.go:1008 +0x46 fp=0xc042027770 sp=0xc042027738           

reflect.Value.MapIndex(0x6079e0, 0xc042139f40, 0x95, 0x5f7080, 0xc04213a540, 0x98, 0x8, 0x200, 0xc042027870)

D:/AmiddleStability/Go1.7/go/src/reflect/value.go:1040 +0x12f fp=0xc0420277f8 sp=0xc042027770           

fmt.(*pp).printValue(0xc04213c000, 0x6079e0, 0xc042139f40, 0x95, 0x76, 0x1)

D:/AmiddleStability/Go1.7/go/src/fmt/print.go:738 +0x1216 fp=0xc0420279e8 sp=0xc0420277f8           

fmt.(*pp).printValue(0xc04213c000, 0x626640, 0xc042139f20, 0x99, 0x76, 0x0)

D:/AmiddleStability/Go1.7/go/src/fmt/print.go:764 +0x23a2 fp=0xc042027bd8 sp=0xc0420279e8           

fmt.(*pp).printArg(0xc04213c000, 0x626640, 0xc042139f20, 0x76)

D:/AmiddleStability/Go1.7/go/src/fmt/print.go:668 +0x1fc fp=0xc042027cd0 sp=0xc042027bd8           

fmt.(*pp).doPrintf(0xc04213c000, 0x6487a5, 0xa, 0xc04213e620, 0x2, 0x2)

D:/AmiddleStability/Go1.7/go/src/fmt/print.go:985 +0x1244 fp=0xc042027db8 sp=0xc042027cd0           

fmt.Sprintf(0x6487a5, 0xa, 0xc04213e620, 0x2, 0x2, 0xc042027e58, 0x0)

D:/AmiddleStability/Go1.7/go/src/fmt/print.go:196 +0x71 fp=0xc042027e10 sp=0xc042027db8           

github.com/cihub/seelog.(*logFormattedMessage).String(0xc042139f50, 0x664c02, 0x73b100)

D:/AmiddleStability/Go1.8/gopath/src/github.com/cihub/seelog/logger.go:369 +0x59 fp=0xc042027e58 sp=0xc042027e10           

github.com/cihub/seelog.(*commonLogger).processLogMsg(0xc0420963f0, 0x61ef02, 0x736d00, 0xc042139f50, 0x73b100, 0xc042162a10)

D:/AmiddleStability/Go1.8/gopath/src/github.com/cihub/seelog/logger.go:312 +0xa3 fp=0xc042027ea0 sp=0xc042027e58           

github.com/cihub/seelog.(*asyncLogger).processQueueElement(0xc0420963f0)

D:/AmiddleStability/Go1.8/gopath/src/github.com/cihub/seelog/behavior_asynclogger.go:115 +0x11f fp=0xc042027f40 sp=0xc042027ea0           

github.com/cihub/seelog.(*asyncLoopLogger).processItem(0xc0420963f0, 0x0)

D:/AmiddleStability/Go1.8/gopath/src/github.com/cihub/seelog/behavior_asynclooplogger.go:57 +0xf6 fp=0xc042027f68 sp=0xc042027f40           

github.com/cihub/seelog.(*asyncLoopLogger).processQueue(0xc0420963f0)

D:/AmiddleStability/Go1.8/gopath/src/github.com/cihub/seelog/behavior_asynclooplogger.go:63 +0x4b fp=0xc042027f88 sp=0xc042027f68           

runtime.goexit()

D:/AmiddleStability/Go1.7/go/src/runtime/asm_amd64.s:2086 +0x1 fp=0xc042027f90 sp=0xc042027f88           

created by github.com/cihub/seelog.NewAsyncLoopLogger

D:/AmiddleStability/Go1.8/gopath/src/github.com/cihub/seelog/behavior_asynclooplogger.go:40 +0xa6
           

goroutine 1 [sleep]:

time.Sleep(0x29e8d60800)

D:/AmiddleStability/Go1.7/go/src/runtime/time.go:59 +0xef           

main.main()

D:/AmiddleStability/godemo/test/rolllogseelog_concurrent_map.go:24 +0x18f
           

goroutine 5 [semacquire]:

sync.runtime_notifyListWait(0xc04200abd0, 0x0)

D:/AmiddleStability/Go1.7/go/src/runtime/sema.go:267 +0x130           

sync.(*Cond).Wait(0xc04200abc0)

D:/AmiddleStability/Go1.7/go/src/sync/cond.go:57 +0x87           

github.com/cihub/seelog.(*asyncLoopLogger).processItem(0xc0420961b0, 0x0)

D:/AmiddleStability/Go1.8/gopath/src/github.com/cihub/seelog/behavior_asynclooplogger.go:50 +0xba           

github.com/cihub/seelog.(*asyncLoopLogger).processQueue(0xc0420961b0)

D:/AmiddleStability/Go1.8/gopath/src/github.com/cihub/seelog/behavior_asynclooplogger.go:63 +0x4b           
D:/AmiddleStability/Go1.8/gopath/src/github.com/cihub/seelog/behavior_asynclooplogger.go:40 +0xa6
           

goroutine 6 [semacquire]:

sync.runtime_notifyListWait(0xc04200add0, 0x0)

D:/AmiddleStability/Go1.7/go/src/runtime/sema.go:267 +0x130           

sync.(*Cond).Wait(0xc04200adc0)

D:/AmiddleStability/Go1.7/go/src/sync/cond.go:57 +0x87           

github.com/cihub/seelog.(*asyncLoopLogger).processItem(0xc0420962d0, 0x0)

D:/AmiddleStability/Go1.8/gopath/src/github.com/cihub/seelog/behavior_asynclooplogger.go:50 +0xba           

github.com/cihub/seelog.(*asyncLoopLogger).processQueue(0xc0420962d0)

D:/AmiddleStability/Go1.8/gopath/src/github.com/cihub/seelog/behavior_asynclooplogger.go:63 +0x4b           
D:/AmiddleStability/Go1.8/gopath/src/github.com/cihub/seelog/behavior_asynclooplogger.go:40 +0xa6