天天看點

sync.Map源碼分析

原文連結

golang 1.6之後,并發地讀寫會直接panic:

fatal error: concurrent map read and map write
package main
func main() {
    m := make(map[int]int)
    go func() {
        for {
            _ = m[1]
        }
    }()
    go func() {
        for {
            m[2] = 2
        }
    }()
    select {}
}
           

是以需要支援對map的并發讀寫時候,部落客使用兩種方法:

1、 第三方類庫 concurrent-map。

2、 map加上sync.RWMutex來保障線程(goroutine)安全的。

golang 1.9之後,go 在sync包下引入了并發安全的map,也為部落客提供了第三種方法。本文重點也在此,為了時效性,本文基于golang 1.10源碼進行分析。

sync.Map

結構體

Map

type Map struct {
    mu Mutex    //互斥鎖,用于鎖定dirty map

    read atomic.Value //優先讀map,支援原子操作,注釋中有readOnly不是說read是隻讀,而是它的結構體。read實際上有寫的操作

    dirty map[interface{}]*entry // dirty是一個目前最新的map,允許讀寫

    misses int // 主要記錄read讀取不到資料加鎖讀取read map以及dirty map的次數,當misses等于dirty的長度時,會将dirty複制到read
}
           

readOnly

readOnly 主要用于存儲,通過原子操作存儲在Map.read中元素。

type readOnly struct {
    m       map[interface{}]*entry
    amended bool // 如果資料在dirty中但沒有在read中,該值為true,作為修改辨別
}
           

entry

type entry struct {
    // nil: 表示為被删除,調用Delete()可以将read map中的元素置為nil
    // expunged: 也是表示被删除,但是該鍵隻在read而沒有在dirty中,這種情況出現在将read複制到dirty中,即複制的過程會先将nil标記為expunged,然後不将其複制到dirty
    //  其他: 表示存着真正的資料
    p unsafe.Pointer // *interface{}
}
           

原理

如果你接觸過大Java,那你一定對CocurrentHashMap利用鎖分段技術增加了鎖的數目,進而使争奪同一把鎖的線程的數目得到控制的原理記憶深刻。

那麼Golang的sync.Map是否也是使用了相同的原理呢?sync.Map的原理很簡單,使用了空間換時間政策,通過備援的兩個資料結構(read、dirty),實作加鎖對性能的影響。 通過引入兩個map将讀寫分離到不同的map,其中read map提供并發讀和已存元素原子寫,而dirty map則負責讀寫。 這樣read map就可以在不加鎖的情況下進行并發讀取,當read map中沒有讀取到值時,再加鎖進行後續讀取,并累加未命中數,當未命中數大于等于dirty map長度,将dirty map上升為read map。從之前的結構體的定義可以發現,雖然引入了兩個map,但是底層資料存儲的是指針,指向的是同一份值。

開始時sync.Map寫入資料

X=1
Y=2
Z=3
           

dirty map主要接受寫請求,read map沒有資料,此時read map與dirty map資料如下圖。

sync.Map源碼分析

讀取資料的時候從read map中讀取,此時read map并沒有資料,miss記錄從read map讀取失敗的次數,當misses>=len(dirty map)時,将dirty map直接更新為read map,這裡直接對dirty map進行位址拷貝并且dirty map被清空,misses置為0。此時read map與dirty map資料如下圖。

sync.Map源碼分析

現在有需求對Z元素進行修改Z=4,sync.Map會直接修改read map的元素。

sync.Map源碼分析

新加元素K=5,新加的元素就需要操作dirty map了,如果misses達到閥值後dirty map直接更新為read map并且dirty map為空map(read的amended==false),則dirty map需要從read map複制資料。

sync.Map源碼分析

更新後的效果如下。

sync.Map源碼分析

如果需要删除Z,需要分幾種情況:

一種read map存在該元素且read的amended==false:直接将read中的元素置為nil。

sync.Map源碼分析

另一種為元素剛剛寫入dirty map且未更新為read map:直接調用golang内置函數delete删除dirty map的元素;

sync.Map源碼分析

還有一種是read map和dirty map同時存在該元素:将read map中的元素置為nil,因為read map和dirty map 使用的均為元素位址,是以均被置為nil。

sync.Map源碼分析

優化點

1、空間換時間。通過備援的兩個資料結構(read、dirty),實作加鎖對性能的影響。

2、使用隻讀資料(read),避免讀寫沖突。

3、動态調整,miss次數多了之後,将dirty資料提升為read。

4、double-checking(雙重檢測)。

5、延遲删除。 删除一個鍵值隻是打标記,隻有在提升dirty的時候才清理删除的資料。

6、優先從read讀取、更新、删除,因為對read的讀取不需要鎖。

方法源碼分析

