天天看點

ACE的接受器(Acceptor)和連接配接器(Connector):連接配接建立模式

/

連接配接器模式設計用于降低連接配接建立與連接配接建立後所執行的服務之間的耦合。例如,在

WWW

浏覽器中,所執行的服務或“實際工作”是解析和顯示客戶浏覽器接收到的

HTML

頁面。連接配接建立是次要的,可能通過

BSD socket

或其他一些等價的

IPC

機制來完成。使用這些模式允許程式員專注于“實際工作”,而最少限度地去關心怎樣在伺服器和客戶之間建立連接配接。而另外一方面,程式員也可以獨立于他所編寫的、或将要編寫的服務例程,去調諧連接配接建立的政策。

  因為該模式降低了服務和連接配接建立方法之間的耦合,非常容易改動其中一個,而不影響另外一個,進而也就可以複用以前編寫的連接配接建立機制和服務例程的代碼。在同樣的例子中,使用這些模式的浏覽器程式員一開始可以構造他的系統、使用特定的連接配接建立機制來運作它和測試它;然後,如果先前的連接配接機制被證明不适合他所構造的系統,他可以決定說他希望将底層連接配接機制改變為多線程的(或許使用線程池政策)。因為此模式提供了嚴格的去耦合,隻需要極少的努力就可以實作這樣的變動。

IPC_SAP的章節(特别是接受器和連接配接器部分)。此外,你還可能需要參考線程和線程管理部分。

 

7.1 接受器模式

    接受器通常被用在你本來會使用 BSD accept() 系統調用的地方。接受器模式也适用于同樣的環境,但就如我們将看到的,它提供了更多的功能。在 ACE 中,接收器模式借助名為 ACE_Acceptor 的“工廠”( Factory )實作。工廠(通常)是用于對助手對象的執行個體化過程進行抽象的類。在面向對象設計中,複雜的類常常會将特定功能委托給助手類。複雜的類對助手的建立和委托必須很靈活。這種靈活性是在工廠的幫助下獲得的。工廠允許一個對象通過改變它所委托的對象來改變它的底層政策,而工廠提供給應用的接口卻是一樣的,這樣,可能根本就無需對客戶代碼進行改動(有關工廠的更多資訊,請閱讀“設計模式”中的參考文獻)。

 

 

圖7-1 工廠模式示意圖

       ACE_Acceptor 工廠允許應用開發者改變“助手”對象,以用于:

  • 被動連接配接建立
  • 連接配接建立後的處理

   同樣地, ACE_Connector 工廠允許應用開發者改變“助手”對象,以用于:

 

  • 主動連接配接建立
  • 連接配接建立後的處理

    下面的讨論同時适用于接受器和連接配接器,是以作者将隻讨論接受器,而連接配接器同樣具有相應的參數。

         ACE_Acceptor 被實作為模闆容器,通過兩個類作為實參來進行執行個體化。第一個參數實作特定的服務(類型為 ACE_Event_Handler。因為它被用于處理 I/O 事件,是以必須來自事件處理類層次),應用在建立連接配接後執行該服務;第二個參數是“具體的”接受器(可以是在 IPC_SAP 一章中讨論的各種變種)。

ACE_Acceptor工廠和底層所用的具體接受器是非常不同的。具體接受器可完全獨立于ACE_Acceptor工廠使用,而無需涉及我們在這裡讨論的接受器模式(獨立使用接受器已在IPC_SAP一章中讨論和示範)。ACE_Acceptor工廠内在于接受器模式,并且不能在沒有底層具體接受器的情況下使用。ACE_Acceptor使用底層的具體接受器來建立連接配接。如我們已看到的,有若幹ACE的類可被用作ACE_Acceptor工廠模闆的第二個參數(也就是,具體接受器類)。但是服務處理類必須由應用開發者來實作,而且其類型必須是ACE_Event_Handler。ACE_Acceptor工廠可以這樣來執行個體化:

typedef ACE_Acceptor<My_Service_Handler,ACE_SOCK_ACCEPTOR> MyAcceptor;

    這裡,名為 My_Service_Handler 的事件處理器和具體接受器 ACE_SOCK_ACCEPTOR 被傳給 MyAcceptor 。 ACE_SOCK_ACCEPTOR 是基于 BSD socket 流家族的 TCP 接受器(各種可傳給接受器工廠的不同類型的接受器,見表 7-1 和 IPC 一章)。請再次注意,在使用接受器模式時,我們總是處理兩個接受器:名為 ACE_Acceptor 的工廠接受器,和 ACE 中的某種具體接受器,比如 ACE_SOCK_ACCEPTOR (你可以建立自定義的具體接受器來取代 ACE_SOCK_ACCEPTOR ,但你将很可能無需改變 ACE_Acceptor 工廠類中的任何東西)。

重要提示

:ACE_SOCK_ACCEPTOR實際上是一個宏,其定義為:

#define ACE_SOCK_ACCEPTOR ACE_SOCK_Acceptor, ACE_INET_Addr

我們認為這個宏的使用是必要的,因為在類中的typedefs在某些平台上無法工作。如果不是這樣的話,ACE_Acceptor就可以這樣來執行個體化:

typedef ACE_Acceptor<My_Service_Handler,ACE_SOCK_Acceptor>MyAcceptor;

在表7-1中對宏進行了說明。

7.1.1 元件

    如上面的讨論所說明的,在接受器模式中有三個主要的參與類:

 

    具體接受器

​:它含有建立連接配接的特定政策,連接配接與底層的傳輸協定機制系在一起。下面是在ACE中的各種具體接受器的例子:ACE_SOCK_ACCEPTOR(使用TCP來建立連接配接)、ACE_LSOCK_ACCEPTOR(使用UNIX域socket來建立連接配接),等等。​

  • 具體服務處理器

​:由應用開發者編寫,它的open()方法在連接配接建立後被自動回調。接受器構架假定服務處理類的類型是ACE_Event_Handler,這是ACE定義的接口類(該類已在反應堆一章中詳細讨論過)。另一個特别為接受器和連接配接器模式的服務處理而建立的類是ACE_Svc_Handler。該類不僅基于ACE_Event_Handler接口(這是使用反應堆所必需的),同時還基于在ASX流構架中使用的ACE_Task類。ACE_Task類提供的功能有:建立分離的線程、使用消息隊列來存儲到來的資料消息、并發地處理它們,以及其他一些有用的功能。如果與接受器模式一起使用的具體服務處理器派生自ACE_Svc_Handler、而不是ACE_Event_Handler,它就可以獲得這些額外的功能。對ACE_Svc_Handler中的額外功能的使用,在這一章的進階課程裡詳細讨論。在下面的讨論中,我們将使用ACE_Svc_Handler作為我們的事件處理器。在簡單的ACE_Event_Handler和ACE_Svc_Handler類之間的重要差別是,後者擁有一個底層通信流元件。這個流在ACE_Svc_Handler模闆被執行個體化的時候設定。而在使用ACE_Event_Handler的情況下,我們必須自己增加I/O通信端點(也就是,流對象),作為事件處理器的私有資料成員。因而,在這樣的情況下,應用開發者應該将他的服務處理器建立為ACE_Svc_Handler類的子類,并首先實作将被構架自動回調的open()方法。此外,因為ACE_Svc_Handler是一個模闆,通信流元件和鎖定機制是作為模闆參數被傳入的。​

  • 反應堆

​:與ACE_Acceptor協同使用。如我們将看到的,在執行個體化接受器後,我們啟動反應堆的事件處理循環。反應堆,如先前所解釋的,是一個事件分派類;而在此情況下,它被接受器用于将連接配接建立事件分派到适當的服務處理例程。

7.1.2 用法

    通過觀察一個簡單的例子,可以進一步了解接受器。這個例子是一個簡單的應用,它使用接受器接受連接配接,随後回調服務例程。當服務例程被回調時,它就讓使用者知道連接配接已經被成功地建立。

例7-1

#include ”ace/Reactor.h”

#include ”ace/Svc_Handler.h”

#include ”ace/Acceptor.h”

#include ”ace/Synch.h”

#include ”ace/SOCK_Acceptor.h”

 

//Create a Service Handler whose open() method will be called back

//automatically. This

 class MUST derive from ACE_Svc_Handler which is an

//interface and as can be seen is a

 template container class itself. The

//first parameter to ACE_Svc_Handler is the

 underlying stream that it

//may use for communication. Since we are using TCP sockets the

 stream

//is ACE_SOCK_STREAM. The second is the internal synchronization

//mechanism it

 could use. Since we have a single threaded application we

//pass it a “null” lock which

 will do nothing.

class My_Svc_Handler:

public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>

