天天看点

TeamTalk源码分析(1)

一、TeamTalk服务器端以下部署程序:

db_proxy_server、file_server、http_msg_server、login_server、msfs、msg_server、push_server、router_server

TeamTalk源码分析(1)

各个服务程序的作用描述如下:

  • LoginServer (C++): 负载均衡服务器,分配一个负载小的MsgServer给客户端使用
  • MsgServer (C++): 消息服务器,提供客户端大部分信令处理功能,包括私人聊天、群组聊天等
  • RouteServer (C++): 路由服务器,为登录在不同MsgServer的用户提供消息转发功能
  • FileServer (C++): 文件服务器,提供客户端之间得文件传输服务,支持在线以及离线文件传输
  • MsfsServer (C++): 图片存储服务器,提供头像,图片传输中的图片存储服务
  • DBProxy (C++): 数据库代理服务器,提供mysql以及redis的访问服务,屏蔽其他服务器与mysql与redis的直接交互
  • HttpMsgServer(C++) :对外接口服务器,提供对外接口功能。(目前只是框架)
  • PushServer(C++): 消息推送服务器,提供IOS系统消息推送。(IOS消息推送必须走apns)

注意:上图中并没有push_server和http_push_server。如果你不调试ios版本的客户端,可以暂且不启动push_server,另外http_push_server也可以暂不启动。

启动顺序:

一般来说,前端的服务会依赖后端的服务,所以一般先启动后端服务,再启动前端服务。建议按以下顺序启动服务:

1、启动db_proxy。

2、启动route_server,file_server,msfs

3、启动login_server

4、启动msg_server

各个服务的端口号

服务 端口
login_server 8080/8008
msg_server 8000
db_proxy_server 10600
route_server 8200
http_msg_server 8400
file_server 8600/8601

服务网络通信框架介绍:

上面介绍的每一个服务都使用了相同的网络通信框架,该通信框架可以单独拿出来做为一个通用的网络通信框架。该网络框架是在一个循环里面不断地检测IO事件,然后对检测到的事件进行处理。流程如下:

1. 使用IO复用技术(linux和windows平台用select、mac平台用kevent)分离网络IO。

2. 对分离出来的网络IO进行操作,分为socket句柄可读、可写和出错三种情况。

当然再加上定时器事件,即检测一个定时器事件列表,如果有定时器到期,则执行该定时器事件。

整个框架的伪码大致如下(即主线程不断事件循环(Reactor模式)):

while (running)
{
    //处理定时器事件
    _CheckTimer();  //1. 遍历定时器队列,检测是否有定时器事件到期,有则执行定时器的回调函数

    //处理其他事件
    _CheckLoop();  //2. 遍历其他任务队列,检测是否有其他任务需要执行,有,执行之

    //IO multiplexing
    int n = select(socket集合, ...);  //3. 检测socket集合,分离可读、可写和异常事件

    //事件处理
    if (某些socket可读)
    {
        pSocket->OnRead();  //4. 处理socket可读事件
    }

    if (某些socket可写)
    {
        pSocket->OnWrite();  //5. 处理socket可写事件
    }

    if (某些socket出错)
    {
        pSocket->OnClose();  //6. 处理socket异常事件
    }
}
           

二、具体实现

(1)处理定时器事件的代码如下:

void CEventDispatch::_CheckTimer()
{
	uint64_t curr_tick = get_tick_count();
	list<TimerItem*>::iterator it;

	for (it = m_timer_list.begin(); it != m_timer_list.end(); )
	{
		TimerItem* pItem = *it;
		it++;    //iterator maybe deleted in the callback, so we should increment it before callback
		if (curr_tick >= pItem->next_tick)
		{
			pItem->next_tick += pItem->interval;
			pItem->callback(pItem->user_data, NETLIB_MSG_TIMER, 0, NULL);
		}
	}
}
           

