Map結構是go語言項目經常使用的資料結構,map使用簡單對于資料量不大的場合使用非常合适。Map結構是如何實作的?我們先從測試程式入手,我們希望分析map的建立、插入、查詢、删除等流程,是以我們的測試程式就要包括這幾種操作,測試程式如下:
//Test.go
import (
"fmt"
)
func main() {
testmap()
}
func testmap() {
fmt.Printf("testmap start \n")
var test1 map[int64]int
test1=make(map[int64]int,10)
test1[3]=3
v, ok := test1[3]
fmt.Printf("test1 kv: \n",v,ok)
for k, v := range test1 {
fmt.Printf("test1 kv: \n",k,v)
}
test2:=make(map[int64]int,500)
test2[400]=400
v2, ok2 := test2[400]
fmt.Printf("test2 400 \n",v2,ok2)
delete(test2,400)
fmt.Printf("testmap end \n")
}
主要資料結構
const (
// Maximum number of key/value pairs a bucket can hold.
bucketCntBits = 3
bucketCnt = 1 << bucketCntBits
// Maximum average load of a bucket that triggers growth is 6.5.
// Represent as loadFactorNum/loadFactDen, to allow integer math.
loadFactorNum = 13
loadFactorDen = 2
// Maximum key or value size to keep inline (instead of mallocing per element).
// Must fit in a uint8.
// Fast versions cannot handle big values - the cutoff size for
// fast versions in ../../cmd/internal/gc/walk.go must be at most this value.
maxKeySize = 128
maxValueSize = 128
// data offset should be the size of the bmap struct, but needs to be
// aligned correctly. For amd64p32 this means 64-bit alignment
// even though pointers are 32 bit.
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)
// Possible tophash values. We reserve a few possibilities for special marks.
// Each bucket (including its overflow buckets, if any) will have either all or none of its
// entries in the evacuated* states (except during the evacuate() method, which only happens
// during map writes and thus no one else can observe the map during that time).
empty = 0 // cell is empty
evacuatedEmpty = 1 // cell is empty, bucket is evacuated.
evacuatedX = 2 // key/value is valid. Entry has been evacuated to first half of larger table.
evacuatedY = 3 // same as above, but evacuated to second half of larger table.
minTopHash = 4 // minimum tophash for a normal filled cell.
// flags
iterator = 1 // there may be an iterator using buckets
oldIterator = 2 // there may be an iterator using oldbuckets
hashWriting = 4 // a goroutine is writing to the map
sameSizeGrow = 8 // the current map growth is to a new map of the same size
// sentinel bucket ID for iterator checks
noCheck = 1<<(8*sys.PtrSize) - 1
)
// A header for a Go map.
type hmap struct {
// Note: the format of the Hmap is encoded in ../../cmd/internal/gc/reflect.go and
// ../reflect/type.go. Don't change this structure without also changing that code!
count int // # live cells == size of map. Must be first (used by len() builtin)
flags uint8
B uint8 //hash桶buckets的數量為2^B個
noverflow uint16 // approximate number of overflow buckets; see incrnoverflow for details
hash0 uint32 // hash seed
buckets unsafe.Pointer // array of 2^B Buckets. may be nil if count==0.
oldbuckets unsafe.Pointer // previous bucket array of half the size, non-nil only when growing
nevacuate uintptr // progress counter for evacuation (buckets less than this have been evacuated)
extra *mapextra // optional fields
}
// mapextra holds fields that are not present on all maps.
type mapextra struct {
overflow *[]*bmap
oldoverflow *[]*bmap
// nextOverflow holds a pointer to a free overflow bucket.
nextOverflow *bmap
}
// A bucket for a Go map.
type bmap struct {
tophash [bucketCnt]uint8
}
結構中每個字段的含義很多文章都有詳細說明,這裡在補充說明下重點,建立map結構就是建立一個hmap,hmap中buckets中具體儲存着kv值,bmap就是buckets的具體結構,上面結構中看bmap内是個tophash[8]數組,kv存放在何處了呢?其實這和c語言裡面的隐藏指針一樣,建立buckets時是建立一個144位元組(根據機器不同會有不同值)的一段buff,将這個buff轉成了bmap指針,buckets的大小定義如下:
size := bucketSize*(1+ktyp.size+etyp.size) + overflowPad + ptrSize
這裡可以看出來是将key和value包含在内。
查詢時先根據key計算出散列的hash值,然後根據B計算出具體哪一個bucket,找到bucket後根據hash值的高8位與tophash比較如果相等就進行key值比較,如果相等就找到了key,如果不等根據overflow查詢下一個tophash繼續比較。
看下整體結構圖就明白了這些字段之間的聯系:
![](https://img.laitimes.com/img/__Qf2AjLwojIjJCLyojI0JCLiAzNfRHLGZkRGZkRfJ3bs92YsYTMfVmepNHL9UleOd3a61UeZR1Y14kMMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZuBnLwkTNxMDNwATMxETNwAjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.png)
根據hight bit 和low bit查找key:
回到之前的測試代碼,将測試代碼編譯成彙編代碼我們就可以友善的知道map的這些操作調用了哪些接口,是以我們要将測試代碼生成彙編,生成彙編指令:go tool compile -N -l -S test.go,彙編代碼(部分)如下:
[[email protected] cyq]# go tool compile -N -l -S test.go
"".main STEXT size=48 args=0x0 locals=0x8
0x0000 00000 (test.go:10) TEXT "".main(SB), $8-0
0x0000 00000 (test.go:10) MOVQ (TLS), CX
0x0009 00009 (test.go:10) CMPQ SP, 16(CX)
0x000d 00013 (test.go:10) JLS 41
0x000f 00015 (test.go:10) SUBQ $8, SP
0x0013 00019 (test.go:10) MOVQ BP, (SP)
0x0017 00023 (test.go:10) LEAQ (SP), BP
0x001b 00027 (test.go:10) FUNCDATA $0, gclocals•33cdeccccebe80329f1fdbee7f5874cb(SB)
0x001b 00027 (test.go:10) FUNCDATA $1, gclocals•33cdeccccebe80329f1fdbee7f5874cb(SB)
0x001b 00027 (test.go:11) PCDATA $0, $0
0x001b 00027 (test.go:11) CALL "".testmap(SB)
0x0020 00032 (test.go:12) MOVQ (SP), BP
0x0024 00036 (test.go:12) ADDQ $8, SP
0x0028 00040 (test.go:12) RET
0x0029 00041 (test.go:12) NOP
0x0029 00041 (test.go:10) PCDATA $0, $-1
0x0029 00041 (test.go:10) CALL runtime.morestack_noctxt(SB)
0x002e 00046 (test.go:10) JMP 0
0x0000 64 48 8b 0c 25 00 00 00 00 48 3b 61 10 76 1a 48 dH..%....H;a.v.H
0x0010 83 ec 08 48 89 2c 24 48 8d 2c 24 e8 00 00 00 00 ...H.,$H.,$.....
0x0020 48 8b 2c 24 48 83 c4 08 c3 e8 00 00 00 00 eb d0 H.,$H...........
rel 5+4 t=16 TLS+0
rel 28+4 t=8 "".testmap+0
rel 42+4 t=8 runtime.morestack_noctxt+0
map初始化
先來看看make(map[int64]int,10) 這行對應的初始化函數吧:
0x0089 00137 (test.go:17) LEAQ type.map[int64]int(SB), AX
0x0090 00144 (test.go:17) MOVQ AX, (SP)
0x0094 00148 (test.go:17) MOVQ $10, 8(SP)
0x009d 00157 (test.go:17) LEAQ ""..autotmp_23+520(SP), AX
0x00a5 00165 (test.go:17) MOVQ AX, 16(SP)
0x00aa 00170 (test.go:17) PCDATA $0, $1
0x00aa 00170 (test.go:17) CALL runtime.makemap(SB)
根據彙編代碼可知是調用了:runtime.makemap:
func makemap(t *maptype, hint int, h *hmap) *hmap {
// The size of hmap should be 48 bytes on 64 bit
// and 28 bytes on 32 bit platforms.
if debug.gctrace > 0 {
print("makemap: hint=", hint, "\n")
}
if sz := unsafe.Sizeof(hmap{}); sz != 8+5*sys.PtrSize {
println("runtime: sizeof(hmap) =", sz, ", t.hmap.size =", t.hmap.size)
throw("bad hmap size")
}
if hint < 0 || hint > int(maxSliceCap(t.bucket.size)) {
hint = 0
}
// initialize Hmap
if h == nil {
h = (*hmap)(newobject(t.hmap))
}
//生成哈希因子
h.hash0 = fastrand()
if debug.gctrace > 0 {
print("makemap: hash0=", h.hash0, "\n")
}
// find size parameter which will hold the requested # of elements
B := uint8(0)
//根據使用者初始設定的元素個數設定合适的B
for overLoadFactor(hint, B) {
B++
}
h.B = B
if debug.gctrace > 0 {
print("makemap: B=", B, "\n")
}
if h.B != 0 {
var nextOverflow *bmap
//如果有初始的值,配置設定buckets
h.buckets, nextOverflow = makeBucketArray(t, h.B)
if nextOverflow != nil {
h.extra = new(mapextra)
h.extra.nextOverflow = nextOverflow
}
}
return h
}
需要注意B的計算公式:overLoadFactor(hint, B)隻有一行代碼:
return hint > bucketCnt && uintptr(hint) > loadFactorNum*(bucketShift(B)/loadFactorDen)
即B的大小應滿足 hint <= (2^B) * 6.5,B取滿足該式的最小值。
配置設定hash數組,是在makeBucketArray函數中。
func makeBucketArray(t *maptype, b uint8) (buckets unsafe.Pointer, nextOverflow *bmap) {
// base代表使用者預期的桶的數量,即hash數組的真實大小,2^B
base := bucketShift(b)
nbuckets := base
// For small b, overflow buckets are unlikely.
// Avoid the overhead of the calculation.
if b >= 4 {
// Add on the estimated number of overflow buckets
// required to insert the median number of elements
// used with this value of b.
nbuckets += bucketShift(b - 4)
sz := t.bucket.size * nbuckets
up := roundupsize(sz)
if up != sz {
nbuckets = up / t.bucket.size
}
}
buckets = newarray(t.bucket, int(nbuckets))
if base != nbuckets {
nextOverflow = (*bmap)(add(buckets, base*uintptr(t.bucketsize)))
last := (*bmap)(add(buckets, (nbuckets-1)*uintptr(t.bucketsize)))
last.setoverflow(t, (*bmap)(buckets))
}
return buckets, nextOverflow
}
如果B>4,就需要多申請一部分bucket,将多申請的bucket記錄在nextOverflow,這樣等需要擴容是直接就可以從nextOverflow中擷取bucket。
如下圖:
map插入和更新
0x00bc 00188 (test.go:18) LEAQ type.map[int64]int(SB), AX
0x00c3 00195 (test.go:18) MOVQ AX, (SP)
0x00c7 00199 (test.go:18) MOVQ "".test1+168(SP), AX
0x00cf 00207 (test.go:18) MOVQ AX, 8(SP)
0x00d4 00212 (test.go:18) MOVQ $3, 16(SP)
0x00dd 00221 (test.go:18) PCDATA $0, $2
0x00dd 00221 (test.go:18) CALL runtime. mapassign (SB)
0x00e2 00226 (test.go:18) MOVQ 24(SP), AX
0x00e7 00231 (test.go:18) MOVQ AX, ""..autotmp_24+224(SP)
0x00ef 00239 (test.go:18) TESTB AL, (AX)
0x00f1 00241 (test.go:18) MOVQ $3, (AX)
插入是調用了mapassign函數。當然後有人可能是mapassign_fast64,他們是一族函數就看編譯器怎麼選擇了。
func mapassign(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
if h == nil {
panic(plainError("assignment to entry in nil map"))
}
if raceenabled {
callerpc := getcallerpc()
pc := funcPC(mapassign)
racewritepc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled {
msanread(key, t.key.size)
}
// 在這裡做并發判斷,檢測到并發寫時,抛異常
// 注意:go map的并發檢測是僞檢測,并不保證所有的并發都會被檢測出來。而且這玩意是在運作期檢測。
// 是以對map有并發要求時,應使用sync.map來代替普通map,通過加鎖來阻斷并發沖突
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
alg := t.key.alg
// 這裡得到hash值
hash := alg.hash(key, uintptr(h.hash0))
// 置Writing标志,key寫入buckets後才會清除标志
h.flags |= hashWriting
// map不能為空,但hash數組可以初始是空的,這裡會初始化
if h.buckets == nil {
h.buckets = newobject(t.bucket) // newarray(t.bucket, 1)
}
again:
// 這裡用hash值的低階位定位hash數組的下标偏移量
bucket := hash & bucketMask(h.B)
if h.growing() {
// // 這裡是map的擴容縮容操作,後面單獨講
growWork(t, h, bucket)
}
// 通過下标bucket,偏移定位到具體的桶
b := (*bmap)(unsafe.Pointer(uintptr(h.buckets) + bucket*uintptr(t.bucketsize)))
top := tophash(hash) // 這裡取高8位用于在桶内定位鍵值對
var inserti *uint8 // tophash插入位置
var insertk unsafe.Pointer // key插入位置
var val unsafe.Pointer // value插入位置
for {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
if b.tophash[i] == empty && inserti == nil {
// 找到個空位,先記錄下tophash、key、value的插入位置
// 但要周遊完才能确定要不要插入到這個位置,因為後面有可能有重複的元素
inserti = &b.tophash[i]
insertk = add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
}
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
if t.indirectkey {
k = *((*unsafe.Pointer)(k))
}
if !alg.equal(key, k) {
continue
}
// 走到這裡說明map裡找到一個重複的key,更新key-value
if t.needkeyupdate {
typedmemmove(t.key, k, key)
}
val = add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
goto done
}
ovf := b.overflow(t)
if ovf == nil {
break
}
b = ovf
}
//這裡判斷要不要擴容,後面單獨講
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}
// inserti == nil說明上1步沒找到空位,整個連結清單是滿的,這裡添加一個新的溢出桶上去
if inserti == nil {
/// 配置設定新溢出桶,優先用預留的溢出桶,用完了則配置設定一個新桶記憶體
newb := h.newoverflow(t, b)
inserti = &newb.tophash[0]
insertk = add(unsafe.Pointer(newb), dataOffset)
val = add(insertk, bucketCnt*uintptr(t.keysize))
}
// // 當key或value的類型大小超過一定值(128位元組)時,桶隻存儲key或value的指針。這裡配置設定空間并取指針
if t.indirectkey {
kmem := newobject(t.key)
*(*unsafe.Pointer)(insertk) = kmem
insertk = kmem
}
if t.indirectvalue {
vmem := newobject(t.elem)
*(*unsafe.Pointer)(val) = vmem
}
typedmemmove(t.key, insertk, key) // 在桶中對應位置插入key
*inserti = top // 插入tophash,hash值高8位
h.count++ // 插入了新的鍵值對,h.count數量+1
done:
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
h.flags &^= hashWriting // 釋放hashWriting标志位
if t.indirectvalue {
val = *((*unsafe.Pointer)(val))
}
// 傳回value可插入位置的指針,注意,value還沒插入
return val
}
函數的最後傳回儲存值的位址,我們在調到彙編代碼中就可以看到在彙編中将值儲存到該位址内。
map擴容和縮容
在上面說到了擴容,擴容/縮容的觸發條件:
(1)目前不處在growing狀态
(2)觸發擴容:map的資料量count大于hash桶數量(2^B) * 6.5,2^B是hash數組大小,不包括溢出的桶
(3)觸發縮容:溢出的桶數量noverflow>=32768(1<<15)或者>=hash數組大小。
擴容和縮容是同一個函數hashGrow():
func hashGrow(t *maptype, h *hmap) {
// If we've hit the load factor, get bigger.
// Otherwise, there are too many overflow buckets,
// so keep the same number of buckets and "grow" laterally.
bigger := uint8(1)
if !overLoadFactor(h.count+1, h.B) {
bigger = 0
h.flags |= sameSizeGrow
}
oldbuckets := h.buckets
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger)
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// commit the grow (atomic wrt gc)
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0
if h.extra != nil && h.extra.overflow != nil {
// Promote current overflow buckets to the old generation.
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}
// the actual copying of the hash table data is done incrementally
// by growWork() and evacuate().
}
在hashGrow()開始,會先判斷是否滿足擴容條件,如果滿足就表明這次是擴容,不滿足就一定是縮容條件觸發了。擴容和縮容剩下的邏輯,主要差別就在于容量變化,就是hmap.B參數,擴容時B+1則hash表容量擴大1倍,縮容時hash表容量不變。
- h.oldbuckets:指向舊的hash數組,即目前的h.buckets
- h.buckets:指向新建立的hash數組
到這裡觸發的主要工作已經完成,接下來就是怎麼把元素搬遷到新hash表裡了。如果現在就一次全量搬遷過去,顯然接下來會有比較長的一段時間map被占用(不支援并發)。是以搬遷的工作是異步增量搬遷的。
在插入和删除的函數内都有下面一段代碼用于在每次插入和删除操作時,執行一次搬遷工作。
每執行一次插入或删除,都會調用growWork搬遷0~2個hash桶(有可能這次需要搬遷的2個桶在此之前都被搬過了)。搬遷是以hash桶為機關的,包含對應的hash桶和這個桶的溢對外連結表。被delete掉的元素(emptyone标志)會被舍棄(這是縮容的關鍵),被搬遷過的bucket會被設定标記evacuatedX/evacuatedY
func growWork(t *maptype, h *hmap, bucket uintptr) {
// make sure we evacuate the oldbucket corresponding
// to the bucket we're about to use
evacuate(t, h, bucket&h.oldbucketmask())
// evacuate one more oldbucket to make progress on growing
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
真正進行bucket搬移的evacuate函數。
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
newbit := h.noldbuckets()//如果是擴容,因為每次都是擴兩倍,相當于之前一個bucket分到兩個bucket裡面,這裡就是找到新的bucket(其實newbit就是老bucket個數)。
if !evacuated(b) {
var xy [2]evacDst
x := &xy[0]
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.v = add(x.k, bucketCnt*uintptr(t.keysize))
if !h.sameSizeGrow() {
// Only calculate y pointers if we're growing bigger.
// Otherwise GC can see bad pointers.
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.v = add(y.k, bucketCnt*uintptr(t.keysize))
}
for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset)
v := add(k, bucketCnt*uintptr(t.keysize))
for i := 0; i < bucketCnt; i, k, v = i+1, add(k, uintptr(t.keysize)), add(v, uintptr(t.valuesize)) {
top := b.tophash[i]
if top == empty {
b.tophash[i] = evacuatedEmpty
continue
}
if top < minTopHash {
throw("bad map state")
}
k2 := k
if t.indirectkey {
k2 = *((*unsafe.Pointer)(k2))
}
var useY uint8
if !h.sameSizeGrow() {
hash := t.key.alg.hash(k2, uintptr(h.hash0))
if h.flags&iterator != 0 && !t.reflexivekey && !t.key.alg.equal(k2, k2) {
useY = top & 1
top = tophash(hash)
} else {
if hash&newbit != 0 {
useY = 1
}
}
}
if evacuatedX+1 != evacuatedY {
throw("bad evacuatedN")
}
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY
dst := &xy[useY] // evacuation destination
if dst.i == bucketCnt {
dst.b = h.newoverflow(t, dst.b)
dst.i = 0
dst.k = add(unsafe.Pointer(dst.b), dataOffset)
dst.v = add(dst.k, bucketCnt*uintptr(t.keysize))
}
dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
if t.indirectkey {
*(*unsafe.Pointer)(dst.k) = k2 // copy pointer
} else {
typedmemmove(t.key, dst.k, k) // copy value
}
if t.indirectvalue {
*(*unsafe.Pointer)(dst.v) = *(*unsafe.Pointer)(v)
} else {
typedmemmove(t.elem, dst.v, v)
}
dst.i++
// These updates might push these pointers past the end of the
// key or value arrays. That's ok, as we have the overflow pointer
// at the end of the bucket to protect against pointing past the
// end of the bucket.
dst.k = add(dst.k, uintptr(t.keysize))
dst.v = add(dst.v, uintptr(t.valuesize))
}
}
// Unlink the overflow buckets & clear key/value to help GC.
if h.flags&oldIterator == 0 && t.bucket.kind&kindNoPointers == 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
// Preserve b.tophash because the evacuation
// state is maintained there.
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}
Evacuate函數實作兩個目的:
1、如果是擴容将老的bucket分攤到新的兩個(其中之一是和老bucket相同位置,另外一個就是間隔老bucket個數的下一個bucket)bucket中。如下圖:
2、如果是縮容,就是将溢出桶内的key/value緊湊存放
縮容僅僅針對溢出桶太多的情況,觸發縮容時hash數組的大小不變,即hash數組所占用的空間隻增不減。也就是說,如果我們把一個已經增長到很大的map的元素挨個全部删除掉,hash表所占用的記憶體空間也不會被釋放
map删除
0x06f6 01782 (test.go:28) LEAQ type.map[int64]int(SB), AX
0x06fd 01789 (test.go:28) MOVQ AX, (SP)
0x0701 01793 (test.go:28) MOVQ "".test2+160(SP), AX
0x0709 01801 (test.go:28) MOVQ AX, 8(SP)
0x070e 01806 (test.go:28) MOVQ $400, 16(SP)
0x0717 01815 (test.go:28) PCDATA $0, $8
0x0717 01815 (test.go:28) CAL L runtime.mapdelete(SB)
删除與插入類似,前面的步驟都是參數和狀态判斷、定位key-value位置,然後clear對應的記憶體。不展開說。以下是幾個關鍵點:
(1)删除過程中也會置hashWriting标志
(2)當key/value過大時,hash表裡存儲的是指針,這時候用軟删除,置指針為nil,資料交給gc去删。當然,這是map的内部處理,外層是無感覺的,拿到的都是值拷貝
(3)無論Key/value是值類型還是指針類型,删除操作都隻影響hash表,外層已經拿到的資料不受影響。尤其是指針類型,外層的指針還能繼續使用
func mapdelete(t *maptype, h *hmap, key unsafe.Pointer) {
if raceenabled && h != nil {
callerpc := getcallerpc()
pc := funcPC(mapdelete)
racewritepc(unsafe.Pointer(h), callerpc, pc)
raceReadObjectPC(t.key, key, callerpc, pc)
}
if msanenabled && h != nil {
msanread(key, t.key.size)
}
if h == nil || h.count == 0 {
return
}
if h.flags&hashWriting != 0 {
throw("concurrent map writes")
}
alg := t.key.alg
hash := alg.hash(key, uintptr(h.hash0))
// Set hashWriting after calling alg.hash, since alg.hash may panic,
// in which case we have not actually done a write (delete).
h.flags |= hashWriting
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
top := tophash(hash)
search:
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
if b.tophash[i] != top {
continue
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
k2 := k
if t.indirectkey {
k2 = *((*unsafe.Pointer)(k2))
}
if !alg.equal(key, k2) {
continue
}
// Only clear key if there are pointers in it.
if t.indirectkey {
*(*unsafe.Pointer)(k) = nil
} else if t.key.kind&kindNoPointers == 0 {
memclrHasPointers(k, t.key.size)
}
// Only clear value if there are pointers in it.
if t.indirectvalue || t.elem.kind&kindNoPointers == 0 {
v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
if t.indirectvalue {
*(*unsafe.Pointer)(v) = nil
} else {
memclrHasPointers(v, t.elem.size)
}
}
b.tophash[i] = empty
h.count--
break search
}
}
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
h.flags &^= hashWriting
}
有了前面的查詢和插入基礎删除就比較簡單了,删除就是将key對應的tophash設定成empty,記憶體并沒有釋放。
map周遊
先看幾個标記:
// 可能有疊代器使用
bucketsiterator =1
// 可能有疊代器使用
oldbucketsoldIterator =2
// 有協程正在向 map 中寫入
keyhashWriting =4
// 等量擴容(對應條件 2)
sameSizeGrow =8
本來 map 的周遊過程比較簡單:周遊所有的 bucket 以及它後面挂的 overflow bucket,然後挨個周遊 bucket 中的所有 key。每個 bucket 中包含 8 個 key,取出 key 和 value,這個過程就完成了。
但是,現實并沒有這麼簡單。還記得前面講過的擴容過程嗎?擴容過程不是一個原子的操作,它每次最多隻搬運 2 個 bucket,是以如果觸發了擴容操作,那麼在很長時間裡,map 的狀态都是處于一個中間态:有些 bucket 已經搬遷到新家,而有些 bucket 還待在老地方。
是以,周遊如果發生在擴容的過程中,就會涉及到周遊新老 bucket 的過程,這是難點所在。
先看下彙編代碼中疊代器的使用:
0x0297 00663 (test.go:21) MOVQ "".test1+168(SP), AX
0x029f 00671 (test.go:21) MOVQ AX, ""..autotmp_13+232(SP)
0x02a7 00679 (test.go:21) LEAQ ""..autotmp_14+568(SP), DI
0x02af 00687 (test.go:21) XORPS X0, X0
0x02b2 00690 (test.go:21) LEAQ -32(DI), DI
0x02b6 00694 (test.go:21) DUFFZERO $273
0x02c9 00713 (test.go:21) LEAQ type.map[int64]int(SB), AX
0x02d0 00720 (test.go:21) MOVQ AX, (SP)
0x02d4 00724 (test.go:21) MOVQ ""..autotmp_13+232(SP), AX
0x02dc 00732 (test.go:21) MOVQ AX, 8(SP)
0x02e1 00737 (test.go:21) LEAQ ""..autotmp_14+568(SP), AX
0x02e9 00745 (test.go:21) MOVQ AX, 16(SP)
0x02ee 00750 (test.go:21) PCDATA $0, $5
0x02ee 00750 (test.go:21) CALL runtime.mapiterinit(SB)
0x02f3 00755 (test.go:21) JMP 757
0x02f5 00757 (test.go:21) MOVQ ""..autotmp_14+568(SP), AX
0x02fd 00765 (test.go:21) TESTQ AX, AX
0x0300 00768 (test.go:21) JNE 775
0x0302 00770 (test.go:21) JMP 1226
0x0307 00775 (test.go:21) MOVQ ""..autotmp_14+576(SP), AX
0x030f 00783 (test.go:21) TESTB AL, (AX)
0x0311 00785 (test.go:21) MOVQ (AX), AX
0x0314 00788 (test.go:21) MOVQ AX, ""..autotmp_29+112(SP)
0x0319 00793 (test.go:21) MOVQ ""..autotmp_14+568(SP), AX
0x0321 00801 (test.go:21) TESTB AL, (AX)
0x0323 00803 (test.go:21) MOVQ (AX), AX
0x0326 00806 (test.go:21) MOVQ AX, "".k+96(SP)
0x032b 00811 (test.go:21) MOVQ ""..autotmp_29+112(SP), AX
0x0330 00816 (test.go:21) MOVQ AX, "".v+88(SP)
疊代器使用流程圖:
type hiter struct{
// key 指針
key unsafe.Pointer
// value 指針
value unsafe.Pointer
// map 類型,包含如 key size 大小等
t *maptype
// map header
h *hmap
// 初始化時指向的 bucket
buckets unsafe.Pointer
// 目前周遊到的 bmap
bptr *bmap
overflow [2]*[]*bmap
oldoverflow *[]*bmap
// 起始周遊的 bucet 編号
startBucket uintptr
// 周遊開始時 key 的編号(每個 bucket 中有 8 個 cell)
offset uint8
// 是否從頭周遊了
wrapped bool
// B 的大小 B uint8
// 訓示目前 key 序号
i uint8
// 指向目前的 bucket
bucket uintptr
// 因為擴容,需要檢查的 bucket
checkBucket uintptr
}
mapiterinit 就是對 hiter 結構體裡的字段進行初始化指派操作,即使是對一個寫死的 map 進行周遊,每次出來的結果也是無序的。
func mapiterinit(t *maptype, h *hmap, it *hiter) {
if raceenabled && h != nil {
callerpc := getcallerpc()
racereadpc(unsafe.Pointer(h), callerpc, funcPC(mapiterinit))
}
if h == nil || h.count == 0 {
return
}
if unsafe.Sizeof(hiter{})/sys.PtrSize != 12 {
throw("hash_iter size incorrect") // see ../../cmd/internal/gc/reflect.go
}
it.t = t
it.h = h
// grab snapshot of bucket state
it.B = h.B
it.buckets = h.buckets
if t.bucket.kind&kindNoPointers != 0 {
h.createOverflow()
it.overflow = h.extra.overflow
it.oldoverflow = h.extra.oldoverflow
}
// 生成随機數 r
r := uintptr(fastrand())
if h.B > 31-bucketCntBits {
r += uintptr(fastrand()) << 31
}
//從哪個 bucket 開始周遊
it.startBucket = r & bucketMask(h.B)
//從 bucket 的哪個 key 開始周遊
it.offset = uint8(r >> h.B & (bucketCnt - 1))
// iterator state
it.bucket = it.startBucket
// Remember we have an iterator.
// Can run concurrently with another mapiterinit().
if old := h.flags; old&(iterator|oldIterator) != iterator|oldIterator {
atomic.Or8(&h.flags, iterator|oldIterator)
}
mapiternext(it)
}
在 mapiternext 函數中就會從 it.startBucket 的 it.offset 号的 key 開始周遊,取出其中的 key 和 value,直到又回到起點 bucket,完成周遊過程。
源碼部分比較好看懂,尤其是了解了前面注釋的幾段代碼後,再看這部分代碼就沒什麼壓力了。下面通過圖形化的方式講解整個周遊過程,希望能夠清晰易懂。
func mapiternext(it *hiter) {
h := it.h
if raceenabled {
callerpc := getcallerpc()
racereadpc(unsafe.Pointer(h), callerpc, funcPC(mapiternext))
}
if h.flags&hashWriting != 0 {
throw("concurrent map iteration and map write")
}
t := it.t
bucket := it.bucket
b := it.bptr
i := it.i
checkBucket := it.checkBucket
alg := t.key.alg
next:
if b == nil {
if bucket == it.startBucket && it.wrapped {
// end of iteration
it.key = nil
it.value = nil
return
}
if h.growing() && it.B == h.B {
oldbucket := bucket & it.h.oldbucketmask()
b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
if !evacuated(b) {
checkBucket = bucket
} else {
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
} else {
b = (*bmap)(add(it.buckets, bucket*uintptr(t.bucketsize)))
checkBucket = noCheck
}
bucket++
if bucket == bucketShift(it.B) {
bucket = 0
it.wrapped = true
}
i = 0
}
for ; i < bucketCnt; i++ {
offi := (i + it.offset) & (bucketCnt - 1)
if b.tophash[offi] == empty || b.tophash[offi] == evacuatedEmpty {
continue
}
k := add(unsafe.Pointer(b), dataOffset+uintptr(offi)*uintptr(t.keysize))
if t.indirectkey {
k = *((*unsafe.Pointer)(k))
}
v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+uintptr(offi)*uintptr(t.valuesize))
if checkBucket != noCheck && !h.sameSizeGrow() {
if t.reflexivekey || alg.equal(k, k) {
hash := alg.hash(k, uintptr(h.hash0))
if hash&bucketMask(it.B) != checkBucket {
continue
}
} else {
if checkBucket>>(it.B-1) != uintptr(b.tophash[offi]&1) {
continue
}
}
}
if (b.tophash[offi] != evacuatedX && b.tophash[offi] != evacuatedY) ||
!(t.reflexivekey || alg.equal(k, k)) {
it.key = k
if t.indirectvalue {
v = *((*unsafe.Pointer)(v))
}
it.value = v
} else {
rk, rv := mapaccessK(t, h, k)
if rk == nil {
continue // key has been deleted
}
it.key = rk
it.value = rv
}
it.bucket = bucket
if it.bptr != b { // avoid unnecessary write barrier; see issue 14921
it.bptr = b
}
it.i = i + 1
it.checkBucket = checkBucket
return
}
b = b.overflow(t)
i = 0
goto next
}
假設我們有下圖所示的一個 map,起始時 B = 1,有兩個 bucket,後來觸發了擴容(這裡不要深究擴容條件,隻是一個設定),B 變成 2。并且, 1 号 bucket 中的内容搬遷到了新的 bucket, 1号裂變成 1号和 3号;0号 bucket 暫未搬遷。老的 bucket 挂在在 *oldbuckets 指針上面,新的 bucket 則挂在 *buckets 指針上面
這時,我們對此 map 進行周遊。假設經過初始化後,startBucket = 3,offset = 0。于是,周遊的起點将是 3 号 bucket 的 0 号 key, bucket 周遊順序為:3 -> 0 -> 1 -> 2。
因為 3 号 bucket 對應老的 1 号 bucket,是以先檢查老 1 号 bucket 是否已經被搬遷過。判斷方法就是:
func evacuated(b *bmap) bool {
h := b.tophash[0]
return h > empty && h < minTopHash
}
如果 b.tophash[0] 的值在标志值範圍内,即在 (0,4) 區間裡,說明已經被搬遷過了。
empty = 0
evacuatedEmpty = 1
evacuatedX = 2
evacuatedY = 3
minTopHash = 4
在本例中,老 1 号 bucket 已經被搬遷過了。是以它的 tophash[0] 值在 (0,4) 範圍内,是以隻用周遊新的 3 号 bucket。
依次周遊 3 号 bucket 的 key,這時候會找到第一個非空的 key4:
新 3 号 bucket 周遊完之後,回到了新 0 号 bucket。0 号 bucket 對應老的 0 号 bucket,經檢查,老 0 号 bucket 并未搬遷,是以對新 0 号 bucket 的周遊就改為周遊老 0 号 bucket。那是不是把老 0 号 bucket 中的所有 key 都取出來呢?
并沒有這麼簡單,回憶一下,老 0 号 bucket 在搬遷後将裂變成 2 個 bucket:新 0 号、新 2 号。而我們此時正在周遊的隻是新 0 号 bucket(注意,周遊都是周遊的
*bucket
指針,也就是所謂的新 buckets)。是以,我們隻會取出老 0 号 bucket 中那些在裂變之後,配置設定到新 0 号 bucket 中的那些 key。
是以,
lowbits == 00
的key1将進入周遊結果集:
key4—>key1
和之前的流程一樣,繼續周遊新 1 号 bucket,發現老 1 号 bucket 已經搬遷,隻用周遊新 1 号 bucket 中現有的元素就可以了。結果集變成:
key4—>key1—>key3
繼續周遊新 2 号 bucket,它來自老 0 号 bucket,是以需要在老 0 号 bucket 中那些會裂變到新 2 号 bucket 中的 key,也就是
lowbit == 01
的那些 key2。
這樣,周遊結果集變成:
key4—>key1—>key3—>key2
最後,繼續周遊到新 3 号 bucket 時,發現所有的 bucket 都已經周遊完畢,整個疊代過程執行完畢。
順便說一下,如果碰到 key 是 NaNs這種的,處理方式類似。核心還是要看它被分裂後具體落入哪個 bucket。隻不過隻用看它 top hash 的最低位。如果 top hash 的最低位是 0 ,配置設定到 X part;如果是 1 ,則配置設定到 Y part。據此決定是否取出 key,放到周遊結果集裡。
map 周遊的核心在于了解 2 倍擴容時,老 bucket 會分裂到 2 個新 bucket 中去。而周遊操作,會按照新 bucket 的序号順序進行,碰到老 bucket 未搬遷的情況時,要在老 bucket 中找到将來要搬遷到新 bucket 來的 key