天天看點

Kafka網絡模型和通信流程剖析

Kafka網絡模型和通信流程剖析

1.概述

最近有同學在學習Kafka的網絡通信這塊内容時遇到一些疑問,關于網絡模型和通信流程的相關内容,這裡筆者将通過這篇部落格為大家來剖析一下這部分内容。

2.内容

Kafka系統作為一個Message Queue,涉及到的網絡通信主要包含以下兩個方面:

Pull:Consumer從消息隊列中拉取消息資料;

Push:Producer往消息隊列中推送消息資料。

要實作高性能的網絡通信,可以使用更加底層的TCP協定或者UDP協定來實作。Kafka在Producer、Broker、Consumer之間設計了一套基于TCP層的通信協定,這套協定完全是為了Kafka系統自身需求而定制實作的。

提示:

這裡需要注意的是,由于UDP協定是一種不可靠的傳輸協定,是以Kafka系統采用TCP協定作為服務間的通信協定。

2.1 基本資料類型

通信協定中的基本資料類型分為以下幾種:

定長資料類型:例如,int8、int16、int32和、int64,對應到Java語言中,分别是byte、short、int和long

可變資料類型:例如,Java語言中Map、List等

數組:例如,Java語言中的int[]、String[]等

2.2 通信模型

Kafka系統采用的是Reactor多線程模型,即通過一個Acceptor線程處理所有的新連接配接,通過多個Processor線程對請求進行處理(比如解析協定、封裝請求、、轉發等)。

Reactor是一種事件模型,可以将請求送出到一個或者多個服務程式中進行處理。

當收到Client的請求後,Server處理程式使用多路分發政策,由一個非阻塞的線程來接收所有的請求,然後将這些請求轉發到對應的工作線程中進行處理。

之後,在Kafka的版本疊代中,新增了一個Handler子產品,它通過指定的線程數對請求進行處理。Handler和Processor之間通過一個Block Queue進行連接配接。如下圖所示:

這裡 Acceptor是一個繼承于AbstractServerThread的線程類,Acceptor的主要目的是監聽并且接收Client的請求,同時,建立資料傳輸通道(SocketChannel),然後通過輪詢的方式交給一個Processor處理。其核心代碼在Acceptor的run方法中,代碼如下:

def run() {

serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
  var currentProcessor = 0
  while (isRunning) {
    try {
      val ready = nioSelector.select(500)
      if (ready > 0) {
        val keys = nioSelector.selectedKeys()
        val iter = keys.iterator()
        while (iter.hasNext && isRunning) {
          try {
            val key = iter.next
            iter.remove()
            if (key.isAcceptable)
              accept(key, processors(currentProcessor))
            else
              throw new IllegalStateException("Unrecognized key state for acceptor thread.")

            // round robin to the next processor thread
            currentProcessor = (currentProcessor + 1) % processors.length
          } catch {
            case e: Throwable => error("Error while accepting connection", e)
          }
        }
      }
    }
    catch {
      // We catch all the throwables to prevent the acceptor thread from exiting on exceptions due
      // to a select operation on a specific channel or a bad request. We don't want
      // the broker to stop responding to requests from other clients in these scenarios.
      case e: ControlThrowable => throw e
      case e: Throwable => error("Error occurred", e)
    }
  }
} finally {
  debug("Closing server socket and selector.")
  swallowError(serverChannel.close())
  swallowError(nioSelector.close())
  shutdownComplete()
}           

}

這裡還有一個塊通道(BlockingChannel),用于連接配接Processor和Handler,其代碼如下所示:

