天天看點

跟我學Kafka之NIO通信機制

很久沒有做技術方面的分享了,今天閑來有空寫一篇關于Kafka通信方面的文章與大家共同學習。

一、Kafka通信機制的整體結構

74EACA88-8B9D-45F8-B7BF-202D658205A9.png

這個圖采用的就是我們之前提到的SEDA多線程模型,連結如下:

http://www.jianshu.com/p/e184fdc0ade4

1、對于broker來說,用戶端連接配接數量有限,不會頻繁建立大量連接配接。是以一個Acceptor thread線程處理建立連接配接綽綽有餘。

2、Kafka高吐吞量,則要求broker接收和發送資料必須快速,是以用proccssor thread線程池處理,并把讀取用戶端資料轉交給緩沖區,不會導緻用戶端請求大量堆積。

3、Kafka磁盤操作比較頻繁會且有io阻塞或等待,IO Thread線程數量一般設定為proccssor thread num兩倍,可以根據運作環境需要進行調節。

二、SocketServer整體設計時序圖

Kafka 通信時序圖.jpg

說明:

Kafka SocketServer是基于Java NIO來開發的,采用了Reactor的模式,其中包含了1個Acceptor負責接受用戶端請求,N個Processor線程負責讀寫資料,M個Handler來處理業務邏輯。在Acceptor和Processor,Processor和Handler之間都有隊列來緩沖請求。

下面我們就針對以上整體設計思路分開講解各個不同部分的源代碼。

2.1 啟動初始化工作

def startup() {
    val quotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
    for(i <- 0 until numProcessorThreads) {
      processors(i) = new Processor(i, 
                                    time, 
                                    maxRequestSize, 
                                    aggregateIdleMeter,
                                    newMeter("IdlePercent", "percent", TimeUnit.NANOSECONDS, Map("networkProcessor" -> i.toString)),
                                    numProcessorThreads, 
                                    requestChannel,
                                    quotas,
                                    connectionsMaxIdleMs)
      Utils.newThread("kafka-network-thread-%d-%d".format(port, i), processors(i), false).start()
    }

    newGauge("ResponsesBeingSent", new Gauge[Int] {
      def value = processors.foldLeft(0) { (total, p) => total + p.countInterestOps(SelectionKey.OP_WRITE) }
    })

    // register the processor threads for notification of responses
    requestChannel.addResponseListener((id:Int) => processors(id).wakeup())
   
    // start accepting connections
    this.acceptor = new Acceptor(host, port, processors, sendBufferSize, recvBufferSize, quotas)
    Utils.newThread("kafka-socket-acceptor", acceptor, false).start()
    acceptor.awaitStartup
    info("Started")
  }
           
說明:

ConnectionQuotas對象負責管理連接配接數/IP, 建立一個Acceptor偵聽者線程,初始化N個Processor線程,processors是一個線程數組,可以作為線程池使用,預設是三個,Acceptor線程和N個Processor線程中每個線程都獨立建立Selector.open()多路複用器,相關代碼在下面:

val numNetworkThreads = props.getIntInRange("num.network.threads", 3, (1, Int.MaxValue));

val serverChannel = openServerSocket(host, port);
           

範圍可以設定從1到Int的最大值。

2.2 Acceptor線程

def run() {
    serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    startupComplete()
    var currentProcessor = 0
    while(isRunning) {
      val ready = selector.select(500)
      if(ready > 0) {
        val keys = selector.selectedKeys()
        val iter = keys.iterator()
        while(iter.hasNext && isRunning) {
          var key: SelectionKey = null
          try {
            key = iter.next
            iter.remove()
            if(key.isAcceptable)
               accept(key, processors(currentProcessor))
            else
               throw new IllegalStateException("Unrecognized key state for acceptor thread.")

            // round robin to the next processor thread
            currentProcessor = (currentProcessor + 1) % processors.length
          } catch {
            case e: Throwable => error("Error while accepting connection", e)
          }
        }
      }
    }
    debug("Closing server socket and selector.")
    swallowError(serverChannel.close())
    swallowError(selector.close())
    shutdownComplete()
  }
           

2.1.1 注冊OP_ACCEPT事件

