前面讲过,客户端主要框架是两个任务队列,四个线程,分别为逻辑任务队列(处理业务逻辑任务)、http任务队列(处理http有关的业务),每个队列有一个对应的线程,http队列有一个线程池,默认线程池线程个数为1,还有一个网络IO线程负责处理IO数据的读写操作,主线程做界面消息循环,用来与用户交互,之前分析过Io线程是进行数据下面就分析下这些队列和逻辑
一、逻辑队列实现方式
客户端概述的那一章讲过,逻辑队列的启动方式,OperationManager::startup()启动逻辑队列处理线程,进行while循环处理std::list
IMCoreErrorCode OperationManager::startup()
{
m_operationThread = std::thread([&]
{
std::unique_lock <std::mutex> lck(m_cvMutex);
Operation* pOperation = nullptr;
while (m_bContinue)
{
if (!m_bContinue)
break;
if (m_vecRealtimeOperations.empty())
m_CV.wait(lck);//没有任务,线程进行条件等待
if (!m_bContinue)
break;
{
std::lock_guard<std::mutex> lock(m_mutexOperation);
if (m_vecRealtimeOperations.empty())
continue;
pOperation = m_vecRealtimeOperations.front();
m_vecRealtimeOperations.pop_front();
}
if (!m_bContinue)
break;
if (pOperation)
{
pOperation->process();//有任务,任务出队,调用任务的process函数处理
pOperation->release();//处理完即自己释放掉
}
}
});
return IMCORE_OK;
}
void Operation::process()
{
try
{
m_state = OPERATION_RUNNING;
processOpertion();这是一个纯虚函数,该类是抽象类,不同任务类继承该类,并重写该函数,处理不同的逻辑任务
}
catch (Exception& exc)
{
assert(false);
LOG__(ERR, _T("process exception,reason:%s"),exc.what());
}
catch (std::exception& exc)
{
assert(false);
LOG__(ERR, _T("process exception,reason:%s"), exc.what());
}
catch (...)
{
assert(false);
LOG__(ERR, _T("process unknown exception"));
}
m_state = OPERATION_FINISHED;
}
OperationManager作为任务管理类,拥有这个逻辑队列,并可以对逻辑任务队列进行添加删除任务,添加任务有两种方式,普通插入添加startOperation,和使用lambda表达式添加startOperationWithLambda
二、http队列实现方式
这个http队列线程启动方式在之前的概述中也没有讲到,因为http队列线程是在点击登录界面中的登录按钮的时候才会进行启动
void LoginDialog::OnClick(TNotifyUI& msg)
{
PTR_VOID(msg.pSender);
if (msg.pSender == m_pBtnLogin)
{
_DoLogin();//如果是登录按钮点击,则进行登录操作
return;
}
else if (msg.pSender == m_pBtnSysSetting)
{
module::getSysConfigModule()->showServerConfigDialog(m_PaintManager.GetPaintWindow());
return;
}
__super::OnClick(msg);
}
void LoginDialog::_DoLogin()
{
LOG__(APP,_T("User Clicked LoginBtn"));
m_ptxtTip->SetText(_T(""));
CDuiString userName = m_pedtUserName->GetText();
CDuiString password = m_pedtPassword->GetText();
if (userName.IsEmpty())
{
CString csTip = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_USERNAME_EMPTY"));
m_ptxtTip->SetText(csTip);
return;
}
if (password.IsEmpty())
{
CString csTip = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_PASSWORD_EMPTY"));
m_ptxtTip->SetText(csTip);
return;
}
module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig();
pCfg->userName = userName;
if (m_bPassChanged)
{
std::string sPass = util::cStringToString(CString(password));
char* pOutData = ;
uint32_t nOutLen = ;
int retCode = EncryptPass(sPass.c_str(), sPass.length(), &pOutData, nOutLen);
if (retCode == && nOutLen > && pOutData != )
{
pCfg->password = std::string(pOutData, nOutLen);
Free(pOutData);
}
else
{
LOG__(ERR, _T("EncryptPass Failed!"));
CString csTip = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_ENCRYPT_PASE_FAIL"));
m_ptxtTip->SetText(csTip);
return;
}
}
pCfg->isRememberPWD = m_pChkRememberPWD->GetCheck();
module::getSysConfigModule()->saveData();
CString csTxt = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_BTN_DOLOGIN"));
m_pBtnLogin->SetText(csTxt);
m_pBtnLogin->SetEnabled(false);
//上面进行用户名密码获取等一系列登录准备操作
//连接登陆服务器
DoLoginServerParam param;
//DoLoginServerHttpOperation这个继承于IHttpOperation,IHttpOperation又继承于ICallbackOpertaion,ICallbackOpertaion继承自Operation
DoLoginServerHttpOperation* pOper = new DoLoginServerHttpOperation(
BIND_CALLBACK_1(LoginDialog::OnHttpCallbackOperation), param);
module::getHttpPoolModule()->pushHttpOperation(pOper);//如果这时没有http任务队列处理线程,会进行启动对应数量的线程
}
DoLoginServerHttpOperation继承自Operation,自然知道这个任务类可以重写了processOpertion(),暂时不看这个函数做了什么,去看下http任务处理线程怎么启动的
void HttpPoolModule_Impl::pushHttpOperation(module::IHttpOperation* pOperaion, BOOL bHighPriority /*= FALSE*/)
{
if (NULL == pOperaion)
{
return;
}
CAutoLock lock(&m_mtxLock);
if (bHighPriority)
m_lstHttpOpers.push_front(pOperaion);
else
m_lstHttpOpers.push_back(pOperaion);
_launchThread();//启动线程函数
::ReleaseSemaphore(m_hSemaphore, , NULL);
return;
}
BOOL HttpPoolModule_Impl::_launchThread()
{
if ((int)m_vecHttpThread.size() >= MAX_THEAD_COUNT)//每次入队任务的时候都会进行线程大小的检查,不够就进行增加,默认为1
{
return TRUE;
}
TTHttpThread* pThread = new TTHttpThread();
PTR_FALSE(pThread);
if (!pThread->create())//这里调用TTHttpThread的process函数,具体怎么启动的调试进入代码可以看到
{
return FALSE;
}
Sleep();
m_vecHttpThread.push_back(pThread);
return TRUE;
}
UInt32 TTHttpThread::process()
{
module::IHttpOperation * pHttpOper = NULL;
HttpPoolModule_Impl *pPool = m_pInstance;
while (m_bContinue)
{
if (WAIT_OBJECT_0 != ::WaitForSingleObject(pPool->m_hSemaphore, INFINITE))
{
break;
}
if (!m_bContinue)
{
break;
}
{
CAutoLock lock(&(pPool->m_mtxLock));
if (pPool->m_lstHttpOpers.empty())
pHttpOper = NULL;
else
{
pHttpOper = pPool->m_lstHttpOpers.front();
pPool->m_lstHttpOpers.pop_front();
}
}
try
{
if (m_bContinue && pHttpOper)
{
pHttpOper->process();//进行具体任务的处理
pHttpOper->release();
}
}
catch (...)
{
LOG__(ERR, _T("TTHttpThread: Failed to execute opertaion(0x%p)"), pHttpOper);
}
}
return ;
}
TTHttpThread::process()这里进行while循环处理http队列里的http任务
pHttpOper->process();这里最后会调用该操作类的processOpertion函数,回调之前那个
DoLoginServerHttpOperation* pOper = new DoLoginServerHttpOperation(BIND_CALLBACK_1(LoginDialog::OnHttpCallbackOperation), param);构造http任务类的地方,这里http任务入队后,最后会被http线程调用他的processOpertion函数
void DoLoginServerHttpOperation::processOpertion()
{
module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig();
LOG__(APP, _T("loginAddr = %s"), pCfg->loginServIP);
std::string& loginAddr = util::cStringToString(pCfg->loginServIP);
std::string url = loginAddr;
DoLoginServerParam* pPamram = new DoLoginServerParam();
pPamram->resMsg = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_HTTP_DEFERROR"));
Http::HttpResponse response;
Http::HttpClient client;
Http::HttpRequest request("get", url);
if (!client.execute(&request, &response))
{
CString csTemp = util::stringToCString(url);
pPamram->result = DOLOGIN_FAIL;
LOG__(ERR,_T("failed %s"), csTemp);
asyncCallback(std::shared_ptr<void>(pPamram));
client.killSelf();
return;
}
std::string body = response.getBody();
client.killSelf();
//json解析
try
{
Json::Reader reader;
Json::Value root;
if (!reader.parse(body, root))
{
CString csTemp = util::stringToCString(body);
LOG__(ERR, _T("parse data failed,%s"), csTemp);
pPamram->result = DOLOGIN_FAIL;
pPamram->resMsg = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_HTTP_JSONERROR"));
goto End;
}
int nCode = root.get("code", "").asInt();
if ( == nCode)//登陆成功
{
LOG__(APP, _T("get msgSvr IP succeed!"));
pCfg->msgSevPriorIP = root.get("priorIP", "").asString();
pCfg->msgSevBackupIP = root.get("backupIP", "").asString();
std::string strPort = root.get("port", "").asString();
pCfg->msgServPort = util::stringToInt32(strPort);
pCfg->fileSysAddr = util::stringToCString(root.get("msfsPrior", "").asString());
pCfg->fileSysBackUpAddr = util::stringToCString(root.get("msfsBackup", "").asString());
pPamram->result = DOLOGIN_SUCC;
}
else
{
LOG__(ERR, _T("get msgSvr IP failed! Code = %d"),nCode);
pPamram->result = DOLOGIN_FAIL;
CString csRetMsgTemp = util::stringToCString(root.get("msg", "").asString());
if (!csRetMsgTemp.IsEmpty())
pPamram->resMsg = csRetMsgTemp;
}
}
catch (...)
{
CString csTemp = util::stringToCString(body);
LOG__(ERR,_T("parse json execption,%s"), csTemp);
pPamram->result = DOLOGIN_FAIL;
pPamram->resMsg = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_LOGIN_HTTP_JSONERROR"));
}
End:
asyncCallback(std::shared_ptr<void>(pPamram));//异步通知ui代理窗口线程调用回调函数
}
这里这个函数的的具体作用就是对登录服务器发送http请求,获取到具体的消息服务器的ip和端口,然后根据这个ip和端口进行真正的登录消息服务器进行通信,这里不过多的讨论业务方面的东西,那些东西可以自行摸索,有些业务我不是参与者我只是学习者,也没有完全掌握,这里只是作为一个代码学习,对它的框架结构的一个分析,这个笔记在看源码的时候可以进行参考,并不能直接帮你解读
DoLoginServerHttpOperation::processOpertion()中成功获取到所想要的服务器的地址后,通过asyncCallback(std::shared_ptr(pPamram));来进行一个异步通知,这个异步通知最后会在ui代理窗口的过程函数中被处理,即之前的构造函数中注册的回调函数,会在ui线程中进行调用
void asyncCallback(std::shared_ptr<void> param)
{
CallbackOperationEvent* pEvent = new CallbackOperationEvent(m_callback, param);
module::getEventManager()->asynFireUIEvent(pEvent);
}
看CallbackOperationEvent中的process方法
virtual void process()
{
m_callback(m_param);
}
而上述中的module::getEventManager()->asynFireUIEvent(pEvent);会给ui代理窗口发送消息,这个消息在消息队列中被ui线程进行获取处理
module::IMCoreErrorCode UIEventManager::asynFireUIEvent(IN const IEvent* const pEvent)
{
assert(m_hWnd);
assert(pEvent);
if ( == m_hWnd || == pEvent)
return IMCORE_ARGUMENT_ERROR;
if (FALSE == ::PostMessage(m_hWnd, UI_EVENT_MSG, reinterpret_cast<WPARAM>(this), reinterpret_cast<WPARAM>(pEvent)))
return IMCORE_WORK_POSTMESSAGE_ERROR;
return IMCORE_OK;
}
//客户端概述中讲过ui代理窗口处理任务的过程,这里就带过
LRESULT _stdcall UIEventManager::_WindowProc(HWND hWnd
, UINT message
, WPARAM wparam
, LPARAM lparam)
{
switch (message)
{
case UI_EVENT_MSG:
reinterpret_cast<UIEventManager*>(wparam)->_processEvent(reinterpret_cast<IEvent*>(lparam), TRUE);
break;
case WM_TIMER:
reinterpret_cast<UIEventManager*>(wparam)->_processTimer();
break;
default:
break;
}
return ::DefWindowProc(hWnd, message, wparam, lparam);
}
void UIEventManager::_processEvent(IEvent* pEvent, BOOL bRelease)
{
assert(pEvent);
if ( == pEvent)
return;
try
{
pEvent->process();
if (bRelease)
pEvent->release();
}
......
}
最终CallbackOperationEvent中的回调会在这个process中调用,而CallbackOperationEvent中的回调是之前构造的时候进行注册的OnHttpCallbackOperation
void LoginDialog::OnHttpCallbackOperation(std::shared_ptr<void> param)
{
DoLoginServerParam* pParam = (DoLoginServerParam*)param.get();
if (DOLOGIN_SUCC == pParam->result)
{
module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig();
PTR_VOID(pCfg);
LoginParam loginparam;
loginparam.csUserName = pCfg->userName;
loginparam.password = pCfg->password;
loginparam.csUserName.Trim();
//将操作任务进行入逻辑队列,等待逻辑任务线程处理
LoginOperation* pOperation = new LoginOperation(
BIND_CALLBACK_1(LoginDialog::OnOperationCallback), loginparam);
imcore::IMLibCoreStartOperation(pOperation);
}
else
{
m_ptxtTip->SetText(pParam->resMsg);
module::TTConfig* pCfg = module::getSysConfigModule()->getSystemConfig();
LOG__(ERR, _T("get MsgServer config faild,login server addres:%s:%d"), pCfg->loginServIP,pCfg->loginServPort);
CString csTxt = util::getMultilingual()->getStringById(_T("STRID_LOGINDIALOG_BTN_LOGIN"));
m_pBtnLogin->SetText(csTxt);
m_pBtnLogin->SetEnabled(true);
}
}
OnHttpCallbackOperation由ui代理窗口调用,然后注册的OnOperationCallback最后由任务逻辑队列进行调用,这里还是重复的或者业务逻辑的东西都进行跳过
这里总结下这个小结,这个小结是讲http任务队列的实现方式,主要是启动http任务线程进行循环处理队列中的http任务,而这个http任务可能是Ui线程投入的,有的是逻辑任务线程投入的,使用的任务类都是继承自ICallbackOpertaion,它会最后调用之前注册的回调函数,ui代理窗口线程处理任务的实现方式有一种跟这个类似,如CallbackOperationEvent继承自ICallbackOpertaion,然后ui代理窗口会调用之前注册的回调函数,只是和http队列不一样的是,入队是通过发送消息,队列是系统的消息队列,并没有额外启动线程进行循环处理,而是利用duilib的消息机制,另外ui窗口进行事件处理并不只是这一种异步通知的方式进行处理事件,下面接着看
三、ui代理窗口事件处理与模块通信
客户端将一些功能抽象成模块组件,如登录模块、组模块、会话模块等,有些模块是后台信息保存和处理的模块,逻辑任务线程进行对其操作,有些模块涉及界面的一些数据,ui代理线程对其操作,类似于mfc里的文档与视图,这里的后台模块与界面也会进行通信,但这个数据流通的方式比前面讲述的单独的asyncCallback异步通知要为复杂
下面是简单画的一个草图,用来理清类之间的关系
模块基础类
1. IEvent : 事件基类
2. ITimerEvent : 定时器基类
3. IMKOEvent :观察事件基类
4. MKOEvent : 观察事件实现类,处理观察事件
5. ModuleObserverCtx : 观察者信息保存类,有回调函数
6. ModuleSubject : 通知者,通知所有观察者
7. CallbackOperationEvent : 继承于IEvent,封装回调函数的事件
8. ICallbackPertaion : 继承Operation , 封装对回调事件的操作
9. ModuleBase : 封装对ModuleSubject的操作
10. UIEventManager : UI事件管理类,负责事件的通知和事件处理
之前的CallbackOperationEvent和IMKOEvent都是继承于IEvent,并进行重写process方法,而各个后台模块都是ModuleBase的基类,每个模块都拥有ModuleSubject的指针,可以进行添加观察者,这里使用的是观察者模式,如后台数据的变化会导致多个界面或者多个地方会发生改变,此时ModuleBase类通过添加需要改变的类,然后进行通知,将MKOEvent_Impl事件给ui代理线程,ui线程将调用MKOEvent_Impl的process,在process中调用ModuleSubject中的每个ModuleObjectCtx中的回调函数,这样就完成了后台数据发生变化可以通知到多个地方的操作,上面讲的比较粗像,但这些只是作为笔记,用来类似于思路提示作用,看客户端代码时候许多业务都会涉及到这种通知方式,可以进行调试下即可