天天看點

Dubbo-RemotingServer基本介紹關系類圖ServerDelegate-服務端代表AbstractServerPeer接口ServerPeer(待完善)

服務端接口-RemotingServer

  • 基本介紹
  • 關系類圖
  • ServerDelegate-服務端代表
  • AbstractServer
    • 屬性
    • 構造方法
    • 抽象方法
    • 實作Endpoint接口
      • send
      • close
      • ExecutorUtil相關方法
    • 覆寫AbstractPeer方法
  • Peer接口
  • ServerPeer(待完善)

基本介紹

RemotingServer繼承了Endpoint,Endpoint提供了發送和關閉功能,是以RemotingServer也具有這個兩個功能

/**
  * is bound.
  * 是否綁定 本地端口,提供服務,即是否啟動成功,可連接配接,可接受消息
  *
  * @return bound
  */
boolean isBound();

/**
  * get channels.
  * 擷取 目前server 對應的 多個通道
  *
  * @return channels
  */
Collection<Channel> getChannels();

/**
  * get channel.
  * 根據 遠端位址 擷取 對應通道
  *
  * @param remoteAddress
  * @return channel
  */
Channel getChannel(InetSocketAddress remoteAddress);
           

關系類圖

Dubbo-RemotingServer基本介紹關系類圖ServerDelegate-服務端代表AbstractServerPeer接口ServerPeer(待完善)
  • 實作RemotingServer的有AbstractServer、ServerDelegate、2個NettyServer
  • 繼承RemotingServer的有Peer、ExchangeServer、HttpServer。ExchangeServer會放在交換層内容整理中。HttpServer會放在遠端通信内容整理中

ServerDelegate-服務端代表

同ClientDelegate 和 ChannelDelegate,裝飾模式

AbstractServer

  • AbstractServer是 在傳輸層-transport 對RemotingServer的 抽象實作,主要是對打開伺服器前的參數處理,以及關閉伺服器的處理
  • AbstractServer繼承自AbstractEndpoint抽象類,而AbstractEndpoint又繼承自AbstractPeer,在AbstractServer中複寫了connected和disconnected方法
  • AbstractServer還覆寫了Endpoint接口相關方法

屬性

// 線程池名稱
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
// 線程池
ExecutorService executor;
// 服務位址
private InetSocketAddress localAddress;
// 綁定位址
private InetSocketAddress bindAddress;
// 伺服器最大可接受連接配接數
private int accepts;
// ExecutorRepository是一個可以獲得線程池的SPI
private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();

           

構造方法

由于AbstractServer繼承了AbstractEndpoint,AbstractEndpoint繼承了AbstractPeer,AbstractPeer維護者URL和ChannelHandler的關系,是以AbstractClient的構造方法入參也必須有這兩個參數,在這一點上同AbstractClient

String BIND_IP_KEY = "bind.ip";
String BIND_PORT_KEY = "bind.port";
String ANYHOST_KEY = "anyhost";
String ACCEPTS_KEY = "accepts";
int DEFAULT_ACCEPTS = 0;

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
	// 這裡會先執行AbstractPeer的構造方法,再執行AbstractEndpoint的構造方法
	super(url, handler);
	
	// 根據url的host和port擷取本地位址,設定localAddress屬性
	localAddress = getUrl().toInetSocketAddress();
	
	// 從url中擷取綁定的ip和port參數值
	String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
	int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
	// 如果url的anyhost參數值為true 或 bindIp是localhost,把bindIp=0.0.0.0(這個其實就是監聽本機上所有ipv4的位址)
	if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
		bindIp = ANYHOST_VALUE;
	}
	// 設定bindAddress屬性
	bindAddress = new InetSocketAddress(bindIp, bindPort);
	
	// 擷取url最大連接配接數,預設是0
	// 0并不是最大連接配接是0個,而是不對連接配接數做限制
	this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
	try {
		// 子類實作
		doOpen();
		// ...
	} catch (Throwable t) {
		// ...
	}
	
	// 根據url建立線程池
	executor = executorRepository.createExecutorIfAbsent(url);
}
           

executorRepository.createExecutorIfAbsent中的内容可以參考Dubbo-Client中的“initExecutor-初始化線程池”

