天天看點

c# 多線程排隊隊列實作的源碼

using System;

using System.Threading;

using System.Collections;

using System.Collections.Generic;

// 将線程同步事件封裝在此類中,

// 以便于将這些事件傳遞給 Consumer 和

// Producer 類。

public class SyncEvents

{

    public SyncEvents()

    {

        // AutoResetEvent 用于“新項”事件,因為

        // 我們希望每當使用者線程響應此事件時,

        // 此事件就會自動重置。

        _newItemEvent = new AutoResetEvent(false);

        // ManualResetEvent 用于“退出”事件,因為

        // 我們希望發出此事件的信号時有多個線程響應。

        // 如果使用 AutoResetEvent,事件

        // 對象将在單個線程作出響應之後恢複為

        // 未發信号的狀态,而其他線程将

        // 無法終止。

        _exitThreadEvent = new ManualResetEvent(false);

        // 這兩個事件也放在一個 WaitHandle 數組中,以便

        // 使用者線程可以使用 WaitAny 方法

        // 阻塞這兩個事件。

        _eventArray = new WaitHandle[2];

        _eventArray[0] = _newItemEvent;

        _eventArray[1] = _exitThreadEvent;

    }

    // 公共屬性允許對事件進行安全通路。

    public EventWaitHandle ExitThreadEvent

    {

        get { return _exitThreadEvent; }

    }

    public EventWaitHandle NewItemEvent

    {

        get { return _newItemEvent; }

    }

    public WaitHandle[] EventArray

    {

        get { return _eventArray; }

    }

    private EventWaitHandle _newItemEvent;

    private EventWaitHandle _exitThreadEvent;

    private WaitHandle[] _eventArray;

}

// Producer 類(使用一個輔助線程)

// 将項異步添加到隊列中,共添加 20 個項。

public class Producer

{

    public Producer(Queue<int> q, SyncEvents e)

    {

        _queue = q;

        _syncEvents = e;

    }

    public void ThreadRun()

    {

        int count = 0;

        Random r = new Random();

        while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))

        {

            lock (((ICollection)_queue).SyncRoot)

            {

                while (_queue.Count < 20)

                {

                    _queue.Enqueue(r.Next(0, 100));

                    _syncEvents.NewItemEvent.Set();

                    count++;

                }

            }

        }

        Console.WriteLine("Producer thread: produced {0} items", count);

    }

    private Queue<int> _queue;

    private SyncEvents _syncEvents;

}

// Consumer 類通過自己的輔助線程使用隊列

// 中的項。Producer 類使用 NewItemEvent

// 将新項通知 Consumer 類。

public class Consumer

{

    public Consumer(Queue<int> q, SyncEvents e)

    {

        _queue = q;

        _syncEvents = e;

    }

    public void ThreadRun()

    {

        int count = 0;

        while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)

        {

            lock (((ICollection)_queue).SyncRoot)

            {

                int item = _queue.Dequeue();

            }

            count++;

        }

        Console.WriteLine("Consumer Thread: consumed {0} items", count);

    }

    private Queue<int> _queue;

    private SyncEvents _syncEvents;

}

public class ThreadSyncSample

{

    private static void ShowQueueContents(Queue<int> q)

    {

        // 對集合進行枚舉本來就不是線程安全的,

        // 是以在整個枚舉過程中鎖定集合以防止

        // 使用者和制造者線程修改内容

        // 是絕對必要的。(此方法僅由

        // 主線程調用。)

        lock (((ICollection)q).SyncRoot)

        {

            foreach (int i in q)

            {

                Console.Write("{0} ", i);

            }

        }

        Console.WriteLine();

    }

    static void Main()

    {

        // 配置結構,該結構包含線程同步

        // 所需的事件資訊。

        SyncEvents syncEvents = new SyncEvents();

        // 泛型隊列集合用于存儲要制造和使用的

        // 項。此例中使用的是“int”。

        Queue<int> queue = new Queue<int>();

        // 建立對象,一個用于制造項,一個用于

        // 使用項。将隊列和線程同步事件傳遞給

        // 這兩個對象。

        Console.WriteLine("Configuring worker threads...");

        Producer producer = new Producer(queue, syncEvents);

        Consumer consumer = new Consumer(queue, syncEvents);

        // 為制造者對象和使用者對象建立線程

        // 對象。此步驟并不建立或啟動

        // 實際線程。

        Thread producerThread = new Thread(producer.ThreadRun);

        Thread consumerThread = new Thread(consumer.ThreadRun);

        // 建立和啟動兩個線程。

        Console.WriteLine("Launching producer and consumer threads...");       

        producerThread.Start();

        consumerThread.Start();

        // 為制造者線程和使用者線程設定 10 秒的運作時間。

        // 使用主線程(執行此方法的線程)

        // 每隔 2.5 秒顯示一次隊列内容。

        for (int i = 0; i < 4; i++)

        {

            Thread.Sleep(2500);

            ShowQueueContents(queue);

        }

        // 向使用者線程和制造者線程發出終止信号。

        // 這兩個線程都會響應,由于 ExitThreadEvent 是

        // 手動重置的事件,是以除非顯式重置,否則将保持“設定”。

        Console.WriteLine("Signaling threads to terminate...");

        syncEvents.ExitThreadEvent.Set();

        // 使用 Join 阻塞主線程,首先阻塞到制造者線程

        // 終止,然後阻塞到使用者線程終止。

        Console.WriteLine("main thread waiting for threads to finish...");

        producerThread.Join();

        consumerThread.Join();

    }

}

namespace WindowsFormsApplication1

{

    public partial class Form3 : Form

    {

        public Form3()

        {

            InitializeComponent();

        }

        public delegate void Delegate1();

        public delegate void Delegate2(DataTable dt);

        public void buttonFind_Click(object sender, EventArgs e)

        {

            Delegate1 d1 = new Delegate1(Find);

            d1.BeginInvoke(new AsyncCallback(AsyncCallback1), d1);

        }

        public void AsyncCallback1(IAsyncResult iAsyncResult)

        {

            Delegate1 d1 = (Delegate1)iAsyncResult.AsyncState;

            d1.EndInvoke(iAsyncResult);

        }

        public void Find()

        {

            DataTable dt = new DataTable();

            dt.Columns.Add("name", typeof(string));

            dt.Columns.Add("age", typeof(int));

            AddRow(dt, "張三", 19);

            AddRow(dt, "張三", 19);

            AddRow(dt, "李四", 18);

            this.Invoke(new Delegate2(Bind2), new object[] { dt });

        }

        public void AddRow(DataTable dt, string name, int age)

        {

            DataRow dr = dt.NewRow();

            dr["name"] = name;

            dr["age"] = age;

            dt.Rows.Add(dr);

        }

        public void Bind2(DataTable dt)

        {

            this.dataGridView1.DataSource = dt;

        }

    }

}

繼續閱讀