天天看點

多程序程式的架構模闆1      功能說明2      Job和任務,分發和彙總3      流程和接口4      核心設計(系統實作的功能)5      使用說明6      性能調節7      示例

so sorry 的,這個文檔的主要内容都是繪圖的,貼上來圖都沒了但是圖裡面的文字卻還在,看起來莫名其妙的。

多程序伺服器架構

張博(Zb++) 2012

多程序伺服器架構... 1

張博(Zb++) 2012. 1

1       功能說明... 2

2       Job和任務,分發和彙總... 3

3       流程和接口... 4

4       核心設計(系統實作的功能)... 4

4.1             JOB資料... 4

4.2             任務資料狀态... 5

4.3             子程序狀态... 5

4.4             子程序控制指令... 6

4.5             子程序休眠/喚醒機制... 6

4.6             信号丢失... 7

4.7             子程序循環... 7

4.8             分發(主要)程序循環... 8

4.9             分發(主要)程序處理一個job. 9

4.10          高低水(自動程序數控制)... 10

5       使用說明... 10

5.1             編寫使用者資料結構... 10

5.1.1         共享記憶體資料結構限制... 10

5.1.2         共享記憶體免用鎖指導... 11

5.1.3         Job資訊(模闆參數T_JOBINFO)... 12

5.1.4         任務資訊(模闆參數T_TASKINFO)... 12

5.1.5         使用者服務資料結構(模闆參數T_SERVERINFO)... 12

5.2             實作三個接口... 12

5.2.1         接口規範... 12

5.2.2         IDistributeTask接口... 12

5.2.3         IProcessTask接口... 13

5.2.4         IRollup接口... 13

5.3             使用CMultiProcessServer. 13

5.3.1         定義... 13

5.3.2         構造函數... 14

5.3.3         獲得使用者服務資料區... 14

5.3.4         運作... 14

5.3.5         控制... 14

5.3.5.1     控制說明... 14

5.3.5.2     檢視資料... 15

5.3.5.3     暫停... 15

5.3.5.4     繼續... 15

5.3.5.5     停止... 15

5.3.5.6     運作模式... 15

5.3.5.7     子程序coredump是否退出... 15

5.3.5.8     自動睡眠控制... 15

5.3.5.9     修改允許的最大處理程序數... 16

6       性能調節... 16

6.1             與性能有關的因素... 16

6.2             最佳情形... 17

6.3             性能調節的原則... 18

6.4             性能調節參數和因素... 18

7       示例... 18

7.1             三個使用者資料結構... 18

7.2             三個接口... 18

7.3             運作和控制代碼... 20

1      功能說明

本系統将多程序并發處理程式的架構模闆化,客戶代碼僅需要定義業務處理代碼即可實作可靠的并發處理。

任務分發程序
任務彙總程序
處理
處理
處理
處理
處理
處理
處理
共享記憶體,包含N個任務隊列,每個隊列由一個處理程序處理,分發程序和彙總程序處理所有隊列

一般業務的并發處理通常為如下模式:

逐行讀取檔案
處理
輸出結果
處理
處理
接收請求
處理
處理
處理
檔案處理
網絡服務

2      Job和任務,分發和彙總

本系統中使用Job來代表一項工作,例如對一個檔案的處理,使用任務來代表可以并發的一個處理,例如對每一條記錄的操作。

分發代表并發處理之前的操作,彙總代表并發處理之後的操作,例如分開可以是打開檔案讀取每一行,而彙總就是輸出檔案。

3      流程和接口

初始化共享記憶體,建立彙總程序
彙總程序被建立
Run

分發程序

IDistributeTask

彙總程序

IRollupTask

OpenJob
DistributeTask
isJobEnd
CloseJob_Distribute
完成/需彙總

處理程序,每個隊列對應一個,不同隊列的程序之間沒有交叉

IProcessTask

建立/需處理
空閑/已彙總
T_TASKINFO

第一個

OpenJob_Rollup