serverChannel.register(selector, SelectionKey.OP_ACCEPT);
           

2.1.2 内部邏輯

此處采用的是同步非阻塞邏輯,每隔500MS輪詢一次,關于同步非阻塞的知識點在

http://www.jianshu.com/p/e9c6690c0737

當有請求到來的時候采用輪詢的方式擷取一個Processor線程處理請求,代碼如下:

currentProcessor = (currentProcessor + 1) % processors.length
           

之後将代碼添加到newConnections隊列之後傳回,代碼如下:

def accept(socketChannel: SocketChannel) {  newConnections.add(socketChannel)  wakeup()}

//newConnections是一個線程安全的隊列,存放SocketChannel通道
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
           

2.3 kafka.net.Processor

override def run() {
    startupComplete()
    while(isRunning) {
      // setup any new connections that have been queued up
      configureNewConnections()
      // register any new responses for writing
      processNewResponses()
      val startSelectTime = SystemTime.nanoseconds
      val ready = selector.select(300)
      currentTimeNanos = SystemTime.nanoseconds
      val idleTime = currentTimeNanos - startSelectTime
      idleMeter.mark(idleTime)
      // We use a single meter for aggregate idle percentage for the thread pool.
      // Since meter is calculated as total_recorded_value / time_window and
      // time_window is independent of the number of threads, each recorded idle
      // time should be discounted by # threads.
      aggregateIdleMeter.mark(idleTime / totalProcessorThreads)

      trace("Processor id " + id + " selection time = " + idleTime + " ns")
      if(ready > 0) {
        val keys = selector.selectedKeys()
        val iter = keys.iterator()
        while(iter.hasNext && isRunning) {
          var key: SelectionKey = null
          try {
            key = iter.next
            iter.remove()
            if(key.isReadable)
              read(key)
            else if(key.isWritable)
              write(key)
            else if(!key.isValid)
              close(key)
            else
              throw new IllegalStateException("Unrecognized key state for processor thread.")
          } catch {
            case e: EOFException => {
              info("Closing socket connection to %s.".format(channelFor(key).socket.getInetAddress))
              close(key)
            } case e: InvalidRequestException => {
              info("Closing socket connection to %s due to invalid request: %s".format(channelFor(key).socket.getInetAddress, e.getMessage))
              close(key)
            } case e: Throwable => {
              error("Closing socket for " + channelFor(key).socket.getInetAddress + " because of error", e)
              close(key)
            }
          }
        }
      }
      maybeCloseOldestConnection
    }
    debug("Closing selector.")
    closeAll()
    swallowError(selector.close())
    shutdownComplete()
  }
           

先來重點看一下configureNewConnections這個方法:

private def configureNewConnections() {
    while(newConnections.size() > 0) {
      val channel = newConnections.poll()
      debug("Processor " + id + " listening to new connection from " + channel.socket.getRemoteSocketAddress)
      channel.register(selector, SelectionKey.OP_READ)
    }
  }
           

循環判斷NewConnections的大小,如果有值則彈出,并且注冊為OP_READ讀事件。

再回到主邏輯看一下read方法。

def read(key: SelectionKey) {
    lruConnections.put(key, currentTimeNanos)
    val socketChannel = channelFor(key)
    var receive = key.attachment.asInstanceOf[Receive]
    if(key.attachment == null) {
      receive = new BoundedByteBufferReceive(maxRequestSize)
      key.attach(receive)
    }
    val read = receive.readFrom(socketChannel)
    val address = socketChannel.socket.getRemoteSocketAddress();
    trace(read + " bytes read from " + address)
    if(read < 0) {
      close(key)
    } else if(receive.complete) {
      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeMs = time.milliseconds, remoteAddress = address)
      requestChannel.sendRequest(req)
      key.attach(null)
      // explicitly reset interest ops to not READ, no need to wake up the selector just yet
      key.interestOps(key.interestOps & (~SelectionKey.OP_READ))
    } else {
      // more reading to be done
      trace("Did not finish reading, registering for read again on connection " + socketChannel.socket.getRemoteSocketAddress())
      key.interestOps(SelectionKey.OP_READ)
      wakeup()
    }
  }
           
