整理者:鄭昀@ultrapower
日期:2005-04-13
從william kennedy那裡整理過來的,不同之處在于他自己定義了一個overlapped,而我們這裡直接使用
system.threading.nativeoverlapped:。
附一段我以前的win32下的iocp文檔,如果您了解iocp也可以直接跳過看後面的c#測試示範:
我們采用的是i/o complete port(以下簡稱iocp)處理機制。
簡單的講,當服務應用程式初始化時,它應該先建立一個i/o cp。我們在請求到來後,将得到的資料打包用postqueuedcompletionstatus發送到iocp中。這時需要建立一些個線程(7個線程/每cpu,再多就沒有意義了)來處理發送到iocp端口的消息。實作步驟大緻如下:
1 先在主線程中調用createiocompletionport建立iocp。
createiocompletionport的前三個參數隻在把裝置同complete port相關聯時才有用。
此時我們隻需傳遞invalid_handle_value,null和0即可。
第四個參數告訴端口同時能運作的最多線程數,這裡設定為0,表示預設為目前計算機的cpu數目。
2 我們的threadfun線程函數執行一些初始化之後,将進入一個循環,該循環會在服務程序終止時才結束。
在循環中,調用getqueuedcompletionstatus,這樣就把目前線程的id放入一個等待線程隊列中,i/o cp核心對象就總能知道哪個線程在等待處理完成的i/o請求。
如果在idle_thread_timeout規定的時間内i/o cp上還沒有出現一個completion packet,則轉入下一次循環。在這裡我們設定的idle_thread_timeout為1秒。
當端口的i/o完成隊列中出現一項時,完成端口就喚醒等待線程隊列中的這個線程,該線程将得到完成的i/o項中的資訊: 傳輸的位元組數、完成鍵和overlapped結構的位址。
在我們的程式中可以用智能指針或者bstr或者int來接受這個overlapped結構的位址的值,進而得到消息;然後在這個線程中處理消息。
getqueuedcompletionstatus的第一個參數hcompletionport指出了要監視哪一個端口,這裡我們傳送先前從createiocompletionport傳回的端口句柄。
需要注意的是:
第一, 線程池的數目是有限制的,和cpu數目有關系。
第二, iocp是一種較為完美的睡眠/喚醒 線程機制;線程目前沒有任務要處理時,就進入睡眠狀态,進而不占用cpu資源,直到被核心喚醒;
第三, 最近一次剛執行完的線程,下次任務來的時候還會喚醒它;是以有可能比較少被調用的線程以後被調用的幾率也少。
測試代碼:
using system;
using system.threading; // included for the thread.sleep call
using continuum.threading;
using system.runtime.interopservices;
namespace iocpdemo
{
//=============================================================================
/// <summary> sample class for the threading class </summary>
public class utilthreadingsample
{
//*****************************************************************************
/// <summary> test method </summary>
static void main()
{
// create the mssql iocp thread pool
iocpthreadpool pthreadpool = new iocpthreadpool(0, 10, 20, new iocpthreadpool.user_function(iocpthreadfunction));
//for(int i =1;i<10000;i++)
{
pthreadpool.postevent(1234);
}
thread.sleep(100);
pthreadpool.dispose();
}
//********************************************************************
/// <summary> function to be called by the iocp thread pool. called when
/// a command is posted for processing by the socketmanager </summary>
/// <param name="ivalue"> the value provided by the thread posting the event </param>
static public void iocpthreadfunction(int ivalue)
try
console.writeline("value: {0}", ivalue.tostring());
thread.sleep(3000);
catch (exception pexception)
console.writeline(pexception.message);
}
}
類代碼:
using system.threading;
namespace iocpthreading
[structlayout(layoutkind.sequential, charset=charset.auto)]
public sealed class iocpthreadpool
[dllimport("kernel32", charset=charset.auto)]
private unsafe static extern uint32 createiocompletionport(uint32 hfile, uint32 hexistingcompletionport, uint32* puicompletionkey, uint32 uinumberofconcurrentthreads);
private unsafe static extern boolean closehandle(uint32 hobject);
private unsafe static extern boolean postqueuedcompletionstatus(uint32 hcompletionport, uint32 uisizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped* poverlapped);
private unsafe static extern boolean getqueuedcompletionstatus(uint32 hcompletionport, uint32* psizeofargument, uint32* puiuserarg, system.threading.nativeoverlapped** ppoverlapped, uint32 uimilliseconds);
private const uint32 invalid_handle_value = 0xffffffff;
private const uint32 inifinite = 0xffffffff;
private const int32 shutdown_iocpthread = 0x7fffffff;
public delegate void user_function(int ivalue);
private uint32 m_hhandle;
private uint32 gethandle { get { return m_hhandle; } set { m_hhandle = value; } }
private int32 m_uimaxconcurrency;
private int32 getmaxconcurrency { get { return m_uimaxconcurrency; } set { m_uimaxconcurrency = value; } }
private int32 m_iminthreadsinpool;
private int32 getminthreadsinpool { get { return m_iminthreadsinpool; } set { m_iminthreadsinpool = value; } }
private int32 m_imaxthreadsinpool;
private int32 getmaxthreadsinpool { get { return m_imaxthreadsinpool; } set { m_imaxthreadsinpool = value; } }
private object m_pcriticalsection;
private object getcriticalsection { get { return m_pcriticalsection; } set { m_pcriticalsection = value; } }
private user_function m_pfnuserfunction;
private user_function getuserfunction { get { return m_pfnuserfunction; } set { m_pfnuserfunction = value; } }
private boolean m_bdisposeflag;
/// <summary> simtype: flag to indicate if the class is disposing </summary>
private boolean isdisposed { get { return m_bdisposeflag; } set { m_bdisposeflag = value; } }
private int32 m_icurthreadsinpool;
/// <summary> simtype: the current number of threads in the thread pool </summary>
public int32 getcurthreadsinpool { get { return m_icurthreadsinpool; } set { m_icurthreadsinpool = value; } }
/// <summary> simtype: increment current number of threads in the thread pool </summary>
private int32 inccurthreadsinpool() { return interlocked.increment(ref m_icurthreadsinpool); }
/// <summary> simtype: decrement current number of threads in the thread pool </summary>
private int32 deccurthreadsinpool() { return interlocked.decrement(ref m_icurthreadsinpool); }
private int32 m_iactthreadsinpool;
/// <summary> simtype: the current number of active threads in the thread pool </summary>
public int32 getactthreadsinpool { get { return m_iactthreadsinpool; } set { m_iactthreadsinpool = value; } }
/// <summary> simtype: increment current number of active threads in the thread pool </summary>
private int32 incactthreadsinpool() { return interlocked.increment(ref m_iactthreadsinpool); }
/// <summary> simtype: decrement current number of active threads in the thread pool </summary>
private int32 decactthreadsinpool() { return interlocked.decrement(ref m_iactthreadsinpool); }
private int32 m_icurworkinpool;
/// <summary> simtype: the current number of work posted in the thread pool </summary>
public int32 getcurworkinpool { get { return m_icurworkinpool; } set { m_icurworkinpool = value; } }
/// <summary> simtype: increment current number of work posted in the thread pool </summary>
private int32 inccurworkinpool() { return interlocked.increment(ref m_icurworkinpool); }
/// <summary> simtype: decrement current number of work posted in the thread pool </summary>
private int32 deccurworkinpool() { return interlocked.decrement(ref m_icurworkinpool); }
public iocpthreadpool(int32 imaxconcurrency, int32 iminthreadsinpool, int32 imaxthreadsinpool, user_function pfnuserfunction)
// set initial class state
getmaxconcurrency = imaxconcurrency;
getminthreadsinpool = iminthreadsinpool;
getmaxthreadsinpool = imaxthreadsinpool;
getuserfunction = pfnuserfunction;
// init the thread counters
getcurthreadsinpool = 0;
getactthreadsinpool = 0;
getcurworkinpool = 0;
// initialize the monitor object
getcriticalsection = new object();
// set the disposing flag to false
isdisposed = false;
unsafe
{
// create an io completion port for thread pool use
gethandle = createiocompletionport(invalid_handle_value, 0, null, (uint32) getmaxconcurrency);
}
// test to make sure the io completion port was created
if (gethandle == 0)
throw new exception("unable to create io completion port");
// allocate and start the minimum number of threads specified
int32 istartingcount = getcurthreadsinpool;
threadstart tsthread = new threadstart(iocpfunction);
for (int32 ithread = 0; ithread < getminthreadsinpool; ++ithread)
// create a thread and start it
thread ththread = new thread(tsthread);
ththread.name = "iocp " + ththread.gethashcode();
ththread.start();
// increment the thread pool count
inccurthreadsinpool();
catch
throw new exception("unhandled exception");
~iocpthreadpool()
if (!isdisposed)
dispose();
public void dispose()
// flag that we are disposing this object
isdisposed = true;
// get the current number of threads in the pool
int32 icurthreadsinpool = getcurthreadsinpool;
// shutdown all thread in the pool
for (int32 ithread = 0; ithread < icurthreadsinpool; ++ithread)
unsafe
{
bool bret = postqueuedcompletionstatus(gethandle, 4, (uint32*) shutdown_iocpthread, null);
}
// wait here until all the threads are gone
while (getcurthreadsinpool != 0) thread.sleep(100);
// close the iocp handle
closehandle(gethandle);
private void iocpfunction()
uint32 uinumberofbytes;
int32 ivalue;
while (true)
system.threading.nativeoverlapped* pov;
// wait for an event
getqueuedcompletionstatus(gethandle, &uinumberofbytes, (uint32*) &ivalue, &pov, inifinite);
// decrement the number of events in queue
deccurworkinpool();
// was this thread told to shutdown
if (ivalue == shutdown_iocpthread)
break;
// increment the number of active threads
incactthreadsinpool();
try
// call the user function
getuserfunction(ivalue);
catch(exception ex)
throw ex;
// get a lock
monitor.enter(getcriticalsection);
// if we have less than max threads currently in the pool
if (getcurthreadsinpool < getmaxthreadsinpool)
{
// should we add a new thread to the pool
if (getactthreadsinpool == getcurthreadsinpool)
{
if (isdisposed == false)
{
// create a thread and start it
threadstart tsthread = new threadstart(iocpfunction);
thread ththread = new thread(tsthread);
ththread.name = "iocp " + ththread.gethashcode();
ththread.start();
// increment the thread pool count
inccurthreadsinpool();
}
}
}
catch
// relase the lock
monitor.exit(getcriticalsection);
decactthreadsinpool();
catch(exception ex)
string str=ex.message;
// decrement the thread pool count
deccurthreadsinpool();
//public void postevent(int32 ivalue
public void postevent(int ivalue)
// only add work if we are not disposing
if (isdisposed == false)
// post an event into the iocp thread pool
postqueuedcompletionstatus(gethandle, 4, (uint32*) ivalue, null);
// increment the number of item of work
inccurworkinpool();
// release the lock
catch (exception e)
throw e;
}
public void postevent()
postqueuedcompletionstatus(gethandle, 0, null, null);