摘要:Kafka網絡子產品之Server端,介紹Server端啟動、接收請求和處理請求的過程。
本文分享自華為雲社群《Kafka網絡子產品-Server端》,原文作者:中間件小哥 。
SocketServer 是 Kafka server 端用于處理請求的子產品,在 Kafka 啟動過程建立、初始化、啟動。
SocketServer啟動過程:
- 按照 endpoint 順序初始化 Acceptor,每個 endpoint 對應一個 Acceptor,為每個 Acceptor 建立 Processor(數量由 num.network.threads 配置項決定),并啟動 Acceptor,Acceptor 啟動後會通過 selector 監聽連接配接,并将建立立的連接配接交給 Processor 處理(輪詢選擇 Processor)
-
啟動所有 Processor
Acceptor啟動、監聽連接配接過程:
- Acceptor啟動後,會建立一個 serverSocketChannel,監聽在該 acceptor 對應的 endpoint 上,并在 selector 上注冊 OP_ACCEPT,然後進入死循環,每次循環,通過 selector 擷取就緒的 key(即前面注冊的 serverSocketChannel),表明有連接配接來到,然後通過 accept() 建立一個和該連接配接對應的 socketChannel,然後從該 acceptor 負責的 processors 中輪詢選擇一個,将該 socketChannel 交給選擇的 processor 處理,即将連接配接交給 processor。
- Acceptor 将連接配接交給 processor 處理,是将 socketChannel 加入 processor 的連接配接隊列 newConnection 中,processor 在 run 方法中會不斷地從中擷取并處理。
- Processor 從 newConnection 擷取到 socketChannel 後,在 selector 上注冊 OP_READ,并建立對應的 KafkaChannel。
Server端接收請求、處理的過程:
- Processor 收到 OP_READ 的事件就緒後,檢查并嘗試完成 SSL 握手和 SASL 校驗(此時不一定握手完成,是以在 Processor 收到 OP_READ 的事件就緒後,要先檢查并確定握手已經完成,SSL/SASL相關參考 9.4 節)
- SSL 握手和 SASL 校驗完成後,從 channel 中讀取資料,構造 NetworkReceive 對象,并入隊 stagedReceives
- 取出 stagedReceives 隊首元素(移除),加入 completedReceives
- 将 completedReceives 中的元素取出(不移除),構造 Request 對象,加入 requestQueue,移除對 OP_READ 的事件注冊,并将對應的 KafkaChannel 置為 MUTED,再置為 MUTED_AND_RESPONSE_PENDING
- KafkaRequestHandler 從 requestQueue 取出元素(移除),并交給 KafkaApi 子產品處理請求
- KafkaApi 處理完請求後,将響應放入對應的 processor 的 responseQueue 和 inflightResponses 中,并喚醒其 selector
- Processor 從 responseQueue 中取出響應(移除),若響應是需要發回給用戶端的,則将響應的 send 指派給 KafkaChannel,并注冊 OP_WRITE 事件
- 當 channel 寫就緒後,将 send 寫入 channel 的寫 buffer,當 send 寫完後,移除對 OP_WRITE 事件的注冊,并将 send 加入 completedSends
- 從 inflightResponses 中移除對應的響應,執行響應回調,将 KafkaChannel 置為 MUTED,再從 MUTED 置為 NOT_MUTED,并重新添加 OP_READ 事件注冊
點選關注,第一時間了解華為雲新鮮技術~