{

//the open method which will be called back automatically after the

//connection has been

 established.

public:

int open(void*)

{

cout<<”Connection established”<<endl;

}

};

 

// Create the acceptor as described above.

typedef ACE_Acceptor<My_Svc_Handler,ACE_SOCK_ACCEPTOR> MyAcceptor;

 

int main(int argc, char* argv[])

{

//create the address on which we wish to connect. The constructor takes

//the port

 number on which to listen and will automatically take the

//host’s IP address as the IP

 Address for the addr object 

ACE_INET_Addr addr(PORT_NUM);

 

//instantiate the appropriate acceptor object with the address on which

//we wish to

 accept and the Reactor instance we want to use. In this

//case we just use the global

 ACE_Reactor singleton. (Read more about

//the reactor in the previous chapter)

MyAcceptor acceptor(addr, ACE_Reactor::instance());

 

while(1)

// Start the reactor’s event loop

ACE_Reactor::instance()->handle_events();

}

    在上面的例子中,我們首先建立我們希望在其上接受連接配接的端點位址。因為我們決定使用TCP/IP作為底層的連接配接協定,我們建立一個ACE_INET_Addr來作為我們的端點,并将我們所要偵聽的端口号傳給它。我們将這個位址和反應堆單體的執行個體傳給我們要在此之後進行執行個體化的接受器。這個接受器在被執行個體化後,将自動接受任何在PORT_NUM端口上的連接配接請求,并且在為這樣的請求建立連接配接之後回調My_Svc_Handler的open()方法。注意當我們執行個體化ACE_Acceptor工廠時,我們傳給它的是我們想要使用的具體接受器(也就是ACE_SOCK_ACCEPTOR)和具體服務處理器(也就是My_Svc_Handler)。

  現在讓我們嘗試一下更為有趣的事情。在下一個例子中,我們将在連接配接請求到達、服務處理器被回調之後,将服務處理器登記回反應堆。現在,如果新建立的連接配接上有任何資料到達,我們的服務處理例程

handle_input()方法都會被自動回調。因而在此例中,我們在同時使用反應堆和接受器的特性:

例7-2

#include ”ace/Reactor.h”

#include ”ace/Svc_Handler.h”

#include ”ace/Acceptor.h”

#include ”ace/Synch.h”

#include ”ace/SOCK_Acceptor.h”

#define PORT_NUM 10101

#define DATA_SIZE 12

//forward declaration

class My_Svc_Handler;

//Create the Acceptor class

typedef ACE_Acceptor<My_Svc_Handler,ACE_SOCK_ACCEPTOR>

MyAcceptor;

//Create a service handler similar to as seen in example 1. Except this

//time include the handle_input() method which will be called back

//automatically by the reactor when new data arrives on the newly

//established connection

class My_Svc_Handler:

public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>

{

public:

My_Svc_Handler()

{

data= new char[DATA_SIZE];

}

int open(void*)

{

cout<<”Connection established”<<endl;

 

//Register the service handler with the reactor

ACE_Reactor::instance()->register_handler(this,

ACE_Event_Handler::READ_MASK);

 

return 0;

}

int handle_input(ACE_HANDLE)

{

//After using the peer() method of ACE_Svc_Handler to obtain a

//reference to the underlying stream of the service handler class

//we call recv_n() on it to read the data which has been received.

//This data is stored in the data array and then printed out

peer().recv_n(data,DATA_SIZE);

ACE_OS::printf(”<< %s\n”,data);

 

//keep yourself registered with the reactor

return 0;

}

private:

char* data;

};

int main(int argc, char* argv[])

{

ACE_INET_Addr addr(PORT_NUM);

 

//create the acceptor

MyAcceptor acceptor(addr, //address to accept on

ACE_Reactor::instance()); //the reactor to use

 

while(1)

//Start the reactor’s event loop

ACE_Reactor::instance()->handle_events();

}

    這個例子和前一例子的唯一差別是我們在服務處理器的open()方法中将服務處理器登記到反應堆上。是以,我們必須編寫handle_input()方法;當資料在連接配接上到達時,這個方法會被反應堆回調。在此例中我們隻是将我們接收到的資料列印到螢幕上。ACE_Svc_Handler類的peer()方法傳回對底層的對端流的引用。我們使用底層流包裝類的recv_n()方法來擷取連接配接上接收到的資料。

  該模式真正的威力在于,底層的連接配接建立機制完全封裝在具體接受器中,進而可以很容易地改變。在下一個例子裡,我們改變底層的連接配接建立機制,讓它使用

UNIX域socket、而不是TCP socket。這個例子(下劃線标明少量變動)如下所示:

例7-3

class My_Svc_Handler:

public ACE_Svc_Handler <ACE_LSOCK_STREAM,ACE_NULL_SYNCH>{

public:

int open(void*)

{

cout<<”Connection established”<<endl;

ACE_Reactor::instance()

->register_handler(this,ACE_Event_Handler::READ_MASK);

}

int handle_input(ACE_HANDLE)

{

char* data= new char[DATA_SIZE];

peer().recv_n(data,DATA_SIZE);

ACE_OS::printf(”<< %s\n”,data);

return 0;

}

};

typedef ACE_Acceptor<My_Svc_Handler,ACE_LSOCK_ACCEPTOR> MyAcceptor;

 

int main(int argc, char* argv[])

{

ACE_UNIX_Addr addr(”/tmp/addr.ace”);

MyAcceptor acceptor(address, ACE_Reactor::instance());

 

while(1) /* Start the reactor’s event loop */

ACE_Reactor::instance()->handle_events();

}

7-2和例7-3不同的地方标注了下劃線。正如我們所說過的,兩個程式間的不同非常之少,但是它們使用的連接配接建立範式卻極不相同。ACE中可用的連接配接建立機制在表7-1中列出:

接受器類型

所用位址

所用流

具體接受器

TCP

流接受器

ACE_INET_Addr ACE_SOCK_STREAM ACE_SOCK_ACCEPTOR

UNIX

域本地流socket接受器

ACE_UNIX_Addr ACE_LSOCK_STREAM ACE_LSOCK_ACCEPTOR
管道作為底層通信機制 ACE_SPIPE_Addr ACE_SPIPE_STREAM ACE_SPIPE_ACCEPTOR

 

7-1 ACE中的連接配接建立機制

 

7.2

連接配接器

​ 

  連接配接器與接受器非常類似。它也是一個工廠,但卻是用于主動地連接配接遠端主機。在連接配接建立後,它将自動回調适當的服務處理對象的

open()方法。連接配接器通常被用在你本來會使用BSD connect()調用的地方。在ACE中,連接配接器,就如同接受器,被實作為名為ACE_Connector的模闆容器類。如先前所提到的,它需要兩個參數,第一個是事件處理器類,它在連接配接建立時被調用;第二個是“具體的”連接配接器類。

  你必須注意,底層的具體連接配接器和

ACE_Connector工廠是非常不一樣的。ACE_Connector工廠使用底層的具體連接配接器來建立連接配接。随後ACE_Connector工廠使用适當的事件或服務處理例程(通過模闆參數傳入)來在具體的連接配接器建立起連接配接之後處理新連接配接。如我們在IPC一章中看到的,沒有ACE_Connector工廠,也可以使用這個具體的連接配接器。但是,沒有具體的連接配接器類,就會無法使用ACE_Connector工廠(因為要由具體的連接配接器類來實際處理連接配接建立)。

  下面是對

ACE_Connector類進行執行個體化的一個例子:

 

typedef ACE_Connector<My_Svc_Handler,ACE_SOCK_CONNECTOR> MyConnector;

 

這個例子中的第二個參數是具體連接配接器類

ACE_SOCK_CONNECTOR。連接配接器和接受器模式一樣,在内部使用反應堆來在連接配接建立後回調服務處理器的open()方法。我們可以複用我們為前面的接受器例子所寫的服務處理例程。

  一個使用連接配接器的例子可以進一步說明這一點:

 

7-4

typedef ACE_Connector<My_Svc_Handler,ACE_SOCK_CONNECTOR> MyConnector;

 

int main(int argc, char * argv[])

{

ACE_INET_Addr addr(PORT_NO,HOSTNAME);

My_Svc_Handler * handler= new My_Svc_Handler;

 

//Create the connector

MyConnector connector;

 

//Connects to remote machine

if(connector.connect(handler,addr)==-1)

ACE_ERROR(LM_ERROR,”%P|%t, %p”,”Connection failed”);

 

//Registers with the Reactor

while(1)

ACE_Reactor::instance()->handle_events();

}

 

  在上面的例子中,

HOSTNAME和PORT_NO是我們希望主動連接配接到的機器和端口。在執行個體化連接配接器之後,我們調用它的連接配接方法,将服務例程(會在連接配接完全建立後被回調),以及我們希望連接配接到的位址傳遞給它。

 

7.2.1 同時使用接受器和連接配接器

​ 

  一般而言,接受器和連接配接器模式會在一起使用。在客戶

/伺服器應用中,伺服器通常含有接受器,而客戶含有連接配接器。但是,在特定的應用中,可能需要同時使用接受器和連接配接器。下面給出這樣的應用的一個例子:一條消息被反複發送給對端機器,而與此同時也從遠端接受另一消息。因為兩種功能必須同時執行,簡單的解決方案就是分别在不同的線程裡發送和接收消息。

  這個例子同時包含接受器和連接配接器。使用者可以在指令行上給出參數,告訴應用它想要扮演伺服器還是客戶角色。随後應用就相應地調用

main_accept()或main_connect()。

 

​ 

7-5

#include ”ace/Reactor.h”

#include ”ace/Svc_Handler.h”

#include ”ace/Acceptor.h”

#include ”ace/Synch.h”

#include ”ace/SOCK_Acceptor.h”

#include ”ace/Thread.h”

 

//Add our own Reactor singleton

typedef ACE_Singleton<ACE_Reactor,ACE_Null_Mutex> Reactor;

 

//Create an Acceptor

typedef ACE_Acceptor<MyServiceHandler,ACE_SOCK_ACCEPTOR> Acceptor;

 

//Create a Connector

typedef ACE_Connector<MyServiceHandler,ACE_SOCK_CONNECTOR> Connector;

 

class MyServiceHandler:

public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_NULL_SYNCH>

{

public:

//Used by the two threads “globally” to determine their peer stream

static ACE_SOCK_Stream* Peer;

 

//Thread ID used to identify the threads

ACE_thread_t t_id;

 

int open(void*)

{

cout<<”Acceptor: received new connection”<<endl;

 

//Register with the reactor to remember this handle

Reactor::instance()

->register_handler(this,ACE_Event_Handler::READ_MASK);

 

//Determine the peer stream and record it globally

MyServiceHandler::Peer=&peer();

 

//Spawn new thread to send string every second

ACE_Thread::spawn((ACE_THR_FUNC)send_data,0,THR_NEW_LWP,&t_id);

 

//keep the service handler registered by returning 0 to the

//reactor

return 0;

}

 

static void* send_data(void*)

{

while(1)

{

cout<<”>>Hello World”<<endl;

Peer->send_n(”Hello World”,sizeof(”Hello World”));

 

//Go to sleep for a second before sending again

ACE_OS::sleep(1);

}

 

return 0;

}

 

int handle_input(ACE_HANDLE)

{

char* data= new char[12];

 

//Check if peer aborted the connection

if(Peer.recv_n(data,12)==0)

{

cout<<”Peer probably aborted connection”);

ACE_Thread::cancel(t_id); //kill sending thread ..

return -1; //de-register from the Reactor.

}

 

//Show what you got..

cout<<”<< %s\n”,data”<<endl;

 

//keep yourself registered

return 0;

}

};

 

