天天看點

.Net并行程式設計(一)-TPL之資料并行前言應用場景任務并行庫-資料并行源碼位址總結

前言

許多個人計算機和工作站都有多個CPU核心,可以同時執行多個線程。利用硬體的特性,使用并行化代碼以在多個處理器之間配置設定工作。

.Net并行程式設計(一)-TPL之資料并行前言應用場景任務并行庫-資料并行源碼位址總結

應用場景

  • 檔案批量上傳
并行上傳單個檔案。也可以把一個檔案拆成幾段分開上傳,加快上傳速度。
  • 資料分批計算
如幾百萬資料可以拆成許多無關聯的部分,并行計算處理。最後聚合。
  • 資料推送
也是需要将資料拆解後,并行推送。

任務并行庫-資料并行

如果在一個循環内在每次疊代隻執行少量工作或者它沒有運作多次疊代,那麼并行化的開銷可能會導緻代碼運作的更慢。使用并行之前,應該對線程(鎖,死鎖,競争條件)應該有基本的了解。

Parallel.For

/// <summary>
        /// 正常循環
        /// </summary>
        public void FormalDirRun()
        {
            long totalSize = 0;
            var dir = @"E:\LearnWall\orleans";//args[1];
            String[] files = Directory.GetFiles(dir);
            stopwatch.Restart();
            for (var i = 0; i < files.Length; i++)
            {
                FileInfo fi = new FileInfo(files[i]);
                long size = fi.Length;
                Interlocked.Add(ref totalSize, size);
            }
            stopwatch.Stop();
            Console.WriteLine($"FormalDirRun------{files.Length} files, {totalSize} bytes,time:{stopwatch.ElapsedMilliseconds},Dir:{dir}");
        }
        /// <summary>
        /// 并行循環
        /// </summary>
        public void ParallelForDirRun()
        {
            long totalSize = 0;
            var dir = @"E:\LearnWall\orleans";//args[1];
            String[] files = Directory.GetFiles(dir);
            stopwatch.Restart();
            Parallel.For(0, files.Length,
                         index =>
                         {
                             FileInfo fi = new FileInfo(files[index]);
                             long size = fi.Length;
                             Interlocked.Add(ref totalSize, size);
                         });
            stopwatch.Stop();
            Console.WriteLine($"ParallelForDirRun-{files.Length} files, {totalSize} bytes,time:{stopwatch.ElapsedMilliseconds},Dir:{dir}");
        }
        
           

從下圖對比接口可以看出當循環體内方法執行時間很短時,并行時間反而更長。這塊會有更細緻的補充。

FormalDirRun------20 files, 255618 bytes,time:0,Dir:E:\LearnWall\orleans
ParallelForDirRun-20 files, 255618 bytes,time:6,Dir:E:\LearnWall\orleans
           

我們追加一些延時操作如Thread.Sleep,但這應該不是好好例子...但我隻想示範效果就行了。

Thread.Sleep(1000);
           

檢視結果得到,當方法内有阻塞延時一秒後,兩者速度錯了七倍。

FormalDirRun------20 files, 255618 bytes,time:20011,Dir:E:\LearnWall\orleans
ParallelForDirRun-20 files, 255618 bytes,time:3007,Dir:E:\LearnWall\orleans
           

矩陣和秒表示例

Parallel.ForEach

為了并行速度的最大化,我們應該盡量減少在并行内對共享資源的通路,如Console.Write,檔案日志等...但這裡為了顯示效果,就用了。

public void ParallelForEachDirRun()
        {
            long totalSize = 0;
            var dir = @"E:\LearnWall\orleans";//args[1];
            String[] files = Directory.GetFiles(dir);
            stopwatch.Restart();
            Parallel.ForEach(files, (current) =>
            {
                FileInfo fi = new FileInfo(current);
                long size = fi.Length;
                Interlocked.Add(ref totalSize, size);
                Console.WriteLine($"name:{fi.Name}");
            });
            stopwatch.Stop();
            Console.WriteLine($"ParallelForEachDirRun-{files.Length} files, {totalSize} bytes,Time:{stopwatch.ElapsedMilliseconds}");
        }        
           
