服務端接口-RemotingServer
- 基本介紹
- 關系類圖
- ServerDelegate-服務端代表
- AbstractServer
-
- 屬性
- 構造方法
- 抽象方法
- 實作Endpoint接口
-
- send
- close
- ExecutorUtil相關方法
- 覆寫AbstractPeer方法
- Peer接口
- ServerPeer(待完善)
基本介紹
RemotingServer繼承了Endpoint,Endpoint提供了發送和關閉功能,是以RemotingServer也具有這個兩個功能
/**
* is bound.
* 是否綁定 本地端口,提供服務,即是否啟動成功,可連接配接,可接受消息
*
* @return bound
*/
boolean isBound();
/**
* get channels.
* 擷取 目前server 對應的 多個通道
*
* @return channels
*/
Collection<Channel> getChannels();
/**
* get channel.
* 根據 遠端位址 擷取 對應通道
*
* @param remoteAddress
* @return channel
*/
Channel getChannel(InetSocketAddress remoteAddress);
關系類圖
![](https://img.laitimes.com/img/_0nNw4CM6IyYiwiM6ICdiwiIyVGduV2YfNWawNCM38FdsYkRGZkRG9lcvx2bjxiNx8VZ6l2cs0TPn5EMZRlT3FFRPpHOsJGcohVYsR2MMBjVtJWd0ckW65UbM5WOHJWa5kHT20ESjBjUIF2X0hXZ0xCMx81dvRWYoNHLrdEZwZ1Rh5WNXp1bwNjW1ZUba9VZwlHdssmch1mclRXY39CXldWYtlWPzNXZj9mcw1ycz9WL49zZwpmLwkzM2EzNxYTMxMTNwEjMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
- 實作RemotingServer的有AbstractServer、ServerDelegate、2個NettyServer
- 繼承RemotingServer的有Peer、ExchangeServer、HttpServer。ExchangeServer會放在交換層内容整理中。HttpServer會放在遠端通信内容整理中
ServerDelegate-服務端代表
同ClientDelegate 和 ChannelDelegate,裝飾模式
AbstractServer
- AbstractServer是 在傳輸層-transport 對RemotingServer的 抽象實作,主要是對打開伺服器前的參數處理,以及關閉伺服器的處理
- AbstractServer繼承自AbstractEndpoint抽象類,而AbstractEndpoint又繼承自AbstractPeer,在AbstractServer中複寫了connected和disconnected方法
- AbstractServer還覆寫了Endpoint接口相關方法
屬性
// 線程池名稱
protected static final String SERVER_THREAD_POOL_NAME = "DubboServerHandler";
// 線程池
ExecutorService executor;
// 服務位址
private InetSocketAddress localAddress;
// 綁定位址
private InetSocketAddress bindAddress;
// 伺服器最大可接受連接配接數
private int accepts;
// ExecutorRepository是一個可以獲得線程池的SPI
private ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
構造方法
由于AbstractServer繼承了AbstractEndpoint,AbstractEndpoint繼承了AbstractPeer,AbstractPeer維護者URL和ChannelHandler的關系,是以AbstractClient的構造方法入參也必須有這兩個參數,在這一點上同AbstractClient
String BIND_IP_KEY = "bind.ip";
String BIND_PORT_KEY = "bind.port";
String ANYHOST_KEY = "anyhost";
String ACCEPTS_KEY = "accepts";
int DEFAULT_ACCEPTS = 0;
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
// 這裡會先執行AbstractPeer的構造方法,再執行AbstractEndpoint的構造方法
super(url, handler);
// 根據url的host和port擷取本地位址,設定localAddress屬性
localAddress = getUrl().toInetSocketAddress();
// 從url中擷取綁定的ip和port參數值
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
// 如果url的anyhost參數值為true 或 bindIp是localhost,把bindIp=0.0.0.0(這個其實就是監聽本機上所有ipv4的位址)
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
// 設定bindAddress屬性
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 擷取url最大連接配接數,預設是0
// 0并不是最大連接配接是0個,而是不對連接配接數做限制
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
try {
// 子類實作
doOpen();
// ...
} catch (Throwable t) {
// ...
}
// 根據url建立線程池
executor = executorRepository.createExecutorIfAbsent(url);
}
executorRepository.createExecutorIfAbsent中的内容可以參考Dubbo-Client中的“initExecutor-初始化線程池”
抽象方法
AbstractServer有2個抽象方法,分别是doOpen、doClose,他們具體由子類實作
實作Endpoint接口
send
@Override
public void send(Object message, boolean sent) throws RemotingException {
// 擷取server所有對應的通道,判斷是否已連接配接,循環發送
Collection<Channel> channels = getChannels();
for (Channel channel : channels) {
if (channel.isConnected()) {
channel.send(message, sent);
}
}
}
close
@Override
public void close() {
// ...
ExecutorUtil.shutdownNow(executor, 100);
try {
// AbstractPeer#close
super.close();
} catch (Throwable e) {
// ...
}
try {
// 子類實作
doClose();
} catch (Throwable e) {
// ...
}
}
@Override
public void close(int timeout) {
ExecutorUtil.gracefulShutdown(executor, timeout);
close();
}
ExecutorUtil相關方法
public static void gracefulShutdown(Executor executor, int timeout) {
if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
return;
}
final ExecutorService es = (ExecutorService) executor;
try {
// Disable new tasks from being submitted
// 将線程池狀态置為SHUTDOWN
// 當此方法被調用時,ExecutorService停止接收新的任務 并且 等待已經送出的任務(包含送出正在執行和送出未執行)執行完成。
// 當所有送出任務執行完畢,線程池即被關閉
es.shutdown();
} catch (SecurityException | NullPointerException ex2) {
return;
}
try {
// Wait a while for existing tasks to terminate
if (!es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
es.shutdownNow();
}
} catch (InterruptedException ex) {
es.shutdownNow();
Thread.currentThread().interrupt();
}
if (!isTerminated(es)) {
// 如果沒有中止成功,則新起一個線程執行關閉
newThreadToCloseExecutor(es);
}
}
public static void shutdownNow(Executor executor, final int timeout) {
if (!(executor instanceof ExecutorService) || isTerminated(executor)) {
return;
}
final ExecutorService es = (ExecutorService) executor;
try {
// 将線程池狀态置為STOP。企圖立即停止,事實上不一定:
// 跟shutdown()一樣,先停止接收外部送出的任務
// 忽略隊列裡等待的任務
// 嘗試将正在跑的任務interrupt中斷
// 傳回未執行的任務清單
es.shutdownNow();
} catch (SecurityException | NullPointerException ex2) {
return;
}
try {
// 設定逾時時間及機關。當等待超過設定時間時,會監測ExecutorService是否已經關閉,若關閉則傳回true,否則傳回false。
// 一般情況下會和shutdown方法組合使用
es.awaitTermination(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
if (!isTerminated(es)) {
// 如果沒有中止成功,則新起一個線程執行關閉
newThreadToCloseExecutor(es);
}
}
覆寫AbstractPeer方法
@Override
public void connected(Channel ch) throws RemotingException {
// If the server has entered the shutdown process, reject any new connection
// 如果服務端已經進入關閉過程中,拒絕任何新的連接配接,關閉目前的channel
if (this.isClosing() || this.isClosed()) {
// ...
ch.close();
return;
}
// 如果最大可連接配接數 < channel的個數,關閉目前的channel
if (accepts > 0 && getChannels().size() > accepts) {
// ...
ch.close();
return;
}
// AbstractPeer#connected,通過ChannelHandler連接配接通道
super.connected(ch);
}
@Override
public void disconnected(Channel ch) throws RemotingException {
Collection<Channel> channels = getChannels();
if (channels.isEmpty()) {
// ...
}
super.disconnected(ch);
}
Peer接口
在繼承RemotingServer接口的基礎上增加了leave方法
在ServerPeer和ExchangeServerPeer都有具體實作
ServerPeer(待完善)
ServerPeer 繼承了ServerDelegate,同時還實作了Peer接口