//Global stream identifier used by both threads

ACE_SOCK_Stream * MyServiceHandler::Peer=0;

 

void main_accept()

{

ACE_INET_Addr addr(PORT_NO);

Acceptor myacceptor(addr,Reactor::instance());

while(1)

Reactor::instance()->handle_events();

 

return 0;

}

 

void main_connect()

{

ACE_INET_Addr addr(PORT_NO,HOSTNAME);

Connector myconnector;

myconnector.connect(my_svc_handler,addr);

while(1)

Reactor::instance()->handle_events();

}

 

int main(int argc, char* argv[])

{

// Use ACE_Get_Opt to parse and obtain arguments and then call the

// appropriate function for accept or connect.

...

}

這個簡單的例子示範怎樣聯合使用接受器和連接配接模式來生成服務處理例程,該例程與底層的網絡連接配接建立方法是完全分離的。通過改變相應的設定具體連接配接器和接受器的模闆參數,可以很容易地改用任何其他的底層網絡連接配接建立協定。

 

7.3

進階課程

​ 

  下面的部分更為詳細地解釋接受器和連接配接器模式實際上是如何工作的。如果你想要調諧服務處理和連接配接建立政策(其中包括調諧底層具體連接配接器将要使用的服務處理例程的建立和并發政策,以及連接配接建立政策),對該模式的進一步了解就是必要的。此外,還有一部分内容解釋怎樣使用通過

ACE_Svc_Handler類自動獲得的進階特性。最後,我們說明怎樣與接受器和連接配接器模式一起使用簡單的輕量級ACE_Event_Handler。

 

7.3.1 ACE_SVC_HANDLER類

​ 

  如上面所提到的,

ACE_Svc_Handler類基于ACE_Task(它是ASX流構架的一部分)和ACE_Event_Handler接口類。因而ACE_Svc_Handler既是任務,又是事件處理器。這裡我們将簡要介紹ACE_Task和ACE_Svc_Handler的功能。

 

7.3.1.1 ACE_Task

​ 

  ACE_Task

被設計為與ASX流構架一起使用;ASX基于UNIX系統V中的流機制。在設計上ASX與Larry Peterson建構的X-kernel協定工具非常類似。

  ASX

的基本概念是:到來的消息會被配置設定給由若幹子產品(module)組成的流。每個子產品在到來的消息上執行某種固定操作,然後把它傳遞給下一個子產品作進一步處理,直到它到達流的末端為止。子產品中的實際處理由任務來完成。每個子產品通常有兩個任務,一個用于處理到來的消息,一個用于處理外出的消息。在構造協定棧時,這種結構是非常有用的。因為每個子產品都有固定的簡單接口,所建立的子產品可以很容易地在不同的應用間複用。例如,設想一個應用,它處理來自資料鍊路層的消息。程式員會構造若幹子產品,每個子產品分别處理不同層次的協定。因而,他會構造一個單獨的子產品,進行網絡層處理;另一個進行傳輸層處理;還有一個進行表示層處理。在構造這些子產品之後,它們可以(在ASX的幫助下)被“串”成一個流來使用。如果後來建立了一個新的(也許是更好的)傳輸子產品,就可以在不對程式産生任何影響的情況下、在流中替換先前的傳輸子產品。注意子產品就像是容納任務的容器。這些任務是實際的處理元件。一個子產品可能需要兩個任務,如同在上面的例子中;也可能隻需要一個任務。如你可能會猜到的,ACE_Task是子產品中被稱為任務的處理元件的實作。

 

7.3.1.2

任務通信的體系結構

​ 

  每個

ACE_Task都有一個内部的消息隊列,用以與其他任務、子產品或是外部世界通信。如果一個ACE_Task想要發送一條消息給另一個任務,它就将此消息放入目的任務的消息隊列中。一旦目的任務收到此消息,它就會立即對它進行處理。

  所有

ACE_Task都可以作為0個或多個線程來運作。消息可以由多個線程放入ACE_Task的消息隊列,或是從中取出,程式員無需擔心破壞任何資料結構。因而任務可被用作由多個協作線程組成的系統的基礎建構元件。各個線程控制都可封裝在ACE_Task中,與其他任務通過發送消息到它們的消息隊列來進行互動。

  這種體系結構的唯一問題是,任務隻能通過消息隊列與在同一程序内的其他任務互相通信。

ACE_Svc_Handler解決了這一問題,它同時繼承自ACE_Task和ACE_Event_Handler,并且增加了一個私有資料流。這種結合使得ACE_Svc_Handler對象能夠用作這樣的任務:它能夠處理事件、并與遠地主機的任務間發送和接收資料。

  ACE_Task

被實作為模闆容器,它通過鎖定機制來進行執行個體化。該鎖用于保證内部的消息隊列在多線程環境中的完整性。如先前所提到的,ACE_Svc_Handler模闆容器不僅需要鎖定機制,還需要用于與遠地任務通信的底層資料流來作為參數。

7.3.1.3 建立ACE_Svc_Handler

  ACE_Svc_Handler

模闆通過鎖定機制和底層流來執行個體化,以建立所需的服務處理器。如果應用隻是單線程的,就不需要使用鎖,可以用ACE_NULL_SYNCH來将其執行個體化。但是,如果我們想要在多線程應用中使用這個模闆,可以這樣來進行執行個體化:

 

class MySvcHandler:

public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH>

{

...

}

 

7.3.1.4

在服務處理器中建立多個線程

​ 

  在上面的例

