Protobufåºç¨å¹¿æ³ï¼å°¤å ¶ä½ä¸ºç½ç»é讯åè®®æ为æ®éãæ¬æå°è¯¦ç»æè¿°å 个让人ç¼åä¸äº®çprotobufå议设计ï¼å¯¹åå¤åºç¨æå·²ç»åºç¨protobufçå¼åè ä¼ææå¯åï¼çè³å¯ä»¥ç´æ¥æ¿è¿å»ç¨ã è¿éæè¿°çå议设计被ç¨äºç产ç¯å¢çå³æ¶é讯ãåç¹æ°æ®ééãæ¶æ¯æ¨éãredisåmysqlæ°æ®ä»£çã
ââBwarä»2013å¹´å¼å§åºç¨protobufï¼2014年设计äºç¨äºmysqlæ°æ®ä»£ççprotobufåè®®ï¼2015年设计äºç¨äºå³æ¶é讯çprotobufåè®®ãé«æ§è½C++ IoCç½ç»æ¡æ¶Nebula https://github.com/Bwar/Nebulaæè¿å 个protobufå议设计åºç¨å°äºæè´ã
1. TCPé讯å议设计
ââæ¬å议设计äº2015å¹´ï¼ç¨äºä¸ä¸ªç产ç¯å¢çIMååç¹æ°æ®ééåå®æ¶åæï¼2016å¹´å延伸åå±äºåºäºprotobuf3ççæ¬å¹¶ç¨äºå¼æºç½ç»æ¡æ¶Nebulaãåºäºprotobuf2åprotobuf3çæè¾å°å·®å«ï¼è¿éåå¼è®²è§£ä¸¤ä¸ªçæ¬çå议设计ã
1.1. protobuf2.5çMsg
ââ2015å¹´å°æ protobuf3çreleaseçæ¬ï¼protobuf2çæ¬çfixed32ç±»åæ¯åºå®å ç¨4个åèçï¼é常éåç¨äºç½ç»é讯å议设计ãBwar设计ç¨äºIMç³»ç»çåè®®å æ¬ä¸¤ä¸ªprotobuf messageï¼MsgHeadåMsgBodyï¼åè®®å®ä¹å¦ä¸ï¼
syntax = "proto2";
/**
* @brief æ¶æ¯å¤´
*/
message MsgHead
{
required fixed32 cmd = 1 ; ///< å½ä»¤åï¼å缩å å¯ç®æ³å é«ä½1åèï¼
required fixed32 msgbody_len = 2; ///< æ¶æ¯ä½é¿åº¦ï¼å个æ¶æ¯ä½é¿åº¦ä¸è½è¶
è¿65535å³8KBï¼
required fixed32 seq = 3; ///< åºåå·
}
/**
* @brief æ¶æ¯ä½
* @note æ¶æ¯ä½ä¸»ä½æ¯bodyï¼ææä¸å¡é»è¾å
容åæ¾å¨bodyéãsession_idåsessionç¨äºæ¥å
¥å±è·¯ç±ï¼
* 两è
åªéè¦å¡«å
ä¸ä¸ªå³å¯ï¼é¦ésession_idï¼å½session_idç¨æ´åæ æ³è¡¨è¾¾æ¶æ使ç¨sessionã
*/
message MsgBody
{
required bytes body = 1; ///< æ¶æ¯ä½ä¸»ä½
optional uint32 session_id = 2; ///< ä¼è¯IDï¼åèæ¶æ¯ä¸ºæ¥æ¶è
uidï¼ä¸ªäººä¿¡æ¯ä¿®æ¹ä¸ºuidï¼ç¾¤èæ¶æ¯ä¸ºgroupidï¼ç¾¤ç®¡ç为groupidï¼
optional string session = 3; ///< ä¼è¯IDï¼å½session_idç¨æ´åæ æ³è¡¨è¾¾æ¶ä½¿ç¨ï¼
optional bytes additional = 4; ///< æ¥å
¥å±éå çæ°æ®ï¼å®¢æ·ç«¯æ é¡»çä¼ï¼
}
ââ解ææ¶å°çåèæµæ¶å 解åºå®é¿åº¦ï¼15åèï¼çMsgHeadï¼protobuf3.0ä¹åççæ¬å¿ é¡»å¨cmdãmsgbody_lenãseqåä¸ä¸º0çæ åµä¸ææ¯15åèï¼ï¼åéè¿MsgHeadéçmsgbody_lenå¤ææ¶æ¯ä½æ¯å¦æ¥æ¶å®æ¯ï¼è¥æ¥æ¶å®æ¯åè°ç¨MsgBody.Parse()解æãMsgBodyéç设计å¨ä¸ä¸è详ç»è¯´æã
ââMsgHeadå¨å®é ç项ç®åºç¨ä¸å¯¹åºä¸é¢çæ¶æ¯å¤´å¹¶å¯ä»¥ç¸äºè½¬æ¢ï¼
#pragma pack(1)
/**
* @brief ä¸å®¢æ·ç«¯éä¿¡æ¶æ¯å¤´
*/
struct tagClientMsgHead
{
unsigned char version; ///< åè®®çæ¬å·ï¼1åèï¼
unsigned char encript; ///< å缩å å¯ç®æ³ï¼1åèï¼
unsigned short cmd; ///< å½ä»¤å/åè½å·ï¼2åèï¼
unsigned short checksum; ///< æ ¡éªç ï¼2åèï¼
unsigned int body_len; ///< æ¶æ¯ä½é¿åº¦ï¼4åèï¼
unsigned int seq; ///< åºåå·ï¼4åèï¼
};
#pragma pack()
ââ转æ¢ä»£ç å¦ä¸ï¼
E_CODEC_STATUS ClientMsgCodec::Encode(const MsgHead& oMsgHead, const MsgBody& oMsgBody, loss::CBuffer* pBuff)
{
tagClientMsgHead stClientMsgHead;
stClientMsgHead.version = 1; // versionææ¶æ ç¨
stClientMsgHead.encript = (unsigned char)(oMsgHead.cmd() >> 24);
stClientMsgHead.cmd = htons((unsigned short)(gc_uiCmdBit & oMsgHead.cmd()));
stClientMsgHead.body_len = htonl((unsigned int)oMsgHead.msgbody_len());
stClientMsgHead.seq = htonl(oMsgHead.seq());
stClientMsgHead.checksum = htons((unsigned short)stClientMsgHead.checksum);
...
}
E_CODEC_STATUS ClientMsgCodec::Decode(loss::CBuffer* pBuff, MsgHead& oMsgHead, MsgBody& oMsgBody)
{
LOG4_TRACE("%s() pBuff->ReadableBytes() = %u", __FUNCTION__, pBuff->ReadableBytes());
size_t uiHeadSize = sizeof(tagClientMsgHead);
if (pBuff->ReadableBytes() >= uiHeadSize)
{
tagClientMsgHead stClientMsgHead;
int iReadIdx = pBuff->GetReadIndex();
pBuff->Read(&stClientMsgHead, uiHeadSize);
stClientMsgHead.cmd = ntohs(stClientMsgHead.cmd);
stClientMsgHead.body_len = ntohl(stClientMsgHead.body_len);
stClientMsgHead.seq = ntohl(stClientMsgHead.seq);
stClientMsgHead.checksum = ntohs(stClientMsgHead.checksum);
LOG4_TRACE("cmd %u, seq %u, len %u, pBuff->ReadableBytes() %u",
stClientMsgHead.cmd, stClientMsgHead.seq, stClientMsgHead.body_len,
pBuff->ReadableBytes());
oMsgHead.set_cmd(((unsigned int)stClientMsgHead.encript << 24) | stClientMsgHead.cmd);
oMsgHead.set_msgbody_len(stClientMsgHead.body_len);
oMsgHead.set_seq(stClientMsgHead.seq);
...
}
}
<br/>
1.2. protobuf3çMsg
ââprotobuf3ççMsgHeadåMsgBodyä»IMä¸å¡åºç¨å®è·µä¸åå±èæ¥ï¼åæ¶æ»¡è¶³äºåç¹æ°æ®ééãå®æ¶è®¡ç®ãæ¶æ¯æ¨éçä¸å¡éè¦ï¼æ´ä¸ºéç¨ãæ£å å ¶éç¨æ§åé«æ©å±æ§ï¼éç¨proactor模åçIoCç½ç»æ¡æ¶Nebulaæä¼éç¨è¿ä¸ªåè®®ï¼éè¿è¿ä¸ªåè®®ï¼æ¡æ¶å±å°ç½ç»éä¿¡å·¥ä½ä»ä¸å¡åºç¨ä¸å®å ¨ç¬ç«åºæ¥ï¼åºäºNebulaæ¡æ¶çåºç¨å¼åè çè³å¯ä»¥ä¸æç½ç»ç¼ç¨ä¹è½å¼ååºé«å¹¶åçåå¸å¼æå¡ã
ââMsgHeadåMsgBodyçprotobufå®ä¹å¦ä¸ï¼
syntax = "proto3";
// import "google/protobuf/any.proto";
/**
* @brief æ¶æ¯å¤´
* @note MsgHead为åºå®15åèç头é¨ï¼å½MsgHeadä¸çäº15åèæ¶ï¼æ¶æ¯åéå°åºéã
* å¨proto2çæ¬ï¼MsgHead为15åèæ»æ¯æç«ï¼cmdãseqãlené½æ¯requiredï¼
* ä½proto3çæ¬ï¼MsgHead为15åèåå¿
é¡»è¦æ±cmdãseqãlenåä¸çäº0ï¼å¦åæ æ³æ£ç¡®è¿è¡æ¶åç¼è§£ç ã
*/
message MsgHead
{
fixed32 cmd = 1; ///< å½ä»¤åï¼å缩å å¯ç®æ³å é«ä½1åèï¼
fixed32 seq = 2; ///< åºåå·
sfixed32 len = 3; ///< æ¶æ¯ä½é¿åº¦
}
/**
* @brief æ¶æ¯ä½
* @note æ¶æ¯ä½ä¸»ä½æ¯dataï¼ææä¸å¡é»è¾å
容åæ¾å¨dataéãreq_targetæ¯è¯·æ±ç®æ ï¼ç¨äº
* æå¡ç«¯æ¥å
¥è·¯ç±ï¼è¯·æ±å
å¿
须填å
ãrsp_resultæ¯ååºç»æï¼ååºå
å¿
须填å
ã
*/
message MsgBody
{
oneof msg_type
{
Request req_target = 1; ///< 请æ±ç®æ ï¼è¯·æ±å
å¿
须填å
ï¼
Response rsp_result = 2; ///< ååºç»æï¼ååºå
å¿
须填å
ï¼
}
bytes data = 3; ///< æ¶æ¯ä½ä¸»ä½
bytes add_on = 4; ///< æå¡ç«¯æ¥å
¥å±éå å¨è¯·æ±å
çæ°æ®ï¼å®¢æ·ç«¯æ é¡»çä¼ï¼
string trace_id = 5; ///< for log trace
message Request
{
uint32 route_id = 1; ///< è·¯ç±ID
string route = 2; ///< è·¯ç±IDï¼å½route_idç¨æ´åæ æ³è¡¨è¾¾æ¶ä½¿ç¨ï¼
}
message Response
{
int32 code = 1; ///< é误ç
bytes msg = 2; ///< é误信æ¯
}
}
ââMsgBodyçdataå段åå¨æ¶æ¯ä¸»ä½ï¼ä»»ä½èªå®ä¹æ°æ®åå¯ä»¥äºè¿å¶æ°æ®æµæ¹å¼åå ¥å°dataã
ââmsg_typeç¨äºæ è¯è¯¥æ¶æ¯æ¯è¯·æ±è¿æ¯ååºï¼ææç½ç»æ°æ®æµé½å¯å½ä¸ºè¯·æ±æååºï¼ï¼å¦ææ¯è¯·æ±ï¼åå¯ä»¥éæ©æ§å¡«å Requestéçroute_idærouteï¼å¦æå¡«å äºï¼åæ¡æ¶å±æ 须解æåºç¨å±åè®®ï¼ä¹æ æ³è§£æï¼å°±è½èªå¨æ ¹æ®è·¯ç±ID转åï¼èæ é¡»åºç¨å±è§£å¼dataéçå 容åæ ¹æ®èªå®ä¹é»è¾è½¬åãå¦ææ¯ååºï¼åå®ä¹äºç»ä¸çé误æ åï¼ä¹ä¸ºä¸å¡æ å ³çé误å¤çæä¾æ¹ä¾¿ã
ââadd_onæ¯éå¨é¿è¿æ¥ä¸çä¸å¡æ°æ®ï¼æ¡æ¶å¹¶ä¸ä¼è§£æä½ä¼å¨æ¯æ¬¡è½¬åæ¶æ¯æ¶å¸¦ä¸ï¼å¯ä»¥ä¸ºåºç¨æä¾æå ¶æ¹ä¾¿ä¸å¼ºå¤§çåè½ãæ¯å¦ï¼IMç¨æ·ç»å½æ¶å®¢æ·ç«¯åªåéç¨æ·IDåå¯ç å°æå¡ç«¯ï¼æå¡ç«¯å¨ç»å½æ ¡éªéè¿åï¼å°è¯¥ç¨æ·çæµç§°ã头åçä¿¡æ¯éè¿æ¡æ¶æä¾çæ¹æ³SetClientData()å°æ°æ®éå¨æå¡ç«¯æ¥å ¥å±è¯¥ç¨æ·å¯¹åºçé¿è¿æ¥Channelä¸ï¼ä¹åææä»è¯¥è¿æ¥è¿æ¥ç请æ±é½ä¼ç±æ¡æ¶å±èªå¨å¡«å add_onå段ï¼æå¡ç«¯çå ¶ä»é»è¾æå¡å¨åªä»dataä¸å¾å°èªå®ä¹ä¸å¡é»è¾ï¼æ¯å¦è天æ¶æ¯ï¼æ°æ®ï¼å´å¯ä»¥ä»add_onä¸å¾å°è¿ä¸ªåéç¨æ·çä¿¡æ¯ãadd_onç设计ç®åäºåºç¨å¼åé»è¾ï¼å¹¶éä½äºå®¢æ·ç«¯ä¸æå¡ç«¯ä¼ è¾çæ°æ®éã
ââtrace_idç¨äºåå¸å¼æ¥å¿è·è¸ªãåå¸å¼æå¡çé误å®ä½æ¯ç¸å½éº»ç¦çï¼Nebulaåå¸å¼æå¡è§£å³æ¹æ¡æä¾äºæ¥å¿è·è¸ªåè½ï¼åè®®éçtrace_idå段ç设计使å¾Nebulaæ¡æ¶å¯ä»¥å¨å®å ¨ä¸å¢å åºç¨å¼åè é¢å¤å·¥ä½çæ åµä¸ï¼æ£å¸¸è°ç¨LOG4_INFOåæ¥å¿èæ é¡»é¢å¤å·¥ä½ï¼å®ç°æææ è®°çåä¸trace_idçæ¥å¿åéå°æå®ä¸å°æ¥å¿æå¡å¨ï¼å®ä¹é误æ¶è·åä½æå¡é£æ ·ç»å½ä¸å°æå¡å¨æ¥çæ¥å¿å³å¯ãæ¯å¦ï¼IMç¨æ·åéä¸æ¡æ¶æ¯å¤±è´¥ï¼å¨ç¨æ·åéçæ¶æ¯å°è¾¾æå¡ç«¯æ¥å ¥å±æ¶å°±è¢«æä¸äºtrace_idæ è®°ï¼è¿ä¸ªidä¼ä¸ç´ä¼ éå°é»è¾å±ãåå¨å±çï¼åªä¸ªç¯èåçäºé误é½å¯ä»¥ä»æ¶æ¯çåéã转åãå¤çè·¯å¾ä¸æ¥å°ã
ââMsgHeadåMsgBodyçç¼è§£ç å®ç°è§Nebulaæ¡æ¶çhttps://github.com/Bwar/Nebula/blob/master/src/codec/CodecProto.cppã
2. Httpé讯å议设计
ââä¸é¢ç讲解çæ¯protobufåºç¨äºTCPæ°æ®æµéä¿¡ï¼æ¥ä¸æ¥å°æè¿°protobufå¨httpéä¿¡ä¸çåºç¨ã
ââå¨Webæå¡ä¸é常ä¼ç¨Nginxåæ¥å ¥å±çåå代çï¼ç»è¿Nginx转åå°åç»ä¸å¡é»è¾å±çtomcatãapacheænginxä¸ï¼æ¥å ¥å±åä¸å¡é»è¾å±è³å°åäºä¸¤æ¬¡httpå议解æï¼httpåè®®æ¯ææ¬åè®®ï¼ä¼ è¾æ°æ®é大解æéåº¦æ ¢ãNebulaæ¡æ¶ä¸æ¯ä¸ä¸ªwebæå¡å¨ï¼ä½æ¯æhttpåè®®ï¼å¨åªéæä¾httpæ¥å£çåºç¨åºæ¯ï¼æ¯å¦å®å ¨åå端å离çå端ï¼åºäºNebulaçåè¿ç¨httpæå¡ç«¯å¹¶åéå°±å¯ä»¥æ¯tomcatçæ°ååãè¿ä¸å®ç¨åº¦ä¸å¾çäºNebulaæ¡æ¶å¨httpéä¿¡ä¸protobufçåºç¨ãNebulaæ¡æ¶è§£æhttpææ¬å议并转å为HttpMsgå¨æå¡å é¨å¤çï¼åºç¨å¼åè å¡«å HttpMsgï¼æ¥å ¥å±å°ååºçHttpMsg转æ¢æhttpææ¬åè®®ååç»è¯·æ±æ¹ï¼ä¸ç®¡æå¡ç«¯å é¨ç»è¿å¤å°æ¬¡ä¸è½¬ï¼å§ç»åªæä¸æ¬¡httpåè®®çdecodeåä¸æ¬¡httpåè®®çencodeã
syntax = "proto3";
message HttpMsg
{
int32 type = 1; ///< http_parser_type 请æ±æååº
int32 http_major = 2; ///< http大çæ¬å·
int32 http_minor = 3; ///< httpå°çæ¬å·
int32 content_length = 4; ///< å
容é¿åº¦
int32 method = 5; ///< 请æ±æ¹æ³
int32 status_code = 6; ///< ååºç¶æç
int32 encoding = 7; ///< ä¼ è¾ç¼ç ï¼åªå¨encodeæ¶ä½¿ç¨ï¼å½ Transfer-Encoding: chunked æ¶ï¼ç¨äºæ è¯chunkåºå·ï¼0表示第ä¸ä¸ªchunkï¼ä¾æ¬¡éå¢ï¼
string url = 8; ///< å°å
map<string, string> headers = 9; ///< http头å
bytes body = 10; ///< æ¶æ¯ä½ï¼å½ Transfer-Encoding: chunked æ¶ï¼åªåå¨ä¸ä¸ªchunkï¼
map<string, string> params = 11; ///< GETæ¹æ³åæ°ï¼POSTæ¹æ³è¡¨åæ交çåæ°
Upgrade upgrade = 12; ///< å级åè®®
float keep_alive = 13; ///< keep alive time
string path = 14; ///< Http Decodeæ¶ä»urlä¸è§£æåºæ¥ï¼ä¸éè¦äººä¸ºå¡«å
ï¼encodeæ¶ä¸éè¦å¡«ï¼
bool is_decoding = 15; ///< æ¯å¦æ£å¨è§£ç ï¼true æ£å¨è§£ç ï¼ false æªè§£ç æå·²å®æ解ç ï¼
message Upgrade
{
bool is_upgrade = 1;
string protocol = 2;
}
}
ââHttpMsgçç¼è§£ç å®ç°è§Nebulaæ¡æ¶çhttps://github.com/Bwar/Nebula/blob/master/src/codec/CodecHttp.cppã
3. æ°æ®åºä»£çæå¡å议设计
ââå¦æä¸é¢æè¿°çprotobufå¨ç½ç»éä¿¡ä¸åºç¨ç®ä¸éçè¯ï¼é£ä»¥ä¸å°protobufç¨äºæ°æ®ä»£çä¸çå议设计åç»å¯¹æ¯è®©äººç¼åä¸äº®ã
ââæçå ¬å¸è§å®webæå¡ä¸å¾ç´æ¥è®¿é®MySQLæ°æ®åºï¼çè³ä¸å 许å¨webé»è¾å±æ¼æ¥SQLè¯å¥ãå¦ææè¿ç§åºäºå®å ¨æ§èèèåçéå¶ï¼å¨webé»è¾å±åé¢åå¢å ä¸å±ä¸å¡é»è¾å±ææ¬æªå 太é«äºï¼é£ä¹è§£å³åæ³åºè¯¥æ¯å¢å ä¸å±ä¸å¡é»è¾æ å ³ç代çæå¡å±ãè¿ä¸ªä»£çæå¡å±ä¸æ¯ç®åç转åSQLè¯å¥è¿ä¹ç®åï¼å 为webé»è¾å±å¯è½ä¸å 许æ¼æ¥SQLï¼ç±æ¤å¼åºæ们è¿ä¸ªç¨äºæ°æ®åºä»£ççprotobufå议设计ãè¿ä¸ªåè®®æ¯å°SQLé»è¾èå ¥æ´ä¸ªåè®®ä¹ä¸ï¼æ°æ®åºä»£çå±æ¥æ¶å¹¶è§£æè¿ä¸ªåè®®åçæSQLè¯å¥æç¨bindingæ¹å¼å°æ°æ®åºå»æ§è¡ãæ°æ®åºä»£çå±åªæå议解æå转åé»è¾ï¼æ å ¶ä»ä»»ä½ä¸å¡é»è¾ï¼ä¸å¡é»è¾è¿å¨webé»è¾å±ï¼åºå«åªå¨äºä»æ¼æ¥SQLåæäºå¡«å protobufåè®®ã
syntax = "proto2";
package dbagent;
/**
* @brief DB Agentæ¶æ¯
*/
message DbAgentMsg
{
enum E_TYPE
{
UNDEFINE = 0; ///< æªå®ä¹
REQ_CONNECT = 1; ///< è¿æ¥DB请æ±
RSP_CONNECT = 2; ///< è¿æ¥DBååº
REQ_QUERY = 3; ///< æ§è¡SQL请æ±
RSP_QUERY = 4; ///< æ§è¡SQLååº
REQ_DISCONNECT = 5; ///< å
³éè¿æ¥è¯·æ±
RSP_DISCONNECT = 6; ///< å
³éè¿æ¥ååº
RSP_RECORD = 7; ///< ç»æéè®°å½
RSP_COMMON = 8; ///< éç¨ååºï¼å½è¯·æ±ä¸è½è¢«Serveræ认ç¥æ¶ä¼ååºè¿ä¸ªååºï¼
REQ_GET_CONNECT = 9; ///< è·åè¿æ¥è¯·æ±
RSP_GET_CONNECT = 10; ///< è·åè¿æ¥ååº
}
required E_TYPE type = 1; ///< æ¶æ¯/æä½ ç±»å
optional RequestConnect req_connect = 2; ///< è¿æ¥è¯·æ±
optional ResponseConnect rsp_connect = 3; ///< è¿æ¥ååº
optional RequestDisconnect req_disconnect = 4; ///< å
³é请æ±
optional ResponseDisconnect rsp_disconnect = 5; ///< å
³éååº
optional RequestQuery req_query = 6; ///< æ§è¡SQL请æ±
optional ResponseQuery rsp_query = 7; ///< æ§è¡SQLååº
optional ResponseRecord rsp_record = 8; ///< SELECTç»æéè®°å½
optional ResponseCommon rsp_common = 9; ///< éç¨ååº
optional RequestGetConnection req_get_conn = 10; ///< è·åè¿æ¥è¯·æ±
optional ResponseGetConnection rsp_get_conn = 11; ///< è·åè¿æ¥ååº
}
/**
* @brief è¿æ¥è¯·æ±
*/
message RequestConnect
{
required string host = 1; ///< DBæå¨æå¡å¨IP
required int32 port = 2; ///< DB端å£
required string user = 3; ///< DBç¨æ·å
required string password = 4; ///< DBç¨æ·å¯ç
required string dbname = 5; ///< DBåºå
required string charset = 6; ///< DBå符é
}
/**
* @brief è¿æ¥ååº
*/
message ResponseConnect
{
required int32 connect_id = 1; ///< è¿æ¥ID ï¼è¿æ¥å¤±è´¥æ¶ï¼connect_id为0ï¼
optional int32 err_no = 2; ///< é误ç 0 表示è¿æ¥æå
optional string err_msg = 3; ///< é误信æ¯
}
/**
* @brief å
³éè¿æ¥è¯·æ±
*/
message RequestDisconnect
{
required int32 connect_id = 1; ///< è¿æ¥ID ï¼è¿æ¥å¤±è´¥æ¶ï¼connect_id为0ï¼
}
/**
* @brief å
³éè¿æ¥ååº
*/
message ResponseDisconnect
{
optional int32 err_no = 2; ///< é误ç 0 表示è¿æ¥æå
optional string err_msg = 3; ///< é误信æ¯
}
/**
* @brief æ§è¡SQL请æ±
*/
message RequestQuery
{
required E_QUERY_TYPE query_type = 1; ///< æ¥è¯¢ç±»å
required string table_name = 2; ///< 表å
repeated Field fields = 3; ///< åç±»å
repeated ConditionGroup conditions= 4; ///< whereæ¡ä»¶ç»ï¼ç±group_relationæå®ï¼è¥ä¸æå®åé»è®¤ä¸ºANDå
³ç³»ï¼
repeated string groupby_col = 5; ///< group byå段
repeated OrderBy orderby_col = 6; ///< order byå段
optional uint32 limit = 7; ///< æå®è¿åçè¡æ°çæå¤§å¼ (limit 200)
optional uint32 limit_from = 8; ///< æå®è¿åç第ä¸è¡çå移é (limit 100, 200)
optional ConditionGroup.E_RELATION group_relation = 9; ///< whereæ¡ä»¶ç»çå
³ç³»,æ¡ä»¶ç»ä¹é´æä¸åªæä¸ç§å
³ç³»ï¼andæè
orï¼
optional int32 connect_id = 10; ///< è¿æ¥IDï¼ææè¿æ¥IDï¼é¿è¿æ¥ï¼å½connectåå¤æ¬¡æ§è¡queryå¯ä»¥ä½¿ç¨connect_idï¼
optional string bid = 11; ///< ä¸å¡IDï¼å¨CmdDbAgent.jsoné
ç½®æ件ä¸é
ç½®ï¼çè¿æ¥ï¼æ¯æ¬¡æ§è¡queryæ¶è¿æ¥ï¼æ§è¡å®åå
³éè¿æ¥ï¼
optional string password = 12; ///< ä¸å¡å¯ç
enum E_QUERY_TYPE ///< æ¥è¯¢ç±»å
{
SELECT = 0; ///< selectæ¥è¯¢
INSERT = 1; ///< insertæå
¥
INSERT_IGNORE = 2; ///< insert ignoreæå
¥ï¼è¥åå¨åæ¾å¼
UPDATE = 3; ///< updateæ´æ°
REPLACE = 4; ///< replaceè¦çæå
¥
DELETE = 5; ///< deleteå é¤
}
enum E_COL_TYPE ///< åç±»å
{
STRING = 0; ///< char, varchar, text, datetime, timestampç
INT = 1; ///< int
BIGINT = 2; ///< bigint
FLOAT = 3; ///< float
DOUBLE = 4; ///< double
}
message Field ///< å段
{
required string col_name = 1; ///< åå
required E_COL_TYPE col_type = 2; ///< åç±»å
required bytes col_value = 3; ///< åå¼
optional string col_as = 4; ///< asåå
}
message Condition ///< whereæ¡ä»¶
{
required E_RELATION relation = 1; ///< å
³ç³»ï¼=, !=, >, <, >=, <= çï¼
required E_COL_TYPE col_type = 2; ///< åç±»å
required string col_name = 3; ///< åå
repeated bytes col_values = 4; ///< åå¼ï¼å½ä¸ä»
å½relation为INæ¶å¼ç个æ°å¤§äº1ææï¼
optional string col_name_right= 5; ///< å
³ç³»å³è¾¹ååï¼ç¨äºwhere col1=col2è¿ç§æ
åµï¼
enum E_RELATION
{
EQ = 0; ///< çäº=
NE = 1; ///< ä¸çäº!=
GT = 2; ///< 大äº>
LT = 3; ///< å°äº<
GE = 4; ///< 大äºçäº>=
LE = 5; ///< å°äºçäº<=
LIKE = 6; ///< like
IN = 7; ///< in (1, 2, 3, 4, 5)
}
}
message ConditionGroup ///< whereæ¡ä»¶ç»å
{
required E_RELATION relation = 1; ///< æ¡ä»¶ä¹é´çå
³ç³»ï¼ä¸ä¸ªConditionGroupéçææConditionä¹é´æä¸åªæä¸ç§å
³ç³»ï¼andæè
orï¼
repeated Condition condition = 2; ///< æ¡ä»¶
enum E_RELATION
{
AND = 0; ///< andä¸
OR = 1; ///< oræ
}
}
message OrderBy
{
required E_RELATION relation = 1; ///< éåºæååº
required string col_name = 2; ///< åå
enum E_RELATION
{
ASC = 0;
DESC = 1;
}
}
}
/**
* @brief æ§è¡SQLååº
*/
message ResponseQuery
{
required uint32 seq = 1; ///< æ°æ®å
åºåå·ï¼SELECTç»æéä¼åå
è¿åï¼åªæä¸ä¸ªå
çæ
åµæå·²å°è¾¾æåä¸ä¸ªå
åseq=0xFFFFFFFFï¼
required int32 err_no = 2; ///< é误ç ï¼0 表示æ§è¡æå
optional string err_msg = 3; ///< é误信æ¯
optional uint64 insert_id = 4; ///< mysql_insert_id()è·åçå¼ï¼è§æ§è¡çSQLè¯å¥èå®ï¼ä¸ä¸å®åå¨ï¼
repeated bytes dict = 5; ///< ç»æéåå
¸ï¼è§æ§è¡çSQLè¯å¥èå®ï¼ä¸ä¸å®åå¨ï¼
}
/**
* @brief SELECTè¯å¥è¿åç»æéçä¸æ¡è®°å½
*/
message ResponseRecord
{
required uint32 seq = 1; ///< æ°æ®å
åºåå·ï¼SELECTç»æéä¼åå
è¿åï¼å·²å°è¾¾æåä¸ä¸ªå
åseq=0xFFFFFFFFï¼
repeated bytes field = 2; ///< æ°æ®éè®°å½çå段
}
/**
* @brief 常è§ååº
*/
message ResponseCommon
{
optional int32 err_no = 1; ///< é误ç 0 表示è¿æ¥æå
optional string err_msg = 2; ///< é误信æ¯
}
/**
* @brief è·åè¿æ¥è¯·æ±
*/
message RequestGetConnection
{
required string bid = 1; ///< ä¸å¡IDï¼å¨dbproxyé
ç½®æ件ä¸é
ç½®
required string password = 2; ///< ä¸å¡å¯ç
}
/**
* @brief è·åè¿æ¥ååº
*/
message ResponseGetConnection
{
required int32 connect_id = 1; ///< è¿æ¥IDï¼ææè¿æ¥IDï¼å¦åæ§è¡å¤±è´¥
optional int32 err_no = 2; ///< é误ç 0 表示è¿æ¥æå
optional string err_msg = 3; ///< é误信æ¯
}
ââåºäºè¿ä¸ªæ°æ®åºæä½åè®®å¼åçæ°æ®åºä»£çå±å®å ¨è§£å³äºwebé»è¾å±ä¸å 许ç´æ¥è®¿é®æ°æ®åºä¹ä¸å 许æ¼æ¥SQLè¯å¥çé®é¢ï¼èä¸å ä¹æ²¡æå¢å å¼å代价ãå¦å¤ï¼åºäºè¿ä¸ªåè®®çæ°æ®åºä»£ç天ç¶é²æ¢SQLæ³¨å ¥ï¼å¨ä»£çå±æ ¡éªfield_nameï¼å¹¶ä¸mysql_escape_string(filed_value)ï¼ï¼è½ç¶é²SQLæ³¨å ¥åºæ¯åºç¨å±ç责任ï¼ä½å¤äºæ°æ®ä»£çè¿å±ä¿éä¹æ¯å¥½äºã
ââè¿ä¸ªåè®®åªæ¯æç®åSQLï¼ä¸æ¯æèåæ¥è¯¢ãåæ¥è¯¢ï¼ä¹ä¸æ¯æåå¨è¿ç¨ï¼å¦æéè¦æ¯æçè¯åè®®ä¼æ´å¤æãå¨Bwaræè´è´£è¿çä¸å¡éï¼åºæ¬é½ç¦æ¢æ°æ®åºèåæ¥è¯¢ä¹ç±»ï¼åªææ°æ®åºå½åå¨ç¨ï¼ä¸æé»è¾åå°SQLè¯å¥éï¼æ以è¿ä¸ªå议满足大é¨åä¸å¡éè¦ã
ââè¿ä¸èåªè¯´ææ°æ®åºä»£çåè®®ï¼ä¸ä¸èå°ä»æ°æ®åºä»£çå议延伸并æä¾å议代ç 讲解ã
4. RedisåMySQLæ°æ®ä»£çå议设计
ââ大é¨ååå°åºç¨åªæMySQLæ¯ä¸å¤çï¼å¾å¾è¿éè¦ç¼åï¼ç»å¸¸ä¼ç¨Redisæ¥åæ°æ®ç¼åãç¨ç¼åæå³çæ°æ®è³å°éè¦åæ¶åå°RedisåMySQLï¼åæè å¨æªå½ä¸ç¼åæ¶ä»MySQLä¸è¯»åå°çæ°æ®éè¦ååå°Redisï¼è¿äºé常é½æ¯ç±ä¸å¡é»è¾å±æ¥åçãä¹æä¾å¤ï¼Nebulaæä¾çåå¸å¼è§£å³æ¹æ¡æ¯ç±æ°æ®ä»£çå±æ¥åçï¼ä¸å¡é»è¾å±åªéåæ°æ®ä»£çå±åéä¸ä¸ªprotobufåè®®æ°æ®ï¼æ°æ®ä»£çå±å°±ä¼å®æRedisåMySQLååæç¼åæªå½ä¸æ¶çèªå¨ååï¼æä¸ä¸æ¢è®¨æ°æ®ä¸è´æ§é®é¢ï¼ãæ°æ®ä»£çå±æ¥åè¿äºå·¥ä½æ¯ä¸ºäºåå°ä¸å¡é»è¾å±çå¤æ度ï¼æé«å¼åæçãæ¢ç¶æ¯ä¸ºäºæé«å¼åæçï¼å°±å¾è®©ä¸å¡é»è¾å±ä½äºåæ¥åæ¶æä½RedisåMySQLçå¼åéãNebulaæä¾çNebulaMydiså°±æ¯è¿æ ·ä¸ä¸ªè®©åæ¥åæ¶æä½RedisåMySQLçå¼åéï¼å设æ¯2ï¼éå°1.2å·¦å³ã
ââè¿ä¸ªåæ¶æä½RedisåMySQLçæ°æ®ä»£çåè®®å¦ä¸ï¼
syntax = "proto3";
package neb;
message Mydis
{
uint32 section_factor = 1;
RedisOperate redis_operate = 2;
DbOperate db_operate = 3;
message RedisOperate
{
bytes key_name = 1;
string redis_cmd_read = 2;
string redis_cmd_write = 3;
OPERATE_TYPE op_type = 4;
repeated Field fields = 5;
int32 key_ttl = 6;
int32 redis_structure = 7; ///< redisæ°æ®ç±»å
int32 data_purpose = 8; ///< æ°æ®ç¨é
bytes hash_key = 9; ///< å¯éhash keyï¼å½has_hash_key()æ¶ç¨hash_keyæ¥è®¡ç®hashå¼ï¼å¦åç¨key_nameæ¥è®¡ç®hashå¼
enum OPERATE_TYPE
{
T_READ = 0;
T_WRITE = 1;
}
}
message DbOperate
{
E_QUERY_TYPE query_type = 1; ///< æ¥è¯¢ç±»å
string table_name = 2; ///< 表å
repeated Field fields = 3; ///< åç±»å
repeated ConditionGroup conditions = 4; ///< whereæ¡ä»¶ç»ï¼ç±group_relationæå®ï¼è¥ä¸æå®åé»è®¤ä¸ºANDå
³ç³»ï¼
repeated string groupby_col = 5; ///< group byå段
repeated OrderBy orderby_col = 6; ///< order byå段
ConditionGroup.E_RELATION group_relation = 7; ///< whereæ¡ä»¶ç»çå
³ç³»,æ¡ä»¶ç»ä¹é´æä¸åªæä¸ç§å
³ç³»ï¼andæè
orï¼
uint32 limit = 8; ///< æå®è¿åçè¡æ°çæå¤§å¼ (limit 200)
uint32 limit_from = 9; ///< æå®è¿åç第ä¸è¡çå移é (limit 100, 200)
uint32 mod_factor = 10; ///< å表å模å åï¼å½è¿ä¸ªå段没ææ¶ä½¿ç¨section_factor
enum E_QUERY_TYPE ///< æ¥è¯¢ç±»å
{
SELECT = 0; ///< selectæ¥è¯¢
INSERT = 1; ///< insertæå
¥
INSERT_IGNORE = 2; ///< insert ignoreæå
¥ï¼è¥åå¨åæ¾å¼
UPDATE = 3; ///< updateæ´æ°
REPLACE = 4; ///< replaceè¦çæå
¥
DELETE = 5; ///< deleteå é¤
}
message Condition ///< whereæ¡ä»¶
{
E_RELATION relation = 1; ///< å
³ç³»ï¼=, !=, >, <, >=, <= çï¼
E_COL_TYPE col_type = 2; ///< åç±»å
string col_name = 3; ///< åå
repeated bytes col_values = 4; ///< åå¼ï¼å½ä¸ä»
å½relation为INæ¶å¼ç个æ°å¤§äº1ææï¼
string col_name_right = 5; ///< å
³ç³»å³è¾¹ååï¼ç¨äºwhere col1=col2è¿ç§æ
åµï¼
enum E_RELATION
{
EQ = 0; ///< çäº=
NE = 1; ///< ä¸çäº!=
GT = 2; ///< 大äº>
LT = 3; ///< å°äº<
GE = 4; ///< 大äºçäº>=
LE = 5; ///< å°äºçäº<=
LIKE = 6; ///< like
IN = 7; ///< in (1, 2, 3, 4, 5)
}
}
message ConditionGroup ///< whereæ¡ä»¶ç»å
{
E_RELATION relation = 1; ///< æ¡ä»¶ä¹é´çå
³ç³»ï¼ä¸ä¸ªConditionGroupéçææConditionä¹é´æä¸åªæä¸ç§å
³ç³»ï¼andæè
orï¼
repeated Condition condition = 2; ///< æ¡ä»¶
enum E_RELATION
{
AND = 0; ///< andä¸
OR = 1; ///< oræ
}
}
message OrderBy
{
E_RELATION relation = 1; ///< éåºæååº
string col_name = 2; ///< åå
enum E_RELATION
{
ASC = 0;
DESC = 1;
}
}
}
}
enum E_COL_TYPE ///< åç±»å
{
STRING = 0; ///< char, varchar, text, datetime, timestampç
INT = 1; ///< int
BIGINT = 2; ///< bigint
FLOAT = 3; ///< float
DOUBLE = 4; ///< double
}
message Record
{
repeated Field field_info = 1; ///< value data
}
message Field ///< å段
{
string col_name = 1; ///< åå
E_COL_TYPE col_type = 2; ///< åç±»å
bytes col_value = 3; ///< åå¼
string col_as = 4; ///< asåå
}
/**
* @brief æ¥è¯¢ç»æ
* @note éç¨äºRedisè¿ååMySQLè¿åï¼å½totalcountä¸curcountç¸çæ¶è¡¨ææ°æ®å·²æ¥æ¶å®æ¯ï¼
* å¦å表示æ°æ®å°æªæ¥æ¶å®ï¼å©ä½çæ°æ®ä¼å¨åç»æ°æ®å
继ç»è¿åã
*/
message Result
{
int32 err_no = 1;
bytes err_msg = 2;
int32 total_count = 3;
int32 current_count = 4;
repeated Record record_data = 5;
int32 from = 6; ///< æ°æ®æ¥æº E_RESULT_FROM
DataLocate locate = 7; ///< ä»
å¨DataProxy使ç¨
enum E_RESULT_FROM
{
FROM_DB = 0;
FROM_REDIS = 1;
}
message DataLocate
{
uint32 section_from = 1;
uint32 section_to = 2; ///< æ°æ®æå¨å段ï¼section_from < MemOperate.section_factor <= section_to
uint32 hash = 3; ///< ç¨äºååå¸çhashå¼ï¼å模è¿ç®æ¶ï¼ä¸ºå模åçç»æï¼
uint32 divisor = 4; ///< å模è¿ç®çé¤æ°ï¼ä¸è´æ§hashæ¶ä¸éè¦ï¼
}
}
ââè¿ä¸ªåè®®åäºRedisåMySQL两é¨åæ°æ®ï¼çä¼¼ä¸å¡é»è¾å±æä¸ä»½æ°æ®å¡«å äºä¸¤ä»½å¹¶æ²¡æéä½å¤å°å¼åéï¼å®é ä¸è¿ä¸¤é¨åæ°æ®æ许å¤æ¯å¯å ±ç¨çï¼åªè¦æä¾ä¸ä¸ªå¡«å 类就å¯ä»¥å¤§å¹ éä½å议填å å¼åéã为ç®åå议填å ï¼Nebulaæä¾äºå 个类ï¼åæ¶å¡«å RedisåMySQLæ°æ®ãåªå¡«å Redisãåªå¡«å MySQLã
ââä»Mydisåè®®çMySQLé¨åå¦ä½çæSQLè¯å¥è¯·åèNebulaDbAgentï¼æ ¸å¿ä»£ç 头æ件å¦ä¸ï¼
namespace dbagent
{
const int gc_iMaxBeatTimeInterval = 30;
const int gc_iMaxColValueSize = 65535;
struct tagConnection
{
CMysqlDbi* pDbi;
time_t ullBeatTime;
int iQueryPermit;
int iTimeout;
tagConnection() : pDbi(NULL), ullBeatTime(0), iQueryPermit(0), iTimeout(0)
{
}
~tagConnection()
{
if (pDbi != NULL)
{
delete pDbi;
pDbi = NULL;
}
}
};
class CmdExecSql : public neb::Cmd, public neb::DynamicCreator<CmdExecSql, int32>
{
public:
CmdExecSql(int32 iCmd);
virtual ~CmdExecSql();
virtual bool Init();
virtual bool AnyMessage(
std::shared_ptr<neb::SocketChannel> pChannel,
const MsgHead& oMsgHead,
const MsgBody& oMsgBody);
protected:
bool GetDbConnection(const neb::Mydis& oQuery, CMysqlDbi** ppMasterDbi, CMysqlDbi** ppSlaveDbi);
bool FetchOrEstablishConnection(neb::Mydis::DbOperate::E_QUERY_TYPE eQueryType,
const std::string& strMasterIdentify, const std::string& strSlaveIdentify,
const neb::CJsonObject& oInstanceConf, CMysqlDbi** ppMasterDbi, CMysqlDbi** ppSlaveDbi);
std::string GetFullTableName(const std::string& strTableName, uint32 uiFactor);
int ConnectDb(const neb::CJsonObject& oInstanceConf, CMysqlDbi* pDbi, bool bIsMaster = true);
int Query(const neb::Mydis& oQuery, CMysqlDbi* pDbi);
void CheckConnection(); //æ£æ¥è¿æ¥æ¯å¦å·²è¶
æ¶
void Response(int iErrno, const std::string& strErrMsg);
bool Response(const neb::Result& oRsp);
bool CreateSql(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateSelect(const neb::Mydis& oQuery, std::string& strSql);
bool CreateInsert(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateUpdate(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strSql);
bool CreateDelete(const neb::Mydis& oQuery, std::string& strSql);
bool CreateCondition(const neb::Mydis::DbOperate::Condition& oCondition, CMysqlDbi* pDbi, std::string& strCondition);
bool CreateConditionGroup(const neb::Mydis& oQuery, CMysqlDbi* pDbi, std::string& strCondition);
bool CreateGroupBy(const neb::Mydis& oQuery, std::string& strGroupBy);
bool CreateOrderBy(const neb::Mydis& oQuery, std::string& strOrderBy);
bool CreateLimit(const neb::Mydis& oQuery, std::string& strLimit);
bool CheckColName(const std::string& strColName);
private:
std::shared_ptr<neb::SocketChannel> m_pChannel;
MsgHead m_oInMsgHead;
MsgBody m_oInMsgBody;
int m_iConnectionTimeout; //空é²è¿æ¥è¶
æ¶ï¼åä½ç§ï¼
char* m_szColValue; //å段å¼
neb::CJsonObject m_oDbConf;
uint32 m_uiSectionFrom;
uint32 m_uiSectionTo;
uint32 m_uiHash;
uint32 m_uiDivisor;
std::map<std::string, std::set<uint32> > m_mapFactorSection; //å段å ååºé´é
ç½®ï¼key为å åç±»å
std::map<std::string, neb::CJsonObject*> m_mapDbInstanceInfo; //æ°æ®åºé
置信æ¯key为("%u:%u:%u", uiDataType, uiFactor, uiFactorSection)
std::map<std::string, tagConnection*> m_mapDbiPool; //æ°æ®åºè¿æ¥æ± ï¼key为identifyï¼å¦ï¼192.168.18.22:3306ï¼
};
} // namespace dbagent
ââæ´ä¸ªmydisæ°æ®åè®®æ¯å¦ä½è§£æå¦ä½ä½¿ç¨ï¼å¦ä½åRedisåMySQLçæ°æ®ååãç¼åæ°æ®ååçä¸å¨æ¬æ讨论èå´ï¼å¦æå ´è¶£å¯ä»¥é 读NebulaMydisæºç ï¼ä¹å¯ä»¥èç³»Bwarã
5. ç»è¯
ââProtobufç¨å¾åéç¨å¾å¥½å¯ä»¥è§£å³è®¸å¤é®é¢ï¼å¯ä»¥æé«å¼åæçï¼ä¹å¯ä»¥æé«è¿è¡æçï¼ä»¥ä¸å°±æ¯Bwarå¤å¹´åºç¨protobufçå°ç»ï¼æ²¡æä»»ä½èç§ï¼æä¸ååºçåè®®é½å¯ä»¥å¨å¼æºé¡¹ç®Nebulaçè¿ä¸ªè·¯å¾https://github.com/Bwar/Nebula/tree/master/protoæ¾å°ã
ââå¼åNebulaæ¡æ¶ç®çæ¯è´åäºæä¾ä¸ç§åºäºC++å¿«éæ建é«æ§è½çåå¸å¼æå¡ãå¦æè§å¾æ¬æå¯¹ä½ æç¨ï¼å«å¿äºå°NebulaçGithubæç äºç»ä¸ªstarï¼è°¢è°¢ã
ä½è ï¼Bwar
åºå¤ï¼https://www.cnblogs.com/bwar/
Bwarå¾åæé çé«æ§è½ç½ç»æ¡æ¶Nebulaï¼https://github.com/Bwar/Nebula
ååæç« å¦è½¬è½½ï¼è¯·æ³¨æåºå¤ã