name:.gitignore
name:build.sh
.
.
.
name:TestAll.cmd
ParallelForEachDirRun-20 files, 255618 bytes,Time:17
           

Parallel.For 線程局部變量

public void ParallelForForThreadLocalVariables()
        {
            int[] nums = Enumerable.Range(0, 1000000).ToArray();
            long total = 0;

            // Use type parameter to make subtotal a long, not an int
            Parallel.For<long>(0, nums.Length, () => 0, (j,loop, subtotal) =>
            {
                subtotal += nums[j];
                return subtotal;
            },
                (x) => Interlocked.Add(ref total, x)
            );

            Console.WriteLine("The total is {0:N0}", total);
            Console.WriteLine("Press any key to exit");
            Console.ReadKey();
        }
           

結果如下:

The total is 499,999,509,000

每個For方法的前兩個參數指定開始和結束疊代值。在此方法的重載中,第三個參數是初始化本地狀态的位置。在此上下文中,本地狀态表示一個變量,其生命周期從目前線程上的循環的第一次疊代之前延伸到最後一次疊代之後。

第三個參數的類型是Func ,其中TResult是将存儲線程本地狀态的變量的類型。它的類型由調用泛型For (Int32,Int32,Func ,Func ,Action )方法時提供的泛型類型參數定義,在這種情況下是Int64。type參數告訴編譯器将用于存儲線程局部狀态的臨時變量的類型。在此示例中,表達式() => 0(或Function() 0在Visual Basic中)将線程局部變量初始化為零。如果泛型類型參數是引用類型或使用者定義的值類型,則表達式如下所示:

() => new MyClass()  
           

這塊内容比較繁瑣,一句話來說:前兩個參數是開始和結束值,第三個是根據For泛型而初始化的值。我其實也沒看太懂這塊。.net Framework源碼如下,.netcore的不知道:

public static ParallelLoopResult For<TLocal>(
            int fromInclusive, int toExclusive,
            Func<TLocal> localInit,
            Func<int, ParallelLoopState, TLocal, TLocal> body,
            Action<TLocal> localFinally)
        {
            if (body == null)
            {
                throw new ArgumentNullException("body");
            }
            if (localInit == null)
            {
                throw new ArgumentNullException("localInit");
            }
            if (localFinally == null)
            {
                throw new ArgumentNullException("localFinally");
            }
 
            return ForWorker(
                fromInclusive, toExclusive, s_defaultParallelOptions,
                null, null, body, localInit, localFinally);
        }
        
        /// </summary>
        /// <typeparam name="TLocal">本地資料的類型.</typeparam>
        /// <param name="fromInclusive">循環開始數</param>
        /// <param name="toExclusive">循環結束數</param>
        /// <param name="parallelOptions">選項</param>
        /// <param name="body">循環執行體</param>
        /// <param name="bodyWithState">ParallelState的循環體重載。</param>
        /// <param name="bodyWithLocal">線程局部狀态的循環體重載。</param>
        /// <param name="localInit">一個傳回新線程本地狀态的選擇器函數。</param>
        /// <param name="localFinally">清理線程本地狀态的清理函數。</param>
        /// <remarks>隻能提供一個身體參數(即它們是獨占的)。</remarks>
        /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult"/> structure.</returns>
        private static ParallelLoopResult ForWorker<TLocal>(
            int fromInclusive, int toExclusive,
            ParallelOptions parallelOptions,
            Action<int> body,
            Action<int, ParallelLoopState> bodyWithState,
            Func<int, ParallelLoopState, TLocal, TLocal> bodyWithLocal,
            Func<TLocal> localInit, Action<TLocal> localFinally)
        {
        .
        .
        .
        }
           

Parallel.ForEach線程局部變量