7-5中,我們使用ACE_Thread包裝類和它的靜态方法spawn(),建立了單獨的線程來發送資料給遠地對端。但是,在我們完成此工作時,我們必須定義使用C++ static修飾符的檔案範圍内的靜态send_data()方法。結果當然就是,我們無法通路我們執行個體化的實際對象的任何資料成員。換句話說,我們被迫使send_data()成員函數成為class-wide的函數,而這并不是我們所想要的。這樣做的唯一原因是,ACE_Thread::spawn()隻能使用靜态成員函數來作為它所建立的線程的入口。另一個有害的副作用是到對端流的引用也必須成為靜态的。簡而言之,這不是編寫這些代碼的最好方式。

  ACE_Task

提供了更好的機制來避免發生這樣的問題。每個ACE_Task都有activate()方法,可用于為ACE_Task建立線程。所建立的線程的入口是非靜态成員函數svc()。因為svc()是非靜态函數,它可以調用任何對象執行個體專有的資料或成員函數。ACE對程式員隐藏了該機制的所有實作細節。activate()方法有着非常多的用途,它允許程式員建立多個線程,所有這些線程都使用svc()方法作為它們的入口。還可以設定線程優先級、句柄、名字,等等。activate()方法的原型是:

 

// = Active object activation method.

virtual int activate (long flags = THR_NEW_LWP,

int n_threads = 1,

int force_active = 0,

long priority = ACE_DEFAULT_THREAD_PRIORITY,

int grp_id = -1,

ACE_Task_Base *task = 0,

ACE_hthread_t thread_handles[] = 0,

void *stack[] = 0,

size_t stack_size[] = 0,

ACE_thread_t thread_names[] = 0);

  第一個參數

flags描述将要建立的線程所希望具有的屬性。線上程一章裡有較長的描述。可用的标志有:

 

THR_CANCEL_DISABLE, THR_CANCEL_ENABLE, THR_CANCEL_DEFERRED,

THR_CANCEL_ASYNCHRONOUS, THR_BOUND, THR_NEW_LWP, THR_DETACHED,

THR_SUSPENDED, THR_DAEMON, THR_JOINABLE, THR_SCHED_FIFO,

THR_SCHED_RR, THR_SCHED_DEFAULT

 

  第二個參數

n_threads指定要建立的線程的數目。第三個參數force_active用于指定是否應該建立新線程,即使activate()方法已在先前被調用過、因而任務或服務處理器已經在運作多個線程。如果此參數被設為false(0),且如果activate()是再次被調用,該方法就會設定失敗代碼,而不會生成更多的線程。

  第四個參數用于設定運作線程的優先級。預設情況下,或優先級被設為

ACE_DEFAULT_THREAD_PRIORITY,方法會使用給定的排程政策(在flags中指定,例如,THR_SCHED_DEFAULT)的“适當”優先級。這個值是動态計算的,并且是在給定政策的最低和最高優先級之間。如果顯式地給定一個值,這個值就會被使用。注意實際的優先級值極大地依賴于實作,最好不要直接使用。線上程一章中,可讀到更多有關線程優先級的内容。

  還可以傳入将要建立的線程的線程句柄、線程名和棧空間,以線上程建立過程中使用。如果它們被設定為

NULL,它們就不會被使用。但是如果要使用activate建立多個線程,就必須傳入線程的名字或句柄,才能有效地對它們進行使用。

  下面的例子可以幫助你進一步了解

activate方法的使用:

 

7-6

#include ”ace/Reactor.h”

#include ”ace/Svc_Handler.h”

#include ”ace/Acceptor.h”

#include ”ace/Synch.h”

#include ”ace/SOCK_Acceptor.h”

 

class MyServiceHandler; //forward declaration

typedef ACE_Singleton<ACE_Reactor,ACE_Null_Mutex> Reactor;

typedef ACE_Acceptor<MyServiceHandler,ACE_SOCK_ACCEPTOR> Acceptor;

 

class MyServiceHandler:

public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH>

{

// The two thread names are kept here

ACE_thread_t thread_names[2];

 

public:

int open(void*)

{

ACE_DEBUG((LM_DEBUG, ”Acceptor: received new connection \n”));

 

//Register with the reactor to remember this handler..

Reactor::instance()

->register_handler(this,ACE_Event_Handler::READ_MASK);

ACE_DEBUG((LM_DEBUG,”Acceptor: ThreadID:(%t) open\n”));

 

//Create two new threads to create and send messages to the

//remote machine.

activate(THR_NEW_LWP,

2, //2 new threads

0, //force active false, if already created don’t try again.

ACE_DEFAULT_THREAD_PRIORITY,//Use default thread priority

-1,

this,//Which ACE_Task object to create? In this case this one.

0,// don’t care about thread handles used

0,// don’t care about where stacks are created

0,//don’t care about stack sizes

thread_names); // keep identifiers in thread_names

 

//keep the service handler registered with the acceptor.

return 0;

}

 

void send_message1(void)

{

//Send message type 1

ACE_DEBUG((LM_DEBUG,”(%t)Sending message::>>”));

 

//Send the data to the remote peer

ACE_DEBUG((LM_DEBUG,”Sent message1”));

peer().send_n(”Message1”,LENGTH_MSG_1);

} //end send_message1

 

int send_message2(void)

{

//Send message type 1

ACE_DEBUG((LM_DEBUG,”(%t)Sending message::>>”));

 

//Send the data to the remote peer

ACE_DEBUG((LM_DEBUG,”Sent Message2”));

peer().send_n(”Message2”,LENGTH_MSG_2);

}//end send_message_2

 

int svc(void)

{

ACE_DEBUG( (LM_DEBUG,”(%t) Svc thread \n”));

if(ACE_Thread::self()== thread_names[0])

while(1) send_message1(); //send message 1s forever

else

while(1) send_message2(); //send message 2s forever

 

return 0; // keep the com, piler happy.

}

 

int handle_input(ACE_HANDLE)

{

ACE_DEBUG((LM_DEBUG,”(%t) handle_input ::”));

char* data= new char[13];

 

//Check if peer aborted the connection

if(peer().recv_n(data,12)==0)

{

printf(”Peer probably aborted connection”);

return -1; //de-register from the Reactor.

}

 

//Show what you got..

ACE_OS::printf(”<< %s\n”,data);

 

//keep yourself registered

return 0;

}

};

 

int main(int argc, char* argv[])

{

ACE_INET_Addr addr(10101);

ACE_DEBUG((LM_DEBUG,”Thread: (%t) main”));

 

//Prepare to accept connections

Acceptor myacceptor(addr,Reactor::instance());

 

// wait for something to happen.

while(1)

Reactor::instance()->handle_events();

 

return 0;

}

在此例中,服務處理器在它的

open()方法中被登記到反應堆上,随後程式調用activate()來建立2個線程。線程的名字被記錄下來,以便在它們調用svc()例程時,我們可以将它們差別開。每個線程發送一條不同類型的消息給遠地對端。注意在此例中,線程的建立是完全透明的。此外,因為入口是普通的非靜态成員函數,它不需要進行“醜陋的”改動來記住資料成員,比如說對端流。無論何時隻要我們需要,我們就可以簡單地調用成員函數peer()來擷取底層的流。

7.3.1.5使用服務處理器中的消息隊列機制

  如前面所提到的,

ACE_Svc_Handler類擁有内建的消息隊列。這個消息隊列被用作在ACE_Svc_Handler和外部世界之間的主要通信接口。其他任務想要發給該服務處理器的消息被放入它的消息隊列中。這些消息會在單獨的線程裡(通過調用activate()方法建立)處理。随後另一個線程就可以把處理過的消息通過網絡發送給另外的遠地目的地(很可能是另外的ACE_Svc_Handler)。

  如先前所提到的,在這種多線程情況下,

ACE_Svc_Handler會自動地使用鎖來確定消息隊列的完整性。所用的鎖即通過執行個體化ACE_Svc_Handler模闆類建立具體服務處理器時所傳遞的鎖。之所用通過這樣的方式來傳遞鎖,是因為這樣程式員就可以對他的應用進行“調諧”。不同平台上的不同鎖定機制有着不同程度的開銷。如果需要,程式員可以建立他自己的優化的、遵從ACE的鎖接口定義的鎖,并将其用于服務處理器。這是程式員通過使用ACE可獲得的靈活性的又一範例。重要的是程式員必須意識到,在此服務處理例程中的額外線程将帶來顯著的鎖定開銷。為将此開銷降至最低,程式員必須仔細地設計他的程式,確定使這樣的開銷最小化。特别地,上面描述的例子有可能導緻過度的開銷,在大多數情況下可能并不實用。

  ACE_Task

