1、parallel class
1.1、for方法
1.2、foreach方法
1.3、invoke方法
2、并發控制疑問?
2.1、使用lock鎖
2.2、使用plinq——用asparallel
2.3、使用plinq——用parallelenumerable
2.4、使用interlocked操作
2.5、使用parallel.for的有thread-local變量重載函數
性能比較
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<code>using</code> <code>system.threading.tasks; </code>
<code>class</code> <code>test</code>
<code>{</code>
<code> </code><code>static</code> <code>int</code> <code>n = 1000;</code>
<code> </code><code>static</code> <code>void</code> <code>testmethod()</code>
<code> </code><code>{</code>
<code> </code><code>// using a named method.</code>
<code> </code><code>parallel.for(0, n, method2);</code>
<code> </code><code>// using an anonymous method.</code>
<code> </code><code>parallel.for(0, n,</code><code>delegate</code><code>(</code><code>int</code> <code>i)</code>
<code> </code><code>{</code>
<code> </code><code>// do work.</code>
<code> </code><code>});</code>
<code> </code><code>// using a lambda expression.</code>
<code> </code><code>parallel.for(0, n, i =></code>
<code> </code><code>}</code>
<code> </code><code>static</code> <code>void</code> <code>method2(</code><code>int</code> <code>i)</code>
<code> </code><code>// do work.</code>
<code>}</code>
上面這個例子簡單易懂,上篇我們就是用的parallel.for,這裡就不解釋了。其實parallel類的方法主要分為下面三類:
for方法
foreach方法
invoke方法
在裡面執行的for循環可能并行地運作,它有12個重載。這12個重載中int32參數和int64參數的方法各為6個,下面以int32為例列出:
toexclusive)。另外body有兩個local狀态變量用于同一線程的疊代之間共享。localinit委托将在每個線程參與循環執行時調用,
并傳回這些線程初始的local狀态。這些初始狀态被傳遞給body,當它在每個線程上第一次調用時。然後,接下來body調用傳回一個可能的修改狀态值
且傳遞給下一次body調用。最終,最後一次在每個線程上的body調用傳回的一個狀态值傳遞給localfinally委托。每個線程執行在自己的
loacl 狀态上執行最後一個動作時,localfinally委托将被調用。這個委托可能在多個線程上并發執行,是以,你必須同步通路任何共享變量。
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
<code>using</code> <code>system;</code>
<code>using</code> <code>system.collections.generic;</code>
<code>using</code> <code>system.linq;</code>
<code>using</code> <code>system.text;</code>
<code>using</code> <code>system.threading;</code>
<code>using</code> <code>system.threading.tasks;</code>
<code>namespace</code> <code>consoleapplication2</code>
<code> </code><code>class</code> <code>program</code>
<code> </code><code>// demonstrated features:</code>
<code> </code><code>// cancellationtokensource</code>
<code> </code><code>// parallel.for()</code>
<code> </code><code>// paralleloptions</code>
<code> </code><code>// parallelloopresult</code>
<code> </code><code>// expected results:</code>
<code> </code><code>// an iteration for each argument value (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) is executed.</code>
<code> </code><code>// the order of execution of the iterations is undefined.</code>
<code> </code><code>// the iteration when i=2 cancels the loop.</code>
<code> </code><code>// some iterations may bail out or not start at all; because they are temporally executed in unpredictable order,</code>
<code> </code><code>// it is impossible to say which will start/complete and which won't.</code>
<code> </code><code>// at the end, an operationcancelledexception is surfaced.</code>
<code> </code><code>// documentation:</code>
<code> </code><code>static</code> <code>void</code> <code>main(</code><code>string</code><code>[] args)</code>
<code> </code><code>cancellationtokensource cancellationsource =</code><code>new</code> <code>cancellationtokensource();</code>
<code> </code><code>paralleloptions options =</code><code>new</code> <code>paralleloptions();</code>
<code> </code><code>options.cancellationtoken = cancellationsource.token;</code>
<code> </code><code>try</code>
<code> </code><code>{</code>
<code> </code><code>parallelloopresult loopresult = parallel.for(</code>
<code> </code><code>0,</code>
<code> </code><code>10,</code>
<code> </code><code>options,</code>
<code> </code><code>(i, loopstate) =></code>
<code> </code><code>{</code>
<code> </code><code>console.writeline(</code><code>"start thread={0}, i={1}"</code><code>, thread.currentthread.managedthreadid, i);</code>
<code> </code><code>// simulate a cancellation of the loop when i=2</code>
<code> </code><code>if</code> <code>(i == 2)</code>
<code> </code><code>{</code>
<code> </code><code>cancellationsource.cancel();</code>
<code> </code><code>}</code>
<code> </code><code>// simulates a long execution</code>
<code> </code><code>for</code> <code>(</code><code>int</code> <code>j = 0; j < 10; j++)</code>
<code> </code><code>thread.sleep(1 * 200);</code>
<code> </code><code>// check to see whether or not to continue</code>
<code> </code><code>if</code> <code>(loopstate.shouldexitcurrentiteration)</code><code>return</code><code>;</code>
<code> </code><code>console.writeline(</code><code>"finish thread={0}, i={1}"</code><code>, thread.currentthread.managedthreadid, i);</code>
<code> </code><code>}</code>
<code> </code><code>);</code>
<code> </code><code>if</code> <code>(loopresult.iscompleted)</code>
<code> </code><code>{</code>
<code> </code><code>console.writeline(</code><code>"all iterations completed successfully. this was not expected."</code><code>);</code>
<code> </code><code>}</code>
<code> </code><code>}</code>
<code> </code><code>// no exception is expected in this example, but if one is still thrown from a task,</code>
<code> </code><code>// it will be wrapped in aggregateexception and propagated to the main thread.</code>
<code> </code><code>catch</code> <code>(aggregateexception e)</code>
<code> </code><code>console.writeline(</code><code>"parallel.for has thrown an aggregateexception. this was not expected.\n{0}"</code><code>, e);</code>
<code> </code><code>// catching the cancellation exception</code>
<code> </code><code>catch</code> <code>(operationcanceledexception e)</code>
<code> </code><code>console.writeline(</code><code>"an iteration has triggered a cancellation. this was expected.\n{0}"</code><code>, e.tostring());</code>
<code> </code><code>}</code>
提供的每個動作可能并行地執行,它有2個重載。
例如下面代碼執行了三個操作(來自msdn):
<code> </code><code>static</code> <code>void</code> <code>main()</code>
<code> </code><code>parallel.invoke(</code>
<code> </code><code>basicaction, </code><code>// param #0 - static method</code>
<code> </code><code>() => </code><code>// param #1 - lambda expression</code>
<code> </code><code>console.writeline(</code><code>"method=beta, thread={0}"</code><code>, thread.currentthread.managedthreadid);</code>
<code> </code><code>},</code>
<code> </code><code>delegate</code><code>() </code><code>// param #2 - in-line delegate</code>
<code> </code><code>console.writeline(</code><code>"method=gamma, thread={0}"</code><code>, thread.currentthread.managedthreadid);</code>
<code> </code><code>// no exception is expected in this example, but if one is still thrown from a task,</code>
<code> </code><code>// it will be wrapped in aggregateexception and propagated to the main thread.</code>
<code> </code><code>console.writeline(</code><code>"an action has thrown an exception. this was unexpected.\n{0}"</code><code>, e.innerexception.tostring());</code>
<code> </code><code>static</code> <code>void</code> <code>basicaction()</code>
<code> </code><code>console.writeline(</code><code>"method=alpha, thread={0}"</code><code>, thread.currentthread.managedthreadid);</code>
有人提出以下疑問:“如果for裡面的東西,對于順序敏感的話,會不會有問題。并行處理的話,說到底應該是多線程。如果需要lock住什麼東西的話,應該怎麼做呢?例如這個例子不是對數組填充,是對檔案操作呢?對某個資源操作呢?”
<code> </code><code>{ </code>
<code> </code><code>int</code> <code>loops=0;</code>
<code> </code><code>while</code> <code>(loops <= 100)</code>
<code> </code><code>long</code> <code>sum = 0; </code>
<code> </code><code>parallel.for(1, 1001,</code><code>delegate</code><code>(</code><code>long</code> <code>i)</code>
<code> </code><code>sum += i;</code>
<code> </code><code>});</code>
<code> </code><code>system.console.writeline(sum);</code>
<code> </code><code>loops++;</code>
在上述代碼中,為了校驗正确性我進行了重複做了100次,得出如下結果:
我們知道500500才是正确的答案,這說明parallel.for不能保證對sum正确的并發執行,對此我們應該加上适當的控制,并借機來回答上面提出的如何加鎖的問題。下面有幾種方案可以解決這個問題:
這個我就不多解釋了,直接上代碼:
<code> </code><code>int</code> <code>loops = 0;</code>
<code> </code><code>object</code> <code>moniter =</code><code>new</code> <code>object</code><code>();</code>
<code> </code><code>long</code> <code>sum = 0;</code>
<code> </code><code>lock</code> <code>(moniter) { sum += i; }</code>
我們加上lock鎖之後就會得出正确的結果。
關于plinq,以後将會介紹到,這裡不會詳細介紹,感興趣的自行查閱資料。代碼如下:
<code> </code><code>long</code> <code>sum = 0; </code>
<code> </code><code>sum = enumerable.range(0, 1001).asparallel().sum();</code>
運作可以得到正确的結果。
這個也不多說,直接上代碼,因為關于plinq将在以後詳細介紹,感興趣的自行查閱資料。
<code> </code><code>sum = parallelenumerable.range(0, 1001).sum();</code>
運作同樣可以得到正确結果。
代碼如下:
<code> </code><code>interlocked.add(</code><code>ref</code> <code>sum, i);</code>
運作可以得到正确結果。
這個方法已經在1.2中介紹,這裡直接上代碼,代碼如下:
<code> </code><code>int</code> <code>sum = 0;</code>
<code> </code><code>parallel.for(0, 1001, () => 0, (i, state,subtotal) =></code>
<code> </code><code>subtotal += i;</code>
<code> </code><code>return</code> <code>subtotal;</code>
<code> </code><code>},</code>
<code> </code><code>partial => interlocked.add(</code><code>ref</code> <code>sum, partial));</code>
運作可得正确結果。
上面的解決方案那個比較好呢?請大家各抒己見!關于這個我已經測試了一下。
ps:感覺寫這篇的時候很累,思緒也很亂,不知大家對這篇還滿意(⊙_⊙)?有什麼地方需要改進,或者說不易了解,或者哪個地方錯了!