歡迎通路我的個人部落格休息的風
與dubbo服務釋出相對的,是引用服務進行調用的過程,這個很多步驟都是與服務釋出相對的,但是也有特有的地方,比如,負載均衡 ,叢集容錯等。這篇部落格,我們主要關注dubbo服務調用的一個核心過程。
dubbo服務調用的主要過程:将調用資訊注冊到zk上-> 通知RegistryDirectory重新整理可用服務清單->重新整理過程中,新服務會與netty服務端建立連接配接,并封裝到DubboInvoker中。-> 選擇失敗政策通過負載均衡算法,選擇服務端具體哪個服務去執行 -> 通過netty傳回執行結果。
Dubbo服務調用過程圖如下:(看不清,請點選新的頁簽進行檢視)
在ReferenceConfig.init的方法裡,會把配置資訊封閉成一個map,然後去建構一個代理類。在建構這個代理類,會用DubboProtocol擷取DubboInvoker。
private T createProxy(Map<String, String> map) {
// 省略一些代碼。。。
// 通過注冊中心配置拼裝URL
List<URL> us = loadRegistries(false);
if (us != null && us.size() > 0) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls == null || urls.size() == 0) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
if (urls.size() == 1) {
//會調用RegistryProtocol.refer
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
//省略很多代碼
// 建立服務代理,使用JavassistProxyFactory去建立
return (T) proxyFactory.getProxy(invoker);
}
在RegistryProtocol裡,會建立一個RegistryDirectory,這個對象儲存的調用服務的資訊,并且也是一個監聽。
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
//調用zk去真實的注冊
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
//最終也是調用zk去訂閱監聽,監聽器是RegistryDirectory
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
return cluster.join(directory);
}
在真正注冊到zk上,注冊監聽器到zk相應路徑上,會調用RegistryDirectory的notify通知方法,去擷取可用的服務清單。
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
//初始化值
//省略代碼
// configurators
if (configuratorUrls != null && configuratorUrls.size() > 0) {
this.configurators = toConfigurators(configuratorUrls);
}
// routers
if (routerUrls != null && routerUrls.size() > 0) {
List<Router> routers = toRouters(routerUrls);
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators; // local reference
// 合并override參數
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && localConfigurators.size() > 0) {
for (Configurator configurator : localConfigurators) {
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// providers 重新整理服務清單
refreshInvoker(invokerUrls);
}
其中調用refershInvoker方法時,會去調用toInvokers把URl清單轉換為Invoker清單。
private void refreshInvoker(List<URL> invokerUrls) {
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
this.forbidden = true; // 禁止通路
this.methodInvokerMap = null; // 置空清單
destroyAllInvokers(); // 關閉所有Invoker
} else {
this.forbidden = false; // 允許通路
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
this.cachedInvokerUrls.addAll(invokerUrls);//緩存invokerUrls清單,便于交叉對比
}
if (invokerUrls.size() == 0) {
return;
}
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// 将URL清單轉成Invoker清單
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名映射Invoker清單
//省略一些代碼
。。。。。。。
}
}
在将這些url轉換為invoker時,會使用DubboProtocol去建立與netty服務端的連接配接。
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.size() == 0) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
//封裝url資訊
//省略一些代碼
// 緩存key為沒有合并消費端參數的URL,不管消費端如何合并參數,如果服務端URL發生變化,則重新refer
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // 緩存中沒有,重新refer
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
//服務可以就使用DubboProtocol去建立連接配接,封裝DubboInvoker
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
}
在DubboProtocol.refer裡,會去建立netty用戶端連接配接。
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
最後是傳回一個DubboInvoker對象,在這之前會調用getClients先去擷取或建立用戶端連接配接。是共享連接配接就擷取之前的加接,不是的話就建立新的連接配接。
private ExchangeClient initClient(URL url) {
//設定一些參數,比如心跳機制等
// client type setting.
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
String version = url.getParameter(Constants.DUBBO_VERSION_KEY);
boolean compatible = (version != null && version.startsWith("1.0."));
url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
//預設開啟heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
//省略一些代碼
ExchangeClient client;
try {
//設定連接配接應該是lazy的
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
//跟服務釋出相對,具體調用HeaderExchanger.connect
client = Exchangers.connect(url, requestHandler);
}
在HeaderExchanger裡,也是通過具體的NettyTransports連接配接去建立一個NettyClient,在doOpen方法裡,建立netty連接配接,熟悉netty的應該對這段代碼不陌生
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(channelFactory);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
//使用适配,用DubboCodec去編碼解碼
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
//工作線程的處理handler
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
在RegistryDirectory中,緩存了可用的服務清單Invoker,之後具體使用哪個服務去調用,就看選擇的負載均衡政策了。在 RegistryProtocol.doRefer裡,會去執行“cluster.join(directory)”,這裡的cluster看是使用哪種失敗政策。預設會調用FailoverCluster.join。在FailoverClusterInvoker.doInvoke裡,會去選擇具體要調用哪參機器上的哪個服務。
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//重試時,進行重新選擇,避免重試時invoker清單已發生變化.
//注意:如果清單發生了變化,那麼invoked判斷會失效,因為invoker示例已經改變
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
//重新檢查一下
checkInvokers(copyinvokers, invocation);
}
//使用loadbalance負載均衡選擇哪個服務invoker去調用
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
//調用執行調用鍊
Result result = invoker.invoke(invocation);
//省略一些代碼。。。。
最後真正調用服務端的DubboProtocol.requestHandler.reply去處理,
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
Invocation inv = (Invocation) message;
//從DubboExporter裡擷取Invoker
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要處理高版本調用低版本的問題
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
//callback的處理,省略。。。。
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
//反射調用,真正用戶端方法調用的地方
return invoker.invoke(inv);
}
處理結果再通過netty傳回到用戶端resutlt中。
到此,一個服務調用過程就基本完成了。與之前的dubbo 源碼學習筆記 (二) —— dubbo釋出服務的過程相對應,形成dubbo服務釋出調用的整個過程。這裡總結一下整體過程:
dubbo工作原理分為服務釋出和服務引用兩個核心過程。
1、服務釋出的時候,DubboProtocol将調用鍊封裝為DubboExporter,放入到netty服務端工作線程池中
2、URL配置資訊注冊到zk注冊中心,并注冊override監聽,觸發訂閱。
3、服務引用時,也将服務引用的資訊封裝成URL并注冊到zk注冊中心,同時監聽category、providers、configurators、routers這四個目錄節點(才能感覺服務上下線的變化)。将這些資訊包裝成DubboInvoker用戶端的調用鍊,傳回代理。
4、用戶端使用代理進行調用時,經過負載均衡,選擇其中一個服務的URL,根據URl資訊與netty建立連接配接,發送Invocation到netty服務端;服務端在工作線程池中找一個線程,處理Invocation,并把RpcResult結果傳回給用戶端;用戶端接收解析RpcResult,擷取處理結果。
這樣整個過程就基本結束。了解這一整個過程,也就相當于了解了dubbo整體的原理。也就能從大的方向把握這一架構,當然,dubbo架構還有很多值得學習研究的地方。在之後的部落格中會繼續分析。