,進而是ACE_Svc_Handler(因為服務處理器也是一種任務),具有若幹可用于對底層隊列進行設定、操作、入隊和出隊操作的方法。這裡我們将隻讨論這些方法中的一部分。因為在服務處理器中(通過使用msg_queue()方法)可以擷取指向消息隊列自身的指針,是以也可以直接調用底層隊列(也就是,ACE_Message_Queue)的所有公共方法。(有關消息隊列提供的所有方法的更多細節,請參見後面的“消息隊列”一章。)

  如上面所提到的,服務處理器的底層消息隊列是

ACE_Message_Queue的執行個體,它是由服務處理器自動建立的。在大多數情況下,沒有必要調用ACE_Message_Queue的底層方法,因為在ACE_Svc_Handler類中已對它們的大多數進行了包裝。ACE_Message_Queue是用于使ACE_Message_Block進隊或出隊的隊列。每個ACE_Message_Block都含有指向“引用計數”(reference-counted)的ACE_Data_Block的指針,ACE_Data_Block依次又指向存儲在塊中的實際資料(見“消息隊列”一章)。這使得ACE_Message_Block可以很容易地進行資料共享。

  ACE_Message_Block

的主要作用是進行高效資料操作,而不帶來許多拷貝開銷。每個消息塊都有一個讀指針和寫指針。無論何時我們從塊中讀取時,讀指針會在資料塊中向前增長。類似地,當我們向塊中寫的時候,寫指針也會向前移動,這很像在流類型系統中的情況。可以通過ACE_Message_Block的構造器向它傳遞配置設定器,以用于配置設定記憶體(有關Allocator的更多資訊,參見“記憶體管理”一章)。例如,可以使用ACE_Cached_Allocation_Strategy,它預先配置設定記憶體并從記憶體池中傳回指針,而不是在需要的時候才從堆中配置設定記憶體。這樣的功能在需要可預測的性能時十分有用,比如在實時系統中。

  下面的例子示範怎樣使用消息隊列的一些功能:

 

7-7

#include ”ace/Reactor.h”

#include ”ace/Svc_Handler.h”

#include ”ace/Acceptor.h”

#include ”ace/Synch.h”

#include ”ace/SOCK_Acceptor.h”

#include ”ace/Thread.h”

#define NETWORK_SPEED 3

class MyServiceHandler; //forward declaration

typedef ACE_Singleton<ACE_Reactor,ACE_Null_Mutex> Reactor;

typedef ACE_Acceptor<MyServiceHandler,ACE_SOCK_ACCEPTOR> Acceptor;

 

class MyServiceHandler:

public ACE_Svc_Handler<ACE_SOCK_STREAM,ACE_MT_SYNCH>{

// The message sender and creator threads are handled here.

ACE_thread_t thread_names[2];

 

public:

int open(void*)

{

ACE_DEBUG((LM_DEBUG, ”Acceptor: received new connection \n”));

 

//Register with the reactor to remember this handler..

Reactor::instance()

->register_handler(this,ACE_Event_Handler::READ_MASK);

ACE_DEBUG((LM_DEBUG,”Acceptor: ThreadID:(%t) open\n”));

 

//Create two new threads to create and send messages to the

//remote machine.

activate(THR_NEW_LWP,

2, //2 new threads

0,

ACE_DEFAULT_THREAD_PRIORITY,

-1,

this,

0,

0,

0,

thread_names); // identifiers in thread_handles

 

//keep the service handler registered with the acceptor.

return 0;

}

 

void send_message(void)

{

//Dequeue the message and send it off

ACE_DEBUG((LM_DEBUG,”(%t)Sending message::>>”));

 

//dequeue the message from the message queue

ACE_Message_Block *mb;

ACE_ASSERT(this->getq(mb)!=-1);

int length=mb->length();

char *data =mb->rd_ptr();

 

//Send the data to the remote peer

ACE_DEBUG((LM_DEBUG,”%s \n”,data,length));

peer().send_n(data,length);

 

//Simulate very SLOW network.

ACE_OS::sleep(NETWORK_SPEED);

 

//release the message block

mb->release();

} //end send_message

 

int construct_message(void)

{

// A very fast message creation algorithm

// would lead to the need for queuing messages..

// here. These messages are created and then sent

// using the SLOW send_message() routine which is

// running in a different thread so that the message

//construction thread isn’t blocked.

ACE_DEBUG((LM_DEBUG,”(%t)Constructing message::>> ”));

 

// Create a new message to send

ACE_Message_Block *mb;

char *data=”Hello Connector”;

ACE_NEW_RETURN (mb,ACE_Message_Block (16,//Message 16 bytes long

ACE_Message_Block::MB_DATA,//Set header to data

0,//No continuations.

data//The data we want to send

), 0);

mb->wr_ptr(16); //Set the write pointer.

 

// Enqueue the message into the message queue

// we COULD have done a timed wait for enqueuing in case

// someone else holds the lock to the queue so it doesn’t block

//forever..

ACE_ASSERT(this->putq(mb)!=-1);

ACE_DEBUG((LM_DEBUG,”Enqueued msg successfully\n”));

}

 

int svc(void)

{

ACE_DEBUG( (LM_DEBUG,”(%t) Svc thread \n”));

 

//call the message creator thread

if(ACE_Thread::self()== thread_names[0])

while(1) construct_message(); //create messages forever

else

while(1) send_message(); //send messages forever

 

return 0; // keep the compiler happy.

}

 

int handle_input(ACE_HANDLE)

{

ACE_DEBUG((LM_DEBUG,”(%t) handle_input ::”));

char* data= new char[13];

 

//Check if peer aborted the connection

if(peer().recv_n(data,12)==0)

{

printf(”Peer probably aborted connection”);

return -1; //de-register from the Reactor.

}

 

//Show what you got..

ACE_OS::printf(”<< %s\n”,data);

 

//keep yourself registered

return 0;

}

};

 

int main(int argc, char* argv[])

{

ACE_INET_Addr addr(10101);

ACE_DEBUG((LM_DEBUG,”Thread: (%t) main”));

 

//Prepare to accept connections

Acceptor myacceptor(addr,Reactor::instance());

 

// wait for something to happen.

while(1)

Reactor::instance()->handle_events();

 

return 0;

}

這個例子示範怎樣使用

putq()和getq()方法來在隊列中放入或取出消息塊。它還示範怎樣建立消息塊,随後設定它的寫指針,并根據它的讀指針進行讀取。注意消息塊中的實際資料的起始位置由消息塊的讀指針訓示。消息塊的length()成員函數傳回在消息塊中存儲的底層資料的長度,其中不包括ACE_Message_Block中用于管理目的的部分。另外,我們也顯示了怎樣使用release()方法來釋放消息塊(mb)。

  要了解更多關于如何使用消息塊、資料塊或是消息隊列的資訊,請閱讀此教程中有關“消息隊列”、

ASX架構和其他相關的部分。

 

7.4

接受器和連接配接器模式工作原理

  接受器和連接配接器工廠(也就是

ACE_Connector和ACE_Acceptor)有着非常類似的運作結構。它們的工作可大緻劃分為三個階段:

 

  • 端點或連接配接初始化階段
  • 服務初始化階段
  • 服務處理階段

 

7.4.1 端點或連接配接初始化階段

  在使用接受器的情況下,應用級程式員可以調用

ACE_Acceptor工廠的open()方法,或是它的預設構造器(它實際上會調用open()方法),來開始被動偵聽連接配接。當接受器工廠的open()方法被調用時,如果反應堆單體還沒有被執行個體化,open()方法就首先對其進行執行個體化。随後它調用底層具體接受器的open()方法。于是具體接受器會完成必要的初始化來偵聽連接配接。例如,在使用ACE_SOCK_Acceptor的情況中,它打開socket,将其綁定到使用者想要在其上偵聽新連接配接的端口和位址上。在綁定端口後,它将會發出偵聽調用。open方法随後将接受器工廠登記到反應堆。因而在接收到任何到來的連接配接請求時,反應堆會自動回調接受器工廠的handle_input()方法。注意正是因為這一原因,接受器工廠才從ACE_Event_Handler類層次派生;這樣它才可以響應ACCEPT事件,并被反應堆自動回調。

  在使用連接配接器的情況中,應用程式員調用連接配接器工廠的

connect()方法或connect_n()方法來發起到對端的連接配接。除了其他一些選項,這兩個方法的參數包括我們想要連接配接到的遠地位址,以及我們是想要同步還是異步地完成連接配接。我們可以同步或異步地發起NUMBER_CONN個連接配接:

 

//Synchronous

OurConnector.connect_n(NUMBER_CONN,ArrayofMySvcHandlers,Remote_Addr,0,

ACE_Synch_Options::synch);

 

//Asynchronous