查找位置建立處理程序

Process(多個)

由任務隊列對應的程序循環處理

Rollup

最後一個完成

CloseJob_Rollup

4      核心設計(系統實作的功能)

4.1    JOB資料

在用/空閑
是否分發完成
分發數
彙總數
使用者資料,傳遞給接口的每個函數

4.2    任務資料狀态

空閑
建立
完成
分發程序
處理程序
彙總程序

4.3    子程序狀态

未用
建立
運作
暫停
異常
退出
分發程序建立處理和彙總程序
分發程序或監控程式發現異常并設定
子程序标記自己為運作
Sleep睡眠,SIGCONT喚醒,逾時蘇醒則再次睡眠
子程序标記自己為退出

4.4    子程序控制指令

運作
暫停
退出
暫停指令
無指令/清除指令
退出指令

指令設計為狀态性質,即指令可以一直保持,子程序根據指令适當處理。當指令處于暫停狀态時子程序仍然會因為sleep時間到而喚醒,但會根據暫停指令再次進入sleep,目前設計喚醒後仍然會檢查是否有任務需要處理,邏輯上暫停中不會分發任務,是以不會有任務需要處理。

4.5    子程序休眠/喚醒機制

發送信号時間
子程序sleep
子程序sleep結束,清除信号發送時間
主程序發送CONT信号,設定發送信号時間
信号發送時間不是同一秒
中斷sleep
通過該标志避免快速發送大量信号給程序

4.6    信号丢失

不用鎖的互斥機制是不可靠的,可能存在這種情形:檢測到子程序處于暫停中,子程序蘇醒,主程序發送信号,信号丢失,子程序再次進入睡眠,主程序認為信号已經發出而不再發送信号。

本系統采用比較小的睡眠時間來減少信号丢失造成的延遲。

4.7    子程序循環

指令處理首先确認目前指令序列号與循環前一緻,這樣可以保證發出指令前的所有任務都已經被處理
獲得指令
處理/彙總
指令處理->
指令序列号與循環前一緻?
退出指令:exit(0)
暫停指令:sleep(N)
空指令:sleep(1)

4.8    分發(主要)程序循環

是否已經處于暫停中
檢查子程序是否異常
無指令/正常運作
處理一個job
停止所有子程序
退出
暫停指令
初次處理設定子程序指令
Sleep(1)
退出指令
停止所有子程序
退出
出錯
任何其它指令清除此标志

主要程序正常處理時job完成後不設休眠,如果需要降低無job時循環檢查的CPU占用率,應該在接口函數OpenJob中适當處理(例如使用sleep或帶逾時的select)。

4.9    分發(主要)程序處理一個job

OpenJob
清除子程序指令
喚醒子程序
isJobEnd
查找一個空閑task位置
查找、等待、處理程序異常、高低水控制
DistributeTask
Task狀态置為NEW
建立或喚醒子程序和彙總程序
CloseJob

4.10       高低水(自動程序數控制)

程序0  有任務,運作

程序1  有任務,運作

程序2  。。。

程序3

程序4

程序5

程序6

程序7  無任務,睡眠

程序8  無任務,睡眠

程序9  無任務,睡眠

得不到任務的程序自動進入睡眠-蘇醒-睡眠循環,隻占用很少的CPU

5      使用說明

5.1    編寫使用者資料結構

5.1.1  共享記憶體資料結構限制

存放于共享記憶體的資料不能包含任何指向私有記憶體的指針,任何指針類型都不能使用,以及任何包含動态資料配置設定的類,比如STL類string。除非專門為共享記憶體設計,否則隻能用基本資料類型(int long bool char [])。Zbstd::sstring(2個s開頭)是專門為共享記憶體設計的string替代類。

本系統使用的任何結構必須包含如下函數,用于把資料輸出為字元串:

string& toString(string & ret)const

傳回值應為參數str的引用(這樣設計的目的是為了避免動态對象)。

例如:

string& toString(string & ret)const

