天天看點

【C++11 并發程式設計教程 - Part 3 : 鎖的進階與條件變量(bill譯)】

C++11 并發程式設計教程 - Part 3 : 鎖的進階與條件變量

注:文中凡遇通用的術語及行話,均不予以翻譯。譯文有不當之處還望悉心指正。

上一篇文章中我們學習了如何使用互斥量來解決一些線程同步問題。這一講我們将進一步讨論互斥量的話題,并向大家介紹 C++11 并發庫中的另一種同步機制 —— 條件變量。

遞歸鎖

考慮下面這個簡單類:

1

2

3

4

5

6

7

8

9

10

11

12

13

<code>struct</code> <code>Complex {</code>

<code>    </code><code>std::mutex mutex;</code>

<code>    </code><code>int</code> <code>i;</code>

<code>    </code><code>Complex() : i(0) {}</code>

<code>    </code><code>void</code> <code>mul(</code><code>int</code> <code>x){</code>

<code>        </code><code>std::lock_guard&lt;std::mutex&gt; lock(mutex);</code>

<code>        </code><code>i *= x;</code>

<code>    </code><code>}</code>

<code>    </code><code>void</code> <code>div</code><code>(</code><code>int</code> <code>x){</code>

<code>        </code><code>i /= x;</code>

<code>};</code>

現在你想添加一個操作以便無誤地一并執行上述兩項操作,于是你添加了一個函數:

<code>void</code> <code>both(</code><code>int</code> <code>x, </code><code>int</code> <code>y){</code>

<code>    </code><code>std::lock_guard&lt;std::mutex&gt; lock(mutex);</code>

<code>    </code><code>mul(x);</code>

<code>    </code><code>div</code><code>(y);</code>

<code>}</code>

讓我們來測試這個函數:

<code>int</code> <code>main(){</code>

<code>    </code><code>Complex complex;</code>

<code>    </code><code>complex.both(32, 23);</code>

<code>    </code><code>return</code> <code>0;</code>

如果你運作上述測試,你會發現這個程式将永遠不會結束。原因很簡單,在 both() 函數中,線程将申請鎖,然後調用mul() 函數,在這個函數[譯注:指 mul() ]中,線程将再次申請該鎖,但該鎖已經被鎖住了。這是死鎖的一種情況。預設情況下,一個線程不能重複申請同一個互斥量上的鎖。

這裡有一個簡單的解決辦法:std::recursive_mutex 。這個互斥量能夠被同一個線程重複上鎖,下面就是 Complex 結構體的正确實作:

14

15

16

17

18

<code>    </code><code>std::recursive_mutex mutex;</code>

<code>        </code><code>std::lock_guard&lt;std::recursive_mutex&gt; lock(mutex);</code>

<code>    </code><code>void</code> <code>both(</code><code>int</code> <code>x, </code><code>int</code> <code>y){</code>

<code>        </code><code>mul(x);</code>

<code>        </code><code>div</code><code>(y);</code>

這樣一來,程式就能正常的結束了。

計時鎖

有些時候,你并不想某個線程永無止境地去等待某個互斥量上的鎖。譬如說你的線程希望在等待某個鎖的時候做點其他的事情。為了達到這一目的,标準庫提供了一套解決方案:std::timed_mutex 和 std::recursive_timed_mutex (如果你的鎖需要具備遞歸性的話)。他們具備與 std::mutex 相同的函數:lock() 和 unlock(),同時還提供了兩個新的函數:try_lock_for() 和 try_lock_until() 。

第一個函數,也是最有用的一個,它允許你設定一個逾時參數,一旦逾時,就算目前還沒有獲得鎖,函數也會自動傳回。該函數在獲得鎖之後傳回 true,否則 false。下面我們來看一個簡單示例:

19

20

21

22

23

24

<code>std::timed_mutex mutex;</code>

<code>void</code> <code>work(){</code>

<code>    </code><code>std::chrono::milliseconds timeout(100);</code>

<code>    </code><code>while</code><code>(</code><code>true</code><code>){</code>

<code>        </code><code>if</code><code>(mutex.try_lock_for(timeout)){</code>

<code>            </code><code>std::cout &lt;&lt; std::this_thread::get_id() &lt;&lt; </code><code>": do work with the mutex"</code> <code>&lt;&lt; std::endl;</code>

<code>            </code><code>std::chrono::milliseconds sleepDuration(250);</code>

<code>            </code><code>std::this_thread::sleep_for(sleepDuration);</code>

<code>            </code><code>mutex.unlock();</code>

<code>        </code><code>} </code><code>else</code> <code>{</code>

<code>            </code><code>std::cout &lt;&lt; std::this_thread::get_id() &lt;&lt; </code><code>": do work without mutex"</code> <code>&lt;&lt; std::endl;</code>

<code>            </code><code>std::chrono::milliseconds sleepDuration(100);</code>

<code>        </code><code>}</code>

<code>    </code><code>std::</code><code>thread</code> <code>t1(work);</code>

<code>    </code><code>std::</code><code>thread</code> <code>t2(work);</code>

<code>    </code><code>t1.join();</code>

<code>    </code><code>t2.join();</code>

(這個示例在實踐中是毫無用處的)

值得注意的是示例中時間間隔聲明:std::chrono::milliseconds 。它同樣是 C++11 的新特性。你可以得到多種時間機關:納秒、微妙、毫秒、秒、分以及小時。我們使用上述某個時間機關以設定 try_lock_for() 函數的逾時參數。我們同樣可以使用它們并通過 std::this_thread::sleep_for() 函數來設定線程的睡眠時間。示例中剩下的代碼就沒什麼令人激動的了,隻是一些使得結果可見的列印語句。注意:這段示例永遠不會結束,你需要自己把他 kill 掉。

Call Once

有時候你希望某個函數在多線程環境中隻被執行一次。譬如一個由兩部分組成的函數,第一部分隻能被執行一次,而第二部分則在該函數每次被調用時都應該被執行。我們可以使用 std::call_once 函數輕而易舉地實作這一功能。下面是針對這一機制的示例:

<code>std::once_flag flag;</code>

<code>void</code> <code>do_something(){</code>

<code>    </code><code>std::call_once(flag, [](){std::cout &lt;&lt; </code><code>"Called once"</code> <code>&lt;&lt; std::endl;});</code>

<code>    </code><code>std::cout &lt;&lt; </code><code>"Called each time"</code> <code>&lt;&lt; std::endl;</code>

<code>    </code><code>std::</code><code>thread</code> <code>t1(do_something);</code>

<code>    </code><code>std::</code><code>thread</code> <code>t2(do_something);</code>

<code>    </code><code>std::</code><code>thread</code> <code>t3(do_something);</code>

<code>    </code><code>std::</code><code>thread</code> <code>t4(do_something);</code>

<code>    </code><code>t3.join();</code>

<code>    </code><code>t4.join();</code>

每一個 std::call_once 函數都有一個 std::once_flag 變量與之比對。在上例中我使用了 Lambda 表達式[譯注:此處意譯]來作為隻被執行一次的代碼,而使用函數指針以及 std::function 對象也同樣可行。

條件變量

一個非常好的例子就是有界緩沖區。它是一個環形緩沖,擁有确定的容量、起始位置以及結束位置。下面就是使用條件變量實作的一個有界緩沖區。

25

26

27

28

29

30

31

32

33

<code>struct</code> <code>BoundedBuffer {</code>

<code>    </code><code>int</code><code>* buffer;</code>

<code>    </code><code>int</code> <code>capacity;</code>

<code>    </code><code>int</code> <code>front;</code>

<code>    </code><code>int</code> <code>rear;</code>

<code>    </code><code>int</code> <code>count;</code>

<code>    </code><code>std::mutex lock;</code>

<code>    </code><code>std::condition_variable not_full;</code>

<code>    </code><code>std::condition_variable not_empty;</code>

<code>    </code><code>BoundedBuffer(</code><code>int</code> <code>capacity) : capacity(capacity), front(0), rear(0), count(0) {</code>

<code>        </code><code>buffer = </code><code>new</code> <code>int</code><code>[capacity];</code>

<code>    </code><code>~BoundedBuffer(){</code>

<code>        </code><code>delete</code><code>[] buffer;</code>

<code>    </code><code>void</code> <code>deposit(</code><code>int</code> <code>data){</code>

<code>        </code><code>std::unique_lock&lt;std::mutex&gt; l(lock);</code>

<code>        </code><code>not_full.wait(l, [&amp;count, &amp;capacity](){</code><code>return</code> <code>count != capacity; });</code>

<code>        </code><code>buffer[rear] = data;</code>

<code>        </code><code>rear = (rear + 1) % capacity;</code>

<code>        </code><code>++count;</code>

<code>        </code><code>not_empty.notify_one();</code>

<code>    </code><code>int</code> <code>fetch(){</code>

<code>        </code><code>not_empty.wait(l, [&amp;count](){</code><code>return</code> <code>count != 0; });</code>

<code>        </code><code>int</code> <code>result = buffer[front];</code>

<code>        </code><code>front = (front + 1) % capacity;</code>

<code>        </code><code>--count;</code>

<code>        </code><code>not_full.notify_one();</code>

<code>        </code><code>return</code> <code>result;</code>

類中互斥量由 std::unique_lock 接管,它是用于管理鎖的 Wrapper,是使用條件變量的必要條件。我們使用 notify_one() 函數喚醒等待在條件變量上的某個線程。而函數 wait() 就有些特别了,其第一個參數是我們的std::unique_lock,而第二個參數是一個斷言。要想持續等待的話,這個斷言就必須傳回 false,這就有點像 while(!predicate()) { cv.wait(l); } 的形式。上例剩下的部分就沒什麼好說的了。

我們可以使用上例的緩沖區解決“多消費者/多生産者”問題。這是一個非常普遍的同步問題,許多線程(消費者)在等待由其他一些線程(生産者)生産的資料。下面就是一個使用這個緩沖區的例子:

<code>void</code> <code>consumer(</code><code>int</code> <code>id, BoundedBuffer&amp; buffer){</code>

<code>    </code><code>for</code><code>(</code><code>int</code> <code>i = 0; i &lt; 50; ++i){</code>

<code>        </code><code>int</code> <code>value = buffer.fetch();</code>

<code>        </code><code>std::cout &lt;&lt; </code><code>"Consumer "</code> <code>&lt;&lt; id &lt;&lt; </code><code>" fetched "</code> <code>&lt;&lt; value &lt;&lt; std::endl;</code>

<code>        </code><code>std::this_thread::sleep_for(std::chrono::milliseconds(250));</code>

<code>void</code> <code>producer(</code><code>int</code> <code>id, BoundedBuffer&amp; buffer){</code>

<code>    </code><code>for</code><code>(</code><code>int</code> <code>i = 0; i &lt; 75; ++i){</code>

<code>        </code><code>buffer.deposit(i);</code>

<code>        </code><code>std::cout &lt;&lt; </code><code>"Produced "</code> <code>&lt;&lt; id &lt;&lt; </code><code>" produced "</code> <code>&lt;&lt; i &lt;&lt; std::endl;</code>

<code>        </code><code>std::this_thread::sleep_for(std::chrono::milliseconds(100));</code>

<code>    </code><code>BoundedBuffer buffer(200);</code>

<code>    </code><code>std::</code><code>thread</code> <code>c1(consumer, 0, std::ref(buffer));</code>

<code>    </code><code>std::</code><code>thread</code> <code>c2(consumer, 1, std::ref(buffer));</code>

<code>    </code><code>std::</code><code>thread</code> <code>c3(consumer, 2, std::ref(buffer));</code>

<code>    </code><code>std::</code><code>thread</code> <code>p1(producer, 0, std::ref(buffer));</code>

<code>    </code><code>std::</code><code>thread</code> <code>p2(producer, 1, std::ref(buffer));</code>

<code>    </code><code>c1.join();</code>

<code>    </code><code>c2.join();</code>

<code>    </code><code>c3.join();</code>

<code>    </code><code>p1.join();</code>

<code>    </code><code>p2.join();</code>

三個消費者線程和兩個生産者線程被建立後就不斷地對緩沖區進行查詢。值得關注的是例子中使用 std::ref 來傳遞緩沖區的引用,以免造成對緩沖區的拷貝。

總結

這一節我們講到了許多東西,首先,我們看到如何使用遞歸鎖實作某個線程對同一鎖的多次加鎖。接下來知道了如何在加鎖時設定一個逾時屬性。然後我們學習了一種調用某個函數有且隻有一次的方法。最後我們使用條件變量解決了“多生産者/多消費者”同步問題。

     本文轉自Bill_Hoo 51CTO部落格,原文連結:http://blog.51cto.com/billhoo/1296334,如需轉載請自行聯系原作者

繼續閱讀