即遍历一个定时器列表,将定时器对象与当前时间(curr_tick)做比较,如果当前时间已经大于或等于定时器设置的时间,则表明定时器时间已经到了,执行定时器对象对应的回调函数。

(2)事件处理

再来看看OnRead、OnWrite和OnClose这三个函数。在TeamTalk源码中每一个socket连接被封装成一个CBaseSocket对象,该对象是一个使用引用计数的类的子类,通过这种方法来实现生存期自动管理。

void CBaseSocket::OnRead()
{
	if (m_state == SOCKET_STATE_LISTENING)
	{
		_AcceptNewSocket();
	}
	else
	{
		u_long avail = 0;
		if ( (ioctlsocket(m_socket, FIONREAD, &avail) == SOCKET_ERROR) || (avail == 0) )
		{
			m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
		}
		else
		{
			m_callback(m_callback_data, NETLIB_MSG_READ, (net_handle_t)m_socket, NULL);
		}
	}
}
           

OnRead()方法根据状态标识m_state确定一个socket是侦听的socket还是普通与客户端连接的socket,如果是侦听sokcet则接收客户端的连接;如果是与客户端连接的socket,则先检测socket上有多少字节可读,如果没有字节可读或者检测字节数时出错,则关闭socket,反之调用设置的回调函数。

void CBaseSocket::OnWrite()
{
#if ((defined _WIN32) || (defined __APPLE__))
	CEventDispatch::Instance()->RemoveEvent(m_socket, SOCKET_WRITE);
#endif

	if (m_state == SOCKET_STATE_CONNECTING)
	{
		int error = 0;
		socklen_t len = sizeof(error);
#ifdef _WIN32

		getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len);
#else
		getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (void*)&error, &len);
#endif
		if (error) {
			m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
		} else {
			m_state = SOCKET_STATE_CONNECTED;
			m_callback(m_callback_data, NETLIB_MSG_CONFIRM, (net_handle_t)m_socket, NULL);
		}
	}
	else
	{
		m_callback(m_callback_data, NETLIB_MSG_WRITE, (net_handle_t)m_socket, NULL);
	}
}
           

OnWrite()函数则根据m_state标识检测socket是否是尝试连接的socket(connect函数中的socket),用于判断socket是否已经连接成功,反之则是与客户端保持连接的socket,调用预先设置的回调函数。

依托的底层实现:

1、每个服务程序都使用一个stl hash_map来管理所有的socket,键是socket句柄,值是CBaseSocket对象指针:

typedef hash_map<net_handle_t, CBaseSocket*> SocketMap;
SocketMap	g_socket_map;
           

每一个socket连接被封装成一个CBaseSocket对象,该对象是一个使用引用计数的类(CRefObject)的子类,通过这种方法来实现生存期自动管理。

//a wrap for non-block socket class
class CBaseSocket : public CRefObject
           

所以在删除或者新增socket时(实际上目的是在事件驱动器删除或者新增socket事件),实际上就是从这个hash_map中删除或者向这个hash_map中增加对象。多线程操作,需要一个锁来进行保护:

void CEventDispatch::AddEvent(SOCKET fd, uint8_t socket_event)
{
	CAutoLock func_lock(&m_lock);

	if ((socket_event & SOCKET_READ) != 0)
	{
		FD_SET(fd, &m_read_set);
	}
		
	if ((socket_event & SOCKET_WRITE) != 0)
	{
		FD_SET(fd, &m_write_set);
	}

	if ((socket_event & SOCKET_EXCEP) != 0)
	{
		FD_SET(fd, &m_excep_set);
	}
}
           

代码CAutoLock func_lock(&m_lock);即保护该hash_map的锁对象。

2、而管理以上功能的是一个单例类CEventDispatch,所以不难猜出CEventDispatch提供的接口:

class CEventDispatch
{
public:
	virtual ~CEventDispatch();

	void AddEvent(SOCKET fd, uint8_t socket_event);
	void RemoveEvent(SOCKET fd, uint8_t socket_event);