{

         Ret=””;

         Ret+=…

         Ret+=…

         Return ret;

}

為了避免位址對齊問題,建議保證每個結構至少包含一個long。為了保險起見,可以在結構最開始定義一個無意義的long。

例如:

Struct A

{

         Long ____;

         Int a;

         …

}

5.1.2  共享記憶體免用鎖指導

鑒于共享記憶體是多程序共享的,使用鎖有時候是不可避免的。但是鎖會帶來很大的系統資源消耗,并且濫用鎖會失去多程序的意義,是以應盡可能避免需要使用互斥鎖的情形。為了調式和監控的需要,應保證任何時候顯示共享記憶體資料都是安全的。是以應該避免設計動态化的“靈活”的資料結構,盡量使用簡單數組,通過索引位置通路,不排序(否則很難避免讀寫鎖的使用)。

在不使用鎖的情況下對共享記憶體做連續判斷可能會發生沖突,例如下面的情形:

If(1==state ||2==state)

該代碼是不可靠的,原因是如果同時另外一程序正在把state從2改為1則可能判斷結果出錯:

程序1 讀到state為2,不符合1==state

                    程序2修改state為1

程序1讀到state為1,不符合2==state

于是傳回false,但實際上應該傳回true

避免此問題應提前儲存要判斷的資料:

Long tmp=state;

If(1==tmp || 2==tmp)

很顯然,隻能對一個簡單資料類型這樣做,對多個資料無論如何不能避免上面的情形。在某些巧妙設計下,可以通過檢查判斷前後資料是否發生改變來确定剛才的判斷是否有效。

5.1.3  Job資訊(模闆參數T_JOBINFO)

每個job的資料結構,作為分發、處理和彙總的參數。

該結構存放于共享記憶體,需要遵循共享記憶體資料結構限制。

5.1.4  任務資訊(模闆參數T_TASKINFO)

每個任務的資料結構,放在任務隊列中,作為分發、處理和彙總的參數。

該結構存放于共享記憶體,需要遵循共享記憶體資料結構限制。

5.1.5  使用者服務資料結構(模闆參數T_SERVERINFO)

使用者定義的資料,完全由使用者使用,可以用于存放各程序公共資料。該結構通過CMultiProcessServer的getServerInfo獲得。

該結構存放于共享記憶體,需要遵循共享記憶體資料結構限制。

5.2    實作三個接口

5.2.1  接口規範

接口傳回值說明:一般均定義為 bool fun(...,ret)的形式,函數傳回值表達是否執行出錯(系統錯誤,不再繼續運作),ret表達處理結果。

5.2.2  IDistributeTask接口

該接口用于分發程序。

         template<typename T_JOBINFO,typenameT_TASKINFO>

         classIDistributeTask

         {

         public:

                   //打開job,若成功打開ret=true,否則沒有job可做

                   virtualbool OpenJob(T_JOBINFO * pJobInfo,bool * ret)=0;

                   //分發一個task,放入pTaskInfo,如果發生資料性錯誤ret=false,仍繼續處理

                   virtualbool DistributeTask(T_JOBINFO *pJobInfo,T_TASKINFO * pTaskInfo,bool * ret)=0;

                   //是否job結束

                   virtualbool isJobEnd(T_JOBINFO * pJobInfo,bool * ret)=0;

                   //結束job

                   virtualbool CloseJob_Distribute(T_JOBINFO *pJobInfo)=0;

         };

5.2.3  IProcessTask接口

該接口用于任務處理程序。

         template<typename T_JOBINFO,typenameT_TASKINFO>

         classIProcessTask

         {

         public:

                   virtualbool Process(T_JOBINFO * pJobInfo,T_TASKINFO *pTaskInfo,bool * ret)=0;

         };

5.2.4  IRollup接口