/// <summary>
        /// 
        /// </summary>
        public void ParallelForEachThreadLocalVariables()
        {
            int[] nums = Enumerable.Range(0, 1000000).ToArray();
            long total = 0;

            // First type parameter is the type of the source elements
            // Second type parameter is the type of the thread-local variable (partition subtotal)
            Parallel.ForEach<int, long>(nums, // source collection
                                        () => 0, // method to initialize the local variable
                                        (j, loop, subtotal) => // method invoked by the loop on each iteration
                                     {
                                         subtotal += j; //modify local variable
                                         return subtotal; // value to be passed to next iteration
                                     },
                                        // Method to be executed when each partition has completed.
                                        // finalResult is the final value of subtotal for a particular partition.
                                        (finalResult) => Interlocked.Add(ref total, finalResult)
                                        );

            Console.WriteLine("The total from Parallel.ForEach is {0:N0}", total);
        }
           

ForEach的源碼如下

/// <summary>
        /// Executes a for each operation on an <see cref="T:System.Collections.IEnumerable{TSource}"/> 
        /// in which iterations may run in parallel.
        /// </summary>
        /// <typeparam name="TSource">The type of the data in the source.</typeparam>
        /// <param name="source">An enumerable data source.</param>
        /// <param name="body">The delegate that is invoked once per iteration.</param>
        /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="source"/> 
        /// argument is null.</exception>
        /// <exception cref="T:System.ArgumentNullException">The exception that is thrown when the <paramref name="body"/> 
        /// argument is null.</exception>
        /// <exception cref="T:System.AggregateException">The exception that is thrown to contain an exception
        /// thrown from one of the specified delegates.</exception>
        /// <returns>A <see cref="T:System.Threading.Tasks.ParallelLoopResult">ParallelLoopResult</see> structure
        /// that contains information on what portion of the loop completed.</returns>
        /// <remarks>
        /// The <paramref name="body"/> delegate is invoked once for each element in the <paramref name="source"/> 
        /// enumerable.  It is provided with the current element as a parameter.
        /// </remarks>
        public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
        {
            if (source == null)
            {
                throw new ArgumentNullException("source");
            }
            if (body == null)
            {
                throw new ArgumentNullException("body");
            }
 
            return ForEachWorker<TSource, object>(
                source, s_defaultParallelOptions, body, null, null, null, null, null, null);
        }
           

取消 Parallel.ForEach或Parallel.For

通過CancellationTokenSource來擷取token

CancellationTokenSource cts = new CancellationTokenSource();

通過ParallelOptions.CancellationToken屬性來控制取消狀态。

ParallelOptions po = new ParallelOptions();

po.CancellationToken = cts.Token;

通過Parallel.For或Foreach的ParallelOptions值來控制并行内方法的取消。

代碼如下:

int[] nums = Enumerable.Range(0, 10000000).ToArray();
            CancellationTokenSource cts = new CancellationTokenSource();

            // Use ParallelOptions instance to store the CancellationToken
            ParallelOptions po = new ParallelOptions();
            po.CancellationToken = cts.Token;
            po.MaxDegreeOfParallelism = System.Environment.ProcessorCount;
            Console.WriteLine("Press any key to start. Press 'c' to cancel.");
            Console.ReadKey();

            // Run a task so that we can cancel from another thread.
            Task.Factory.StartNew(() =>
            {
                var s = Console.ReadKey().KeyChar;
                if (s == 'c')
                    cts.Cancel();
                Console.WriteLine("press any key to exit111");
            });

            try
            {
                Parallel.ForEach(nums, po, (num) =>
                {
                    double d = Math.Sqrt(num);
                    Console.WriteLine("{0} on {1}", d, Thread.CurrentThread.ManagedThreadId);
                    po.CancellationToken.ThrowIfCancellationRequested();
                });
            }
            catch (OperationCanceledException e)
            {
                Console.WriteLine(e.Message);
            }
            finally
            {
                cts.Dispose();
            }

            Console.ReadKey();
           

運作結果如下,鍵盤輸入c時,并行取消。