	void AddTimer(callback_t callback, void* user_data, uint64_t interval);
	void RemoveTimer(callback_t callback, void* user_data);
    
    void AddLoop(callback_t callback, void* user_data);

	void StartDispatch(uint32_t wait_timeout = 100);
    void StopDispatch();
    
    bool isRunning() {return running;}

	static CEventDispatch* Instance();
protected:
	CEventDispatch();

private:
	void _CheckTimer();
    void _CheckLoop();

	typedef struct {
		callback_t	callback;
		void*		user_data;
		uint64_t	interval;
		uint64_t	next_tick;
	} TimerItem;

private:
#ifdef _WIN32
	fd_set	m_read_set;
	fd_set	m_write_set;
	fd_set	m_excep_set;
#elif __APPLE__
	int 	m_kqfd;
#else
	int		m_epfd;
#endif
	CLock			m_lock;
	list<TimerItem*>	m_timer_list;
	list<TimerItem*>	m_loop_list;

	static CEventDispatch* m_pEventDispatch;
    
    bool running;
};
           

其中StartDispatch()和StopDispatcher()分别用于启动和停止整个循环流程。一般在程序初始化的时候StartDispatch(),在程序退出时StopDispatcher()。

之所以设计CEventDispatch为单例类,是由于每个服务器只有一个事件循环(多线程而单Reactor)。此外设计成单例类的好处是在某个位置需要使用CEventDispatch时只需调用CEventDispatch->Instance()即可获得单例指针。

三、知识点分析

1、引用计数与对象生存期管理。(测试)使用引用计数管理对象生命期

(1)在TeamTalk源码中每一个socket连接被封装成一个CBaseSocket对象,该对象是一个使用引用计数的基类的子类,通过这种方法来(手工)实现生存期自动管理。(理论上,引用计数机制要求程序员小心跟踪每个持有对象指针的变量,在它们走出作用域时,手工调用上述release_reference(),不能有任何一次遗漏,否则将引起内存泄漏Memory Leak。另外,为阻止程序员误将该类型的对象分配在栈(Stack)上,最好将其构造函数申明成protected。此外,在多线程环境下,还需要考虑引用计数的线程安全性。)

(2)引用计数如何管理对象生存期

C++中基类的析构函数为什么要用virtual虚析构函数:

C++中基类采用virtual虚析构函数是为了防止内存泄漏。具体地说,如果派生类中申请了内存空间,并在其析构函数中对这些内存空间进行释放。假设基类中采用的是非虚析构函数,当删除基类指针指向的派生类对象时就不会触发动态绑定,因而只会调用基类的析构函数,而不会调用派生类的析构函数。那么在这种情况下,派生类中申请的空间就得不到释放从而产生内存泄漏。所以,为了防止这种情况的发生,C++中基类的析构函数应采用virtual虚析构函数。

(3)TeamTalk中的实现

int CBaseSocket::Close()
{
	CEventDispatch::Instance()->RemoveEvent(m_socket, SOCKET_ALL);
	RemoveBaseSocket(this);
	closesocket(m_socket);
	ReleaseRef();

	return 0;
}
           
CBaseSocket* FindBaseSocket(net_handle_t fd)
{
	CBaseSocket* pSocket = NULL;
	SocketMap::iterator iter = g_socket_map.find(fd);
	if (iter != g_socket_map.end())
	{
		pSocket = iter->second;
		pSocket->AddRef();
	}

	return pSocket;
}
           

其中的 ReleaseRef(); 和  AddRef(); 为基类对引用计数的操作:

void CRefObject::AddRef()
{
	if (m_lock)
	{
		m_lock->lock();
		m_refCount++;
		m_lock->unlock();
	}
	else
	{
		m_refCount++;
	}
}

void CRefObject::ReleaseRef()
{
	if (m_lock)
	{
		m_lock->lock();
		m_refCount--;
		if (m_refCount == 0)
		{
			delete this;
			return;
		}
		m_lock->unlock();
	}
	else
	{
		m_refCount--;
		if (m_refCount == 0)
			delete this;
	}
}
           

