天天看點

spark 如何用netty實作akka的actor模型

Spark的中,通過netty實作了類似akka的actor機制。

在spark中,一個EndPointData就類似一個akka中的actor。

private class EndpointData(
    val name: String,
    val endpoint: RpcEndpoint,
    val ref: NettyRpcEndpointRef) {
  val inbox = new Inbox(ref, endpoint)
}
           

一個EndPointData由name,endpoint,ref,inbox四個成員組成。

其中name為該actor的名稱,endpoint處理遠端消息的處理,ref則為遠端消息調用時用戶端的目标對象,inbox則為endpoint處理消息來源時的消息存放信箱。

val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK

logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
  s"${executorData.executorHost}.")

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
           

舉一個例子,當driver端向executor端下發task的時候,在CoarseGrainedSchedulerBackend中将會根據任務需要執行的所在executorId尋找得到相應的EndPointData,之後得到EndPointData下的ref調用send()方法即完成了一次遠端調用,而調用的具體task資料将會被序列化,并包裝在LaunchTask中,LaunchTask類似actor模型下對應的動作。

其中,ref為遠端executor在向driver端注冊executor的時候序列化後,在driver處反序列化的。在其send()方法中,會構造一條RequestMessage。

override def send(message: Any): Unit = {
  require(message != null, "Message is null")
  nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
}
           

RequestMessage包含了調用方的遠端位址,該ref本身供遠端尋找具體的endpoint進行操作,和上述的具體消息。

private def postToOutbox(receiver: NettyRpcEndpointRef, message: OutboxMessage): Unit = {
  if (receiver.client != null) {
    message.sendWith(receiver.client)
  } else {
    require(receiver.address != null,
      "Cannot send message to client endpoint with no listen address.")
    val targetOutbox = {
      val outbox = outboxes.get(receiver.address)
      if (outbox == null) {
        val newOutbox = new Outbox(this, receiver.address)
        val oldOutbox = outboxes.putIfAbsent(receiver.address, newOutbox)
        if (oldOutbox == null) {
          newOutbox
        } else {
          oldOutbox
        }
      } else {
        outbox
      }
    }
    if (stopped.get) {
      // It's possible that we put `targetOutbox` after stopping. So we need to clean it.
      outboxes.remove(receiver.address)
      targetOutbox.stop()
    } else {
      targetOutbox.send(message)
    }
  }
}
           

在包裝好了的具體消息,如果該次消息為遠端調用,将會判斷是否已經完成了對應遠端位址的netty用戶端,如果已經完成了對應用戶端的建立,将會直接發送,否則将會構造一個對應用戶端的outbox緩存将要發送的資料,outbox中維護了一條隊列用來存放等到發送的消息,直到對應用戶端初試化完成後将會把outbox中的消息通過對應的netty用戶端發送。

到executor觀察其如何進行監聽并處理相關的遠端調用。

遠端調用的主要處理在于Dispatcher類,其生成與封裝在NettyRpcEnv的初始化過程中,NettyRpcEnv的初試化随着整個SparkEnv的初始化被調用。

private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

private val transportContext = new TransportContext(transportConf,
  new NettyRpcHandler(dispatcher, this, streamManager))
           

其中Dispatcher被初試化并封裝在NettyRpcHandler當中,而NettyRpcHandler将作為上下文的一員,後續在Netty服務端的初始化當中,将該NettyRpcHandler的調用寫到ChannelHandler重寫的channelRead()方法中。

@Override
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
  if (request instanceof RequestMessage) {
    requestHandler.handle((RequestMessage) request);
  } else if (request instanceof ResponseMessage) {
    responseHandler.handle((ResponseMessage) request);
  } else {
    ctx.fireChannelRead(request);
  }
}

private void processRpcRequest(final RpcRequest req) {
  try {
    rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
      @Override
      public void onSuccess(ByteBuffer response) {
        respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
      }

      @Override
      public void onFailure(Throwable e) {
        respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
      }
    });
  } catch (Exception e) {
    logger.error("Error while invoking RpcHandler#receive() on RPC id " + req.requestId, e);
    respond(new RpcFailure(req.requestId, Throwables.getStackTraceAsString(e)));
  } finally {
    req.body().release();
  }
}
           

在channelRead()方法中,遠端調用在完成類型判斷後,最後将會通過NettyRpcHandler的handle()方法處理。

override def receive(
    client: TransportClient,
    message: ByteBuffer,
    callback: RpcResponseCallback): Unit = {
  val messageToDispatch = internalReceive(client, message)
  dispatcher.postRemoteMessage(messageToDispatch, callback)
}
           

在NettyRpcHandler中,首先通過internalReceive()方法,反序列化為RequestMessage,之後調用Dispatcher的 postRemoteMessage()方法,定位到具體的endpoint上前去執行。

private def postMessage(
    endpointName: String,
    message: InboxMessage,
    callbackIfStopped: (Exception) => Unit): Unit = {
  val error = synchronized {
    val data = endpoints.get(endpointName)
    if (stopped) {
      Some(new RpcEnvStoppedException())
    } else if (data == null) {
      Some(new SparkException(s"Could not find $endpointName."))
    } else {
      data.inbox.post(message)
      receivers.offer(data)
      None
    }
  }
  // We don't need to call `onStop` in the `synchronized` block
  error.foreach(callbackIfStopped)
}
           

Dispatcher在postMessage()方法中完成對于遠端調用的分發,此處将會根據遠端攜帶的ref的name從本地name與EndPointData的映射關系中找到對應的endpoint進行處理,如果能夠找到,将會把這條消息放到endpoint對應的inbox郵箱中等待endpoint擷取後進行處理。

在Dispatcher中,維護着一個線程池,執行MessageLoop線程。

private class MessageLoop extends Runnable {
  override def run(): Unit = {
    try {
      while (true) {
        try {
          val data = receivers.take()
          if (data == PoisonPill) {
            // Put PoisonPill back so that other MessageLoops can see it.
            receivers.offer(PoisonPill)
            return
          }
          data.inbox.process(Dispatcher.this)
        } catch {
          case NonFatal(e) => logError(e.getMessage, e)
        }
      }
    } catch {
      case ie: InterruptedException => // exit
    }
  }
}
           

MessageLoop将會不斷從所有有消息但是還未處理的EndPointData集合中依次取出,調用其inbox信箱的process()方法調用endpoint的receive()方法根據遠端調用的事件類型選擇具體的邏輯處理對應的遠端調用。

env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
  env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
           

服務端通過RpcEnv的setUpEndpoint()方法注冊一個EndPointData到Dispatcher當中,以便在上述消息分發的時候,可以找到對應的EndPointData,也就是處理事件遠端調用的actor模型。

繼續閱讀