該接口用于彙總程序。

         template<typename T_JOBINFO,typenameT_TASKINFO>

         classIRollupTask

         {

         public:

                   //打開job

                   virtualbool OpenJob_Rollup(T_JOBINFO * pJobInfo)=0;

                   virtualbool Rollup(T_JOBINFO * pJobInfo,T_TASKINFO *pTaskInfo,bool * ret)=0;

                   //結束job

                   virtualbool CloseJob_Rollup(T_JOBINFO * pJobInfo)=0;

         };

5.3    使用CMultiProcessServer

5.3.1  定義

         template<typename T_JOBINFO,typenameT_SERVERINFO,typename T_TASKINFO,long MAX_JOB,long MAX_PROCESS,long MAX_PROCESS_TASK>

         classCMultiProcessServer

第一第二第三個參數是JOB資訊資料類型、使用者服務資料類型和任務資訊資料類型

MAX_JOB是最大同時打開的job數,因為分發和處理是不同步的。

MAX_PROCESS定義最大處理程序數,也就是任務隊列數。

MAX_PROCESS_TASK定義每個隊列的任務個數。

5.3.2  構造函數

                   CMultiProcessServer(IDistributeTask<T_TASKINFO> * d,IProcessTask<T_TASKINFO > * p,IRollupTask<T_TASKINFO > *r,char * shmbuf)}

前三個參數為三個接口,第四個參數為共享記憶體存儲區指針,該指針有使用者提供,必須指向共享記憶體,并且大小不得小于“static long CMultiProcessServer::calcBufSize()”,該靜态成員根據模闆參數計算所需的共享記憶體存儲區大小(該存儲區包含架構使用的資料、任務隊列資料、使用者服務資料)。這樣設計的目的是使用者項目可能提供了專門的共享記憶體管理。

5.3.3  獲得使用者服務資料區

調用getServerInfo()既可以獲得使用者服務資料區指針。應該在運作服務之前調用,一旦進入運作,控制便不會再回到使用者代碼,除非服務結束。

5.3.4  運作

void clear()

int run()

調用clear()初始化系統,清空共享記憶體,預設參數。

設定初始參數,設定方式見《控制》章節。

調用run()啟動服務。根據運作參數設定在沒有Job可做或發生系統錯誤或收到退出指令時傳回。

5.3.5  控制

5.3.5.1 控制說明

控制必須由獨立的程序進行,控制程序具有和執行程序(指運作服務的程序,即調用run的程序)相同的初始化方式,但不調用clear和run。

暫停或停止隻在Job結束後處理并且等待所有任務彙總完成。如果需要随時暫停,可以設計為每個Job一個任務。

5.3.5.2 檢視資料

string &toString(string & ret)const

具體資料的解讀參考性能調節章節。

5.3.5.3 暫停

void SetCommondPause()

5.3.5.4 繼續

void SetCommondContiue()

5.3.5.5 停止

void SetCommondExit()

5.3.5.6 運作模式

void SetExitIfNoJob(boolexitIfNoJob)

是否在沒有job時退出。

5.3.5.7 子程序coredump是否退出

void SetExitIfProcessCoredump(bool exitIfProcessCoredump)

設定為true則在處理程序coredump時全部退出,否則自動重起。

彙總程序coredump總是全部退出。

分發程序courdump總是全部退出。

5.3.5.8自動睡眠控制

void SetNotAutoSleep(bool)

處理程序和彙總程序無任務可做以及分發失敗時程序是否通過睡眠來放棄CPU時間,具體效果參考性能調節章節。

5.3.5.9 修改允許的最大處理程序數

void SetMaxProcess(longmax_process)

1-MAX_PROCESS,超出此範圍不修改或設定為MAX_PROCESS(如果目前未設定)。

6      性能調節

6.1    與性能有關的因素

多程序服務的性能是一個多種因素互相作用的結果。

程序數增加,吞吐量上升
磁盤尋道時間增加
CPU程序排程和記憶體交換上升
程序數增加導緻CPU程序排程和和記憶體交換上升
CPU排程繁忙反過來抑制吞吐量
磁盤繁忙抑制吞吐量
不停循環增加CPU占用率
CPU無效繁忙導緻實際使用率下降
睡眠減少CPU占用率
睡眠放棄時間片導緻吞吐量降低