OurConnector.connect_n(NUMBER_CONN,ArrayofMySvcHandlers,Remote_Addr,0,

ACE_Synch_Options::asynch);

 

如果連接配接請求是異步的,

ACE_Connector會在反應堆上登記自己,等待連接配接被建立(ACE_Connector也派生自ACE_Event_Handler類層次)。一旦連接配接被建立,反應堆将随即自動回調連接配接器。但如果連接配接請求是同步的,connect()調用将會阻塞,直到連接配接被建立、或是逾時到期為止。逾時值可通過改變特定的ACE_Synch_Options來指定。詳情請參見參考手冊。

 

7.4.2 接受器的服務初始化階段

  在有連接配接請求在指定的位址和端口上到來時,反應堆自動回調

ACE_Acceptor工廠的handle_input()方法。

  該方法是一個“模闆方法”(

Template Method)。模闆方法用于定義一個算法的若幹步驟的順序,并允許改變特定步驟的執行。這種變動是通過允許子類定義這些方法的實作來完成的。(有關模闆方法的更多資訊見“設計模式”參考指南)。

  在我們的這個案例中,模闆方法将算法定義如下:

 

  • make_svc_handler()

:建立服務處理器。

  • accept_svc_handler()

:将連接配接接受進前一步驟建立的服務處理器。

  • activate_svc_handler()

:啟動這個新服務處理器。

 

  這些方法都可以被重新編寫,進而靈活地決定這些操作怎樣來實際執行。

  這樣,

handle_input()将首先調用make_svc_handler()方法,建立适當類型的服務處理器(如我們在上面的例子中所看到的那樣,服務處理器的類型由應用程式員在ACE_Acceptor模闆被執行個體化時傳入)。在預設情況下,make_svc_handler()方法隻是執行個體化恰當的服務處理器。但是,make_svc_handler()是一個“橋接”(bridge)方法,可被重載以提供更多複雜功能。(橋接是一種設計模式,它使類層次的接口與實作去耦合。參閱“設計模式”參考文獻)。例如,服務處理器可建立為程序級或線程級的單體,或者從庫中動态連結,從磁盤中加載,甚或通過更複雜的方式建立,如從資料庫中查找并擷取服務處理器,并将它裝入記憶體。

  在服務處理器被建立後,

handle_input()方法調用accept_svc_handler()。該方法将連接配接“接受進”服務處理器;預設方式是調用底層具體接受器的accept()方法。在ACE_SOCK_Acceptor被用作具體接受器的情況下,它調用BSD accept()例程來建立連接配接(“接受”連接配接)。在連接配接建立後,連接配接句柄在服務處理器中被自動設定(接受“進”服務處理器);這個服務處理器是先前通過調用make_svc_handler()建立的。該方法也可被重載,以提供更複雜的功能。例如,不是實際建立新連接配接,而是“回收利用”舊連接配接。在我們示範各種不同的接受和連接配接政策時,将更為詳盡地讨論這一點。

 

7.4.3 連接配接器的服務初始化階段

  應用發出的

connect()方法與接受器工廠中的handle_input()相類似,也就是,它是一個“模闆方法”。

  在我們的這個案例中,模闆方法

connect()定義下面一些可被重定義的步驟:

 

  • make_svc_handler()

:建立服務處理器。

  • connect_svc_handler()

:将連接配接接受進前一步驟建立的服務處理器。

  • activate_svc_handler()

:啟動這個新服務處理器。

 

  每一方法都可以被重新編寫,進而靈活地決定這些操作怎樣來實際執行。

  這樣,在應用發出

connect()調用後,連接配接器工廠通過調用make_svc_handler()來執行個體化恰當的服務處理器,一如在接受器的案例中所做的那樣。其預設行為隻是執行個體化适當的類,并且也可以通過與接受器完全相同的方式重載。進行這樣的重載的原因可以與上面提到的原因非常類似。

  在服務處理器被建立後,

connect()調用确定連接配接是要成為異步的還是同步的。如果是異步的,在繼續下一步驟之前,它将自己登記到反應堆,随後調用connect_svc_handler()方法。該方法的預設行為是調用底層具體連接配接器的connect()方法。在使用ACE_SOCK_Connector的情況下,這意味着将适當的選項設定為阻塞或非阻塞式I/O,然後發出BSD connect()調用。如果連接配接被指定為同步的,connect()調用将會阻塞、直到連接配接完全建立。在這種情況下,在連接配接建立後,它将在服務處理器中設定句柄,以與它現在連接配接到的對端通信(該句柄即是通過在服務處理器中調用peer()方法獲得的在流中存儲的句柄,見上面的例子)。在服務處理器中設定句柄後,連接配接器模式将進行到最後階段:服務處理。

  如果連接配接被指定為異步的,在向底層的具體連接配接器發出非阻塞式

connect()調用後,對connect_svc_handler()的調用将立即傳回。在使用ACE_SOCK_Connector的情況中,這意味着發出非阻塞式BSD connect()調用。在連接配接稍後被實際建立時,反應堆将回調ACE_Connector工廠的handle_output()方法,該方法在通過make_svc_handler()方法建立的服務處理器中設定新句柄。然後工廠将進行到下一階段:服務處理。

  與

accept_svc_handler()情況一樣,connect_svc_handler()是一個“橋接”方法,可進行重載以提供變化的功能。

 

7.4.4 服務處理

  一旦服務處理器被建立、連接配接被建立,以及句柄在服務處理器中被設定,

ACE_Acceptor的handle_input()方法(或者在使用ACE_Connector的情況下,是handle_output()或connect_svc_handler())将調用activate_svc_handler()方法。該方法将随即啟用服務處理器。其預設行為是調用作為服務處理器的入口的open()方法。如我們在上面的例子中所看到的,在服務處理器開始執行時,open()方法是第一個被調用的方法。是在open()方法中,我們調用activate()方法來建立多個線程控制;并在反應堆上登記服務處理器,這樣當新的資料在連接配接上到達時,它會被自動回調。該方法也是一個“橋接”方法,可被重載以提供更為複雜的功能。特别地,這個重載的方法可以提供更為複雜的并發政策,比如,在另一不同的程序中運作服務處理器。

 

7.5

調諧接受器和連接配接器政策

  如上面所提到的,因為使用了可以重載的橋接方法,很容易對接受器和連接配接器進行調諧。橋接方法允許調諧:

 

  • 服務處理器的建立政策:

​通過重載接受器或連接配接器的make_svc_handler()方法來實作。例如,這可以意味着複用已有的服務處理器,或使用某種複雜的方法來擷取服務處理器,如上面所讨論的那樣。​

  • 連接配接政策:

​連接配接建立政策可通過重載connect_svc_handler()或accept_svc_handler()方法來改變。​

  • 服務處理器的并發政策:

​服務處理器的并發政策可通過重載activate_svc_handler()方法來改變。例如,服務處理器可以在另外的程序中建立。

 

  如上所示,調諧是通過重載

ACE_Acceptor或ACE_Connector類的橋接方法來完成的。ACE的設計使得程式員很容易完成這樣的重載和調諧。

 

7.5.1 ACE_Strategy_Connector和ACE_Strategy_Acceptor類

  為了友善上面所提到的對接受器和連接配接器模式的調諧方法,

ACE提供了兩種特殊的“可調諧”接受器和連接配接器工廠,那就是ACE_Strategy_Acceptor和ACE_Strategy_Connector。它們和ACE_Acceptor與ACE_Connector非常類似,同時還使用了“政策”模式。

  政策模式被用于使算法行為與類的接口去耦合。其基本概念是允許一個類(稱為

Context Class,上下文類)的底層算法獨立于使用該類的客戶進行變動。這是通過具體政策類的幫助來完成的。具體政策類封裝執行操作的算法或方法。這些具體政策類随後被上下文類用于執行各種操作(上下文類将“工作”委托給具體政策類)。因為上下文類不直接執行任何操作,當需要改變功能時,無需對它進行修改。對上下文類所做的唯一修改是使用另一個具體政策類來執行改變了的操作。(要閱讀有關政策模式的更多資訊,參見“設計模式”的附錄)。

  在

ACE中,ACE_Strategy_Connector和ACE_Strategy_Acceptor使用若幹具體政策類來改變算法,以建立服務處理器,建立連接配接,以及為服務處理器設定并發方法。如你可能已經猜到的一樣,ACE_Strategy_Connector和ACE_Strategy_Acceptor利用了上面提到的橋接方法所提供的可調諧性。

 

7.5.1.1

使用政策接受器和連接配接器

  在

