天天看點

多核時代 .NET Framework 4 中的并行程式設計8---任務的同步

在并行程式設計過程中,多個任務同時執行時,就會涉及到任務的同步問題..Net為我們提供很多解決任務同步的類和方法.下面在具體介紹.當然,這些類和方法也适用于處理多線程(Thread)程式設計的同步問題.

1. Barrier

Barrier類是.Net4中新增加一個類, 它使多個任務能夠采用并行方式依據某種算法在多個階段中協同工作。例如:

           BankAccount[] accounts = new BankAccount[5];

            for (int i = 0; i < accounts.Length; i++)

            {

                accounts[i] = new BankAccount();

            }

            int totalBalance = 0;

            System.Threading.Barrier barrier = newSystem.Threading.Barrier(5, (myBarrier) =>

                totalBalance = 0;

                foreach (var account in accounts)

                {

                    totalBalance += account.Balance;

                }

                Console.WriteLine("Total balance:{0}", totalBalance);

            });

            Task[] tasks = new Task[5];

            for (int i = 0; i < tasks.Length; i++)

                tasks[i] = new Task((indata) =>

                    BankAccount account = (BankAccount)indata;

                    Random rnd = new Random();

                    for (int j = 0; j < 1000; j++)

                    {

                        account.Balance += rnd.Next(1, 100);

                    }

                    Console.WriteLine("Task {0},phase {1} ended",Task.CurrentId, barrier.CurrentPhaseNumber);

                    barrier.SignalAndWait();

                    account.Balance -= (totalBalance - account.Balance) / 10;

                }, accounts[i]);

            foreach (var t in tasks)

                t.Start();

            Task.WaitAll(tasks);

            Console.WriteLine("Press enter to finish");

            Console.ReadLine();

通過上面的代碼,我們可以知道:

初始化一個Barrier執行個體,我們可以使用Barrier(Int32, Action<Barrier>)來執行個體,它的第一個參數表示參數同步線程的數量(也就是将有多少個線程參與進來), Action則表示所有參與者線程到達一個階段的屏障之後,就會執行Action委托中的代碼.如果沒有Action委托,則可以傳遞null,此時就是Barrier(Int32)構造函數了.當參與的線程到達某個階段時就可以調用Barrier的SignalAndWait();方法, 發出參與者已達到 Barrier 的信号,并等待所有其他參與者也達到屏障.

通俗的講,Barrier的原理就好比110米欄,劉翔和羅伯斯比賽,當劉翔先跨過第1個欄時就會調用SignalAndWait()方法,告訴大家我已經過了第1個攔,然後等待羅伯斯也跨過第1個欄,當所有人都過了第1個欄,就會調用初始化Barrier時的是Action委托.接着,後續都是這麼一個過程.在比賽過程中,如果有其他人加入,則可以調用AddParticipant()方法,如果中途有人退出比賽了,則可以調用RemoveParticpant()退出.

2. CountdownEvent