6.2    最佳情形

最大吞吐量情形下分發程序、處理程序、彙總程序全部滿負荷運轉,無等待時間(沒有無效循環或自動睡眠)。從技術名額上看就是分發程序分發失敗為0、處理程序和彙總程序空循環為0(空循環為0自然睡眠也為0),處理程序至少有一個CPU占用率100%,分發程序或彙總程序CPU占用率100%。

僅僅分發失敗和空循環為0但CPU占用率低并不能達到最大吞吐量,CPU100%但存在分發失敗或空循環也并非最大吞吐量。

6.3    性能調節的原則

鑒于性能是多個因素互相抑制的結果,單純考慮單一因素不能達到最佳狀态,平衡狀态也并非一定是最佳狀态,而且存在多個平衡狀态的可能,是以需要嘗試各種參數組合來找到最佳狀态。

6.4    性能調節參數和因素

MAX_JOB 同時打開的job數 考慮業務需要,足夠即可
MAX_PROCESS 最大處理程序數 足夠大,保持最後一個處理程序非滿負荷,即既有明顯偏少的處理任務數,如果禁用自動睡眠多餘的程序會占用過多CPU
MAX_PROCESS_TASK 每程序最大任務數 至少為3,因為任務有三種狀态,加大此參數可以減少分發失敗
分發失敗時的狀态 檢視toString的資料 任務NEW比DONE多說明處理程序處理能力不足,可以增加MAX_PROCESS參數。若DONE多說明彙總程序處理能不足,考慮将更多的業務放在處理程序而不是彙總程序
自動睡眠 分發失敗或空循環放棄CPU 禁用自動睡眠可以提高吞吐量,但是最佳狀态是平衡态,無需自動睡眠。不追求極限吞吐量時應使用自動睡眠避免過度占用CPU

7      示例

7.1    三個使用者資料結構

均使用樣闆資料結構CDemoData,該結構含有一個成員long n。服務資料結構在示例代碼中并未使用。

7.2    三個接口

示例代碼直接使用了三個接口的測試功能,每個job含有8個task,task生成時放入一個序列整數(*100),處理時+1,彙總時+10,彙總後的資料後兩位應該是“11”。

接口處理的參數pJobInfo和pTaskInfo均為CDemoData *。

         //分發接口

         template<typename T_JOBINFO,typenameT_TASKINFO>

         classIDistributeTask

         {

         public:

                   //打開job,若成功打開ret=true,否則沒有job可做

                   virtualbool OpenJob(T_JOBINFO * pJobInfo,bool * ret)

                   {

                            static long n=0;

                            pJobInfo->n=(++n)*1000;

                            *ret=true;

                            return true;

                   }

                   //分發一個task,放入pTaskInfo,如果發生資料性錯誤ret=false,仍繼續處理

                   virtualbool DistributeTask(T_JOBINFO *pJobInfo,T_TASKINFO * pTaskInfo,bool * ret)

                   {

                            static long n=0;

                            pTaskInfo->n=(++n)*100;

                            *ret=true;

                            sleep(1);

                            return true;

                   }

                   //是否job結束

                   virtualbool isJobEnd(T_JOBINFO * pJobInfo,bool * ret)

                   {

                            static long n=0;

                            if(n>8)

                            {

                                     *ret=true;

                                     n=0;

                            }

                            else

                            {

                                     *ret=false;

                                     ++n;

                            }

                            return true;

                   }

                   //結束job

                   virtualbool CloseJob_Distribute(T_JOBINFO * pJobInfo)

                   {

                            pJobInfo->n+=1;

                            return true;

                   }

         };

         //處理接口

         template<typename T_JOBINFO,typenameT_TASKINFO>

         classIProcessTask

         {

         public:

                   virtualbool Process(T_JOBINFO * pJobInfo,T_TASKINFO *pTaskInfo,bool * ret)

                   {

                            for(longi=0;i<10000;++i)

                            {

                            }

                            pTaskInfo->n+=1;

                            *ret=true;

                            return true;

                   }

         };

         //彙總接口

         template<typename T_JOBINFO,typenameT_TASKINFO>

         classIRollupTask

         {

         public:

                   //打開job

                   virtualbool OpenJob_Rollup(T_JOBINFO * pJobInfo)

                   {

                            pJobInfo->n+=10;

                            return true;

                   }

                   virtualbool Rollup(T_JOBINFO * pJobInfo,T_TASKINFO *pTaskInfo,bool * ret)

                   {

                            pTaskInfo->n+=10;

                            *ret=true;

                            return true;

                   }

                   //結束job

                   virtualbool CloseJob_Rollup(T_JOBINFO * pJobInfo)

                   {

                            pJobInfo->n+=100;

                            return true;

                   }

         };