class BlockingChannel( val host: String,

val port: Int, 
                   val readBufferSize: Int, 
                   val writeBufferSize: Int, 
                   val readTimeoutMs: Int ) extends Logging {           

private var connected = false

private var channel: SocketChannel = null

private var readChannel: ReadableByteChannel = null

private var writeChannel: GatheringByteChannel = null

private val lock = new Object()

private val connectTimeoutMs = readTimeoutMs

private var connectionId: String = ""

def connect() = lock synchronized {

if(!connected) {
  try {
    channel = SocketChannel.open()
    if(readBufferSize > 0)
      channel.socket.setReceiveBufferSize(readBufferSize)
    if(writeBufferSize > 0)
      channel.socket.setSendBufferSize(writeBufferSize)
    channel.configureBlocking(true)
    channel.socket.setSoTimeout(readTimeoutMs)
    channel.socket.setKeepAlive(true)
    channel.socket.setTcpNoDelay(true)
    channel.socket.connect(new InetSocketAddress(host, port), connectTimeoutMs)

    writeChannel = channel
    // Need to create a new ReadableByteChannel from input stream because SocketChannel doesn't implement read with timeout
    // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
    readChannel = Channels.newChannel(channel.socket().getInputStream)
    connected = true
    val localHost = channel.socket.getLocalAddress.getHostAddress
    val localPort = channel.socket.getLocalPort
    val remoteHost = channel.socket.getInetAddress.getHostAddress
    val remotePort = channel.socket.getPort
    connectionId = localHost + ":" + localPort + "-" + remoteHost + ":" + remotePort
    // settings may not match what we requested above
    val msg = "Created socket with SO_TIMEOUT = %d (requested %d), SO_RCVBUF = %d (requested %d), SO_SNDBUF = %d (requested %d), connectTimeoutMs = %d."
    debug(msg.format(channel.socket.getSoTimeout,
                     readTimeoutMs,
                     channel.socket.getReceiveBufferSize, 
                     readBufferSize,
                     channel.socket.getSendBufferSize,
                     writeBufferSize,
                     connectTimeoutMs))

  } catch {
    case _: Throwable => disconnect()
  }
}           

def disconnect() = lock synchronized {

if(channel != null) {
  swallow(channel.close())
  swallow(channel.socket.close())
  channel = null
  writeChannel = null
}
// closing the main socket channel *should* close the read channel
// but let's do it to be sure.
if(readChannel != null) {
  swallow(readChannel.close())
  readChannel = null
}
connected = false           

def isConnected = connected

def send(request: RequestOrResponse): Long = {

if(!connected)
  throw new ClosedChannelException()

val send = new RequestOrResponseSend(connectionId, request)
send.writeCompletely(writeChannel)           

def receive(): NetworkReceive = {

if(!connected)
  throw new ClosedChannelException()

val response = readCompletely(readChannel)
response.payload().rewind()

response           

private def readCompletely(channel: ReadableByteChannel): NetworkReceive = {

val response = new NetworkReceive
while (!response.complete())
  response.readFromReadableChannel(channel)
response           

3.通信過程

Kafka系統的通信架構也是經過了不同的版本疊代的。例如,在Kafka老的版本中,以NIO作為網絡通信的基礎,通過将多個Socket連接配接注冊到一個Selector上進行監聽,隻用一個線程就能管理多個連接配接,這極大的節省了多線程的資源開銷。

在Kafka之後的新版本中,依然以NIO作為網絡通信的基礎,也使用了Reactor多線程模型,不同的是,新版本将具體的業務處理子產品(Handler子產品)獨立出去了,并用單獨的線程池進行控制。如下圖所示:

通過上圖,我們可以總結一下Kafka的通信流程:

Client向Server發送請求時,Acceptor負責接收TCP請求,連接配接成功後傳遞給Processor線程;

Processor線程接收到新的連接配接後,将其注冊到自身的Selector中,并監聽READ事件

當Client在目前連接配接對象上寫入資料時,會觸發READ事件,根據TCP協定調用Handler進行處理

Handler處理完成後,可能會有傳回值給Client,并将Handler傳回的結果綁定Response端進行發送

通過總結和分析,我們可以知道Kafka新版中獨立Handler子產品,用這樣以下幾點優勢:

能夠單獨指定Handler的線程數,便于調優和管理

防止一個過大的請求阻塞一個Processor線程

Request、Handler、Response之間都是通過隊列來進行連接配接的,這樣它們彼此之間不存在耦合現象,對提升Kafka系統的性能很有幫助

這裡需要注意的是,在Kafka的網絡通信中,RequestChannel為Processor線程與Handler線程之間資料交換提供了一個緩沖區,是通信中Request和Response緩存的地方。是以,其作用就是在通信中起到了一個資料緩沖隊列的作用。Processor線程将讀取到的請求添加至RequestChannel的全局隊列(requestQueue)中,Handler線程從請求隊列中擷取并處理,處理完成後将Response添加至RequestChannel的響應隊列(responseQueues)中,通過responseListeners喚醒對應的Processor線程,最後Processor線程從響應隊列中取出後發送到Client。實作代碼如下:

class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {

private var responseListeners: List[(Int) => Unit] = Nil

private val requestQueue = new ArrayBlockingQueue

RequestChannel.Request

private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)

for(i <- 0 until numProcessors)

responseQueues(i) = new LinkedBlockingQueue[RequestChannel.Response]()
           

newGauge(

"RequestQueueSize",
new Gauge[Int] {
  def value = requestQueue.size
}           

)

newGauge("ResponseQueueSize", new Gauge[Int]{

def value = responseQueues.foldLeft(0) {(total, q) => total + q.size()}           

})

for (i <- 0 until numProcessors) {

newGauge("ResponseQueueSize",
  new Gauge[Int] {
    def value = responseQueues(i).size()
  },
  Map("processor" -> i.toString)
)           

/* Send a request to be handled, potentially blocking until there is room in the queue for the request /

def sendRequest(request: RequestChannel.Request) {

requestQueue.put(request)           

/* Send a response back to the socket server to be sent over the network /

def sendResponse(response: RequestChannel.Response) {

responseQueues(response.processor).put(response)
for(onResponse <- responseListeners)
  onResponse(response.processor)           

/* No operation to take for the request, need to read more over the network /

def noOperation(processor: Int, request: RequestChannel.Request) {

responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.NoOpAction))
for(onResponse <- responseListeners)
  onResponse(processor)           

/* Close the connection for the request /

def closeConnection(processor: Int, request: RequestChannel.Request) {

responseQueues(processor).put(RequestChannel.Response(processor, request, null, RequestChannel.CloseConnectionAction))
for(onResponse <- responseListeners)
  onResponse(processor)           

/* Get the next request or block until specified time has elapsed /

def receiveRequest(timeout: Long): RequestChannel.Request =

requestQueue.poll(timeout, TimeUnit.MILLISECONDS)
           

/* Get the next request or block until there is one /

def receiveRequest(): RequestChannel.Request =

requestQueue.take()
           

/* Get a response for the given processor if there is one /

def receiveResponse(processor: Int): RequestChannel.Response = {

val response = responseQueues(processor).poll()
if (response != null)
  response.request.responseDequeueTimeMs = Time.SYSTEM.milliseconds
response           

def addResponseListener(onResponse: Int => Unit) {

responseListeners ::= onResponse           

def shutdown() {

requestQueue.clear()           

4.總結

通過認真閱讀和分析Kafka的網絡通信層代碼,可以收獲不少關于NIO的網絡通信知識。通過對Kafka的源代碼進行閱讀和學習,這對大規模Kafka叢集性能的調優和問題定位排查是很有幫助的。

原文位址

https://www.cnblogs.com/smartloli/p/12287130.html