天天看点

.NET(C#) Internals: 以一个数组填充的例子初步了解.NET 4.0中的并行(二)

1、parallel class

1.1、for方法

1.2、foreach方法

1.3、invoke方法

2、并发控制疑问?

2.1、使用lock锁

2.2、使用plinq——用asparallel

2.3、使用plinq——用parallelenumerable

2.4、使用interlocked操作

2.5、使用parallel.for的有thread-local变量重载函数

性能比较

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

<code>using</code> <code>system.threading.tasks;  </code>

<code>class</code> <code>test</code>

<code>{</code>

<code>    </code><code>static</code> <code>int</code> <code>n = 1000;</code>

<code>    </code><code>static</code> <code>void</code> <code>testmethod()</code>

<code>    </code><code>{</code>

<code>        </code><code>// using a named method.</code>

<code>        </code><code>parallel.for(0, n, method2);</code>

<code>        </code><code>// using an anonymous method.</code>

<code>        </code><code>parallel.for(0, n,</code><code>delegate</code><code>(</code><code>int</code> <code>i)</code>

<code>        </code><code>{</code>

<code>            </code><code>// do work.</code>

<code>        </code><code>});</code>

<code>        </code><code>// using a lambda expression.</code>

<code>        </code><code>parallel.for(0, n, i =&gt;</code>

<code>    </code><code>}</code>

<code>    </code><code>static</code> <code>void</code> <code>method2(</code><code>int</code> <code>i)</code>

<code>        </code><code>// do work.</code>

<code>}</code>

上面这个例子简单易懂,上篇我们就是用的parallel.for,这里就不解释了。其实parallel类的方法主要分为下面三类:

for方法

foreach方法

invoke方法

在里面执行的for循环可能并行地运行,它有12个重载。这12个重载中int32参数和int64参数的方法各为6个,下面以int32为例列出:

toexclusive)。另外body有两个local状态变量用于同一线程的迭代之间共享。localinit委托将在每个线程参与循环执行时调用,

并返回这些线程初始的local状态。这些初始状态被传递给body,当它在每个线程上第一次调用时。然后,接下来body调用返回一个可能的修改状态值

且传递给下一次body调用。最终,最后一次在每个线程上的body调用返回的一个状态值传递给localfinally委托。每个线程执行在自己的

loacl 状态上执行最后一个动作时,localfinally委托将被调用。这个委托可能在多个线程上并发执行,因此,你必须同步访问任何共享变量。

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

<code>using</code> <code>system;</code>

<code>using</code> <code>system.collections.generic;</code>

<code>using</code> <code>system.linq;</code>

<code>using</code> <code>system.text;</code>

<code>using</code> <code>system.threading;</code>

<code>using</code> <code>system.threading.tasks;</code>

<code>namespace</code> <code>consoleapplication2</code>

<code>    </code><code>class</code> <code>program</code>

<code>        </code><code>// demonstrated features:</code>

<code>        </code><code>//        cancellationtokensource</code>

<code>        </code><code>//         parallel.for()</code>

<code>        </code><code>//        paralleloptions</code>

<code>        </code><code>//        parallelloopresult</code>

<code>        </code><code>// expected results:</code>

<code>        </code><code>//         an iteration for each argument value (0, 1, 2, 3, 4, 5, 6, 7, 8, 9) is executed.</code>

<code>        </code><code>//        the order of execution of the iterations is undefined.</code>

<code>        </code><code>//        the iteration when i=2 cancels the loop.</code>

<code>        </code><code>//        some iterations may bail out or not start at all; because they are temporally executed in unpredictable order,</code>

<code>        </code><code>//          it is impossible to say which will start/complete and which won't.</code>

<code>        </code><code>//        at the end, an operationcancelledexception is surfaced.</code>

<code>        </code><code>// documentation:</code>

<code>        </code><code>static</code> <code>void</code> <code>main(</code><code>string</code><code>[] args)</code>

<code>            </code><code>cancellationtokensource cancellationsource =</code><code>new</code> <code>cancellationtokensource();</code>

<code>            </code><code>paralleloptions options =</code><code>new</code> <code>paralleloptions();</code>

<code>            </code><code>options.cancellationtoken = cancellationsource.token;</code>

<code>            </code><code>try</code>

<code>            </code><code>{</code>

<code>                </code><code>parallelloopresult loopresult = parallel.for(</code>

<code>                    </code><code>0,</code>

<code>                    </code><code>10,</code>

<code>                    </code><code>options,</code>

<code>                    </code><code>(i, loopstate) =&gt;</code>

<code>                    </code><code>{</code>

<code>                        </code><code>console.writeline(</code><code>"start thread={0}, i={1}"</code><code>, thread.currentthread.managedthreadid, i);</code>

<code>                        </code><code>// simulate a cancellation of the loop when i=2</code>

<code>                        </code><code>if</code> <code>(i == 2)</code>

<code>                        </code><code>{</code>

<code>                            </code><code>cancellationsource.cancel();</code>

<code>                        </code><code>}</code>

<code>                        </code><code>// simulates a long execution</code>

<code>                        </code><code>for</code> <code>(</code><code>int</code> <code>j = 0; j &lt; 10; j++)</code>

<code>                            </code><code>thread.sleep(1 * 200);</code>

<code>                            </code><code>// check to see whether or not to continue</code>

<code>                            </code><code>if</code> <code>(loopstate.shouldexitcurrentiteration)</code><code>return</code><code>;</code>

<code>                        </code><code>console.writeline(</code><code>"finish thread={0}, i={1}"</code><code>, thread.currentthread.managedthreadid, i);</code>

<code>                    </code><code>}</code>

<code>                </code><code>);</code>

<code>                </code><code>if</code> <code>(loopresult.iscompleted)</code>

<code>                </code><code>{</code>

<code>                    </code><code>console.writeline(</code><code>"all iterations completed successfully. this was not expected."</code><code>);</code>

<code>                </code><code>}</code>

<code>            </code><code>}</code>

<code>                </code><code>// no exception is expected in this example, but if one is still thrown from a task,</code>

<code>                </code><code>// it will be wrapped in aggregateexception and propagated to the main thread.</code>

<code>            </code><code>catch</code> <code>(aggregateexception e)</code>

<code>                </code><code>console.writeline(</code><code>"parallel.for has thrown an aggregateexception. this was not expected.\n{0}"</code><code>, e);</code>

<code>                </code><code>// catching the cancellation exception</code>

<code>            </code><code>catch</code> <code>(operationcanceledexception e)</code>

<code>                </code><code>console.writeline(</code><code>"an iteration has triggered a cancellation. this was expected.\n{0}"</code><code>, e.tostring());</code>

<code>        </code><code>}</code>

