天天看點

《GO并發程式設計實戰》—— Concurrent Map

我們在本章前面的部分中對go語言提供的各種傳統同步工具和方法進行了逐一的介紹。在本節,我們将運用它們來構造一個并發安全的字典(map)類型。

我們已經知道,go語言提供的字典類型并不是并發安全的。是以,我們需要使用一些同步方法對它進行擴充。這看起來并不困難。我們隻要使用讀寫鎖将針對一個字典類型值的讀操作和寫操作保護起來就可以了。确實,讀寫鎖應該是我們首先想到的同步工具。不過,我們還不能确定隻使用它是否就足夠了。不管怎樣,讓我們先來編寫并發安全的字典類型的第一個版本。

我們先來确定并發安全的字典類型的行為。還記得嗎?依然,這需要聲明一個接口類型。我們在第4章帶領讀者編寫過orderedmap接口類型及其實作類型。我們可以借鑒orderedmap接口類型的聲明并編寫出需要在這裡聲明的接口類型concurrentmap。實際上,concurrentmap接口類型的方法集合應該是orderedmap接口類型的方法集合的一個子集。我們隻需從orderedmap中去除那些代表有序map特有行為的方法聲明即可。既然是這樣,我何不從這兩個自定義的字典接口類型中抽出一個公共接口呢?

這個公共的字典接口類型可以是這樣的:

<code>01</code>

<code>// 泛化的map的接口類型</code>

<code>02</code>

<code>03</code>

<code>type genericmap </code><code>interface</code> <code>{</code>

<code>04</code>

<code>05</code>

<code>// 擷取給定鍵值對應的元素值。若沒有對應元素值則傳回nil。</code>

<code>06</code>

<code>07</code>

<code>get(key </code><code>interface</code><code>{}) </code><code>interface</code><code>{}</code>

<code>08</code>

<code>09</code>

<code>// 添加鍵值對,并傳回與給定鍵值對應的舊的元素值。若沒有舊元素值則傳回(nil, true)。</code>

<code>10</code>

<code>11</code>

<code>put(key </code><code>interface</code><code>{}, elem </code><code>interface</code><code>{}) (</code><code>interface</code><code>{}, bool)</code>

<code>12</code>

<code>13</code>

<code>// 删除與給定鍵值對應的鍵值對,并傳回舊的元素值。若沒有舊元素值則傳回nil。</code>

<code>14</code>

<code>15</code>

<code>remove(key </code><code>interface</code><code>{}) </code><code>interface</code><code>{}</code>

<code>16</code>

<code>17</code>

<code>// 清除所有的鍵值對。</code>

<code>18</code>

<code>19</code>

<code>clear()</code>

<code>20</code>

<code>21</code>

<code>// 擷取鍵值對的數量。</code>

<code>22</code>

<code>23</code>

<code>len() </code><code>int</code>

<code>24</code>

<code>25</code>

<code>// 判斷是否包含給定的鍵值。</code>

<code>26</code>

<code>27</code>

<code>contains(key </code><code>interface</code><code>{}) bool</code>

<code>28</code>

<code>29</code>

<code>// 擷取已排序的鍵值所組成的切片值。</code>

<code>30</code>

<code>31</code>

<code>keys() []</code><code>interface</code><code>{}</code>

<code>32</code>

<code>33</code>

<code>// 擷取已排序的元素值所組成的切片值。</code>

<code>34</code>

<code>35</code>

<code>elems() []</code><code>interface</code><code>{}</code>

<code>36</code>

<code>37</code>

<code>// 擷取已包含的鍵值對所組成的字典值。</code>

<code>38</code>

<code>39</code>

<code>tomap() map[</code><code>interface</code><code>{}]</code><code>interface</code><code>{}</code>

<code>40</code>

<code>41</code>

<code>// 擷取鍵的類型。</code>

<code>42</code>

<code>43</code>

<code>keytype() reflect.type</code>

<code>44</code>

<code>45</code>

<code>// 擷取元素的類型。</code>

<code>46</code>

<code>47</code>

<code>elemtype() reflect.type</code>

<code>48</code>

<code>49</code>

<code>}</code>

