本節書摘來自華章出版社《ceph源碼分析》一書中的第3章,第3.2節simple實作,作者常濤,更多章節内容可以通路雲栖社群“華章計算機”公衆号檢視
3.2 simple實作
simple在ceph裡實作比較早,目前也比較穩定,是在生産環境中使用的網絡通信子產品。如其名字所示,實作相對比較簡單。下面具體分析一下,simple如何實作ceph網絡通信架構的各個子產品。
3.2.1 simplemessager
類simplemessager實作了messager接口。
class simplemessenger : public simplepolicymessenger {
accepter accepter; //用于接受用戶端的連結請求
dispatchqueue dispatch_queue; //接收到的請求的消息分發隊列
bool did_bind; //是否綁定
` //位址→pipe映射
ceph::unordered_map rank_pipe;
//正在處理的pipes
set accepting_pipes;
//所有的pipes
set pipes;
//準備釋放的pipes
list pipe_reap_queue;`
//内部叢集的協定版本
` int cluster_protocol;
}`
3.2.2 accepter
類accepter用來在server端監聽端口,接收連結,它繼承了thread類,本身是一個線程,來不斷地監聽server的端口:
`class accepter : public thread {
simplemessenger *msgr;
bool done;
int listen_sd; //監聽的端口
uint64_t nonce;
……
}
`
3.2.3 dispatchqueue
dispatchqueue類用于把接收到的請求儲存在内部,通過其内部的線程,調用simplemessenger類注冊的dispatch類的處理函數來處理相應的消息:
其内部的mqueue為優先級隊列,用來儲存消息,marrival儲存了接收到的消息。marrival_map儲存消息在集合中的位置。
函數dispatchqueue::enqueue用來把接收到的消息添加到消息隊列中,函數dispatchqueue::entry為線程的處理函數,用于處理消息。
3.2.4 pipe
類pipe實作了pipeconnection的接口,它實作了兩個端口之間的類似管道的功能。
對于每一個pipe,内部都有一個reader和一個writer線程,分别用來處理這個pipe有關的消息接收和請求的發送。線程delayeddelivery用于故障注入測試:
3.2.5 消息的發送
1)當發送一個消息時,首先要通過messenger類,擷取對應的connection:
<code>conn = messenger->get_connection(dest_server);</code>
具體到simplemessenger的實作如下所示:
a)首先比較,如果dest.addr是my_inst.addr,就直接傳回local_connection。
b)調用函數_lookup_pipe在已經存在的pipe中查找。如果找到,就直接傳回pipeconnectionref;否則調用函數connect_rank新建立一個pipe,并加入到msgr的register_pipe裡。
2)當獲得一個connection之後,就可以調用connection的發送函數來發送消息。
<code>conn->send_message(m);</code>
其最終調用了simplemessenger::submit_message函數:
a)如果pipe不為空,并且狀态不是pipe::state_closed狀态,調用函數pipe→_send把發送的消息添加到out_q發送隊列裡,觸發發送線程。
b)如果pipe為空,就調用connect_rank建立pipe,并把消息添加到out_q發送隊列中。
3)發送線程writer把消息發送出去。通過步驟2,要發送的消息已經儲存在相應pipe的out_q隊列裡,并觸發了發送線程。每個pipe的writer線程負責發送out_q的消息,其線程入口函數為pipe::writer,實作功能:
a)調用函數_get_next_outgoing從out_q中擷取消息。
b)調用函數write_message(header, footer, blist)把消息的header、footer、資料blist發送出去。
3.2.6 消息的接收
1)每個pipe對應的線程reader用于接收消息。入口函數為pipe::reader,其功能如下:
a)判斷目前的state,如果為state_accepting,就調用函數pipe::accept來接收連接配接,如果不是state_closed,并且不是state_connecting狀态,就接收消息。
b)先調用函數tcp_read來接收一個tag。
c)根據tag,來接收不同類型的消息如下所示:
`ceph_msgr_tag_keepalive消息。
ceph_msgr_tag_keepalive2,在ceph_msgr_tag_keepalive的基礎上,添加了時間。
ceph_msgr_tag_keepalive2_ack。
ceph_msgr_tag_ack。
ceph_msgr_tag_msg,這裡才是接收的消息。
ceph_msgr_tag_close。`
d)調用函數read_message來接收消息,當本函數傳回後,就完成了接收消息。
2)調用函數in_q->fast_preprocess(m)預處理消息。
3)調用函數in_q->can_fast_dispatch(m),如果可以進行fast_dispatch,就in_q->fast_dispatch(m)處理。fast_dispatch并不把消息加入到mqueue裡,而是直接調用msgr->ms_fast_dispatch函數,并最終調用注冊的fast_dispatcher函數處理。
4)如果不能fast_dispatch,就調用函數in_q->enqueue(m, m->get_priority(), conn_id) 把接收到的消息加入到dispatchqueue的mqueue隊列裡,由dispatchqueue的分發線程調用ms_dispatch處理。
ms_fast_dispath和ms_dispatch兩種處理的差別在于:ms_dispatch是由dispatchqueue的線程處理的,它是一個單線程;ms_fast_dispatch函數是由pipe的接收線程直接調用處理的,是以性能比前者要好。
3.2.7 錯誤處理
網絡子產品複雜的功能是如何處理網絡錯誤。無論是接收還是發送,會出現各種異常錯誤,包括傳回異常錯誤碼,接收資料的magic驗證不一緻,接收的資料的效驗驗證不一緻,等等。錯誤的原因主要是由于網絡本身的錯誤(實體鍊路等),或者位元組跳變引起的。
目前錯誤處理的方法比較簡單,處理流程如下:
1)關閉目前socket的連接配接。
2)重建立立一個socket連接配接。
3)重新發送沒有接受到ack應對的消息。
函數pipe::fault用來處理錯誤:
1)調用shutdown_socket關閉pipe的socket。
2)調用函數requeue_sent把沒有收到ack的消息重新加入發送隊列,當發送隊列有請求時,發送線程會不斷地嘗試重新連接配接。