由于蘑菇街的TeamServer包含了login_server ,msg_server等几个不同的服务端,本文会逐步进行分析,并持续更新。。。。。
首先分析为什么需要应用层的心跳机制
对应IM使用TCP协议还是UDP协议还是个有争议的话题,仁者见仁智者见智,不过个人觉得这得看实际应用场景,根据应用场景的不同用不同的协议。在TCP协议实现的IM中,需要考虑一个很重要的问题就是心跳保活,那么什么是心跳保活呢?
简单的理解就是在使用TCP协议的IM中,TCP连接的某一端定时向对端发送自定义消息,以确定双方是否存活,类似于心跳因此叫做心跳指令或者心跳保活。
读者可能有一个疑问:TCP协议中不是有一个字段用于KeepAlive吗?为什么还需要一个应用层的心跳机制?难道不能利用协议层的KeepAlive机制吗?
要回答这些问题首先要回答为什么IM中保持有效长连接的必要性。
对于客户端而言,保持长连接的长连接的有效性可以使得每次请求都只是简单的数据发送和接受,而不必要每次重新建立一个连接,重新解析DNS,省去连接建立的时间,加快了于服务器之间的通信速度,有利于接收服务器的实时消息,前提都是连接必须可用。
对于服务器而言,保持连接的有效性可以让服务器降低负载,及时清除无效的连接。
那么为什么TCP协议的KeepAlive无法实现确保连接的有效性呢?TCP的KeepAlive功能是定时间隔发送一个KeepAlive探针来确认连接的有效性,一般为7200s,失败后重试10次,每次间隔75s,由于移动网络的特殊性,这个时间是不能满足IM通信的要求的。那么如果修改KeepAlive时间后呢?答案仍然是否定的。KeepAlive仅用于检测连接的死活,而应用层心跳机制还有一个目的是检测简练的存活状态。考虑一个场景:当某台服务器处理负载过高,CPU利用率100%,无法处理任何业务请求,在这种情况下,TCP探针仍然确认对方可用,而实际上,服务器已经不能处理任何请求了,这种情况下应该断开当前连接,重新建立连接,而不是认为服务器仍然可用。因此,从这个角度出发,应用层心跳机制也是必然需要的。
蘑菇街TeamTalk中Login_Server的心跳保活使用方法。
蘑菇街的心跳保活的方法采取了一种简单的方法--定时心跳。由login_Server每隔5秒向消息服务器发送心跳信息,如果30秒内没有收到消息服务器的相应,则关闭当前连接;每隔一分钟向移动端或者PC端发送定时心跳,一分钟没收到对端相应,则关闭当前连接。
具体实现为:
Login_Server的main函数中,对定时器初始化。
[cpp] view plain copy print ?
- init_login_conn();
- init_http_conn();
init_login_conn();
init_http_conn();
两个函数的实现为:
[cpp] view plain copy print ?
- void init_login_conn()
- {
- netlib_register_timer(login_conn_timer_callback, NULL, 1000);
- }
void init_login_conn()
{
netlib_register_timer(login_conn_timer_callback, NULL, 1000);
}
[cpp] view plain copy print ?
- void init_http_conn()
- {
- netlib_register_timer(http_conn_timer_callback, NULL, 1000);
- }
void init_http_conn()
{
netlib_register_timer(http_conn_timer_callback, NULL, 1000);
}
在这两个函数中向时间分发器CEventDispatch分别注册了定时器回调函数和定时器间隔,间隔为1s。
注册方式为:
[cpp] view plain copy print ?
- int netlib_register_timer(callback_t callback, void* user_data, uint64_t interval)
- {
- CEventDispatch::Instance()->AddTimer(callback, user_data, interval);
- return 0;
- }
int netlib_register_timer(callback_t callback, void* user_data, uint64_t interval)
{
CEventDispatch::Instance()->AddTimer(callback, user_data, interval);
return 0;
}
[cpp] view plain copy print ?
- void CEventDispatch::AddTimer(callback_t callback, void* user_data, uint64_t interval)
- {
- list<TimerItem*>::iterator it;
- for (it = m_timer_list.begin(); it != m_timer_list.end(); it++)
- {
- TimerItem* pItem = *it;
- if (pItem->callback == callback && pItem->user_data == user_data)
- {
- pItem->interval = interval;
- pItem->next_tick = get_tick_count() + interval;
- return;
- }
- }
- TimerItem* pItem = new TimerItem;
- pItem->callback = callback;
- pItem->user_data = user_data;
- pItem->interval = interval;
- pItem->next_tick = get_tick_count() + interval;
- m_timer_list.push_back(pItem);
- }
void CEventDispatch::AddTimer(callback_t callback, void* user_data, uint64_t interval)
{
list<TimerItem*>::iterator it;
for (it = m_timer_list.begin(); it != m_timer_list.end(); it++)
{
TimerItem* pItem = *it;
if (pItem->callback == callback && pItem->user_data == user_data)
{
pItem->interval = interval;
pItem->next_tick = get_tick_count() + interval;
return;
}
}
TimerItem* pItem = new TimerItem;
pItem->callback = callback;
pItem->user_data = user_data;
pItem->interval = interval;
pItem->next_tick = get_tick_count() + interval;
m_timer_list.push_back(pItem);
}
时间分发起CEventDispatch中保存了一个TimerItem的List,注册的定时器都添加在该List中,添加的时候会先检查对应的定时器是否已经存在,存在的话更新定时时间和间隔,不存在的话新建对象并加入到List中,至此定时器已经注册完成。
在时间分发器的循环中,CEventDispatch::StartDispatch的事件循环中,会每次调用分发器的_CheckTimer函数,在该函数中检查定时器时间是否已经到了。
[cpp] view plain copy print ?
- void CEventDispatch::_CheckTimer()
- {
- uint64_t curr_tick = get_tick_count();
- list<TimerItem*>::iterator it;
- for (it = m_timer_list.begin(); it != m_timer_list.end(); )
- {
- TimerItem* pItem = *it;
- it++; // iterator maybe deleted in the callback, so we should increment it before callback
- if (curr_tick >= pItem->next_tick)
- {
- pItem->next_tick += pItem->interval;
- pItem->callback(pItem->user_data, NETLIB_MSG_TIMER, 0, NULL);
- }
- }
- }
void CEventDispatch::_CheckTimer()
{
uint64_t curr_tick = get_tick_count();
list<TimerItem*>::iterator it;
for (it = m_timer_list.begin(); it != m_timer_list.end(); )
{
TimerItem* pItem = *it;
it++; // iterator maybe deleted in the callback, so we should increment it before callback
if (curr_tick >= pItem->next_tick)
{
pItem->next_tick += pItem->interval;
pItem->callback(pItem->user_data, NETLIB_MSG_TIMER, 0, NULL);
}
}
}
该函数中会遍历定时器List,比较当前时间和定时器的next_tick,如果定时器已到,更新下一个TICK时间,并调用之前注册回调函数。对于客户端和msg_Server,回调函数分别为login_conn_timer_callback ;http_conn_timer_callback(Login_Server只有客户端和消息服务器MSG_Server会向其发起连接)。
[cpp] view plain copy print ?
- void http_conn_timer_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
- {
- CHttpConn* pConn = NULL;
- HttpConnMap_t::iterator it, it_old;
- uint64_t cur_time = get_tick_count();
- for (it = g_http_conn_map.begin(); it != g_http_conn_map.end(); ) {
- it_old = it;
- it++;
- pConn = it_old->second;
- pConn->OnTimer(cur_time);
- }
- }
void http_conn_timer_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{
CHttpConn* pConn = NULL;
HttpConnMap_t::iterator it, it_old;
uint64_t cur_time = get_tick_count();
for (it = g_http_conn_map.begin(); it != g_http_conn_map.end(); ) {
it_old = it;
it++;
pConn = it_old->second;
pConn->OnTimer(cur_time);
}
}
[cpp] view plain copy print ?
- void login_conn_timer_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
- {
- uint64_t cur_time = get_tick_count();
- for (ConnMap_t::iterator it = g_client_conn_map.begin(); it != g_client_conn_map.end(); ) {
- ConnMap_t::iterator it_old = it;
- it++;
- CLoginConn* pConn = (CLoginConn*)it_old->second;
- pConn->OnTimer(cur_time);
- }
- for (ConnMap_t::iterator it = g_msg_serv_conn_map.begin(); it != g_msg_serv_conn_map.end(); ) {
- ConnMap_t::iterator it_old = it;
- it++;
- CLoginConn* pConn = (CLoginConn*)it_old->second;
- pConn->OnTimer(cur_time);
- }
- }
void login_conn_timer_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{
uint64_t cur_time = get_tick_count();
for (ConnMap_t::iterator it = g_client_conn_map.begin(); it != g_client_conn_map.end(); ) {
ConnMap_t::iterator it_old = it;
it++;
CLoginConn* pConn = (CLoginConn*)it_old->second;
pConn->OnTimer(cur_time);
}
for (ConnMap_t::iterator it = g_msg_serv_conn_map.begin(); it != g_msg_serv_conn_map.end(); ) {
ConnMap_t::iterator it_old = it;
it++;
CLoginConn* pConn = (CLoginConn*)it_old->second;
pConn->OnTimer(cur_time);
}
}
在定时器的回调函数中,分别遍历当前所有连接,调用对应连接的OnTimer函数。说明g_client_map已经不用了。
分别分析客户端http连接的OnTimer 和msg_server连接的OnTimer
客户端Http的Ontimer
[cpp] view plain copy print ?
- void CHttpConn::OnTimer(uint64_t curr_tick)
- {
- if (curr_tick > m_last_recv_tick + HTTP_CONN_TIMEOUT) {
- log("HttpConn timeout, handle=%d ", m_conn_handle);
- Close();
- }
- }
void CHttpConn::OnTimer(uint64_t curr_tick)
{
if (curr_tick > m_last_recv_tick + HTTP_CONN_TIMEOUT) {
log("HttpConn timeout, handle=%d ", m_conn_handle);
Close();
}
}
客户端http连接的定时器的处理策略很简单,http连接对象HttpConn保存了一个上一次收到的数据的时间m_last_recv_tick,将当前时间与上次收到数据的时间和HTTP超时时间进行比较,如果超时未收到数据,直接关闭该连接,HTTP的超时时间是1分钟。
Msg_Server的OnTimer
[cpp] view plain copy print ?
- void CLoginConn::OnTimer(uint64_t curr_tick)
- {
- if (m_conn_type == LOGIN_CONN_TYPE_CLIENT) {
- if (curr_tick > m_last_recv_tick + CLIENT_TIMEOUT) {
- Close();
- }
- } else {
- if (curr_tick > m_last_send_tick + SERVER_HEARTBEAT_INTERVAL) {
- IM::Other::IMHeartBeat msg;
- CImPdu pdu;
- pdu.SetPBMsg(&msg);
- pdu.SetServiceId(SID_OTHER);
- pdu.SetCommandId(CID_OTHER_HEARTBEAT);
- SendPdu(&pdu);
- }
- if (curr_tick > m_last_recv_tick + SERVER_TIMEOUT) {
- log("connection to MsgServer timeout ");
- Close();
- }
- }
- }
void CLoginConn::OnTimer(uint64_t curr_tick)
{
if (m_conn_type == LOGIN_CONN_TYPE_CLIENT) {
if (curr_tick > m_last_recv_tick + CLIENT_TIMEOUT) {
Close();
}
} else {
if (curr_tick > m_last_send_tick + SERVER_HEARTBEAT_INTERVAL) {
IM::Other::IMHeartBeat msg;
CImPdu pdu;
pdu.SetPBMsg(&msg);
pdu.SetServiceId(SID_OTHER);
pdu.SetCommandId(CID_OTHER_HEARTBEAT);
SendPdu(&pdu);
}
if (curr_tick > m_last_recv_tick + SERVER_TIMEOUT) {
log("connection to MsgServer timeout ");
Close();
}
}
}
该函数就是登陆服务器Login_Server和消息服务器Msg_Server的心跳保活机制的实现。
连接类型分为Client和Server,Client已经废弃,因此走第二个分支,首先判断当前时间与上一次发送心跳的时间+心跳超时时间,如果当前时间已经大于这个时间,说明Login_Server已经在超过超时时间未收到Msg_Server的心跳响应,重发心跳保活指令,心跳超时时间为5S,重发后判断是否超过了服务器的超时时间,服务器超时时间为30s若超过了服务器超时时间则认为当前连接已经不可用了,关闭当前连接。
从上可看出,Login_Server和Msg_Server的通信过程中用到了心跳保活机制,心跳时间为5S,持续发送心跳指令,若超过30S仍未收到Msg_Server的心跳响应,则认为当前连接已经不可用了,关闭当前连接。