7.3    運作和控制代碼

本代碼直接建立了共享記憶體,使用了固定的key,在實際使用中應該避免使用固定的key。運作服務的函數每次都嘗試建立共享記憶體,在正式代碼中這樣做是不合适的。應該将共享記憶體管理作為獨立的功能來使用。

兩個函數一個運作一個控制。

注意關于定義和構造部分必須保持一緻,正式代碼應該将定義和構造編寫入一個類中以免無意中造成差異。

typedefzbstd::CMultiProcessServer<CDemoData,CDemoData,CDemoData,10L,5L,3L >MPS_T;

key_t key=0x10000;

zbstd::IDistributeTask<CDemoData,CDemoData> distribute;

zbstd::IProcessTask<CDemoData,CDemoData> process;

zbstd::IRollupTask<CDemoData,CDemoData> rollup;

int test_CMultiProcessServer(int argc,char ** argv)

{

         thelog<<"多程序伺服器測試"<<endi;

         intshmid=zbstd::CShmMan::CreateSHMByKey(key,MPS_T::calcBufSise());

         //if(shmid<=0)return__LINE__;

         char *pshm=zbstd::CShmMan::ConnectByKey(key,false);

         if(NULL==pshm)

         {

                   zbstd::CShmMan::Destory(shmid);

                   return__LINE__;

         }

         string str;

         MPS_T mps(&demoMPS,&demoMPS,&demoMPS,pshm);

         mps.clear();

         mps.SetMaxProcess(5);

         mps.SetExitIfNoJob(true);

         mps.SetNotAutoSleep(false);

         mps.SetExitIfProcessCoredump(false);

         thelog<<endl<<mps.toString(str)<<endi;

         mps.run();

         thelog<<endl<<mps.toString(str)<<endi;

         zbstd::CShmMan::Disconnect(pshm);

         //zbstd::CShmMan::Destory(shmid);

         return0;

}

int test_CMultiProcessServer_view(int argc,char **argv)

{

         thelog<<"多程序伺服器檢視"<<endi;

         char *pshm=zbstd::CShmMan::ConnectByKey(key,false);

         string str;

         MPS_T mps(&distribute,&process,&rollup,pshm);

         while(true)

         {

                   string cmd=UIInput("p=pause s=stop c=continue a=auto_sleepna=not_auto_sleep sp=set_max_process other=show","");

                   thelog<<endl<<mps.toString(str)<<endi;

                   if("p"==cmd)mps.SetCommondPause();

                   elseif("s"==cmd)mps.SetCommondExit();

                   elseif("c"==cmd)mps.SetCommondContiue();

                   elseif("a"==cmd)mps.SetNotAutoSleep(false);

                   elseif("na"==cmd)mps.SetNotAutoSleep(true);

                   elseif("sp"==cmd)mps.SetMaxProcess(atol(UIInput("intput 1-MAX:","").c_str()));

                   else;

         }

         zbstd::CShmMan::Disconnect(pshm);

         return0;

}

繼續閱讀