(本文原作于2006.03.15,第一次修正于2006.06.06,修正后适用于ESFramework V0.3+)
分布式系统的构建一般有两种模式,一是基于消息(如Tcp,http等),一是基于方法调用(如RPC、WebService、Remoting)。深入想一想,它们其实是一回事。如果你了解过.NET的Proxy,那么你会发现,方法调用和消息请求/回复实际上是可以相互转换的,.NET的Proxy的实现,就是在堆栈帧和消息之间相互转换的过程。关于这方面的详细论述可以参见《.Net本质论》一书。
我觉得IServerAgent是我在开发ESFramework期间非常满意的一个想法,相信大家也会对它感兴趣的。因为它使得使用基于消息请求/回复的交互就像方法调用一样简单。
客户端与服务器之间的所有通信都可经过IServerAgent,包括要转发的P2P消息。它的主要目的是:
(1)屏蔽客户端与服务端之间的通信协议(Tcp/Udp),ITcpServerAgent、IUdpServerAgent
(2)可将异步的消息请求/回复转化为同步的方法调用。
ESFramework主要支持基于Tcp或Udp的C/S系统,所以客户端和服务端之间是通过消息进行交互的。如果仅仅是客户端发出请求、服务器给出服务这种情况很容易处理,但是如果服务端有主动发消息给客户端的情况,事情就会变得稍微复杂。通常,客户端会有一个专门的接收线程来负责从网络接收数据,然后把接收的消息交给对应的处理器处理,或者,这个接收到的消息是个服务端给出的回复,那么这个回复就应该交给发出请求的请求者,但是对应的请求者在哪里了?这种回复消息与请求消息的匹配是比较繁琐的,特别是在上述服务端可以主动给客户端发送消息的情况下。为了简化这个过程,IServerAgent出现了,它用于客户端,像它的名字一样,可以把它当作服务器。IServerAgent的主要目的就是将消息请求/回复转换成方法调用,就像该接口定义的一样:
public interface IServerAgent
{
/// <summary>
/// 如果超时仍然没有回复,则抛出超时异常
/// 如果dataPriority != DataPriority.CanBeDiscarded ,则checkRespond只能为false
/// </summary>
NetMessage CommitRequest(NetMessage requestMsg ,DataPriority dataPriority , bool checkRespond);
}
public enum DataPriority
High ,//紧急命令
Common ,//如普通消息,如聊天消息
Low ,//如文件传输
CanBeDiscarded //如视频数据、音频数据
首先解释一下参数dataPriority的意义,dataPriority参数仅仅对Tcp协议起作用,当有多个请求要同时发送时,它决定了发送的优先级。CanBeDiscarded表明这个消息在网络繁忙时可以被抛弃,比如即时通讯的音频数据、视频数据等。关于这个数据发送的优先级机制的实现是ITcpAutoSender,这个组件会在后文中介绍。
CommitRequest方法提交一个请求消息该给服务器,并返回一个回复消息给请求者。这就是一个方法调用!!!其间隐藏了通过网络将消息发送给服务器并从服务器获取结果的中间细节。这是怎么做到的?思路其实很简单,只是描述起来有些复杂。主要要解决两个问题,一是如何将请求消息与对应的回复匹配起来,二是CommitRequest从哪里找到匹配的回复。
对于第一个问题,相信大家还记得IMessageHeader定义中有个CorrelationID属性,正如其名,这是一个随机数,每生成一个新的请求消息,就会产生一个随机数赋值给CorrelationID属性,由于随机数重复的可能性很小,所以可以把它当作是唯一的。这样一个随机数就唯一的标志了一个请求,当服务端收到这个请求后,就处理这个请求,并把回复消息的消息头中的CorrelationID属性设为与对应的请求消息的CorrelationID一样的值,这样,客户端收到回复消息后,就可以和对应的请求消息一一对应起来了。
对于第二个问题的解释,就需要涉及到ESFramework中支持客户端开发的其它两个组件:EsbPassiveDataDealer和IResponseManager。EsbPassiveDataDealer是客户端用户处理所有接收到的消息的处理器,而IResponseManager组件用于暂存所有的来自服务端的回复。对于每个接收到的消息,EsbPassiveDataDealer判断其是否为回复,如果是,则将其交给IResponseManager暂存。IResponseManager为暂存的每个回复都设置的生存期TTL,如果回复在IResponseManager中的时间超过了这个TTL,则会被删除。
你也许已经想到第二个问题的解决方法了。是的,CommitRequest方法将请求发送到网络之后,就定时从IResponseManager中寻找CorrelationID为请求消息头的CorrelationID值的回复消息,如果找到,就返回它,否则就等待循环,直至超时抛出TimeoutException异常。下面给出IResponseManager的接口定义:
public interface IResponseManager
void Initialize() ;
void PushResponse(NetMessage response) ;
NetMessage PopRespose(int correlationID ,int serviceKey) ; //立即返回
NetMessage PickupResponse(int serviceKey ,int corelationID) ;//在TimeoutSec时间内不断的PopRespose
/// ResponseTTL 如果一个回复在管理器中存在的时间超过ResponseTTL,则会被删除。如果ResponseTTL为0,则表示不进行生存期管理
/// </summary>
int ResponseTTL{set ;} //s
/// 如果在TimeoutSec内,仍然接收不到期望的回复,则抛出异常。取0时,表示不设置超时
/// </summary>
int TimeoutSec{set ; }
IServerAgent的具体实现包括TcpServerAgent和UdpServerAgent,分别支持Tcp协议和Udp协议的客户端开发。从它们的接口定义中可以看到它们都借助于IServerAgentHelper实现自己。
public interface IServerAgentHelper
IEsbLogger EsbLogger{set ; get ;}
IContractHelper ContractHelper{set ; get ;}
INetMessageHook NetMessageHook {set ; get ;}
IPassiveHelper PassiveHelper {set ; get ;}
IResponseManager ResponseManager{set ;get ;}
ISingleMessageDealer SingleMessageDealer{set ; get ;}
IMessageDispatcher ConstructDispatcher() ;
public IMessageDispatcher ConstructDispatcher()
{
//NakeDispatcher
EsbPassiveDataDealer dealer = new EsbPassiveDataDealer(this.responseManager ,this.passiveHelper ,this.singleMessageDealer) ;
EsbPassiveDealerFactory factory = new EsbPassiveDealerFactory(dealer) ;
NakeDispatcher nakeDispatcher = new NakeDispatcher() ;
nakeDispatcher.ContractHelper = this.contractHelper ;
nakeDispatcher.DataDealerFactory= factory ;
//MessageDispatcher
IMessageDispatcher messageDispatcher = new MessageDispatcher() ;
messageDispatcher.ContractHelper = this.contractHelper ;
messageDispatcher.NetMessageHook = this.netMessageHook ;
messageDispatcher.NakeDispatcher = nakeDispatcher ;
return messageDispatcher ;
}
在IServerAgent的基础之上,我们就可以从一个新的角度来设计客户端的结构的,那就是采用和功能服务器一样的插件方式。在ESFramework的支持下,我们的应用开发变得非常简洁和简单,所要做的主要内容就是开发服务端的“业务功能插件”和对应的客户端的“PassiveAddin”(客户端插件)。如果我们的应用已经发布投入使用,而此时用户要求添加一项新的业务,那将是非常简单的事情,那就是开发一个实现了新业务的功能插件动态加载到功能服务器中、再开发一个对应的客户端插件动态加载到客户端中,这样就可以了。服务器不用重编译、甚至不用停止服务;客户端也不用重编译、甚至不用停止使用。一切都是在运行中动态完成的。
这是如何做到的?请关注本系列文章。