抽象方法

AbstractServer有2個抽象方法,分别是doOpen、doClose,他們具體由子類實作

實作Endpoint接口

send

@Override
public void send(Object message, boolean sent) throws RemotingException {
	// 擷取server所有對應的通道,判斷是否已連接配接,循環發送
	Collection<Channel> channels = getChannels();
	for (Channel channel : channels) {
		if (channel.isConnected()) {
			channel.send(message, sent);
		}
	}
}
           

close

@Override
public void close() {
	// ...

	ExecutorUtil.shutdownNow(executor, 100);

	try {
		// AbstractPeer#close
		super.close();
	} catch (Throwable e) {
		// ...
	}

	try {
		// 子類實作
		doClose();
	} catch (Throwable e) {
		// ...
	}
}

@Override
public void close(int timeout) {
	ExecutorUtil.gracefulShutdown(executor, timeout);
	close();
}
           

ExecutorUtil相關方法

public static void gracefulShutdown(Executor executor, int timeout) {
	if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
		return;
	}
	final ExecutorService es = (ExecutorService) executor;
	try {
		// Disable new tasks from being submitted
		// 将線程池狀态置為SHUTDOWN
		// 當此方法被調用時,ExecutorService停止接收新的任務 并且 等待已經送出的任務(包含送出正在執行和送出未執行)執行完成。
		// 當所有送出任務執行完畢,線程池即被關閉
		es.shutdown();
	} catch (SecurityException | NullPointerException ex2) {
		return;
	}
	try {
		// Wait a while for existing tasks to terminate
		if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
			es.shutdownNow();
		}
	} catch (InterruptedException ex) {
		es.shutdownNow();
		Thread.currentThread().interrupt();
	}
	if (!isTerminated(es)) {
		// 如果沒有中止成功,則新起一個線程執行關閉
		newThreadToCloseExecutor(es);
	}
}


public static void shutdownNow(Executor executor, final int timeout) {
	if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
		return;
	}
	final ExecutorService es = (ExecutorService) executor;
	try {
		// 将線程池狀态置為STOP。企圖立即停止,事實上不一定:
		// 跟shutdown()一樣,先停止接收外部送出的任務
		// 忽略隊列裡等待的任務
		// 嘗試将正在跑的任務interrupt中斷
		// 傳回未執行的任務清單
		es.shutdownNow();
	} catch (SecurityException | NullPointerException ex2) {
		return;
	}
	try {
		// 設定逾時時間及機關。當等待超過設定時間時,會監測ExecutorService是否已經關閉,若關閉則傳回true,否則傳回false。
		// 一般情況下會和shutdown方法組合使用
		es.awaitTermination(timeout, TimeUnit.MILLISECONDS);
	} catch (InterruptedException ex) {
		Thread.currentThread().interrupt();
	}
	if (!isTerminated(es)) {
		// 如果沒有中止成功,則新起一個線程執行關閉
		newThreadToCloseExecutor(es);
	}
}
           

覆寫AbstractPeer方法

@Override
public void connected(Channel ch) throws RemotingException {
	// If the server has entered the shutdown process, reject any new connection
	// 如果服務端已經進入關閉過程中,拒絕任何新的連接配接,關閉目前的channel
	if (this.isClosing() || this.isClosed()) {
		// ...
		ch.close();
		return;
	}

	// 如果最大可連接配接數 < channel的個數,關閉目前的channel
	if (accepts > 0 && getChannels().size() > accepts) {
		// ...
		ch.close();
		return;
	}
	
	// AbstractPeer#connected,通過ChannelHandler連接配接通道
	super.connected(ch);
}

@Override
public void disconnected(Channel ch) throws RemotingException {
	Collection<Channel> channels = getChannels();
	if (channels.isEmpty()) {
		// ...
	}
	super.disconnected(ch);
}
           

Peer接口

在繼承RemotingServer接口的基礎上增加了leave方法

在ServerPeer和ExchangeServerPeer都有具體實作

ServerPeer(待完善)

ServerPeer 繼承了ServerDelegate,同時還實作了Peer接口

繼續閱讀