TeamTalk对 CBaseSocket 对象的生存期管理实际上是对全局管理变量hash_map<net_handle_t, CBaseSocket*>  g_socket_map;的增加和删去操作的管理。当 new CBaseSocket 放入 g_socket_map 时,则基类引用计数加一;当 从 g_socket_map 删去 CBaseSocket 时,则基类引用计数减一。当减到0时基类函数中调用 delete this。

  • 当计数变量值为0时 delete this; ,此处的 this 是(多态)运行时绑定 CBaseSocket 对象指针而非基类 CRefObject 对象指针。从而实现引用计数管理 CBaseSocket 对象的生存期。
  • 为阻止程序员误将该类型的对象分配在栈(Stack)上,最好将其拷贝构造函数和拷贝复制操作符申明成 protected 后 private(TeamTalk并没有如此操作)。
  • 此外,在多线程环境下,还需要考虑引用计数的线程安全性。
  • 最后,引用计数机制要求程序员小心跟踪每个持有对象指针的变量,在它们走出作用域时,手工调用上述release_reference(),不能有任何一次遗漏,否则将引起内存泄漏 Memory Leak 。因此,如果逻辑太过复杂则最好使用智能指针进行管理(参考muduo)。

2、设计CEventDispatch为单例类

之所以设计CEventDispatch为单例类,是由于设计了每个服务器只有一个事件循环(多线程而单Reactor)。此外设计成单例类的好处是在某个位置需要使用CEventDispatch时只需调用CEventDispatch->Instance()即可获得单例指针。

3、使用标识和状态使代码逻辑清晰

如OnRead()、OnWrite()、OnClose()中的回调函数

m_callback(m_callback_data, NETLIB_MSG_READ, (net_handle_t)m_socket, NULL);
m_callback(m_callback_data, NETLIB_MSG_WRITE, (net_handle_t)m_socket, NULL);
m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
           

使用标识 NETLIB_MSG_READ 、NETLIB_MSG_WRITE 和  NETLIB_MSG_CLOSE。在回调 m_callback 中使用 switich case 对特定标识执行特定逻辑。然后该基类指针通过多态调用继承类的虚函数。

void imconn_callback(void* callback_data, uint8_t msg, uint32_t handle, void* pParam)
{
	NOTUSED_ARG(handle);
	NOTUSED_ARG(pParam);
 
	if (!callback_data)
		return;
 
	ConnMap_t* conn_map = (ConnMap_t*)callback_data;
	CImConn* pConn = FindImConn(conn_map, handle);
	if (!pConn)
		return;
 
	//log("msg=%d, handle=%d ", msg, handle);
 
	switch (msg)
	{
	case NETLIB_MSG_CONFIRM:
		pConn->OnConfirm();
		break;
	case NETLIB_MSG_READ:
		pConn->OnRead();
		break;
	case NETLIB_MSG_WRITE:
		pConn->OnWrite();
		break;
	case NETLIB_MSG_CLOSE:
		pConn->OnClose();
		break;
	default:
		log("!!!imconn_callback error msg: %d ", msg);
		break;
	}
 
	pConn->ReleaseRef();
}
           

此外,对侦听socket和普通socket使用不同状态: SOCKET_STATE_LISTENING 和 SOCKET_STATE_CONNECTED 进行区分,使得当socket可读时,通过CBaseSocket的状态能够分别执行各自的逻辑函数。至此,对于侦听socket,如果socket可读,则接收新连接,并置换其默认OnRead的回调函数为imconn_callback;而对于新socket,如果socket可读,则会调用imconn_callback。

void CBaseSocket::OnRead()
{
	if (m_state == SOCKET_STATE_LISTENING)
	{
		_AcceptNewSocket();
	}
	else
	{
		u_long avail = 0;
		if ( (ioctlsocket(m_socket, FIONREAD, &avail) == SOCKET_ERROR) || (avail == 0) )
		{
			m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
		}
		else
		{
			m_callback(m_callback_data, NETLIB_MSG_READ, (net_handle_t)m_socket, NULL);
		}
	}
}
           