表示在計數變為零時處于有信号狀态的同步基元。其原理是通過一個計數,來監視目前參與者的數量,當某個任務完成後,可以調用CountdownEvent的 Signal()方法表示發出信号,通知任務完成并減少計數.此外,可以調用CountdownEvent 的Wait()方法則阻塞線程直到設定了 CountdownEvent 為止(就是計數為0,也就是所有的參與者都完成了).代碼如下:

            CountdownEvent cd = new CountdownEvent(5);

            Random rnd = new Random();

            Task[] tasks = new Task[6];

                tasks[i] = new Task(() =>

                    Thread.Sleep(rnd.Next(500, 1000));

                    Console.WriteLine("Task {0} signalling event",Task.CurrentId);

                    cd.Signal();

                });

            tasks[5] = new Task(() =>

                cd.Wait();

                Console.WriteLine("Event has been set");

            foreach (Task t in tasks)

3. ManualResetEventSlim

它是 ManualResetEvent 的簡化版本,輕量級的實作. 其Reset方法将事件狀态設定為非終止狀态,進而導緻線程受阻。 其Set 方法将事件狀态設定為有信号,進而允許一個或多個等待該事件的線程繼續。 其 Wait()方法 阻止目前線程,直到設定了目前 ManualResetEventSlim 為止。代碼如下:

ManualResetEventSlim manualResetEvent = newManualResetEventSlim();

            CancellationTokenSource tokenSource = newCancellationTokenSource();

            Task waitingTask = Task.Factory.StartNew(() =>

                while (true)

                    manualResetEvent.Wait(tokenSource.Token);

                    Console.WriteLine("Waiting task active");

            }, tokenSource.Token);

            Task signallingTask = Task.Factory.StartNew(() =>

                Random rnd = new Random();

                while (!tokenSource.Token.IsCancellationRequested)

                {

                    tokenSource.Token.WaitHandle.WaitOne(rnd.Next(500, 2000));

                    manualResetEvent.Set();

                    Console.WriteLine("Event Set");

                    manualResetEvent.Reset();

                    Console.WriteLine("Event ReSet");

            Console.WriteLine("Press enter to cancel tasks");

            tokenSource.Cancel();

            try

                Task.WaitAll(waitingTask, signallingTask);

            catch (AggregateException ex)

                ex.Flatten().Handle((inner) =>

                    Console.WriteLine("Exception is {0}", inner.Message);

                    return true;

在 .NET Framework 4 版中,當等待時間預計非常短時,并且當事件不會跨越程序邊界時,可使用 System.Threading.ManualResetEventSlim 類以獲得更好的性能。當等待事件變為已發出信号狀态的過程中,ManualResetEventSlim 短時間内會使用繁忙旋轉。 當等待時間很短時,旋轉的開銷相對于使用等待句柄來進行等待的開銷會少很多。但是,如果事件在某個時間段内沒有變為已發出信号狀态,則 ManualResetEventSlim 會采用正常的事件處理等待。

4. AutoResetEvent

它和ManualResetEventSlim類似,AutoResetEvent 類表示一個本地等待處理事件,在釋放了單個等待線程以後,該事件會在終止時自動重置。代碼如下:

   var arEvent = new AutoResetEvent(false);

            for (int i = 0; i < 3; i++)

                Task.Factory.StartNew(() =>

                    while (!tokenSource.Token.IsCancellationRequested)

                        arEvent.WaitOne();

                        Console.WriteLine("Task {0} released",Task.CurrentId);

                }, tokenSource.Token);

                    tokenSource.Token.WaitHandle.WaitOne(500);

                    arEvent.Set();

                    Console.WriteLine("Event set");

5. SemaphoreSlim

對可同時通路資源或資源池的線程數加以限制的 Semaphore 的輕量級實作。其原理就是在初始化時指定同時通路資源的數量,當某個任務執行完畢之後,調用Release()方法釋放資源,當某個任務需要執行時,則它可以調用Wait()方法,阻塞目前線程直到它擷取到資源為止。需要注意的時,這裡的資源不是指什麼資料庫連接配接,記憶體什麼的,這裡的資源可以了解成隻是一個協調任務同步的标記或者信号而已。代碼如下:

 var semaphore = new SemaphoreSlim(2);

            for (int i = 0; i < 10; i++)

                    while (true)

                        semaphore.Wait(tokenSource.Token);

                    }

                    semaphore.Release(2);

                    Console.WriteLine("Semaphore released");

                tokenSource.Token.ThrowIfCancellationRequested();

            Console.ReadLine();

可以看到,.Net 4中,提供了更多的同步方法。極大友善了,我們對任務或線程同步的處理。我們可以根據需要,選擇自己合适的、熟悉的方法進行對同步處理。

    本文轉自風車車  部落格園部落格,原文連結:http://www.cnblogs.com/xray2005/archive/2011/09/04/2166831.html,如需轉載請自行聯系原作者

繼續閱讀