在第一章讲了网络库的数据收发流程,但客户端的如何保证正确读取与发送没有详细讲,这需要依赖应用层的缓冲区来进行实现,也即CImConn中的CImConn中的m_in_buf和m_out_buf,本章主要针对缓冲区进行分析
一、缓冲区类型CSimpleBuffer
class DLL_MODIFIER CSimpleBuffer
{
public:
CSimpleBuffer();
~CSimpleBuffer();
uchar_t* GetBuffer() { return m_buffer; }
uint32_t GetAllocSize() { return m_alloc_size; }
uint32_t GetWriteOffset() { return m_write_offset; }
void IncWriteOffset(uint32_t len) { m_write_offset += len; }
void Extend(uint32_t len); //将缓冲区的大小追加len +len/4
uint32_t Write(void* buf, uint32_t len);//写入缓冲区
uint32_t Read(void* buf, uint32_t len);//从缓冲区读出数据
private:
uchar_t* m_buffer;//缓冲区地址
uint32_t m_alloc_size;//缓冲区容量大小
uint32_t m_write_offset;//缓冲区已写入字节大小
};
m_in_buf和m_out_buf的类型都是CSimpleBuffer,这个类型很简单,主要使用三个成员变量来保存缓冲区数据信息
void CSimpleBuffer::Extend(uint32_t len)
{
m_alloc_size = m_write_offset + len;
m_alloc_size += m_alloc_size >> ; // increase by 1/4 allocate size
uchar_t* new_buf = (uchar_t*)realloc(m_buffer, m_alloc_size);
if(new_buf != NULL)
{
m_buffer = new_buf;
}
}
CSimpleBuffer::Extend将缓冲区追加len+len/4大小
uint32_t CSimpleBuffer::Write(void* buf, uint32_t len)
{
if (m_write_offset + len > m_alloc_size)
{
Extend(len);//空间不够进行扩展
}
if (buf)
{
memcpy(m_buffer + m_write_offset, buf, len);
}
m_write_offset += len;
return len;
}
每次写入缓冲区数据时,都需要判断下缓冲区是否足够大小,不够则进行扩展,并将m_write_offset 重新设置为缓冲区现有数据大小
uint32_t CSimpleBuffer::Read(void* buf, uint32_t len)
{
if (len > m_write_offset)
len = m_write_offset;
if (buf)
memcpy(buf, m_buffer, len);
m_write_offset -= len; //有可能并不是一次性读完
memmove(m_buffer, m_buffer + len, m_write_offset);
return len;
}
从缓冲区中读出数据,将m_write_offset进行减去读出的数据
本来客户端缓冲区这个东西应该和服务端一起讲的,大部分处理都类似,而拿出来单独分析的目的是两边分发器采用的监听事件模式有些不一样,客户端采用LT(水平触发模式,只要socket缓冲区中有数据或可写就会一直反复进行通知),而ET(边沿触发只进行通知一次),这样就造成了客户端和服务端在应用层上所做的操作就会不一样,下面看看客户端对于数据到达,和发送数据完毕是如何操作的
二、处理数据达到
输入缓冲区,所说的输入,主要针对应用层,对于网络层来说它其实是输出缓冲区,teamtalk这样命名比较符合用户习惯
void CImConn::OnRead()
{
for (;;)//1、保证socket缓冲区数据读完
{
uint32_t free_buf_len = m_in_buf.GetAllocSize() - m_in_buf.GetWriteOffset();
if (free_buf_len < READ_BUF_SIZE)//2、保证缓冲区有足够空闲空间进行存放数据
m_in_buf.Extend(READ_BUF_SIZE);
int ret = netlib_recv(m_handle, m_in_buf.GetBuffer() + m_in_buf.GetWriteOffset(), READ_BUF_SIZE);
if (ret <= )
break;
m_in_buf.IncWriteOffset(ret);
//3、读出的数据不足最小包长继续读取数据
while (m_in_buf.GetWriteOffset() >= imcore::HEADER_LENGTH)
{
uint32_t len = m_in_buf.GetWriteOffset();
uint32_t length = CByteStream::ReadUint32(m_in_buf.GetBuffer());
if (length > len)//4、如果不足一个包的长度,继续读取数据
break;
try
{
//5、先解出包头,再解出包体进行业务逻辑处理
imcore::TTPBHeader pbHeader;
pbHeader.unSerialize((byte*)m_in_buf.GetBuffer(), imcore::HEADER_LENGTH);
LOG__(NET, _T("OnRead moduleId:0x%x,commandId:0x%x"), pbHeader.getModuleId(), pbHeader.getCommandId());
if (m_pTcpSocketCB)
m_pTcpSocketCB->onReceiveData((const char*)m_in_buf.GetBuffer(), length);
LOGBIN_F__(SOCK, "OnRead", m_in_buf.GetBuffer(), length);
}
catch (std::exception& ex)
{
assert(FALSE);
LOGA__(NET, "std::exception,info:%s", ex.what());
if (m_pTcpSocketCB)
m_pTcpSocketCB->onReceiveError();
}
catch (...)
{
assert(FALSE);
LOG__(NET, _T("unknown exception"));
if (m_pTcpSocketCB)
m_pTcpSocketCB->onReceiveError();
}
//6、将输入缓冲区已读的数据清空
m_in_buf.Read(NULL, length);
}
}
}
2.1 for (;;)
它的作用是,下面的netlib_recv一次没有将一个包的数据读完,可以进行多次读取,如果此时消息只到达一部分,这个循环会在下一次netlib_recv返回0的时候break,等待下一次数据的读取
2.2 if (free_buf_len < READ_BUF_SIZE)
如果缓冲区的数据不足128k的时候,就进行扩展,保证每次循环读取数据时,缓冲区中有足够空间
2.3 while (m_in_buf.GetWriteOffset() >= imcore::HEADER_LENGTH)
这个while在这理其实没有真的进行循环,只是为了既能进行判断,在第四处,又能break掉,实际只会执行一次
2.4 if (length > len)
这里这样做可以保证读出一个完整的包
2.5 先解出包头,再解出包体进行业务逻辑处理
这个地方就是获取到完整包之后的处理,除了消息头付含有一些包长信息,其他的一些处理都是业务范围
2.6 m_in_buf.Read(NULL, length);
在这个函数里面调用memmove,将已从输入缓冲区中读出的数据清空
2.7 避免粘包
上面几处的组合,确保一个消息包达到的多种情况都可以完整的读取出来,避免粘包,如:消息包多次到达,1可以保证数据包能多次从socket中读出,3保证这个消息包达到一定长度才继续下面的操作 4保证业务逻辑收到一个完整的包。还有其他情况如多个消息在不同时间到达或者一起到达
三、消息发送完毕
int CImConn::Send(void* data, int len)
{
if (m_busy)//1、输出缓冲区中有数据
{
m_out_buf.Write(data, len);
return len;
}
int offset =;
int remain = len;
while (remain >) {
int send_size = remain;
if (send_size > NETLIB_MAX_SOCKET_BUF_SIZE) {
send_size = NETLIB_MAX_SOCKET_BUF_SIZE;
}
int ret = netlib_send(m_handle, (char*)data + offset, send_size);
if (ret <=) {
ret =;
break;//2、数据发送完毕或者socket缓冲区已满,数据不能继续发送出去
}
offset += ret;
remain -= ret;
}
if (remain >)
{
m_out_buf.Write((char*)data + offset, remain);
m_busy = true;
LOG__(NET, _T("send busy, remain=%d"), m_out_buf.GetWriteOffset());
}
return len;
}
3.1 if (m_busy)
在1处,m_busy标识输出缓冲区中是否有数据没有发送出去,如果有,则不能将后续消息包的数据直接发送出去,而是要追加到输出缓冲区末尾,以避免有一个包的数据插在另一包的数据中间,会造成乱序
3.2 break
这个地方break有两种情况,一种是数据通过while循环全部发送出去,这时remain == 0,数据正常发送完毕(这个完毕也只是数据从应用层到socket缓冲区,数据并不是真的已经发送到网络上了,当然这个不需要我们关心,系统会自动处理),第二种是如对端接受数据过慢,socket缓冲区不能再容纳更多的数据,remain此时>0,进入下面的if分支,将剩余数据写入输出缓冲区进行存储,等待有可写事件的发生时,再将缓冲区的数据发送出去,下面看看客户端如何进行将剩余数据发送出去的
3.3 将剩余数据从缓冲区发送出去
数据发送不出去的时候看看CImConn::Send中的netlib_send做了什么事
int netlib_send(net_handle_t handle, void* buf, int len)
{
CBaseSocket* pSocket = FindBaseSocket(handle);
if (!pSocket)
{
return NETLIB_ERROR;
}
int ret = pSocket->Send(buf, len);//继续看这个地方
pSocket->ReleaseRef();
return ret;
}
int CBaseSocket::Send(void* buf, int len)
{
if (m_state != SOCKET_STATE_CONNECTED)
return NETLIB_ERROR;
int ret = send(m_socket, (char*)buf, len, );
if (ret == SOCKET_ERROR)
{
int err_code = _GetErrorCode();
if (_IsBlock(err_code))
{
#if ((defined _MSC_VER) || (defined __APPLE__))
CEventDispatch::Instance()->AddEvent(m_socket, SOCKET_WRITE);//这是window的客户端,数据发送不出去会进入这里
#endif
ret = ;
}
else
{
LOG__(NET, _T("!!!send failed, error code: %d"), err_code);
}
}
else
{
LOGBIN_F__(SOCK, "Send", buf, len);
}
return ret;
}
如上,数据发送失败,会调用CEventDispatch::Instance()->AddEvent(m_socket, SOCKET_WRITE);往分发器中为这个socket添加写事件,如果这个socket可写时会调用pSocket->OnWrite(),如下
void CBaseSocket::OnWrite()
{
#if ((defined _MSC_VER) || (defined __APPLE__))
CEventDispatch::Instance()->RemoveEvent(m_socket, SOCKET_WRITE);//将写事件去除
#endif
if (m_state == SOCKET_STATE_CONNECTING)
{
int error = ;
socklen_t len = sizeof(error);
#ifdef _MSC_VER
getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (char*)&error, &len);
#else
getsockopt(m_socket, SOL_SOCKET, SO_ERROR, (void*)&error, &len);
#endif
if (error) {
m_callback(m_callback_data, NETLIB_MSG_CLOSE, (net_handle_t)m_socket, NULL);
} else {
m_state = SOCKET_STATE_CONNECTED;
m_callback(m_callback_data, NETLIB_MSG_CONFIRM, (net_handle_t)m_socket, NULL);
}
}
else
{
m_callback(m_callback_data, NETLIB_MSG_WRITE, (net_handle_t)m_socket, NULL);
}
}
如上,该socket可写后就将socket的写事件移除,因为客户端采用LT模式,不移除会不断触发写事件,造成busy loop,下面的回调在之前第一章的网络框架分析中讲了是调用imconn_callback,根据第二个参数来调用imconn的不同反应函数,这里是调用pConn->OnWrite()
void CImConn::OnWrite()
{
if (!m_busy)
return;
while (m_out_buf.GetWriteOffset() > ) {
int send_size = m_out_buf.GetWriteOffset();
if (send_size > NETLIB_MAX_SOCKET_BUF_SIZE) {
send_size = NETLIB_MAX_SOCKET_BUF_SIZE;
}
int ret = netlib_send(m_handle, m_out_buf.GetBuffer(), send_size);
if (ret <= ) {
ret = ;
break;
}
m_out_buf.Read(NULL, ret);//发送多少数据,就将缓冲区清楚多少数据
}
if (m_out_buf.GetWriteOffset() == ) {
m_busy = false;
}
LOG__(NET, _T("onWrite, remain=%d"), m_out_buf.GetWriteOffset());
}
这段代码就比较容易了,循环发送数据,直至数据发送完毕,然后将m_busy标识为没有数据,如果发送了一部分,则在netlib_send里继续添加写事件,等待下一次可以发送的时候进行发送数据,至此,数据发送完毕这个操作就讲完了
以上是windows客户端的缓冲区的一些处理,如有不足之处请及时指正