然後,我們把這個名為genericmap的字典接口類型嵌入到orderedmap接口類型中,并去掉後者中的已在前者内聲明的那些方法。修改後的orderedmap接口類型如下:

<code>// 有序的map的接口類型。</code>

<code>type orderedmap </code><code>interface</code> <code>{</code>

<code>genericmap </code><code>// 泛化的map接口</code>

<code>// 擷取第一個鍵值。若無任何鍵值對則傳回nil。</code>

<code>firstkey() </code><code>interface</code><code>{}</code>

<code>// 擷取最後一個鍵值。若無任何鍵值對則傳回nil。</code>

<code>lastkey() </code><code>interface</code><code>{}</code>

<code>// 擷取由小于鍵值tokey的鍵值所對應的鍵值對組成的orderedmap類型值。</code>

<code>headmap(tokey </code><code>interface</code><code>{}) orderedmap</code>

<code>// 擷取由小于鍵值tokey且大于等于鍵值fromkey的鍵值所對應的鍵值對組成的orderedmap類型值。</code>

<code>submap(fromkey </code><code>interface</code><code>{}, tokey </code><code>interface</code><code>{}) orderedmap</code>

<code>// 擷取由大于等于鍵值fromkey的鍵值所對應的鍵值對組成的orderedmap類型值。</code>

<code>tailmap(fromkey </code><code>interface</code><code>{}) orderedmap</code>

我們要記得在修改完成後立即使用go test指令重新運作相關的功能測試,并以此確定這樣的重構沒有破壞任何現有的功能。

有了genericmap接口類型之後,我們的concurrentmap接口類型的聲明就相當簡單了。由于後者沒有任何特殊的行為,是以我們隻要簡單地将前者嵌入到後者的聲明中即可,就像這樣:

<code>1</code>

<code>type concurrentmap </code><code>interface</code> <code>{</code>

<code>2</code>

<code>3</code>

<code>genericmap</code>

<code>4</code>

<code>5</code>

下面我們來編寫該接口類型的實作類型。我們依然使用一個結構體類型來充當,并把它命名為myconcurrentmap。myconcurrentmap類型的基本結構如下:

<code>type myconcurrentmap struct {</code>

<code>m       map[</code><code>interface</code><code>{}]</code><code>interface</code><code>{}</code>

<code>keytype reflect.type</code>

<code>elemtype reflect.type</code>

<code>rwmutex sync.rwmutex</code>

有了編寫myorderedmap類型(還記得嗎?它的指針類型是orderedmap的實作類型)的經驗,寫出myconcurrentmap類型的基本結構也是一件比較容易的事情。可以看到,在基本需要之外,我們隻為myconcurrentmap類型加入了一個代表了讀寫鎖的rwmutex字段。此外,我們需要為myconcurrentmap類型添加的那些指針方法的實作代碼實際上也可以以myorderedmap類型中的相應方法為藍本。不過,在實作前者的過程中要注意合理運用同步方法以保證它們的并發安全性。下面,我們就開始編寫它們。

首先,我們來看put、remove和clear這幾個方法。它們都屬于寫操作,都會改變myconcurrentmap類型的m字段的值。

方法put的功能是向myconcurrentmap類型值添加一個鍵值對。那麼,我們在這個操作的前後一定要分别鎖定和解鎖rwmutex的寫鎖。put方法的實作如下:

<code>func (cmap *myconcurrentmap) put(key </code><code>interface</code><code>{}, elem </code><code>interface</code><code>{}) (</code><code>interface</code><code>{}, bool) {</code>

<code>if</code> <code>!cmap.isacceptablepair(key, elem) {</code>

<code>return</code> <code>nil, </code><code>false</code>

<code>cmap.rwmutex.lock()</code>

<code>defer cmap.rwmutex.unlock()</code>

<code>oldelem := cmap.m[key]</code>

<code>cmap.m[key] = elem</code>

<code>return</code> <code>oldelem, </code><code>true</code>

該實作中的isacceptablepair方法的功能是檢查參數值key和elem是否均不為nil且它們的類型是否均與目前值允許的鍵類型和元素類型一緻。在通過該檢查之後,我們就需要對rwmutex進行鎖定了。相應的,我們使用defer語句來保證對它的及時解鎖。與此類似,我們在remove和clear方法的實作中也應該加入相同的操作。

與這些代表着寫操作的方法相對應的,是代表讀操作的方法。在concurrentmap接口類型中,此類方法有get、len、contains、keys、elems和tomap。我們需要分别在這些方法的實作中加入對rwmutex的讀鎖的鎖定和解鎖操作。以get方法為例,我們應該這樣來實作它:

<code>func (cmap *myconcurrentmap) get(key </code><code>interface</code><code>{}) </code><code>interface</code><code>{} {</code>

<code>cmap.rwmutex.rlock()</code>

<code>defer cmap.rwmutex.runlock()</code>

<code>6</code>

<code>7</code>

<code>return</code> <code>cmap.m[key]</code>

<code>8</code>

<code>9</code>

這裡有兩點需要特别注意。

我們在使用寫鎖的時候,要注意方法間的調用關系。比如,一個代表寫操作的方法中調用了另一個代表寫操作的方法。顯然,我們在這兩個方法中都會用到讀寫鎖中的寫鎖。但如果使用不當,我們就會使前者被永遠鎖住。當然,對于代表寫操作的方法調用代表讀操作的方法的這種情況來說,也會是這樣。請看下面的示例:

<code>func (cmap *myconcurrentmap) remove(key </code><code>interface</code><code>{}) </code><code>interface</code><code>{} { cmap.rwmutex.lock() defer cmap.rwmutex.unlock() oldelem := cmap.get() delete(cmap.m, key) </code><code>return</code> <code>oldelem }</code>

可以看到,我們在remove方法中調用了get方法。并且,在這個調用之前,我們已經鎖定了rwmutex的寫鎖。然而,由前面的展示可知,我們在get方法的開始處對rwmutex的讀鎖進行了鎖定。由于這兩個鎖定操作之間的互斥性,是以我們一旦調用這個remove方法就會使目前goroutine永遠陷入阻塞。更嚴重的是,在這之後,其他goroutine在調用該*myconcurrentmap類型值的一些方法(涉及到其中的rwmutex字段的讀鎖或寫鎖)的時候也會立即被阻塞住。

我們應該避免這種情況的方式。這裡有兩種解決方案。第一種解決方案是,把remove方法中的oldelem := cmap.get()語句與在它前面的那兩條語句的位置互換,即變為:

<code>oldelem := cmap.get() cmap.rwmutex.lock() defer cmap.rwmutex.unlock()</code>

這樣可以保證在解鎖讀鎖之後才會去鎖定寫鎖。相比之下,第二種解決方案更加徹底一些,即:消除掉方法間的調用。也就是說,我們需要把oldelem := cmap.get()語句替換掉。在get方法中,展現其功能的語句是oldelem := cmap.m[key]。是以,我們把後者作為前者的替代品。若如此,那麼我們必須保證該語句出現在對寫鎖的鎖定操作之後。這樣,我們才能依然確定其在鎖的保護之下。實際上,通過這樣的修改,我們更新了remove方法中的被用來保護從m字段中擷取對應元素值的這一操作的鎖(由讀鎖更新至寫鎖)。

對于rwmutex字段的讀鎖來說,雖然鎖定它的操作之間不是互斥的,但是這些操作與相應的寫鎖的鎖定操作之間卻是互斥的。我們在上一條注意事項中已經說明了這一點。是以,為了最小化對寫操作的性能的影響,我們應該在鎖定讀鎖之後盡快的對其進行解鎖。也就是說,我們要在相關的方法中盡量減少持有讀鎖的時間。這需要我們綜合的考量。

依據前面的示例和注意事項說明,讀者可以試着實作remove、clear、len、contains、keys、elems和tomap方法。它們實作起來并不困難。注意,我們想讓*myconcurrentmap類型成為concurrentmap接口類型的實作類型。是以,這些方法都必須是myconcurrentmap類型的指針方法。這包括馬上要提及的那兩個方法。

方法keytype和elemtype的實作極其簡單。我們可以直接分别傳回myconcurrentmap類型的keytype字段和elemtype字段的值。這兩個字段的值應該是在myconcurrentmap類型值的使用方初始化它的時候給出的。

按照慣例,我們理應提供一個可以友善的建立和初始化并發安全的字典值的函數。我們把它命名為newconcurrentmap,其實作如下:

<code>func newconcurrentmap(keytype, elemtype reflect.type) concurrentmap {</code>

<code>return</code> <code>&amp;amp;amp;myconcurrentmap{</code>

<code>keytype: keytype,</code>

<code>elemtype: elemtype,</code>

<code>m:       make(map[</code><code>interface</code><code>{}]</code><code>interface</code><code>{})}</code>

這個函數并沒有什麼特别之處。由于myconcurrentmap類型的rwmutex字段并不需要額外的初始化,是以它并沒有出現在該函數中的那個複合字面量中。此外,為了遵循面向接口程式設計的原則,我們把該函數的結果的類型聲明為了concurrentmap,而不是它的實作類型*myconcurrentmap。如果将來我們編寫出了另一個concurrentmap接口類型的實作類型,那麼就應該考慮調整該函數的名稱。比如變更為newdefaultconcurrentmap,或者其他。

待讀者把還未實作的*myconcurrentmap類型的那幾個方法都補全之後(可以利用newconcurrentmap函數來檢驗這個類型是否是一個合格的concurrentmap接口的實作類型),我們就開始一起為該類型編寫功能和性能測試了。

參照我們之前為*myorderedmap類型編寫的功能測試,我們可以很快的照貓畫虎的建立出*myconcurrentmap類型的功能測試函數。這些函數和本小節前面講到的所有代碼都被放到了goc2p項目的basic/map1代碼包中。其中,接口類型concurrentmap的聲明和myconcurrentmap類型的基本結構及其所有的指針方法均在庫源碼檔案cmap.go中。是以,我們應該把對應的測試代碼放到cmap_test.go檔案中。

既然有了很好的參照,作者并不想再贅述*myconcurrentmap類型的功能測試函數了。我希望讀者能夠先獨立的編寫出來并通過go test指令的檢驗,然後再去與cmap_test.go檔案中的代碼對照。

另外,在myconcurrentmap類型及其指針方法的實作中,我們多處用到了讀寫鎖和反射api(聲明在reflect代碼包中的那些公開的程式實體)。它們執行的都是可能會對程式性能造成一定影響的操作。是以,針對*myconcurrentmap類型的性能測試(或稱基準測試)是很有必要的。這樣我們才能知道它的值在性能上到底與官方的字典類型有怎樣的差别。

我們在測試源碼檔案cmap_test.go檔案中聲明兩個基準測試函數——benchmarkconcurrentmap和benchmarkmap。顧名思義,這兩個函數是分别被用來測試*myconcurrentmap類型和go語言官方的字典類型的值的性能的。

在benchmarkconcurrentmap函數中,我們執行這樣一個流程。

(1) 初始化一個*myconcurrentmap類型的值,同時設定鍵類型和元素類型均為int32類型。

(2) 執行疊代次數預先給定(即該函數的*testing.b類型的參數b的字段n的值)的循環。在單次疊代中,我們向字典類型值添加一個鍵值對,然後再試圖從該值中擷取與目前鍵值對應的元素值。

(3) 列印出一行提示資訊,包含該值的鍵類型、元素類型以及長度等内容。

下面是該函數的實作:

<code>func benchmarkconcurrentmap(b *testing.b) {</code>

<code>keytype := reflect.typeof(int32(</code><code>2</code><code>))</code>

<code>elemtype := keytype</code>

<code>cmap := newconcurrentmap(keytype, elemtype)</code>

<code>var key, elem int32</code>

<code>fmt.printf(&amp;quot;n=%d.\n&amp;quot;, b.n)</code>

<code>b.resettimer()</code>

<code>for</code> <code>i := </code><code>0</code><code>; i &amp;amp;lt; b.n; i++ {</code>

<code>b.stoptimer()</code>

<code>seed := int32(i)</code>

<code>key = seed</code>

<code>elem = seed &amp;amp;lt;&amp;amp;lt; </code><code>10</code>

<code>b.starttimer()</code>

<code>cmap.put(key, elem)</code>

<code>_ = cmap.get(key)</code>

<code>b.setbytes(</code><code>8</code><code>)</code>

<code>ml := cmap.len()</code>

<code>maptype := fmt.sprintf(&amp;quot;concurrentmap&amp;amp;lt;%s, %s&amp;amp;gt;&amp;quot;,</code>

<code>keytype.kind().string(), elemtype.kind().string())</code>

<code>b.logf(&amp;quot;the length of % value is %d.\n&amp;quot;, maptype, ml)</code>

<code>50</code>

<code>51</code>

在這段代碼中,我們用到了參數b的幾個方法。我們在第5章講基準測試的時候說明過它們的功用。這裡再簡單回顧一下。b.resettimer方法的功能是将針對該函數的本次執行的計時器歸零。而b.starttimer方法和b.stoptimer方法的功能則分别是啟動和停止這個計時器。在該函數體中,我們使用這三個方法忽略掉一些無關緊要的語句的執行時間。更具體的講,我們隻對for語句的for子句及其代碼塊中的cmap.put(key, elem)語句和_ = cmap.get(key)語句,以及ml := cmap.len()語句的執行時間進行計時。注意,隻要它們的耗時不超過1秒或由go test指令的benchtime标記給定的時間,那麼測試運作程式就會嘗試着多次執行該函數并在每次執行前增加b.n的值。是以,我們去掉無關語句的執行耗時也意味着會讓benchmarkconcurrentmap函數被執行更多次。

除此之外,我們還用到了b.setbytes方法。它的作用是記錄在單次操作中被處理的位元組的數量。在這裡,我們每次記錄一個鍵值對所用的位元組數量。由于鍵和元素的類型都是int32類型的,是以它們共會用掉8個位元組。

在編寫完成benchmarkconcurrentmap函數之後,我們便可以如法炮制針對go官方的字典類型的基準測試函數benchmarkmap了。請注意,為了公平起見,我們在初始化這個字典類型值的時候也要把它的鍵類型和元素類型都設定為interface{},就像這樣:

<code>imap := make(map[</code><code>interface</code><code>{}]</code><code>interface</code><code>{})</code>

但是,在為其添加鍵值對的時候要讓鍵和元素值的類型均為int32類型。

在一切準備妥當之後,我們在相應目錄下使用指令

go test -bench=”.” -run=”^$” -benchtime=1s -v

運作goc2p項目的basic/map1代碼包中的基準測試。

稍等片刻,标準輸出上會出現如下内容:

<code>pass</code>

<code>benchmarkconcurrentmap n=</code><code>1</code><code>.</code>

<code>n=</code><code>100</code><code>.</code>

<code>n=</code><code>10000</code><code>.</code>

<code>n=</code><code>1000000</code><code>.</code>

<code>1000000</code>             <code>1612</code> <code>ns/op           </code><code>4.96</code> <code>mb/s</code>

<code>--- bench: benchmarkconcurrentmap</code>

<code>cmap_test.go:</code><code>240</code><code>: the length of concurrentmap&amp;amp;lt;int32, int32&amp;amp;gt;alue is </code><code>1</code><code>.</code>

<code>cmap_test.go:</code><code>240</code><code>: the length of concurrentmap&amp;amp;lt;int32, int32&amp;amp;gt;alue is </code><code>100</code><code>.</code>

<code>cmap_test.go:</code><code>240</code><code>: the length of concurrentmap&amp;amp;lt;int32, int32&amp;amp;gt;alue is </code><code>10000</code><code>.</code>

<code>cmap_test.go:</code><code>240</code><code>: the length of concurrentmap&amp;amp;lt;int32, int32&amp;amp;gt;alue is </code><code>1000000</code><code>.</code>

<code>benchmarkmap   n=</code><code>1</code><code>.</code>

<code>n=</code><code>2000000</code><code>.</code>

<code>2000000</code>               <code>856</code> <code>ns/op           </code><code>9.35</code> <code>mb/s</code>

<code>--- bench: benchmarkmap</code>

<code>cmap_test.go:</code><code>268</code><code>: the length of map&amp;amp;lt;int32, int32&amp;amp;gt; value is </code><code>1</code><code>.</code>

<code>cmap_test.go:</code><code>268</code><code>: the length of map&amp;amp;lt;int32, int32&amp;amp;gt; value is </code><code>100</code><code>.</code>

<code>cmap_test.go:</code><code>268</code><code>: the length of map&amp;amp;lt;int32, int32&amp;amp;gt; value is </code><code>10000</code><code>.</code>

<code>cmap_test.go:</code><code>268</code><code>: the length of map&amp;amp;lt;int32, int32&amp;amp;gt; value is </code><code>1000000</code><code>.</code>

<code>cmap_test.go:</code><code>268</code><code>: the length of map&amp;amp;lt;int32, int32&amp;amp;gt; value is </code><code>2000000</code><code>.</code>

<code>ok     basic/map1     </code><code>258</code><code>.327s</code>

我們看到,測試運作程式執行benchmarkconcurrentmap函數的次數是4,而執行benchmarkmap函數的次數是5。這從以“n=”為起始的輸出内容和測試日志的行數上都可以看得出來。由我們前面提到的測試運作程式多次執行基準測試函數的前提條件已經可知,go語言提供的字典類型的值的性能要比我們自行擴充的并發安全的*myconcurrentmap類型的值的性能好。具體的性能差距可以參看測試輸出中的那兩行代表了測試細節的内容,即:

前者代表針對*myconcurrentmap類型值的測試細節。測試運作程式在1秒鐘之内最多可以執行相關操作(包括添加鍵值對、根據鍵值擷取元素值和擷取字典類型值的長度)的次數為一百萬,平均每次執行的耗時為1612納秒。并且,根據我們在benchmarkconcurrentmap函數中的設定,它每秒可以處理4.86兆位元組的資料。

另一方面,go語言方法的字典類型的值的測試細節是這樣的:測試運作程式在1秒鐘之内最多可以執行相關操作的次數為兩百萬,平均每次執行的耗時為856納秒,根據benchmarkmap函數中的設定,它每秒可以處理9.35兆位元組的資料。

從上述測試細節可以看出,前者在性能上要比後者差,且差距将近一倍。這樣的差距幾乎都是由*myconcurrentmap類型及其方法中使用的讀寫鎖造成的。

由此,我們也印證了,同步工具在為程式的并發安全提供支援的同時也會對其性能造成了不可忽視的損耗。這也使我們認識到:在使用同步工具的時候應該仔細斟酌并盡量平衡各個方面的名額,以使其無論是在功能上還是在性能上都能達到我們的要求。

順便提一句,go語言未對自定義泛型提供支援,以至于我們在編寫此類擴充的時候并不是那麼友善。有時候,我們不得不使用反射api。但是,衆所周知,它們對程式性能的負面影響也是不可小觑的。是以,我們應該盡量減少對它們的使用。

繼續閱讀