ACE中已有若幹具體的政策類可用于“調諧”政策接受器和連接配接器。當類被執行個體化時,它們作為參數被傳入政策接受器或連接配接器。表7-2顯示了可用于調諧政策接受器和連接配接器類的一些類。

需要修改

具體政策類

描述

建立政策

(

重定義make_svc_handler())

ACE_NOOP_Creation_Strategy 這個具體政策并不執行個體化服務處理器,而隻是一個空操作。
ACE_Singleton_Strategy 保證服務處理器被建立為單體。也就是,所有連接配接将有效地使用同一個服務處理例程。
ACE_DLL_Strategy 通過從動态連結庫中動态連結服務處理器來對它進行建立。

連接配接政策

(

重定義connect_svc_handler())

ACE_Cached_Connect_Strategy 檢查是否有已經連接配接到特定的遠地位址的服務處理器沒有在被使用。如果有這樣一個服務處理器,就對它進行複用。

并發政策

(

重定義activate_svc_handler())

ACE_NOOP_Concurrency_Strategy

一個“無為”(

do-nothing)的并發政策。它甚至不調用服務處理器的open()方法。

ACE_Process_Strategy

在另外的程序中建立服務處理器,并調用它的

open()方法。

ACE_Reactive_Strategy

先在反應堆上登記服務處理器,然後調用它的

open()方法。

ACE_Thread_Strategy

先調用服務處理器的

open()方法,然後調用它的activate()方法,以讓另外的線程來啟動服務處理器的svc()方法。

 

         表

7-2 用于調諧政策接受器和連接配接器類的類

 

  下面的例子示範政策接受器和連接配接器類的使用。

 

7-8

#include ”ace/Reactor.h”

#include ”ace/Svc_Handler.h”

#include ”ace/Acceptor.h”

#include ”ace/Synch.h”

#include ”ace/SOCK_Acceptor.h”

#define PORT_NUM 10101

#define DATA_SIZE 12

//forward declaration

class My_Svc_Handler;

//instantiate a strategy acceptor

typedef ACE_Strategy_Acceptor<My_Svc_Handler,ACE_SOCK_ACCEPTOR> MyAcceptor;

//instantiate a concurrency strategy

typedef ACE_Process_Strategy<My_Svc_Handler> Concurrency_Strategy;

 

// Define the Service Handler

class My_Svc_Handler:

public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_NULL_SYNCH>

{

private:

char* data;

public:

My_Svc_Handler()

{

data= new char[DATA_SIZE];

}

 

My_Svc_Handler(ACE_Thread_Manager* tm)

{

data= new char[DATA_SIZE];

}

 

int open(void*)

{

cout<<”Connection established”<<endl;

 

//Register with the reactor

ACE_Event_Handler::READ_MASK);

 

return 0;

}

 

int handle_input(ACE_HANDLE)

{

peer().recv_n(data,DATA_SIZE);

ACE_OS::printf(”<< %s\n”,data);

 

// keep yourself registered with the reactor

return 0;

}

};

 

int main(int argc, char* argv[])

{

ACE_INET_Addr addr(PORT_NUM);

 

//Concurrency Strategy

Concurrency_Strategy my_con_strat;

 

//Instantiate the acceptor

MyAcceptor acceptor(addr, //address to accept on

ACE_Reactor::instance(), //the reactor to use

0,

// don’t care about creation strategy

0, // don’t care about connection estb. strategy

&my_con_strat

); // use our new process concurrency strategy

 

while(1) /* Start the reactor’s event loop */

ACE_Reactor::instance()->handle_events();

}

這個例子基于上面的例

7-2。唯一的不同是它使用了ACE_Strategy_Acceptor,而不是使用ACE_Acceptor;并且它還使用ACE_Process_Strategy作為服務處理器的并發政策。這種并發政策保證一旦連接配接建立後,服務處理器在單獨的程序中被執行個體化。如果在特定服務上的負載變得過于繁重,使用ACE_Process_Strategy可能是一個好主意。但是,在大多數情況下,使用ACE_Process_Strategy會過于昂貴,而ACE_Thread_Strategy可能是更好的選擇。

 

7.5.1.2

使用ACE_Cached_Connect_Strategy進行連接配接緩存

 

  在許多應用中,客戶會連接配接到伺服器,然後重新連接配接到同一伺服器若幹次;每次都要建立連接配接,執行某些工作,然後挂斷連接配接(比如像在

Web客戶中所做的那樣)。不用說,這樣做是非常低效而昂貴的,因為連接配接建立和挂斷是非常昂貴的操作。在這樣的情況下,連接配接者可以采用一種更好的政策:“記住”老連接配接并保持它,直到确定客戶不會再重建立立連接配接為止。ACE_Cached_Connect_Strategy就提供了這樣一種緩存政策。這個政策對象被ACE_Strategy_Connector用于提供基于緩存的連接配接建立。如果一個連接配接已經存在,ACE_Strategy_Connector将會複用它,而不是建立新的連接配接。

  當客戶試圖重建立立連接配接到先前已經連接配接的伺服器時,

ACE_Cached_Connect_Strategy確定對老的連接配接和服務處理器進行複用,而不是建立新的連接配接和服務處理器。因而,實際上,ACE_Cached_Connect_Strategy不僅管理連接配接建立政策,它還管理服務處理器建立政策。因為在此例中,使用者不想建立新的服務處理器,我們将ACE_Null_Creation_Strategy傳遞給ACE_Strategy_Connector。如果連接配接先前沒有建立過,ACE_Cached_Connect_Strategy将自動使用内部的建立政策來執行個體化适當的服務處理器,它是在這個模闆類被執行個體化時傳入的。這個政策可被設定為使用者想要使用的任何一種政策。除此而外,也可以将ACE_Cached_Connect_Strategy自己在其構造器中使用的建立、并發和recycling政策傳給它。下面的例子示範這些概念:

 

7-9

#include ”ace/Reactor.h”

#include ”ace/Svc_Handler.h”

#include ”ace/Connector.h”

#include ”ace/Synch.h”

#include ”ace/SOCK_Connector.h”

#include ”ace/INET_Addr.h”

 

#define PORT_NUM 10101

#define DATA_SIZE 16

 

//forward declaration

class My_Svc_Handler;

//Function prototype

static void make_connections(void *arg);

 

// Template specializations for the hashing function for the

// hash_map which is used by the cache. The cache is used internally by the

// Cached Connection Strategy . Here we use ACE_Hash_Addr

// as our external identifier. This utility class has already

// overloaded the == operator and the hash() method. (The

// hashing function). The hash() method delegates the work to

// hash_i() and we use the IP address and port to get a

// a unique integer hash value.

size_t ACE_Hash_Addr<ACE_INET_Addr>::hash_i (const ACE_INET_Addr &addr) const

{

return addr.get_ip_address () + addr.get_port_number ();

}

 

//instantiate a strategy acceptor

typedef ACE_Strategy_Connector<My_Svc_Handler,ACE_SOCK_CONNECTOR>

STRATEGY_CONNECTOR;

 

//Instantiate the Creation Strategy

typedef ACE_NOOP_Creation_Strategy<My_Svc_Handler>

NULL_CREATION_STRATEGY;

//Instantiate the Concurrency Strategy

typedef ACE_NOOP_Concurrency_Strategy<My_Svc_Handler>

NULL_CONCURRENCY_STRATEGY;

//Instantiate the Connection Strategy

typedef ACE_Cached_Connect_Strategy<My_Svc_Handler,

ACE_SOCK_CONNECTOR,

ACE_SYNCH_RW_MUTEX>

CACHED_CONNECT_STRATEGY;

 

class My_Svc_Handler:

public ACE_Svc_Handler <ACE_SOCK_STREAM,ACE_MT_SYNCH>

{

private:

char* data;

public:

My_Svc_Handler()

{

data= new char[DATA_SIZE];

}

 

My_Svc_Handler(ACE_Thread_Manager* tm)

{

data= new char[DATA_SIZE];

}

 

//Called before the service handler is recycled..

int

 recycle (void *a=0)/P>

{

ACE_DEBUG ((LM_DEBUG,

”(%P|%t) recycling Svc_Handler %d with handle %d\n”,

this, this->peer ().get_handle ()));

return 0;

}

 

int open(void*)

{

ACE_DEBUG((LM_DEBUG,”(%t)Connection established \n”));

 

//Register the service handler with the reactor

ACE_Reactor::instance()

->register_handler(this,ACE_Event_Handler::READ_MASK);

 

activate(THR_NEW_LWP|THR_DETACHED);

return 0;

}

 

int handle_input(ACE_HANDLE)

{

ACE_DEBUG((LM_DEBUG,”Got input in thread: (%t) \n”));

peer().recv_n(data,DATA_SIZE);

ACE_DEBUG((LM_DEBUG,”<< %s\n”,data));

 

//keep yourself registered with the reactor

return 0;

}

 

int svc(void)

{

//send a few messages and then mark connection as idle so that it can

// be recycled

 later.

ACE_DEBUG((LM_DEBUG,”Started the service routine \n”));

for(int i=0;i<3;i++)

{

ACE_DEBUG((LM_DEBUG,”(%t)>>Hello World\n”));

ACE_OS::fflush(stdout);

peer().send_n(”Hello World”,sizeof(”Hello World”));

}

 

//Mark the service handler as being idle now and let the

//other threads reuse this connection

this->idle(1);

 

//Wait for the thread to die

this->thr_mgr()->wait();

 

return 0;

}

};

 