提供的每个动作可能并行地执行,它有2个重载。

例如下面代码执行了三个操作(来自msdn):

<code>        </code><code>static</code> <code>void</code> <code>main()</code>

<code>                </code><code>parallel.invoke(</code>

<code>                    </code><code>basicaction,   </code><code>// param #0 - static method</code>

<code>                    </code><code>() =&gt;           </code><code>// param #1 - lambda expression</code>

<code>                        </code><code>console.writeline(</code><code>"method=beta, thread={0}"</code><code>, thread.currentthread.managedthreadid);</code>

<code>                    </code><code>},</code>

<code>                    </code><code>delegate</code><code>()       </code><code>// param #2 - in-line delegate</code>

<code>                        </code><code>console.writeline(</code><code>"method=gamma, thread={0}"</code><code>, thread.currentthread.managedthreadid);</code>

<code>            </code><code>// no exception is expected in this example, but if one is still thrown from a task,</code>

<code>            </code><code>// it will be wrapped in aggregateexception and propagated to the main thread.</code>

<code>                </code><code>console.writeline(</code><code>"an action has thrown an exception. this was unexpected.\n{0}"</code><code>, e.innerexception.tostring());</code>

<code>        </code><code>static</code> <code>void</code> <code>basicaction()</code>

<code>            </code><code>console.writeline(</code><code>"method=alpha, thread={0}"</code><code>, thread.currentthread.managedthreadid);</code>

有人提出以下疑问:“如果for里面的东西,对于顺序敏感的话,会不会有问题。并行处理的话,说到底应该是多线程。如果需要lock住什么东西的话,应该怎么做呢?例如这个例子不是对数组填充,是对文件操作呢?对某个资源操作呢?”

<code>    </code><code>{       </code>

<code>            </code><code>int</code> <code>loops=0;</code>

<code>            </code><code>while</code> <code>(loops &lt;= 100)</code>

<code>                </code><code>long</code> <code>sum = 0;               </code>

<code>                </code><code>parallel.for(1, 1001,</code><code>delegate</code><code>(</code><code>long</code> <code>i)</code>

<code>                    </code><code>sum += i;</code>

<code>                </code><code>});</code>

<code>                </code><code>system.console.writeline(sum);</code>

<code>                </code><code>loops++;</code>

在上述代码中,为了校验正确性我进行了重复做了100次,得出如下结果:

.NET(C#) Internals: 以一个数组填充的例子初步了解.NET 4.0中的并行(二)

我们知道500500才是正确的答案,这说明parallel.for不能保证对sum正确的并发执行,对此我们应该加上适当的控制,并借机来回答上面提出的如何加锁的问题。下面有几种方案可以解决这个问题:

这个我就不多解释了,直接上代码:

<code>            </code><code>int</code> <code>loops = 0;</code>

<code>            </code><code>object</code> <code>moniter =</code><code>new</code> <code>object</code><code>();</code>

<code>                </code><code>long</code> <code>sum = 0;</code>

<code>                    </code><code>lock</code> <code>(moniter) { sum += i; }</code>

我们加上lock锁之后就会得出正确的结果。

关于plinq,以后将会介绍到,这里不会详细介绍,感兴趣的自行查阅资料。代码如下:

<code>                </code><code>long</code> <code>sum = 0;    </code>

<code>                </code><code>sum = enumerable.range(0, 1001).asparallel().sum();</code>

运行可以得到正确的结果。

这个也不多说,直接上代码,因为关于plinq将在以后详细介绍,感兴趣的自行查阅资料。

<code>                </code><code>sum = parallelenumerable.range(0, 1001).sum();</code>

运行同样可以得到正确结果。

代码如下:

<code>                    </code><code>interlocked.add(</code><code>ref</code> <code>sum, i);</code>

运行可以得到正确结果。

这个方法已经在1.2中介绍,这里直接上代码,代码如下:

<code>                </code><code>int</code> <code>sum = 0;</code>

<code>                </code><code>parallel.for(0, 1001, () =&gt; 0, (i, state,subtotal) =&gt;</code>

<code>                    </code><code>subtotal += i;</code>

<code>                    </code><code>return</code> <code>subtotal;</code>

<code>                </code><code>},</code>

<code>                </code><code>partial =&gt; interlocked.add(</code><code>ref</code> <code>sum, partial));</code>

运行可得正确结果。

上面的解决方案那个比较好呢?请大家各抒己见!关于这个我已经测试了一下。

ps:感觉写这篇的时候很累,思绪也很乱,不知大家对这篇还满意(⊙_⊙)?有什么地方需要改进,或者说不易理解,或者哪个地方错了!