原文: ThreadPool類(線程池)
CLR線程池并不會在CLR初始化時立即建立線程,而是在應用程式要建立線程來運作任務時,線程池才初始化一個線程。
線程池初始化時是沒有線程的,線程池裡的線程的初始化與其他線程一樣,但是在完成任務以後,該線程不會自行銷毀,而是以挂起的狀态傳回到線程池。直到應用程式再次向線程池送出請求時,線程池裡挂起的線程就會再度激活執行任務。
這樣既節省了建立線程所造成的性能損耗,也可以讓多個任務反複重用同一線程,進而在應用程式生存期内節約大量開銷。
通過CLR線程池所建立的線程總是預設為背景線程,優先級數為ThreadPriority.Normal。
CLR線程池分為工作者線程(workerThreads)與I/O線程(completionPortThreads)兩種:
- 工作者線程是主要用作管理CLR内部對象的運作,通常用于計算密集的任務。
- I/O(Input/Output)線程主要用于與外部系統互動資訊,如輸入輸出,CPU僅需在任務開始的時候,将任務的參數傳遞給裝置,然後啟動硬體裝置即可。等任務完成的時候,CPU收到一個通知,一般來說是一個硬體的中斷信号,此時CPU繼續後繼的處理工作。在處理過程中,CPU是不必完全參與處理過程的,如果正在運作的線程不交出CPU的控制權,那麼線程也隻能處于等待狀态,即使作業系統将目前的CPU排程給其他線程,此時線程所占用的空間還是被占用,而并沒有CPU處理這個線程,可能出現線程資源浪費的問題。如果這是一個網絡服務程式,每一個網絡連接配接都使用一個線程管理,可能出現大量線程都在等待網絡通信,随着網絡連接配接的不斷增加,處于等待狀态的線程将會很消耗盡所有的記憶體資源。可以考慮使用線程池解決這個問題。
線程池的最大值一般預設為1000、2000。當大于此數目的請求時,将保持排隊狀态,直到線程池裡有線程可用。
使用CLR線程池的工作者線程一般有兩種方式:
- 通過ThreadPool.QueueUserWorkItem()方法;
- 通過委托;
要注意,不論是通過ThreadPool.QueueUserWorkItem()還是委托,調用的都是線程池裡的線程。
通過以下兩個方法可以讀取和設定CLR線程池中工作者線程與I/O線程的最大線程數。
- ThreadPool.GetMax(out in workerThreads,out int completionPortThreads);
- ThreadPool.SetMax(int workerThreads,int completionPortThreads);
若想測試線程池中有多少線程正在投入使用,可以通過ThreadPool.GetAvailableThreads(out in workThreads,out int conoletionPortThreads)方法。
方法 | 說明 |
GetAvailableThreads | 剩餘空閑線程數 |
GetMaxThreads | 最多可用線程數,所有大于此數目的請求将保持排隊狀态,直到線程池線程變為可用 |
GetMinThreads | 檢索線程池在新請求預測中維護的空閑線程數 |
QueueUserWorkItem | 啟動線程池裡得一個線程(隊列的方式,如線程池暫時沒空閑線程,則進入隊列排隊) |
SetMaxThreads | 設定線程池中的最大線程數 |
SetMinThreads | 設定線程池最少需要保留的線程數 |
我們可以使用線程池來解決上面的大部分問題,跟使用單個線程相比,使用線程池有如下優點:
1、縮短應用程式的響應時間。因為線上程池中有線程的線程處于等待配置設定任務狀态(隻要沒有超過線程池的最大上限),無需建立線程。
2、不必管理和維護生存周期短暫的線程,不用在建立時為其配置設定資源,在其執行完任務之後釋放資源。
3、線程池會根據目前系統特點對池内的線程進行優化處理。
總之使用線程池的作用就是減少建立和銷毀線程的系統開銷。在.NET中有一個線程的類ThreadPool,它提供了線程池的管理。
ThreadPool是一個靜态類,它沒有構造函數,對外提供的函數也全部是靜态的。其中有一個QueueUserWorkItem方法,它有兩種重載形式,如下:
public static bool QueueUserWorkItem(WaitCallback callBack):将方法排入隊列以便執行。此方法在有線程池線程變得可用時執行。
public static bool QueueUserWorkItem(WaitCallback callBack,Object state):将方法排入隊列以便執行,并指定包含該方法所用資料的對象。此方法在有線程池線程變得可用時執行。
QueueUserWorkItem方法中使用的的WaitCallback參數表示一個delegate,它的聲明如下:
public delegate void WaitCallback(Object state)
如果需要傳遞任務資訊可以利用WaitCallback中的state參數,類似于ParameterizedThreadStart委托。
下面是一個ThreadPool的例子,代碼如下:
using System;
using System.Collections;
using System.ComponentModel;
using System.Diagnostics;
using System.Threading;
namespace ConsoleApp1
{
class ThreadPoolDemo
{
public ThreadPoolDemo()
{
}
public void Work()
{
ThreadPool.QueueUserWorkItem(new WaitCallback(CountProcess));
ThreadPool.QueueUserWorkItem(new WaitCallback(GetEnvironmentVariables));
}
/// <summary>
/// 統計目前正在運作的系統程序資訊
/// </summary>
/// <param name="state"></param>
private void CountProcess(object state)
{
Process[] processes = Process.GetProcesses();
foreach (Process p in processes)
{
try
{
Console.WriteLine("程序資訊:Id:{0},ProcessName:{1},StartTime:{2}", p.Id, p.ProcessName, p.StartTime);
}
catch (Win32Exception e)
{
Console.WriteLine("ProcessName:{0}", p.ProcessName);
}
finally
{
}
}
Console.WriteLine("擷取程序資訊完畢。");
}
/// <summary>
/// 擷取目前機器系統變量設定
/// </summary>
/// <param name="state"></param>
public void GetEnvironmentVariables(object state)
{
IDictionary list = System.Environment.GetEnvironmentVariables();
foreach (DictionaryEntry item in list)
{
Console.WriteLine("系統變量資訊:key={0},value={1}", item.Key, item.Value);
}
Console.WriteLine("擷取系統變量資訊完畢。");
}
}
}
ThreadPoolDemo
using System;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
ThreadPoolDemo tpd1 = new ThreadPoolDemo();
tpd1.Work();
Thread.Sleep(5000);
Console.WriteLine("OK");
Console.ReadLine();
}
}
}
Program
利用ThreadPool調用工作線程和IO線程的範例
using System;
using System.Collections;
using System.IO;
using System.Text;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
// 設定線程池中處于活動的線程的最大數目
// 設定線程池中工作者線程數量為1000,I/O線程數量為1000
ThreadPool.SetMaxThreads(1000, 1000);
Console.WriteLine("Main Thread: queue an asynchronous method");
PrintMessage("Main Thread Start");
// 把工作項添加到隊列中,此時線程池會用工作者線程去執行回調方法
ThreadPool.QueueUserWorkItem(asyncMethod);
asyncWriteFile();
Console.Read();
}
// 方法必須比對WaitCallback委托
private static void asyncMethod(object state)
{
Thread.Sleep(1000);
PrintMessage("Asynchoronous Method");
Console.WriteLine("Asynchoronous thread has worked ");
}
#region 異步讀取檔案子產品
private static void asyncReadFile()
{
byte[] byteData = new byte[1024];
FileStream stream = new FileStream(@"D:\123.txt", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);
//把FileStream對象,byte[]對象,長度等有關資料綁定到FileDate對象中,以附帶屬性方式送到回調函數
Hashtable ht = new Hashtable();
ht.Add("Length", (int)stream.Length);
ht.Add("Stream", stream);
ht.Add("ByteData", byteData);
//啟動異步讀取,倒數第二個參數是指定回調函數,倒數第一個參數是傳入回調函數中的參數
stream.BeginRead(byteData, 0, (int)ht["Length"], new AsyncCallback(Completed), ht);
PrintMessage("asyncReadFile Method");
}
//實際參數就是回調函數
static void Completed(IAsyncResult result)
{
Thread.Sleep(2000);
PrintMessage("asyncReadFile Completed Method");
//參數result實際上就是Hashtable對象,以FileStream.EndRead完成異步讀取
Hashtable ht = (Hashtable)result.AsyncState;
FileStream stream = (FileStream)ht["Stream"];
int length = stream.EndRead(result);
stream.Close();
string str = Encoding.UTF8.GetString(ht["ByteData"] as byte[]);
Console.WriteLine(str);
stream.Close();
}
#endregion
#region 異步寫入檔案子產品
//異步寫入子產品
private static void asyncWriteFile()
{
//檔案名 檔案建立方式 檔案權限 檔案程序共享 緩沖區大小為1024 是否啟動異步I/O線程為true
FileStream stream = new FileStream(@"D:\123.txt", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite, 1024, true);
//這裡要注意,如果寫入的字元串很小,則.Net會使用輔助線程寫,因為這樣比較快
byte[] bytes = Encoding.UTF8.GetBytes("你在他鄉還好嗎?");
//異步寫入開始,倒數第二個參數指定回調函數,最後一個參數将自身傳到回調函數裡,用于結束異步線程
stream.BeginWrite(bytes, 0, (int)bytes.Length, new AsyncCallback(Callback), stream);
PrintMessage("AsyncWriteFile Method");
}
static void Callback(IAsyncResult result)
{
//顯示線程池現狀
Thread.Sleep(2000);
PrintMessage("AsyncWriteFile Callback Method");
//通過result.AsyncState再強制轉換為FileStream就能夠擷取FileStream對象,用于結束異步寫入
FileStream stream = (FileStream)result.AsyncState;
stream.EndWrite(result);
stream.Flush();
stream.Close();
asyncReadFile();
}
#endregion
// 列印線程池資訊
private static void PrintMessage(String data)
{
int workthreadnumber;
int iothreadnumber;
// 獲得線程池中可用的線程,把獲得的可用工作者線程數量賦給workthreadnumber變量
// 獲得的可用I/O線程數量給iothreadnumber變量
ThreadPool.GetAvailableThreads(out workthreadnumber, out iothreadnumber);
Console.WriteLine("{0}\n CurrentThreadId is {1}\n CurrentThread is background :{2}\n WorkerThreadNumber is:{3}\n IOThreadNumbers is: {4}\n",
data,
Thread.CurrentThread.ManagedThreadId,
Thread.CurrentThread.IsBackground.ToString(),
workthreadnumber.ToString(),
iothreadnumber.ToString());
}
}
}
線程池中放入異步操作
using System;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
private static void AsyncOperation(object state)
{
Console.WriteLine("Operation state: {0}", state ?? "(null)");
Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(TimeSpan.FromSeconds(2));
}
static void Main(string[] args)
{
const int x = 1;
const int y = 2;
const string lambdaState = "lambda state 2";
ThreadPool.QueueUserWorkItem(AsyncOperation);
Thread.Sleep(TimeSpan.FromSeconds(1));
ThreadPool.QueueUserWorkItem(AsyncOperation, "async state");
Thread.Sleep(TimeSpan.FromSeconds(1));
ThreadPool.QueueUserWorkItem(state => {
Console.WriteLine("Operation state: {0}", state);
Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(TimeSpan.FromSeconds(2));
}, "lambda state");
ThreadPool.QueueUserWorkItem(_ =>
{
Console.WriteLine("Operation state: {0}, {1}", x + y, lambdaState);
Console.WriteLine("Worker thread id: {0}", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(TimeSpan.FromSeconds(2));
}, "lambda state");
Thread.Sleep(TimeSpan.FromSeconds(2));
}
}
}
線程池同步操作
using System;
using System.Threading;
namespace ConsoleApp1
{
class ThreadPoolDemo
{
static object lockobj = new object();
static int Count = 0;
ManualResetEvent manualEvent;
public ThreadPoolDemo(ManualResetEvent manualEvent)
{
this.manualEvent = manualEvent;
}
public void DisplayNumber(object a)
{
lock (lockobj)
{
Count++;
Console.WriteLine("目前運算結果:{0},Count={1},目前子線程id:{2} 的狀态:{3}", a, Count, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ThreadState);
}
//Console.WriteLine("目前運算結果:{0}", a);
//Console.WriteLine("目前運算結果:{0},目前子線程id:{1} 的狀态:{2}", a,Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ThreadState);
//這裡是方法執行時間的模拟,如果注釋該行代碼,就能看出線程池的功能了
Thread.Sleep(2000);
//Console.WriteLine("目前運算結果:{0},Count={1},目前子線程id:{2} 的狀态:{3}", a, Count, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ThreadState);
//這裡是釋放共享鎖,讓其他線程進入
manualEvent.Set();
}
}
}
using System;
using System.Diagnostics;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
//設定任務數量
static int count = 10;
static void Main(string[] args)
{
//讓線程池執行5個任務是以也為每個任務加上這個對象保持同步
ManualResetEvent[] events = new ManualResetEvent[count];
Console.WriteLine("目前主線程id:{0}", Thread.CurrentThread.ManagedThreadId);
Stopwatch sw = new Stopwatch();
sw.Start();
NoThreadPool(count);
sw.Stop();
Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds);
sw.Reset();
sw.Start();
//循環每個任務
for (int i = 0; i < count; i++)
{
//執行個體化同步工具
events[i] = new ManualResetEvent(false);
//Test在這裡就是任務類,将同步工具的引用傳入能保證共享區内每次隻有一個線程進入
ThreadPoolDemo tst = new ThreadPoolDemo(events[i]);
//Thread.Sleep(200);
//将任務放入線程池中,讓線程池中的線程執行該任務
ThreadPool.QueueUserWorkItem(tst.DisplayNumber, i);
}
//注意這裡,設定WaitAll是為了阻塞調用線程(主線程),讓其餘線程先執行完畢,
//其中每個任務完成後調用其set()方法(收到信号),當所有
//的任務都收到信号後,執行完畢,将控制權再次交回調用線程(這裡的主線程)
ManualResetEvent.WaitAll(events);
sw.Stop();
Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds);
//Console.WriteLine("所有任務做完!");
Console.ReadKey();
}
static void NoThreadPool(int count)
{
for (int i = 0; i < count; i++)
{
Thread.Sleep(2000);
Console.WriteLine("目前運算結果:{0},Count={1},目前子線程id:{2} 的狀态:{3}", i, i + 1, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.ThreadState);
}
}
}
}
線程池中的取消操作
using System;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
ThreadPool.SetMaxThreads(1000, 1000);
Console.WriteLine("Main thread run");
PrintMessage("Start");
Run();
Console.ReadKey();
}
private static void Run()
{
CancellationTokenSource cts = new CancellationTokenSource();
// 這裡用Lambda表達式的方式和使用委托的效果一樣的,隻是用了Lambda後可以少定義一個方法。
// 這在這裡就是讓大家明白怎麼lambda表達式如何由委托轉變的
////ThreadPool.QueueUserWorkItem(o => Count(cts.Token, 1000));
ThreadPool.QueueUserWorkItem(callback, cts.Token);
Console.WriteLine("Press Enter key to cancel the operation\n");
Console.ReadLine();
// 傳達取消請求
cts.Cancel();
Console.ReadLine();
}
private static void callback(object state)
{
Thread.Sleep(1000);
PrintMessage("Asynchoronous Method Start");
CancellationToken token = (CancellationToken)state;
Count(token, 1000);
}
// 執行的操作,當受到取消請求時停止數數
private static void Count(CancellationToken token, int countto)
{
for (int i = 0; i < countto; i++)
{
if (token.IsCancellationRequested)
{
Console.WriteLine("Count is canceled");
break;
}
Console.WriteLine(i);
Thread.Sleep(300);
}
Console.WriteLine("Cout has done");
}
// 列印線程池資訊
private static void PrintMessage(String data)
{
int workthreadnumber;
int iothreadnumber;
// 獲得線程池中可用的線程,把獲得的可用工作者線程數量賦給workthreadnumber變量
// 獲得的可用I/O線程數量給iothreadnumber變量
ThreadPool.GetAvailableThreads(out workthreadnumber, out iothreadnumber);
Console.WriteLine("{0}\n CurrentThreadId is {1}\n CurrentThread is background :{2}\n WorkerThreadNumber is:{3}\n IOThreadNumbers is: {4}\n",
data,
Thread.CurrentThread.ManagedThreadId,
Thread.CurrentThread.IsBackground.ToString(),
workthreadnumber.ToString(),
iothreadnumber.ToString());
}
}
}
Thread與ThreadPool的一個性能比較
using System;
using System.Diagnostics;
using System.Threading;
namespace ConsoleApp1
{
class Program
{
static void Main(string[] args)
{
const int numberOfOperations = 300;
var sw = new Stopwatch();
sw.Start();
UseThreads(numberOfOperations);
sw.Stop();
Console.WriteLine("Execution time using threads: {0}", sw.ElapsedMilliseconds);
sw.Reset();
sw.Start();
UseThreadPool(numberOfOperations);
sw.Stop();
Console.WriteLine("Execution time using threadPool: {0}", sw.ElapsedMilliseconds);
}
static void UseThreads(int numberOfOperations)
{
using (var countdown = new CountdownEvent(numberOfOperations))
{
Console.WriteLine("Scheduling work by creating threads");
for (int i = 0; i < numberOfOperations; i++)
{
var thread = new Thread(() => {
Console.Write("{0},", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(TimeSpan.FromSeconds(0.1));
countdown.Signal();
});
thread.Start();
}
countdown.Wait();
Console.WriteLine();
}
}
static void UseThreadPool(int numberOfOperations)
{
using (var countdown = new CountdownEvent(numberOfOperations))
{
Console.WriteLine("Starting work on a threadpool");
for (int i = 0; i < numberOfOperations; i++)
{
ThreadPool.QueueUserWorkItem(_ => {
Console.Write("{0},", Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(TimeSpan.FromSeconds(0.1));
countdown.Signal();
});
}
countdown.Wait();
Console.WriteLine();
}
}
}
}