Load

Load傳回存儲在映射中的鍵值,如果沒有值,則傳回nil。ok結果訓示是否在映射中找到值。

func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
    // 第一次檢測元素是否存在
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        // 為dirty map 加鎖
        m.mu.Lock()
        // 第二次檢測元素是否存在,主要防止在加鎖的過程中,dirty map轉換成read map,進而導緻讀取不到資料
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            // 從dirty map中擷取是為了應對read map中不存在的新元素
            e, ok = m.dirty[key]
            // 不論元素是否存在,均需要記錄miss數,以便dirty map更新為read map
            m.missLocked()
        }
        // 解鎖
        m.mu.Unlock()
    }
    // 元素不存在直接傳回
    if !ok {
        return nil, false
    }
    return e.load()
}           

dirty map更新為read map

func (m *Map) missLocked() {
    // misses自增1
    m.misses++
    // 判斷dirty map是否可以更新為read map
    if m.misses < len(m.dirty) {
        return
    }
    // dirty map更新為read map
    m.read.Store(readOnly{m: m.dirty})
    // dirty map 清空
    m.dirty = nil
    // misses重置為0
    m.misses = 0
}
           

元素取值

func (e *entry) load() (value interface{}, ok bool) {
    p := atomic.LoadPointer(&e.p)
    // 元素不存在或者被删除,則直接傳回
    if p == nil || p == expunged {
        return nil, false
    }
    return *(*interface{})(p), true
}
           

read map主要用于讀取,每次Load都先從read讀取,當read中不存在且amended為true,就從dirty讀取資料 。無論dirty map中是否存在該元素,都會執行missLocked函數,該函數将misses+1,當m.misses >= len(m.dirty)時,便會将dirty複制到read,此時再将dirty置為nil,misses=0。

storage

設定Key=>Value。

func (m *Map) Store(key, value interface{}) {
    // 如果read存在這個鍵,并且這個entry沒有被标記删除,嘗試直接寫入,寫入成功,則結束
    // 第一次檢測
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok && e.tryStore(&value) {
        return
    }
    // dirty map鎖
    m.mu.Lock()
    // 第二次檢測
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        // unexpungelocc確定元素沒有被标記為删除
        // 判斷元素被辨別為删除
        if e.unexpungeLocked() {
            // 這個元素之前被删除了,這意味着有一個非nil的dirty,這個元素不在裡面.
            m.dirty[key] = e
        }
        // 更新read map 元素值
        e.storeLocked(&value)
    } else if e, ok := m.dirty[key]; ok {
        // 此時read map沒有該元素,但是dirty map有該元素,并需修改dirty map元素值為最新值
        e.storeLocked(&value)
    } else {
        // read.amended==false,說明dirty map為空,需要将read map 複制一份到dirty map
        if !read.amended {
            m.dirtyLocked()
            // 設定read.amended==true,說明dirty map有資料
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        // 設定元素進入dirty map,此時dirty map擁有read map和最新設定的元素
        m.dirty[key] = newEntry(value)
    }
    // 解鎖,有人認為鎖的範圍有點大,假設read map資料很大,那麼執行m.dirtyLocked()會耗費花時間較多,完全可以在操作dirty map時才加鎖,這樣的想法是不對的,因為m.dirtyLocked()中有寫入操作
    m.mu.Unlock()
}           

嘗試存儲元素。

func (e *entry) tryStore(i *interface{}) bool {
    // 擷取對應Key的元素,判斷是否辨別為删除
    p := atomic.LoadPointer(&e.p)
    if p == expunged {
        return false
    }
    for {
        // cas嘗試寫入新元素值
        if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
            return true
        }
        // 判斷是否辨別為删除
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
    }
}           

unexpungelocc確定元素沒有被标記為删除。如果這個元素之前被删除了,它必須在未解鎖前被添加到dirty map上。