1937.41838537782 on 7
2739.95711645274 on 8
2501.40660429287 on 9
2958.47798707376 on 10
.
.
.
press any key to exit111
The operation was canceled.
           

捕獲并行體内的異常

示例方法采用ConcurrentQueue來接收異常集合,最後抛出一個聚合異常AggregateException。

var exceptions = new ConcurrentQueue();

exceptions.Enqueue(e);

外部調用AggregateException.Flatten方法擷取異常資訊。

這為我以後捕獲異常提供了一個好思路。

/// <summary>
        /// 捕獲并行體内的異常
        /// </summary>
        public void HandleExceptionParallelLoop()
        {
            // Create some random data to process in parallel.
            // There is a good probability this data will cause some exceptions to be thrown.
            byte[] data = new byte[5000];
            Random r = new Random();
            r.NextBytes(data);

            try
            {
                ProcessDataInParallel(data);
            }
            catch (AggregateException ae)
            {
                var ignoredExceptions = new List<Exception>();
                // This is where you can choose which exceptions to handle.
                foreach (var ex in ae.Flatten().InnerExceptions)
                {
                    if (ex is ArgumentException)
                        Console.WriteLine(ex.Message);
                    else
                        ignoredExceptions.Add(ex);
                }
                if (ignoredExceptions.Count > 0) throw new AggregateException(ignoredExceptions);
            }

            Console.WriteLine("Press any key to exit.");
            Console.ReadKey();
        }
        private  void ProcessDataInParallel(byte[] data)
        {
            // Use ConcurrentQueue to enable safe enqueueing from multiple threads.
            var exceptions = new ConcurrentQueue<Exception>();

            // Execute the complete loop and capture all exceptions.
            Parallel.ForEach(data, d =>
            {
                try
                {
                    // Cause a few exceptions, but not too many.
                    if (d < 3)
                        throw new ArgumentException($"Value is {d}. Value must be greater than or equal to 3.");
                    else
                        Console.Write(d + " ");
                }
                // Store the exception and continue with the loop.                    
                catch (Exception e)
                {
                    exceptions.Enqueue(e);
                }
            });
            Console.WriteLine();

            // Throw the exceptions here after the loop completes.
            if (exceptions.Count > 0) throw new AggregateException(exceptions);
        }
           

對微小執行體提速

當Parallel.For循環有一個很快的執行體,它可能比同等順序循環執行更慢。較慢的性能是由分區資料所涉及的開銷和每次循環疊代調用委托的成本引起的。為了解決這種情況,Partitioner類提供了Partitioner.Create方法,該方法使您能夠為委托主體提供順序循環,以便每個分區僅調用一次委托,而不是每次疊代調用一次。

var rangePartitioner = Partitioner.Create(0, source.Length);
/// <summary>
        /// 提速
        /// </summary>
        public void SpeedUpMicroParallelBody() {
            // Source must be array or IList.
            var source = Enumerable.Range(0, 100000).ToArray();

            // Partition the entire source array.
            var rangePartitioner = Partitioner.Create(0, source.Length);

            double[] results = new double[source.Length];

            // Loop over the partitions in parallel.
            Parallel.ForEach(rangePartitioner, (range, loopState) =>
            {
                // Loop over each range element without a delegate invocation.
                for (int i = range.Item1; i < range.Item2; i++)
                {
                    results[i] = source[i] * Math.PI;
                }
            });

            Console.WriteLine("Operation complete. Print results? y/n");
            char input = Console.ReadKey().KeyChar;
            if (input == 'y' || input == 'Y')
            {
                foreach (double d in results)
                {
                    Console.Write("{0} ", d);
                }
            }
        }
           

源碼位址

CsharpFanDemo

總結

本篇文章沿着微軟官方文檔步驟熟悉了第一部分資料并行的用法。

Parallel.For和Parallel.ForEach實作并行。

Parallel.For和Parallel.ForEach線程局部變量。

取消并行ParallelOptions.CancellationToken

捕捉異常ConcurrentQueue累加并行體内的異常,外部接收。

加速Partitioner.Create

感謝觀看!