4、学习 OnRead() 和 OnWrite() 函数的处理方法

(1)OnRead()

void CBaseSocket::OnRead()
{
	if (m_state == SOCKET_STATE_LISTENING)
	{
		_AcceptNewSocket();
	}
	else
	{
		u_long avail = 0;
		if ( (ioctlsocket(m_socket, FIONREAD, &avail) == SOCKET_ERROR) || (avail == 0) )
		{
			m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
		}
		else
		{
			m_callback(m_callback_data, NETLIB_MSG_READ, (net_handle_t)m_socket, NULL);
		}
	}
}
           

首先是通过不同状态 SOCKET_STATE_LISTENING 和 SOCKET_STATE_CONNECTED 区分(服务端)侦听socket(listen(2))和普通socket,执行相应的可读事件,即 if 分支和 else 分支。

然后针对普通socket,调用系统函数:

ioctlsocket(sock, FIONREAD, &packet_length)
           

获得普通socket可读的数据字节数。如果出错或者字节数为0,则以消息NETLIB_MSG_CLOSE调用回调函数imconn_callback;反之,以消息NETLIB_MSG_READ调用回调函数imconn_callback进行正常的读处理。

(2)OnWrite()

void CBaseSocket::OnWrite()
{
#if ((defined _WIN32) || (defined __APPLE__))
	CEventDispatch::Instance()->RemoveEvent(m_socket, SOCKET_WRITE);
#endif

	if (m_state == SOCKET_STATE_CONNECTING)
	{
		int error = 0;
		socklen_t len = sizeof(error);
#ifdef _WIN32

		getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len);
#else
		getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (void*)&error, &len);
#endif
		if (error) {
			m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
		} else {
			m_state = SOCKET_STATE_CONNECTED;
			m_callback(m_callback_data, NETLIB_MSG_CONFIRM, (net_handle_t)m_socket, NULL);
		}
	}
	else
	{
		m_callback(m_callback_data, NETLIB_MSG_WRITE, (net_handle_t)m_socket, NULL);
	}
}
           

首先是通过不同状态 SOCKET_STATE_CONNECTING 和 SOCKET_STATE_CONNECTED 区分(客户端)连接socket(connect(2))和普通socket,执行相应的可写事件,即 if 分支和 else 分支。

由于连接socket的状态是 SOCKET_STATE_CONNECTING,会走第一个if分支。调用getsocketopt检查是否出错:

getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len);
           

若出错则以参数 NETLIB_MSG_CLOSE 调用回调 imconn_callback;若不出错,以参数 NETLIB_MSG_CONFIRM 调用之前设置的回调函数 imconn_callback。

(3)总结

由此可见,TeamTalk的网络库框架将服务端的侦听socket和普通socket的可读事件(侦听socket即服务端调用listen(2)进行监听的socket,当有连接到达时该侦听socket可读)汇总到了OnRead()函数;而客户端的连接socket以及普通socket的可写事件(连接socket即客户端调用connect(2)进行连接的socket,当连接成功以后该连接socket立马会变的可写(参考UNP第6章:socket可读可写就绪条件))汇总到了OnWrite()函数,通过socket的wrapper对象CBaseSocket的不同状态(SOCKET_STATE_LISTENING 、SOCKET_STATE_CONNECTED 和SOCKET_STATE_CONNECTING)执行不同的可读可写逻辑,然后通过不同的消息信息(NETLIB_MSG_READ、NETLIB_MSG_WRITE、 NETLIB_MSG_CONFIRM 和 NETLIB_MSG_CLOSE )执行相应的回调imconn_callback。

进一步的底层实现参考对socket进行wrap的类CBaseSocket的实现:文件BaseSocket.cpp

继续阅读