func (e *entry) unexpungeLocked() (wasExpunged bool) {
    return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
從read map複制到dirty map。

func (m *Map) dirtyLocked() {
    if m.dirty != nil {
        return
    }

    read, _ := m.read.Load().(readOnly)
    m.dirty = make(map[interface{}]*entry, len(read.m))
    for k, e := range read.m {
        // 如果标記為nil或者expunged,則不複制到dirty map
        if !e.tryExpungeLocked() {
            m.dirty[k] = e
        }
    }
}           

LoadOrStore

如果對應的元素存在,則傳回該元素的值,如果不存在,則将元素寫入到sync.Map。如果已加載值,則加載結果為true;如果已存儲,則為false。

func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
    // 不加鎖的情況下讀取read map
    // 第一次檢測
    read, _ := m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        // 如果元素存在(是否辨別為删除由tryLoadOrStore執行處理),嘗試擷取該元素已存在的值或者将元素寫入
        actual, loaded, ok := e.tryLoadOrStore(value)
        if ok {
            return actual, loaded
        }
    }

    m.mu.Lock()
    // 第二次檢測
    // 以下邏輯參看Store
    read, _ = m.read.Load().(readOnly)
    if e, ok := read.m[key]; ok {
        if e.unexpungeLocked() {
            m.dirty[key] = e
        }
        actual, loaded, _ = e.tryLoadOrStore(value)
    } else if e, ok := m.dirty[key]; ok {
        actual, loaded, _ = e.tryLoadOrStore(value)
        m.missLocked()
    } else {
        if !read.amended {
            m.dirtyLocked()
            m.read.Store(readOnly{m: read.m, amended: true})
        }
        m.dirty[key] = newEntry(value)
        actual, loaded = value, false
    }
    m.mu.Unlock()

    return actual, loaded
}           

如果沒有删除元素,tryLoadOrStore将自動加載或存儲一個值。如果删除元素,tryLoadOrStore保持條目不變并傳回ok= false。

func (e *entry) tryLoadOrStore(i interface{}) (actual interface{}, loaded, ok bool) {
    p := atomic.LoadPointer(&e.p)
    // 元素辨別删除,直接傳回
    if p == expunged {
        return nil, false, false
    }
    // 存在該元素真實值,則直接傳回原來的元素值
    if p != nil {
        return *(*interface{})(p), true, true
    }

    // 如果p為nil(此處的nil,并是不是指元素的值為nil,而是atomic.LoadPointer(&e.p)為nil,元素的nil在unsafe.Pointer是有值的),則更新該元素值
    ic := i
    for {
        if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
            return i, false, true
        }
        p = atomic.LoadPointer(&e.p)
        if p == expunged {
            return nil, false, false
        }
        if p != nil {
            return *(*interface{})(p), true, true
        }
    }
}           

Delete

删除元素,采用延遲删除,當read map存在元素時,将元素置為nil,隻有在提升dirty的時候才清理删除的數,延遲删除可以避免後續擷取删除的元素時候需要加鎖。當read map不存在元素時,直接删除dirty map中的元素

func (m *Map) Delete(key interface{}) {
    // 第一次檢測
    read, _ := m.read.Load().(readOnly)
    e, ok := read.m[key]
    if !ok && read.amended {
        m.mu.Lock()
        // 第二次檢測
        read, _ = m.read.Load().(readOnly)
        e, ok = read.m[key]
        if !ok && read.amended {
            // 不論dirty map是否存在該元素,都會執行删除
            delete(m.dirty, key)
        }
        m.mu.Unlock()
    }
    if ok {
        // 如果在read中,則将其标記為删除(nil)
        e.delete()
    }
}           

元素值置為nil

func (e *entry) delete() (hadValue bool) {
    for {
        p := atomic.LoadPointer(&e.p)
        if p == nil || p == expunged {
            return false
        }
        if atomic.CompareAndSwapPointer(&e.p, p, nil) {
            return true
        }
    }
}
           

Range

周遊擷取sync.Map中所有的元素,使用的為快照方式,是以不一定是準确的。

func (m *Map) Range(f func(key, value interface{}) bool) {
    // 第一檢測
    read, _ := m.read.Load().(readOnly)
    // read.amended=true,說明dirty map包含所有有效的元素(含新加,不含被删除的),使用dirty map
    if read.amended {
        // 第二檢測
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        if read.amended {
            // 使用dirty map并且更新為read map
            read = readOnly{m: m.dirty}
            m.read.Store(read)
            m.dirty = nil
            m.misses = 0
        }
        m.mu.Unlock()
    }
    // 一貫原則,使用read map作為讀
    for k, e := range read.m {
        v, ok := e.load()
        // 被删除的不計入
        if !ok {
            continue
        }
        // 函數傳回false,終止
        if !f(k, v) {
            break
        }
    }
}           

總結

經過了上面的分析可以得到,sync.Map并不适合同時存在大量讀寫的場景,大量的寫會導緻read map讀取不到資料進而加鎖進行進一步讀取,同時dirty map不斷更新為read map。 進而導緻整體性能較低,特别是針對cache場景.針對append-only以及大量讀,少量寫場景使用sync.Map則相對比較合适。

sync.Map沒有提供擷取元素個數的Len()方法,不過可以通過Range()實作。

func Len(sm sync.Map) int {
    lengh := 0
    f := func(key, value interface{}) bool {
        lengh++
        return true
    }
    one:=lengh
    lengh=0
    sm.Range(f)
    if one != lengh {
        one = lengh
        lengh=0
        sm.Range(f)
        if one <lengh {
            return lengh
        }
        
    }
    return one
}