一、TeamTalk服务器端以下部署程序:
db_proxy_server、file_server、http_msg_server、login_server、msfs、msg_server、push_server、router_server
各个服务程序的作用描述如下:
- LoginServer (C++): 负载均衡服务器,分配一个负载小的MsgServer给客户端使用
- MsgServer (C++): 消息服务器,提供客户端大部分信令处理功能,包括私人聊天、群组聊天等
- RouteServer (C++): 路由服务器,为登录在不同MsgServer的用户提供消息转发功能
- FileServer (C++): 文件服务器,提供客户端之间得文件传输服务,支持在线以及离线文件传输
- MsfsServer (C++): 图片存储服务器,提供头像,图片传输中的图片存储服务
- DBProxy (C++): 数据库代理服务器,提供mysql以及redis的访问服务,屏蔽其他服务器与mysql与redis的直接交互
- HttpMsgServer(C++) :对外接口服务器,提供对外接口功能。(目前只是框架)
- PushServer(C++): 消息推送服务器,提供IOS系统消息推送。(IOS消息推送必须走apns)
注意:上图中并没有push_server和http_push_server。如果你不调试ios版本的客户端,可以暂且不启动push_server,另外http_push_server也可以暂不启动。
启动顺序:
一般来说,前端的服务会依赖后端的服务,所以一般先启动后端服务,再启动前端服务。建议按以下顺序启动服务:
1、启动db_proxy。
2、启动route_server,file_server,msfs
3、启动login_server
4、启动msg_server
各个服务的端口号
服务 | 端口 |
login_server | 8080/8008 |
msg_server | 8000 |
db_proxy_server | 10600 |
route_server | 8200 |
http_msg_server | 8400 |
file_server | 8600/8601 |
服务网络通信框架介绍:
上面介绍的每一个服务都使用了相同的网络通信框架,该通信框架可以单独拿出来做为一个通用的网络通信框架。该网络框架是在一个循环里面不断地检测IO事件,然后对检测到的事件进行处理。流程如下:
1. 使用IO复用技术(linux和windows平台用select、mac平台用kevent)分离网络IO。
2. 对分离出来的网络IO进行操作,分为socket句柄可读、可写和出错三种情况。
当然再加上定时器事件,即检测一个定时器事件列表,如果有定时器到期,则执行该定时器事件。
整个框架的伪码大致如下(即主线程不断事件循环(Reactor模式)):
while (running)
{
//处理定时器事件
_CheckTimer(); //1. 遍历定时器队列,检测是否有定时器事件到期,有则执行定时器的回调函数
//处理其他事件
_CheckLoop(); //2. 遍历其他任务队列,检测是否有其他任务需要执行,有,执行之
//IO multiplexing
int n = select(socket集合, ...); //3. 检测socket集合,分离可读、可写和异常事件
//事件处理
if (某些socket可读)
{
pSocket->OnRead(); //4. 处理socket可读事件
}
if (某些socket可写)
{
pSocket->OnWrite(); //5. 处理socket可写事件
}
if (某些socket出错)
{
pSocket->OnClose(); //6. 处理socket异常事件
}
}
二、具体实现
(1)处理定时器事件的代码如下:
void CEventDispatch::_CheckTimer()
{
uint64_t curr_tick = get_tick_count();
list<TimerItem*>::iterator it;
for (it = m_timer_list.begin(); it != m_timer_list.end(); )
{
TimerItem* pItem = *it;
it++; //iterator maybe deleted in the callback, so we should increment it before callback
if (curr_tick >= pItem->next_tick)
{
pItem->next_tick += pItem->interval;
pItem->callback(pItem->user_data, NETLIB_MSG_TIMER, 0, NULL);
}
}
}
即遍历一个定时器列表,将定时器对象与当前时间(curr_tick)做比较,如果当前时间已经大于或等于定时器设置的时间,则表明定时器时间已经到了,执行定时器对象对应的回调函数。
(2)事件处理
再来看看OnRead、OnWrite和OnClose这三个函数。在TeamTalk源码中每一个socket连接被封装成一个CBaseSocket对象,该对象是一个使用引用计数的类的子类,通过这种方法来实现生存期自动管理。
void CBaseSocket::OnRead()
{
if (m_state == SOCKET_STATE_LISTENING)
{
_AcceptNewSocket();
}
else
{
u_long avail = 0;
if ( (ioctlsocket(m_socket, FIONREAD, &avail) == SOCKET_ERROR) || (avail == 0) )
{
m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
}
else
{
m_callback(m_callback_data, NETLIB_MSG_READ, (net_handle_t)m_socket, NULL);
}
}
}
OnRead()方法根据状态标识m_state确定一个socket是侦听的socket还是普通与客户端连接的socket,如果是侦听sokcet则接收客户端的连接;如果是与客户端连接的socket,则先检测socket上有多少字节可读,如果没有字节可读或者检测字节数时出错,则关闭socket,反之调用设置的回调函数。
void CBaseSocket::OnWrite()
{
#if ((defined _WIN32) || (defined __APPLE__))
CEventDispatch::Instance()->RemoveEvent(m_socket, SOCKET_WRITE);
#endif
if (m_state == SOCKET_STATE_CONNECTING)
{
int error = 0;
socklen_t len = sizeof(error);
#ifdef _WIN32
getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len);
#else
getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (void*)&error, &len);
#endif
if (error) {
m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
} else {
m_state = SOCKET_STATE_CONNECTED;
m_callback(m_callback_data, NETLIB_MSG_CONFIRM, (net_handle_t)m_socket, NULL);
}
}
else
{
m_callback(m_callback_data, NETLIB_MSG_WRITE, (net_handle_t)m_socket, NULL);
}
}
OnWrite()函数则根据m_state标识检测socket是否是尝试连接的socket(connect函数中的socket),用于判断socket是否已经连接成功,反之则是与客户端保持连接的socket,调用预先设置的回调函数。
依托的底层实现:
1、每个服务程序都使用一个stl hash_map来管理所有的socket,键是socket句柄,值是CBaseSocket对象指针:
typedef hash_map<net_handle_t, CBaseSocket*> SocketMap;
SocketMap g_socket_map;
每一个socket连接被封装成一个CBaseSocket对象,该对象是一个使用引用计数的类(CRefObject)的子类,通过这种方法来实现生存期自动管理。
//a wrap for non-block socket class
class CBaseSocket : public CRefObject
所以在删除或者新增socket时(实际上目的是在事件驱动器删除或者新增socket事件),实际上就是从这个hash_map中删除或者向这个hash_map中增加对象。多线程操作,需要一个锁来进行保护:
void CEventDispatch::AddEvent(SOCKET fd, uint8_t socket_event)
{
CAutoLock func_lock(&m_lock);
if ((socket_event & SOCKET_READ) != 0)
{
FD_SET(fd, &m_read_set);
}
if ((socket_event & SOCKET_WRITE) != 0)
{
FD_SET(fd, &m_write_set);
}
if ((socket_event & SOCKET_EXCEP) != 0)
{
FD_SET(fd, &m_excep_set);
}
}
代码CAutoLock func_lock(&m_lock);即保护该hash_map的锁对象。
2、而管理以上功能的是一个单例类CEventDispatch,所以不难猜出CEventDispatch提供的接口:
class CEventDispatch
{
public:
virtual ~CEventDispatch();
void AddEvent(SOCKET fd, uint8_t socket_event);
void RemoveEvent(SOCKET fd, uint8_t socket_event);
void AddTimer(callback_t callback, void* user_data, uint64_t interval);
void RemoveTimer(callback_t callback, void* user_data);
void AddLoop(callback_t callback, void* user_data);
void StartDispatch(uint32_t wait_timeout = 100);
void StopDispatch();
bool isRunning() {return running;}
static CEventDispatch* Instance();
protected:
CEventDispatch();
private:
void _CheckTimer();
void _CheckLoop();
typedef struct {
callback_t callback;
void* user_data;
uint64_t interval;
uint64_t next_tick;
} TimerItem;
private:
#ifdef _WIN32
fd_set m_read_set;
fd_set m_write_set;
fd_set m_excep_set;
#elif __APPLE__
int m_kqfd;
#else
int m_epfd;
#endif
CLock m_lock;
list<TimerItem*> m_timer_list;
list<TimerItem*> m_loop_list;
static CEventDispatch* m_pEventDispatch;
bool running;
};
其中StartDispatch()和StopDispatcher()分别用于启动和停止整个循环流程。一般在程序初始化的时候StartDispatch(),在程序退出时StopDispatcher()。
之所以设计CEventDispatch为单例类,是由于每个服务器只有一个事件循环(多线程而单Reactor)。此外设计成单例类的好处是在某个位置需要使用CEventDispatch时只需调用CEventDispatch->Instance()即可获得单例指针。
三、知识点分析
1、引用计数与对象生存期管理。(测试)使用引用计数管理对象生命期
(1)在TeamTalk源码中每一个socket连接被封装成一个CBaseSocket对象,该对象是一个使用引用计数的基类的子类,通过这种方法来(手工)实现生存期自动管理。(理论上,引用计数机制要求程序员小心跟踪每个持有对象指针的变量,在它们走出作用域时,手工调用上述release_reference(),不能有任何一次遗漏,否则将引起内存泄漏Memory Leak。另外,为阻止程序员误将该类型的对象分配在栈(Stack)上,最好将其构造函数申明成protected。此外,在多线程环境下,还需要考虑引用计数的线程安全性。)
(2)引用计数如何管理对象生存期
C++中基类的析构函数为什么要用virtual虚析构函数:
C++中基类采用virtual虚析构函数是为了防止内存泄漏。具体地说,如果派生类中申请了内存空间,并在其析构函数中对这些内存空间进行释放。假设基类中采用的是非虚析构函数,当删除基类指针指向的派生类对象时就不会触发动态绑定,因而只会调用基类的析构函数,而不会调用派生类的析构函数。那么在这种情况下,派生类中申请的空间就得不到释放从而产生内存泄漏。所以,为了防止这种情况的发生,C++中基类的析构函数应采用virtual虚析构函数。
(3)TeamTalk中的实现
int CBaseSocket::Close()
{
CEventDispatch::Instance()->RemoveEvent(m_socket, SOCKET_ALL);
RemoveBaseSocket(this);
closesocket(m_socket);
ReleaseRef();
return 0;
}
CBaseSocket* FindBaseSocket(net_handle_t fd)
{
CBaseSocket* pSocket = NULL;
SocketMap::iterator iter = g_socket_map.find(fd);
if (iter != g_socket_map.end())
{
pSocket = iter->second;
pSocket->AddRef();
}
return pSocket;
}
其中的 ReleaseRef(); 和 AddRef(); 为基类对引用计数的操作:
void CRefObject::AddRef()
{
if (m_lock)
{
m_lock->lock();
m_refCount++;
m_lock->unlock();
}
else
{
m_refCount++;
}
}
void CRefObject::ReleaseRef()
{
if (m_lock)
{
m_lock->lock();
m_refCount--;
if (m_refCount == 0)
{
delete this;
return;
}
m_lock->unlock();
}
else
{
m_refCount--;
if (m_refCount == 0)
delete this;
}
}
TeamTalk对 CBaseSocket 对象的生存期管理实际上是对全局管理变量hash_map<net_handle_t, CBaseSocket*> g_socket_map;的增加和删去操作的管理。当 new CBaseSocket 放入 g_socket_map 时,则基类引用计数加一;当 从 g_socket_map 删去 CBaseSocket 时,则基类引用计数减一。当减到0时基类函数中调用 delete this。
- 当计数变量值为0时 delete this; ,此处的 this 是(多态)运行时绑定 CBaseSocket 对象指针而非基类 CRefObject 对象指针。从而实现引用计数管理 CBaseSocket 对象的生存期。
- 为阻止程序员误将该类型的对象分配在栈(Stack)上,最好将其拷贝构造函数和拷贝复制操作符申明成 protected 后 private(TeamTalk并没有如此操作)。
- 此外,在多线程环境下,还需要考虑引用计数的线程安全性。
- 最后,引用计数机制要求程序员小心跟踪每个持有对象指针的变量,在它们走出作用域时,手工调用上述release_reference(),不能有任何一次遗漏,否则将引起内存泄漏 Memory Leak 。因此,如果逻辑太过复杂则最好使用智能指针进行管理(参考muduo)。
2、设计CEventDispatch为单例类
之所以设计CEventDispatch为单例类,是由于设计了每个服务器只有一个事件循环(多线程而单Reactor)。此外设计成单例类的好处是在某个位置需要使用CEventDispatch时只需调用CEventDispatch->Instance()即可获得单例指针。
3、使用标识和状态使代码逻辑清晰
如OnRead()、OnWrite()、OnClose()中的回调函数
m_callback(m_callback_data, NETLIB_MSG_READ, (net_handle_t)m_socket, NULL);
m_callback(m_callback_data, NETLIB_MSG_WRITE, (net_handle_t)m_socket, NULL);
m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
使用标识 NETLIB_MSG_READ 、NETLIB_MSG_WRITE 和 NETLIB_MSG_CLOSE。在回调 m_callback 中使用 switich case 对特定标识执行特定逻辑。然后该基类指针通过多态调用继承类的虚函数。
void imconn_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{
NOTUSED_ARG(handle);
NOTUSED_ARG(pParam);
if (!callback_data)
return;
ConnMap_t* conn_map = (ConnMap_t*)callback_data;
CImConn* pConn = FindImConn(conn_map, handle);
if (!pConn)
return;
//log("msg=%d, handle=%d ", msg, handle);
switch (msg)
{
case NETLIB_MSG_CONFIRM:
pConn->OnConfirm();
break;
case NETLIB_MSG_READ:
pConn->OnRead();
break;
case NETLIB_MSG_WRITE:
pConn->OnWrite();
break;
case NETLIB_MSG_CLOSE:
pConn->OnClose();
break;
default:
log("!!!imconn_callback error msg: %d ", msg);
break;
}
pConn->ReleaseRef();
}
此外,对侦听socket和普通socket使用不同状态: SOCKET_STATE_LISTENING 和 SOCKET_STATE_CONNECTED 进行区分,使得当socket可读时,通过CBaseSocket的状态能够分别执行各自的逻辑函数。至此,对于侦听socket,如果socket可读,则接收新连接,并置换其默认OnRead的回调函数为imconn_callback;而对于新socket,如果socket可读,则会调用imconn_callback。
void CBaseSocket::OnRead()
{
if (m_state == SOCKET_STATE_LISTENING)
{
_AcceptNewSocket();
}
else
{
u_long avail = 0;
if ( (ioctlsocket(m_socket, FIONREAD, &avail) == SOCKET_ERROR) || (avail == 0) )
{
m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
}
else
{
m_callback(m_callback_data, NETLIB_MSG_READ, (net_handle_t)m_socket, NULL);
}
}
}
4、学习 OnRead() 和 OnWrite() 函数的处理方法
(1)OnRead()
void CBaseSocket::OnRead()
{
if (m_state == SOCKET_STATE_LISTENING)
{
_AcceptNewSocket();
}
else
{
u_long avail = 0;
if ( (ioctlsocket(m_socket, FIONREAD, &avail) == SOCKET_ERROR) || (avail == 0) )
{
m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
}
else
{
m_callback(m_callback_data, NETLIB_MSG_READ, (net_handle_t)m_socket, NULL);
}
}
}
首先是通过不同状态 SOCKET_STATE_LISTENING 和 SOCKET_STATE_CONNECTED 区分(服务端)侦听socket(listen(2))和普通socket,执行相应的可读事件,即 if 分支和 else 分支。
然后针对普通socket,调用系统函数:
ioctlsocket(sock, FIONREAD, &packet_length)
获得普通socket可读的数据字节数。如果出错或者字节数为0,则以消息NETLIB_MSG_CLOSE调用回调函数imconn_callback;反之,以消息NETLIB_MSG_READ调用回调函数imconn_callback进行正常的读处理。
(2)OnWrite()
void CBaseSocket::OnWrite()
{
#if ((defined _WIN32) || (defined __APPLE__))
CEventDispatch::Instance()->RemoveEvent(m_socket, SOCKET_WRITE);
#endif
if (m_state == SOCKET_STATE_CONNECTING)
{
int error = 0;
socklen_t len = sizeof(error);
#ifdef _WIN32
getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len);
#else
getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (void*)&error, &len);
#endif
if (error) {
m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
} else {
m_state = SOCKET_STATE_CONNECTED;
m_callback(m_callback_data, NETLIB_MSG_CONFIRM, (net_handle_t)m_socket, NULL);
}
}
else
{
m_callback(m_callback_data, NETLIB_MSG_WRITE, (net_handle_t)m_socket, NULL);
}
}
首先是通过不同状态 SOCKET_STATE_CONNECTING 和 SOCKET_STATE_CONNECTED 区分(客户端)连接socket(connect(2))和普通socket,执行相应的可写事件,即 if 分支和 else 分支。
由于连接socket的状态是 SOCKET_STATE_CONNECTING,会走第一个if分支。调用getsocketopt检查是否出错:
getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len);
若出错则以参数 NETLIB_MSG_CLOSE 调用回调 imconn_callback;若不出错,以参数 NETLIB_MSG_CONFIRM 调用之前设置的回调函数 imconn_callback。
(3)总结
由此可见,TeamTalk的网络库框架将服务端的侦听socket和普通socket的可读事件(侦听socket即服务端调用listen(2)进行监听的socket,当有连接到达时该侦听socket可读)汇总到了OnRead()函数;而客户端的连接socket以及普通socket的可写事件(连接socket即客户端调用connect(2)进行连接的socket,当连接成功以后该连接socket立马会变的可写(参考UNP第6章:socket可读可写就绪条件))汇总到了OnWrite()函数,通过socket的wrapper对象CBaseSocket的不同状态(SOCKET_STATE_LISTENING 、SOCKET_STATE_CONNECTED 和SOCKET_STATE_CONNECTING)执行不同的可读可写逻辑,然后通过不同的消息信息(NETLIB_MSG_READ、NETLIB_MSG_WRITE、 NETLIB_MSG_CONFIRM 和 NETLIB_MSG_CLOSE )执行相应的回调imconn_callback。
进一步的底层实现参考对socket进行wrap的类CBaseSocket的实现:文件BaseSocket.cpp