ACE_INET_Addr *addr;

 

int main(int argc, char* argv[])

{

addr= new ACE_INET_Addr(PORT_NUM,argv[1]);

 

//Creation Strategy

NULL_CREATION_STRATEGY creation_strategy;

 

//Concurrency Strategy

NULL_CONCURRENCY_STRATEGY concurrency_strategy;

 

//Connection Strategy

CACHED_CONNECT_STRATEGY caching_connect_strategy;

 

//instantiate the connector

STRATEGY_CONNECTOR connector(

ACE_Reactor::instance(), //the reactor to use

&creation_strategy,

&caching_connect_strategy,

&concurrency_strategy);

 

//Use the thread manager to spawn a single thread

//to connect multiple times passing it the address

//of the strategy connector

if(ACE_Thread_Manager::instance()->spawn(

(ACE_THR_FUNC) make_connections,

(void *) &connector,

THR_NEW_LWP) == -1)

 

ACE_ERROR ((LM_ERROR, ”(%P|%t) %p\n%a”, ”client thread spawn failed”));

 

while(1) /* Start the reactor’s event loop */

ACE_Reactor::instance()->handle_events();

}

 

//Connection establishment function, tries to establish connections

//to the same server again and re-uses the connections from the cache

void make_connections(void *arg)

{

ACE_DEBUG((LM_DEBUG,”(%t)Prepared to connect \n”));

STRATEGY_CONNECTOR *connector= (STRATEGY_CONNECTOR*) arg;

for (int i = 0; i < 10; i++)

{

My_Svc_Handler *svc_handler = 0;

 

// Perform a blocking connect to the server using the Strategy

// Connector with a connection caching strategy. Since we are

// connecting to the same <server_addr> these calls will return the

// same dynamically allocated <Svc_Handler> for each <connect> call.

if (connector->connect (svc_handler, *addr) == -1)

{

ACE_ERROR ((LM_ERROR, ”(%P|%t) %p\n”, ”connection failed\n”));

return;

}

 

// Rest for a few seconds so that the connection has been freed up

ACE_OS::sleep (5);

}

}

在上面的例子中,緩存式連接配接政策被用于緩存連接配接。要使用這一政策,需要一點額外的工作:定義

ACE_Cached_Connect_Strategy在内部使用的哈希映射管理器的hash()方法。這個hash()方法用于對服務處理器和ACE_Cached_Connect_Strategy内部使用的連接配接進行哈希運算,放入緩存映射中。它簡單地使用IP位址和端口号的總和作為哈希函數,這也許并不是很好的哈希函數。

  這個例子比至今為止我們所列舉的例子都要複雜一點,是以有理由多進行一點讨論。

  我們為

ACE_Strategy_Acceptor使用空操作并發和建立政策。使用空操作建立政策是必須的。如上面所解釋的,如果沒有使用ACE_NOOP_Creation_Strategy,ACE_Cached_Connection_Strategy将會産生斷言失敗。但是,在使用ACE_Cached_Connect_Strategy時,任何并發政策都可以和政策接受器一起使用。如上面所提到的,ACE_Cached_Connect_Strategys所用的底層建立政策可以由使用者來設定。還可以設定recycling政策。這是在執行個體化caching_connect_strategy時,通過将所需的建立和recycling政策的對象傳給它的構造器來完成的。在這裡我們沒有這樣做,而是使用了預設的建立和recycling政策。

  在适當地設定連接配接器後,我們使用

Thread_Manager來派生新線程,且将make_connections()方法作為線程的入口。該方法使用我們的新的政策連接配接器來連接配接到遠地站點。在連接配接建立後,該線程休眠5秒鐘,然後使用我們的緩存式連接配接器來重新建立同樣的連接配接。于是該線程應該在連接配接器緩存中找到這個連接配接并複用它。

  和平常一樣,一旦連接配接建立後,反應堆回調我們的服務處理器(

My_Svc_Handler)。随後My_Svc_Handler的open()方法通過調用My_Svc_Handler的activate()方法來使它成為主動對象。svc()方法随後發送三條消息給遠地主機,并通過調用服務處理器的idle()方法将該連接配接标記為空閑。注意this->thr_mrg_wait()要求線程管理器等待所有線上程管理器中的線程終止。如果你不要求線程管理器等待其它線程,根據在ACE中設定的語義,一旦ACE_Task(在此例中是ACE_Task類型的ACE_Svc_Handler)中的線程終止,ACE_Task對象(在此例中是ACE_My_Svc_Handler)就會被自動删除。如果發生了這種情況,在Cache_Connect_Strategy查找先前緩存的連接配接時,它就不會如我們期望的那樣找到My_Svc_Handler,因為它已經被删除掉了。

  在

My_Svc_Handler中還重載了ACE_Svc_Handler中的recycle()方法。當有舊連接配接被ACE_Cache_Connect_Strategy找到時,這個方法就會被自動回調,這樣服務處理器就可以在此方法中完成回收利用所特有的操作。在我們的例子中,我們隻是列印出在緩存中找到的處理器的this指針的位址。在程式運作時,每次連接配接建立後所使用的句柄的位址是相同的,進而說明緩存工作正常。

 

7.6

通過接受器和連接配接器模式使用簡單事件處理器

  有時, ,使用重量級的

ACE_Svc_Handler作為接受器和連接配接器的處理器不僅沒有必要,而且會導緻代碼臃腫。在這樣情況下,使用者可以使用較輕的ACE_Event_Handler方法來作為反應堆在連接配接一旦建立時所回調的類。要采用這種方法,程式員需要重載get_handle()方法,并包含将要用于事件處理器的具體底層流。下面的例子有助于示範這些變動。這裡我們還編寫了新的peer()方法,它傳回底層流的引用(reference),就像在ACE_Svc_Handler類中所做的那樣。

 

7-10

#include ”ace/Reactor.h”

#include ”ace/Svc_Handler.h”

#include ”ace/Acceptor.h”

#include ”ace/Synch.h”

#include ”ace/SOCK_Acceptor.h”

 

#define PORT_NUM 10101

#define DATA_SIZE 12

 

//forward declaration

class My_Event_Handler;

 

//Create the Acceptor class

typedef ACE_Acceptor<My_Event_Handler,ACE_SOCK_ACCEPTOR>

MyAcceptor;

 

//Create an event handler similar to as seen in example 2. We have to

//overload the get_handle() method and write the peer() method. We also

//provide the data member peer_ as the underlying stream which is

//used.

class My_Event_Handler: public ACE_Event_Handler

{

private:

char* data;

 

//Add a new attribute for the underlying stream which will be used by

//the Event Handler

ACE_SOCK_Stream peer_;

 

public:

My_Event_Handler()

{

data= new char[DATA_SIZE];

}

 

int open(void*)

{

cout<<”Connection established”<<endl;

 

//Register the event handler with the reactor

ACE_Reactor::instance()->register_handler(this,

ACE_Event_Handler::READ_MASK);

 

return 0;

}

 

int handle_input(ACE_HANDLE)

{

// After using the peer() method of our ACE_Event_Handler to obtain a

//reference to the underlying stream of the service handler class we

//call recv_n() on it to read the data which has been received. This

//data is stored in the data array and then printed out

peer().recv_n(data,DATA_SIZE);

ACE_OS::printf(”<< %s\n”,data);

 

// keep yourself registered with the reactor

return 0;

}

 

// new method which returns the handle to the reactor when it

//asks for it.

ACE_HANDLE get_handle(void) const

{

return this->peer_.get_handle();

}

 

//new method which returns a reference to the peer stream

ACE_SOCK_Stream &peer(void) const

{

return (ACE_SOCK_Stream &) this->peer_;

}

};

 

int main(int argc, char* argv[])

{

ACE_INET_Addr addr(PORT_NUM);

 

//create the acceptor

MyAcceptor acceptor(addr, //address to accept on

ACE_Reactor::instance()); //the reactor to use