說明

1、把目前SelectionKey和事件循環時間放入LRU映射表中,将來檢查時回收連接配接資源。

2、建立BoundedByteBufferReceive對象,具體讀取操作由這個對象的readFrom方法負責進行,傳回讀取的位元組大小。

  • 如果讀取完成,則修改狀态為receive.complete,并通過requestChannel.sendRequest(req)将封裝好的Request對象放到RequestQueue隊列中。
  • 如果沒有讀取完成,則讓selector繼續偵聽OP_READ事件。

2.4 kafka.server.KafkaRequestHandler

def run() {
    while(true) {
      try {
        var req : RequestChannel.Request = null
        while (req == null) {
          // We use a single meter for aggregate idle percentage for the thread pool.
          // Since meter is calculated as total_recorded_value / time_window and
          // time_window is independent of the number of threads, each recorded idle
          // time should be discounted by # threads.
          val startSelectTime = SystemTime.nanoseconds
          req = requestChannel.receiveRequest(300)
          val idleTime = SystemTime.nanoseconds - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)
        }

        if(req eq RequestChannel.AllDone) {
          debug("Kafka request handler %d on broker %d received shut down command".format(
            id, brokerId))
          return
        }
        req.requestDequeueTimeMs = SystemTime.milliseconds
        trace("Kafka request handler %d on broker %d handling request %s".format(id, brokerId, req))
        apis.handle(req)
      } catch {
        case e: Throwable => error("Exception when handling request", e)
      }
    }
  }
           

KafkaRequestHandler也是一個事件處理線程,不斷的循環讀取requestQueue隊列中的Request請求資料,其中逾時時間設定為300MS,并将請求發送到apis.handle方法中處理,并将請求響應結果放到responseQueue隊列中去。

代碼如下:

try{
      trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress)
      request.requestId match {
        case RequestKeys.ProduceKey => handleProducerOrOffsetCommitRequest(request)
        case RequestKeys.FetchKey => handleFetchRequest(request)
        case RequestKeys.OffsetsKey => handleOffsetRequest(request)
        case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
        case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
        case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
        case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
        case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
        case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
        case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
        case requestId => throw new KafkaException("Unknown api code " + requestId)
      }
    } catch {
      case e: Throwable =>
        request.requestObj.handleError(e, requestChannel, request)
        error("error when handling request %s".format(request.requestObj), e)
    } finally
      request.apiLocalCompleteTimeMs = SystemTime.milliseconds
  }
           
說明如下:
參數 對應方法
RequestKeys.ProduceKey producer請求 ProducerRequest
RequestKeys.FetchKey consumer請求 FetchRequest
RequestKeys.OffsetsKey topic的offset請求 OffsetRequest
RequestKeys.MetadataKey topic中繼資料請求 TopicMetadataRequest
RequestKeys.LeaderAndIsrKey leader和isr資訊更新請求 LeaderAndIsrRequest
RequestKeys.StopReplicaKey 停止replica請求 StopReplicaRequest
RequestKeys.UpdateMetadataKey 更新中繼資料請求 UpdateMetadataRequest
RequestKeys.ControlledShutdownKey controlledShutdown請求 ControlledShutdownRequest
RequestKeys.OffsetCommitKey commitOffset請求 OffsetCommitRequest
RequestKeys.OffsetFetchKey consumer的offset請求 OffsetFetchRequest

2.5 Processor響應資料處理

private def processNewResponses() {  
  var curr = requestChannel.receiveResponse(id)  
  while(curr != null) {  
    val key = curr.request.requestKey.asInstanceOf[SelectionKey]  
    curr.responseAction match {  
      case RequestChannel.SendAction => {  
        key.interestOps(SelectionKey.OP_WRITE)  
        key.attach(curr)  
      }  
    }  
  curr = requestChannel.receiveResponse(id)  
  }  
}  
           

我們回到Processor線程類中,processNewRequest()方法是發送請求,那麼會調用processNewResponses()來處理Handler提供給用戶端的Response,把requestChannel中responseQueue的Response取出來,注冊OP_WRITE事件,将資料傳回給用戶端。