文章目錄
-
-
- 簡介
- 架構演進
- 了解RPC與SOA
- dubbo架構簡介
- SPI擴充機制
- 注冊中心
- 配置解析(xml)
- 服務暴露原理
- 服務引用原理
- 服務調用
- dubbo協定
- Cluster原理
- Filter原理
- 總結
-
簡介
目前的工作主要是做一些業務功能的開發,涉及到的系統架構以all-in-one的單體架構為主,也少量涉及分布式系統,對于涉及到的分布式系統,幾乎都是以dubbo為基礎建構的,日常也主要是對其進行維護以及對開發新的功能,之前對dubbo的使用也不是特别深入,應用場景也不複雜。dubbo作為目前最流行的分布式微服務架構之一,是非常值得好好去研究一下的。
《深入了解Apache Dubbo與實戰》這本書主要是對dubbo的架構及原理進行講解,中間穿插了一些實戰案例,通過這本書可以更好的了解dubbo,了解其設計理念。兩位作者商宗海和林琳,商宗海是dubbo的技術負責人,在dubbo開源到apache後,其本人稱為了PMC Member,林琳也是dubbo的核心貢獻者,這兩位目前都就職于螞蟻金服。
架構演進
在将dubbo前,我們先來簡單回顧一下應用系統架構的演進,這是一個老生常談的問題了。
對于一些比較簡單的應用,我們通常采用all-in-one的單體架構,這種架構對于開發和部署都比較友好,能夠在項目前期快速的響應需求,但随着業務的發展,系統中的功能越來越多,服務的體積也越來越龐大,另外團隊成員也越來越多,這時候就會代理很多問題,比如代碼重複問題,牽一發而動全身以及單次部署時間越來越長等,這時候就需要考慮對服務進行拆分。在拆分時,可以根據系統不同的特點采用橫向拆分或縱向拆分。如果系統中各個子子產品的業務邊界比較清楚,内部不存在跨子產品的互相調用的話,可以按照縱向拆分,把每一個子產品拆分成一個獨立的服務,單獨維護部署,但是如果子產品之間耦合度比較高的話,就可以考慮按照功能橫向拆分,把每一個功能子產品獨立成一個服務,每個服務交給一個獨立的小組負責,這樣,可以很大程度的提高團隊的開發效率,其實這時候系統架構就演進到了我們常說的SOA架構。如果架構再往前演進的話,就到了微服務架構,我覺得微服務架構可以簡單的了解為拆分粒度更小的服務化,微服務架構由于對目前比較流程的靈活開發和DevOps比較友好,是以也很流行,但是具體采用哪種架構,還是要根據具體的場景來決定。
了解RPC與SOA
在最開始接觸dubbo的時候,認為dubbo就是架構,後來随着工作經驗的增加,了解了dubbo本身其實是一種用來建構分布式系統的架構,通過dubbo建構的分布式系統遵循SOA架構,即面向服務的架構。那麼,dubbo所要解決的問題就是SOA架構的中的問題,我個人了解,dubbo解決了soa架構中的兩大核心問題:RPC調用和服務治理。
RPC是遠端過程調用的縮寫,它指的是一種基于網絡的跨程序調用,在java中實作rpc的方式主要有兩種,一種是基于jdk RMI實作,一種是基于自定義協定和反射實作,在幾乎所有的rpc架構中都是采用第二種方式實作。RPC解決了分布式系統的核心的遠端服務調用的問題。
但是SOA架構中的另一個重要功能就是服務治理,服務治理包括服務注冊與發現,服務的限流、降級,服務監控等。這些功能dubbo也都有提供。
是以說,dubbo是一個分布式服務架構,基于dubbo建構的分布式系統基于SOA架構,也就是說dubbo解決了SOA架構中的核心問題。
下面,我們從dubbo的世界裡暫時跳出來,看看一個通用的分布式服務架構都應該具有哪些功能。
在最上層是業務層,也就是具體的業務服務接口;下層是proxy代理層,proxy的主要作用是屏蔽rpc架構的底層細節,避免rpc架構侵入業務代碼;下面的兩層我了解成架構特性層,主要包含了負載均衡、逾時重試、叢集容錯以及服務路由;再往下,就屬于rpc層了,分為協定層、通信架構層和傳輸層,這幾層包含了rpc架構核心,包括協定、序列化/反序列化等,總是這裡的主要作用是把消息按照指定的格式發送到provider或傳回給consumer。
dubbo架構簡介
下面再來看看dubbo,下圖是dubbo的總體抽象架構
這裡包含了四個部分,分别是服務提供者,服務消費者,注冊中心,監控中心,包含了SOA架構中的兩個核心部分:RPC與服務治理。服務提供者異步向注冊中心;服務消費者異步從注冊中心拉取服務提供者資訊,并接受注冊中心的變更通知;服務消費者和服務提供者異步向監控中心上報資料;服務消費者同步調用服務提供者;
下面再從微觀層面看看dubbo在代碼層面的架構設計
對于dubbo來說這是一張非常經典的架構圖,我們可以看到,完全滿足一個分布式服務架構所應該具有的功能。
下面簡單按照我個人的了解簡單解釋一下
- service:業務層,包含了業務接口及具體實作;
- proxy:服務代理層,封裝底層實作細節,使業務層對rpc過程無感覺;
- registry:注冊層,負責與注冊中心進行互動,進行服務注冊于發現、接收注冊中心消息變更通知;
- cluster:叢集容錯層,提供了dubbo在叢集層面的核心功能,包括叢集容錯、負載均衡、服務路由、逾時重試等;
- monitor:監控層。負責上報資料到監控中心;
- protocol:協定層,這裡封裝了rpc調用的完整邏輯;
- exchange:資訊交換層,把對API接口的調用轉換成-Request-Response模式;
- transport:網絡傳輸層,即Netty或Mina;
- Serializable:序列化,将消息進行序列化或反序列化;
SPI擴充機制
上面的代碼架構是縱向觀察dubbo的結構,但從橫向看,dubbo會呈現出一個不一樣的風景。
在橫向上,dubbo采用的是一種為核心架構,核心基于SPI擴充點機制将不同的元件組合在一起,形成一個完整的分布式服務架構,同時,使用者還可以基于SPI标準實作自己的擴充元件,這些元件通過SPI可以很容易的整合到dubbo中,下圖是我對dubbo微核心和spi機制的了解
提到SPI,如果不了解dubbo的同學可能沒有聽過,或者有的同學對Java的SPI有一些了解,SPI利用政策模式,将接口與實作進行解耦合,用法如下
- 建立接口和實作類
public interface IHelloService {
String hello(String name);
}
public class HelloService1 implements IHelloService {
@Override
public String hello(String name) {
return "HelloService1:" + name;
}
}
public class HelloService2 implements IHelloService {
@Override
public String hello(String name) {
return "HelloService2:" + name;
}
}
- 在META-INF/services中建立以接口全名稱為名字的檔案
com.learn.dubbo.chapter1.echo.server.spi.IHelloService
- 在檔案中制定具體的實作類,多個實作類用換行符分隔
com.learn.dubbo.chapter1.echo.server.spi.HelloService1
com.learn.dubbo.chapter1.echo.server.spi.HelloService2
- 使用Java SPI api調用實作類
ServiceLoader<IHelloService> helloServices = ServiceLoader.load(IHelloService.class);
for (IHelloService helloService : helloServices) {
System.out.println(helloService.hello("zhangsan"));
}
輸出:
HelloService1:zhangsan
HelloService2:zhangsan
Java SPI存在兩個主要問題,一是不能按需加載實作類,隻能全部加載,二是對異常不友好,如果執行個體化對象出現異常 通過SPI相關的api沒辦法感覺異常,Dubbo并沒有直接使用Java SPI來實作自己的擴充點機制,而是自己實作了一個與Java SPI類似但功能更強的Dubbo SPI,具體展現在一下幾個方面
- 按需執行個體化接口的實作類
- 提供了IoC和AOP功能
- 能夠基于參數動态加載擷取實作類的對象
-
對異常友好
下面用具體事例看一下
@SPI("helloService1") //指定預設的SPI接口實作
public interface IHelloService {
//通過URL中的helloService參數對應的值進行自适應,主要為了配置setter實作依賴注入
@Adaptive("helloService")
String hello(URL url, String name);
}
//第一個實作類
public class HelloService1 implements IHelloService {
@Override
public String hello(URL url, String name) {
return "HelloService1:" + name;
}
}
//第二個實作類
public class HelloService2 implements IHelloService {
@Override
public String hello(URL url, String name) {
return "HelloService2:" + name;
}
}
//第三個實作類
public class MyHelloService implements IHelloService {
private IHelloService helloService;
//這裡有一個setter依賴注入,具體注入的對象是由URL中的參數決定的
public void setHelloService(IHelloService helloService) {
this.helloService = helloService;
}
@Override
public String hello(URL url, String name) {
System.out.println("my hello service ");
return helloService.hello(url, name);
}
}
//這是一個具有AOP功能的實作類
public class HelloServiceWrapper implements IHelloService{
private IHelloService helloService;
//構造方法中如果右依賴的擴充點,就會為對每個執行個體進行包裝
public HelloServiceWrapper(IHelloService helloService) {
this.helloService = helloService;
}
@Override
public String hello(URL url, String name) {
System.out.println("before...");
String result = helloService.hello(url, name);
System.out.println("after...");
return result;
}
}
//配置檔案 com.learn.dubbo.chapter1.echo.server.spi.IHelloService
helloService1=com.learn.dubbo.chapter1.echo.server.spi.HelloService1
helloService2=com.learn.dubbo.chapter1.echo.server.spi.HelloService2
myHelloService=com.learn.dubbo.chapter1.echo.server.spi.MyHelloService
wrapper=com.learn.dubbo.chapter1.echo.server.spi.HelloServiceWrapper
//調用
IHelloService helloService = ExtensionLoader.getExtensionLoader(IHelloService.class).getDefaultExtension();
URL url1 = new URL("","",0);
System.out.println(helloService.hello(url1,"lisi"));
IHelloService helloService2 = ExtensionLoader.getExtensionLoader(IHelloService.class).getExtension("helloService2");
URL url2 = new URL("","",0);
System.out.println(helloService2.hello(url2,"wangwu"));
IHelloService helloService3 = ExtensionLoader.getExtensionLoader(IHelloService.class).getExtension("myHelloService");
Map<String, String> p3 = new HashMap<>();
p3.put("helloService","helloService2");
URL url3 = new URL("","",0, p3);
helloService3.hello(url3, "zhaoliu");
下面在說說Dubbo SPI中涉及到的注解
- @SPI 标記一個借口為SPI擴充點
- @Adaptive 實作基于URL總線的自适應注入機制
- @Active
下面說說getExtension(String name)原理,這個方法的作用是根據指定的名稱擷取擴充點實作類,主要分為以下幾步
- 加載配置檔案,通過反射得到對應的Class對象并進行緩存
- 建立Class對應的執行個體化對象
- 進行setter依賴注入
- 進行Aop包裝處理
加載配置檔案時,dubbo相容了java spi,會從
/META-INF/services, /META-INF/dubbo, /META-INF/dubbo/internal
三個目錄下讀取,讀取完成後還會對對應的Class對象進行緩存,緩存時,會區分普通擴充類、自适應擴充類、包裝擴充類和Activate擴充類,以友善後續處理
緊接着就會通過名稱确定對應的Class對象,然後對其進行執行個體化。
然後處理setter依賴注入,這裡的原理很簡單,就是根據setter方法規則,截取setter方法的set後剩下的字元串作為待注入的擴充執行個體名稱,通過ExtensionFactory根據這個名稱來擷取執行個體,然後通過反射調用setter方法,将這個依賴的執行個體注入進去。
最後處理包裝類,也即是AOP特性,這裡也很簡單,由于在加載class時,dubbo根據實作類是否包含一個依賴其他擴充點的構造器來判斷是不是包裝擴充類,如果是,則單獨緩存,當進行AOP處理時,就會周遊在加載配置時得到的所有wrapper,然後通過反射調用構造器,層層注入 實作AOP。
下面看看getAdaptiveExtension()的原理,該方法可以通過URL總線完成自适應加載擴充點,先看看如何使用
@SPI
public interface AdaptiveService {
@Adaptive("impl")
void adaptive(URL url, String msg);
}
public class AdaptiveServiceImpl implements AdaptiveService {
@Override
public void adaptive(URL url, String msg) {
System.out.println("msg");
}
}
配置檔案:com.learn.dubbo.chapter1.echo.server.spi.AdaptiveService
impl1=com.learn.dubbo.chapter1.echo.server.spi.AdaptiveServiceImpl
調用
AdaptiveService adaptiveService = ExtensionLoader.getExtensionLoader(AdaptiveService.class).getAdaptiveExtension();
Map<String, String> p5 = new HashMap<>();
p5.put("impl","impl1");
URL url5 = new URL("","",0, p5);
adaptiveService.adaptive(url5, "aaaaa");
輸出:
aaaaa
可以看到,這裡是通過在url設定與@Adaptive中指定的key的值來動态覺得最終加載那個擴充類的,其實,底層實作的原理很簡單,簡單來說,就是dubbo底層動态生成了Xxx$Adaptive類,在這個類中,會從url中擷取key對應的值,然後通過getExtension(String name)來擷取指定的擴充點,下面是dubbo動态生成的代碼對應的字元串
package com.learn.dubbo.chapter1.echo.server.spi;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class AdaptiveService$Adaptive implements com.learn.dubbo.chapter1.echo.server.spi.AdaptiveService {
public void adaptive(com.alibaba.dubbo.common.URL arg0, java.lang.String arg1) {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("impl");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.learn.dubbo.chapter1.echo.server.spi.AdaptiveService) name from url(" + url.toString() + ") use keys([impl])");
com.learn.dubbo.chapter1.echo.server.spi.AdaptiveService extension = (com.learn.dubbo.chapter1.echo.server.spi.AdaptiveService)ExtensionLoader.getExtensionLoader(com.learn.dubbo.chapter1.echo.server.spi.AdaptiveService.class).getExtension(extName);
extension.adaptive(arg0, arg1);
}
}
然後dubbo會動态對這個字元串進行編譯,最終傳回這個代理類的執行個體給用戶端。dubbo中提供了三種編譯方式,分别是jdk 編譯、javaassist編譯和ActiveCompiler編譯,其中javaassist是預設的編譯方式(@SPI(“javaassist”)),可以通過
<dubbo:application compiler="jdk">
來修改。
最後,看一下getActivateExtension是怎麼實作的
@SPI
public interface IActivateService {
void active();
}
@Activate(order = 1, group = "test")
public class ActiveService1 implements IActivateService {
@Override
public void active() {
System.out.println("Active1");
}
}
@Activate(order = 2, group = "test")
public class ActivateService2 implements IActivateService {
@Override
public void active() {
System.out.println("active2");
}
}
配置:com.learn.dubbo.chapter1.echo.server.spi.IActivateService
activateService1=com.learn.dubbo.chapter1.echo.server.spi.ActiveService1
activateService2=com.learn.dubbo.chapter1.echo.server.spi.ActivateService2
調用
Map<String, String> m = new HashMap<>();
m.put("activate1", "activateService1");
m.put("activate2", "activateService2");
URL url6 = new URL("","",0, m);
List<IActivateService> activateService = ExtensionLoader.getExtensionLoader(IActivateService.class).getActivateExtension(url6, "activate1,activate2", "test");
for (IActivateService a : activateService) {
a.active();
}
最終會根據key和group來比對需要加載的擴充點實作,然後根據order指定的順序進行排序,最終傳回一個List集合。
注冊中心
registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-provider&dubbo=2.0.2&pid=31757®istry=zookeeper×tamp=1574044913995
zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-provider&dubbo=2.0.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=31787×tamp=1574045806869
的主要功能是實作服務注冊與發現,同時也對監控中心提供了一部分支援。dubbo支援不多種類型的注冊中心,例如zookeeper、redis、sofa、multicast等等,下面看一下注冊中心的總體結構
注冊中心主要可以分為3部分,分别是RegistryFactory、Registry和NotifyListener,其中,Registry可以了解為進行服務注冊與發現的核心元件,也可以簡單了解為dubbo與注冊中心之間的一個橋梁,從圖中可以看到,Registry內建了RegistryService接口,RegistryService提供了四個與服務注冊與發現相關的四個核心方法:
registry()/unregistry/subscribe/unsubscribe
,RegistryFactory則是用來建立Registry執行個體的,這裡面采用了
工廠方法
設計模式,同時,RegistryFactory在SPI的加持下,實作了基于dubbo消息總線URL的自适應模式
@SPI("dubbo")
public interface RegistryFactory {
@Adaptive({"protocol"})
Registry getRegistry(URL var1);
}
第二個部分是NotifyListener,這個元件被組合在Registry中,主要用來接口注冊中心的消息變更通知。
下面再解釋一下中間層的三個抽象類
public abstract class AbstractRegistryFactory implements RegistryFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class);
private static final ReentrantLock LOCK = new ReentrantLock();
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap();
public AbstractRegistryFactory() {}
public static Collection<Registry> getRegistries() {
return Collections.unmodifiableCollection(REGISTRIES.values());
}
public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
LOCK.lock();
try {
Iterator i$ = getRegistries().iterator();
while(i$.hasNext()) {
Registry registry = (Registry)i$.next();
try {
registry.destroy();
} catch (Throwable var6) {
LOGGER.error(var6.getMessage(), var6);
}
}
REGISTRIES.clear();
} finally {
LOCK.unlock();
}
}
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName()).addParameter("interface", RegistryService.class.getName()).removeParameters(new String[]{"export", "refer"});
String key = url.toServiceString();
LOCK.lock();
Registry var4;
try {
Registry registry = (Registry)REGISTRIES.get(key);
if (registry != null) {
var4 = registry;
return var4;
}
registry = this.createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
var4 = registry;
} finally {
LOCK.unlock();
}
return var4;
}
protected abstract Registry createRegistry(URL var1);
}
可以看到,在AbstractRegistryFactory中,實作了一些通用邏輯,例如Registry的消耗和緩存,對于建立Registry執行個體的createRegistry方法則交給了子類來實作。
再來看AbstractRegistry,代碼就不貼了,這個類中主要是實作了Registry的本地緩存,大概思路就是把資料寫入本地磁盤中,這也就保證了即使注冊中心故障了,在一定程度上也不會影響dubbo服務的調用。
以zookeeper為例,NotifyListener在執行subscribe是會設定到節點上,依次來箭筒zookeeper節點資料變化的。
另外,觸發注冊中心邏輯的地方是dubbo導出服務的時候
public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
Registry registry = this.getRegistry(originInvoker);
.....
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
}
private Registry getRegistry(Invoker<?> originInvoker) {
URL registryUrl = this.getRegistryUrl(originInvoker);
return this.registryFactory.getRegistry(registryUrl);
}
簡單來說就是根據URL消息總線自适應擷取RegistryFactory執行個體(這一步是由上層完成的),然後再用這個執行個體建立具體的Registry執行個體。
最後再總結一下注冊中心這個子產品涉及到的設計模式,對于SPI的應用設計到了
政策設計模式
,對于Registry和RegistryFactory體系涉及到了
模闆設計模式
和
工廠方法設計模式
。
配置解析(xml)
dubbo支援三種配置方式,分别是xml配置、注解配置和properties/yml配置,這裡以xml為例來說明,主要因為筆者目前在項目中用到的就是xml的配置方式。
在使用dubbo時,我們通常與Spring進行內建,是以dubbo也對此進行了支援,dubbo與spring內建的方式簡單,基于spring規範開發自己的NamespaceHandler,将dubbo設計的到bean統一交給spring進行管理。
熟悉spring的同學都知道,spring在解析xml的時候,xml中的namespace對應的url從spring.handlers中找到對應的NamespaceHandler,然後通過NamespaceHandler來解析這個标簽,是以,dubbo在config子產品中也提供了一個spring.handlers,具體内容如下
http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
這裡之是以有兩個是為了相容alibaba dubbo和apache dubbo,因為dubbo在2018年春節以及捐給了Apache進行維護。
接下來在看看DubboNamespaceHandler做了什麼
this.registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
this.registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
this.registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
this.registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
this.registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
this.registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
this.registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
this.registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
this.registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
this.registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
可以很清楚的看到,Dubbo對所有标簽的解析都是由DubboBeanDefinitionParser完成的,到這裡,我們可以和dubbo的代碼架構聯系起來,這裡就對應的config層,其中比較重要的像ApplicationConfig/RegistryConfig/ProviderConfig/ConsumerConfig/ProtocolConfig/ServiceConfig和ReferenceConfig,這些Config承載了dubbo的核心配置。
另外,在配置這裡需要注意的一點是各種配置的優先級,因為dubbo中,很多配置都可以在不同的地方進行設定,下面來看一下這張圖
根據這張圖的結構可以了解到,對于同一個配置,優先級為方法級别>service/reference > provider /consumer ,consumer > provider,另外,需要知道的是,很多配置在provider端的配置最終會通過注冊中心以消息總線URL為載體透傳到consumer端,例如典型的負載均衡、叢集容錯及失敗重試次數等,也就是說,這些設定最終都是在consumer端生效的。
對于配置解析這部分,我個人覺得了解到這兒就可以了,重點應該放在服務暴露、服務引用和服務調用這些核心邏輯上。
服務暴露原理
由于dubbo基于網絡/自定義協定+反射實作的rpc架構,是以,暴露服務簡單了解無非就是啟動網絡端口進行監聽,依次來接收用戶端tpc連結和調用的,但是當涉及到注冊中心時,還需要将暴露的服務添加到注冊中心,這樣服務消費者才能感覺到這個服務,另外,對于一個服務,如果在本地jvm中也存在服務調用者,為了保證效率,就不應該再通過網絡進行調用,再了解服務暴露時,我們就可以從這三個方面入手。
從xml解析的章節我們可以知道,承載
<dubbo:service/>
配置的Bean是ServiceBean這個類,服務暴露的啟動入口也在這裡
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
public void onApplicationEvent(ContextRefreshedEvent event) {
this.export();
}
}
由于ServiceBean實作了Spring的ApplicationListener,當接受到容器初始化完成的事件時,就會執行
this.export()
進行服務導出
上面的圖是一個簡易的方法調用時序圖(有點醜~ ~),下面直接把代碼貼上來看更加直覺
ServiceBean.java
public void onApplicationEvent(ContextRefreshedEvent event) {
this.export();
}
ServiceConfig.java
public synchronized void export() {
this.doExport();
}
protected synchronized void doExport() {
this.doExportUrls();
}
private void doExportUrls() {
List<URL> registryURLs = this.loadRegistries(true);
/*
這裡是重點:多協定支援
如果我們在配置的時候指定了多個<dubbo:protocol/>那麼這裡就會多每個Protocol進行處理
這裡的List<URL> 中的URL可以了解成一個<dubbo:protocol/>
我們詳細說一下:
1)如果不配置<dubbo:registry/>的話,暴露服務的時候,就會直接打開對應的端口,而不會向注冊中心注冊服務,此時的URL是以具體協定開頭的,比如dubbo://ip:port/xxx.xxx.Service?...
2)如果配置了<dubbo:registry/>,那麼,暴露服務的時候處理打開網絡端口,同時還會通過Registry向注冊中心注冊服務,此時的url以registry開頭,例如:registry://127.0.0.1:2181/xxx.xxx.Service?protocol=zookeeper&export=dubbo://ip:port/xxx.xxx.Service
**/
Iterator i$ = this.protocols.iterator();
while(i$.hasNext()) {
ProtocolConfig protocolConfig = (ProtocolConfig)i$.next();
this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//包括服務到本地,這個不主動設定的話都會執行,這裡采用injvm protocol協定
this.exportLocal(url);
// 如果不是隻在本地暴露
if (!"local".toString().equalsIgnoreCase(scope)) {
//如果<dubbo:registry/>不為空,對應上面的第二種情況
if (registryURLs != null && !registryURLs.isEmpty()) {
/**
這裡也需要特殊注意以下,dubbo的多注冊中心就是在這裡支援的。
每個registryUrl代表了一個注冊中心,也就是<dubbo:registry/>
*/
Iterator i$ = registryURLs.iterator();
while(i$.hasNext()) {
//建立Invoker代理
Invoker<?> invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//通過制定的協定進行暴露,這裡涉及到了Protocol的SPI自适應
Exporter<?> exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);
}else{
//如果沒有配置<dubbo:registry/>則直接暴露服務,不涉及注冊中心的邏輯
Invoker<?> invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);
}
}
}
//這裡是暴露服務到本地
private void exportLocal(URL url) {
if (!"injvm".equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString()).setProtocol("injvm").setHost("127.0.0.1").setPort(0);
ServiceClassHolder.getInstance().pushServiceClass(this.getServiceClass(this.ref));
Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(this.ref, this.interfaceClass, local));
this.exporters.add(exporter);
logger.info("Export dubbo service " + this.interfaceClass.getName() + " to local registry");
}
}
從代碼中可以看到,設計到了三處
protocol.export
,分别對應了本地暴露、待注冊中心的遠端包括和直接遠端暴露,那麼從代碼上看沒有什麼差別,是如何實作不同的服務暴露邏輯的呢?其實這裡就用到了Dubbo SPI機制,前面我們說dubbo是一種微核心架構,這裡的暴露服務的主邏輯就是微核心的一部分,而Protocol就是一個插件,我們看看Protocol的接口定義
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> var1) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> var1, URL var2) throws RpcException;
void destroy();
}
ServiceConfig中對protocol的初始化
private static final Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
可以很清楚的看到這是SPI的自适應模式,上面三種不同類型的服務導出就是根據URL中的protocol參數來決定具體選擇哪個Protocol實作的,實際上分别對應了
- InjvmProtocol
- RegistryProtocol
- DubboProtocol
下面以
RegistryProtocol
為例,看具體的實作邏輯
public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
//導出本地服務,這裡主要是開啟本地服務端口
RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker);
URL registryUrl = this.getRegistryUrl(originInvoker);
/**
這裡是擷取Registry對象,主要邏輯就是通過SPI擷取對應類型的RegistryFactory,然後
建立對應的Registry執行個體,以Zookeeper為例,就是ZookeeperRegistryFactory和
ZookeeperRegistry
*/
Registry registry = this.getRegistry(originInvoker);
URL registedProviderUrl = this.getRegistedProviderUrl(originInvoker);
boolean register = registedProviderUrl.getParameter("register", true);
//将暴露的服務利用Registry的register方法釋出到注冊中心
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
this.register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(registedProviderUrl);
RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker);
this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener):
//訂閱相關的url,例如provider會訂閱Service/configuators目錄,監聽配置資訊的變化
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
return new RegistryProtocol.DestroyableExporter(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}
/**
這裡的邏輯可以總結為:
1)從registry://... url中擷取到export參數,也就是用于暴露服務的url
2)通過自使用SPI機制擷取到對應的Protocol實作,通常是DubboProtocol
3)調用對應的export方法,将服務暴露到本地端口上
**/
private <T> RegistryProtocol.ExporterChangeableWrapper<T> doLocalExport(Invoker<T> originInvoker) {
String key = this.getCacheKey(originInvoker);
RegistryProtocol.ExporterChangeableWrapper<T> exporter = (RegistryProtocol.ExporterChangeableWrapper)this.bounds.get(key);
if (exporter == null) {
synchronized(this.bounds) {
exporter = (RegistryProtocol.ExporterChangeableWrapper)this.bounds.get(key);
if (exporter == null) {
Invoker<?> invokerDelegete = new RegistryProtocol.InvokerDelegete(originInvoker, this.getProviderUrl(originInvoker));
exporter = new RegistryProtocol.ExporterChangeableWrapper(this.protocol.export(invokerDelegete), originInvoker);
this.bounds.put(key, exporter);
}
}
}
return exporter;
}
//從Invoker中拿到registry:// 中對應的export參數
private URL getProviderUrl(Invoker<?> origininvoker) {
String export = origininvoker.getUrl().getParameterAndDecoded("export");
if (export != null && export.length() != 0) {
URL providerUrl = URL.valueOf(export);
return providerUrl;
} else {
throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
}
}
DubboProtocol.java
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap);
this.exporterMap.put(key, exporter);
Boolean isStubSupportEvent = url.getParameter("dubbo.stub.event", false);
Boolean isCallbackservice = url.getParameter("is_callback_service", false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter("dubbo.stub.event.methods");
if (stubServiceMethods != null && stubServiceMethods.length() != 0) {
this.stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
} else if (this.logger.isWarnEnabled()) {
this.logger.warn(new IllegalStateException("consumer [" + url.getParameter("interface") + "], has set stubproxy support event ,but no stub methods founded."));
}
}
/**
這裡很重要,開啟指定端口上的監聽
最終底層調用的是Netty的
*/
this.openServer(url);
this.optimizeSerialization(url);
return exporter;
}
到此服務暴露的基本邏輯就清楚了,下面簡單總結一下:
1)spring容器初始化完成時間觸發ServiceBean的onApplicationEvent方法
2)調用ServiceConfig的doExport/doExportUrls 在這裡處理多協定問題(循環周遊)
3)調用doExportUrlsFor1Protocol針對每種類型的協定進行導出(這裡處理多注冊中心問題,循環)
4)以injvm協定導出服務到本地,用于本地調用
5)以dubbo協定導出服務到遠端
6)registry://觸發RegistryProtocol的export方法建立于注冊中心的連結
7)從registry://中拿到export參數,對應的值為dubbo://xxx
8)dubbo://觸發DubboProtocol的export,開啟NettyServer,同時把Exportor放入到exporterMap中
9)通過Registry向注冊中心注冊服務,同時訂閱category/configurators/check目錄
10)導出服務完成
服務引用原理
在引用服務時,通常是通過
<dubbo:reference/>
或
@Reference
來做的,這兩個配置最終會被翻譯成ReferenceBean,由于ReferenceBean實作了FactoryBean接口,是以,按照spring的規範,在進行依賴注入時,就會調用它的getObject()方法來擷取執行個體,這個getObject方法就是在服務引用階段dubbo與spring連接配接的橋梁。
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
public Object getObject() throws Exception {
return this.get();
}
}
對于服務調用的整個流程這裡引用一張網上的時序圖,個人覺得畫的非常清晰
下面,就貼一下源碼,跟一下這個過程。由于getObject方法中調用了get方法,這個方法定義在了ReferenceConfig中
ReferenceConfig.java
/***
這三個屬性很重要,分别對應的Protocol、Cluster和ProxyFactory
通過dubbo spi的自适應模式來擷取執行個體,其次Cluster預設的是FailoverCluster,ProxyFactory預設是JavaAssistProxyFactory,這可以從這兩個接口中@SPI注解中的預設值中看到
*/
private static final Protocol refprotocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final Cluster cluster = (Cluster)ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
p
public synchronized T get() {
//這裡判斷是否已經建立過,如果沒有則調用init()方法初始化ref
if (this.ref == null) {
this.init();
}
return this.ref;
}
/**
在init方法中,總的來說就做了兩件事
1. 建立包用戶端與服務端之間的長連接配接的Invoker
2. 将Invoker封裝成業務接口對象指派給ref
但其中包含了不少細節,下面我們接着往下看
**/
private void init() {
//這一句是整個init()方法的核心,也就是傳回了最終的ref對象并指派給了ref
this.ref = this.createProxy(map);
}
private T createProxy(Map<String, String> map) {
.....
if (isJvmRefer) {
//這裡判斷是不是本地調用,如果是,則通過InjvmProtocol的refer方法引用,這裡用到的也是Dubbo SPI的自适應機制
URL url = (new URL("injvm", "127.0.0.1", 0, this.interfaceClass.getName())).addParameters(map);
this.invoker = refprotocol.refer(this.interfaceClass, url);
} else {
.....
/**
這裡的urls是
registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-consumer&dubbo=2.0.2&pid=44067&refer=application%3Decho-consumer%26cache%3Dtrue%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dcom.learn.dubbo.chapter1.echo.api.EchoService%26methods%3Decho%26mock%3Dtrue%26pid%3D44067%26register.ip%3D192.168.199.238%26revision%3D0.0.1%26side%3Dconsumer%26timeout%3D2000%26timestamp%3D1571895539207%26version%3D0.0.1®istry=zookeeper×tamp=1571895543957
是以refprotocol對應的肯定是RegistryProtocol,主要用來處理服務注冊于發現相關的邏輯
*/
if (this.urls.size() == 1) {
this.invoker = refprotocol.refer(this.interfaceClass, (URL)this.urls.get(0));
} else {
//這裡主要用來處理多協定,将多個Invoker合并成cluster
List<Invoker<?>> invokers = new ArrayList();
URL registryURL = null;
Iterator i$ = this.urls.iterator();
while(i$.hasNext()) {
url = (URL)i$.next();
/**
可以看到,主要的處理邏輯和上面是一樣的,隻不過外面套了一個循環
*/
invokers.add(refprotocol.refer(this.interfaceClass, url));
if ("registry".equals(url.getProtocol())) {
registryURL = url;
}
}
//這裡将多個Invoker合并成Cluster
if (registryURL != null) {
u = registryURL.addParameter("cluster", "available");
this.invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
this.invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
}
下面就來到了RegistryProtocol的refer方法,這裡的遠端服務引用的最核心邏輯
RegistryProtocol.java
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//這裡就是從url中擷取registry參數的值,一般是zookeeper,然後把這個參數從url中删掉
//最終url就變成了 zookeeper://....
url = url.setProtocol(url.getParameter("registry", "dubbo")).removeParameter("registry");
//擷取一個注冊中心執行個體,内部包含了用戶端與服務端的長連接配接,這個在注冊中心中已經講過
Registry registry = this.registryFactory.getRegistry(url);
....
this.doRefer(this.cluster, registry, type, url)
}
/**
這裡是宇宙的中心!
RegistryDirectory前面已經講過,它實作了NotifyListener接口,用來接收zookeeper目錄資料變化通知的
directory.setRegistry方法把Registry執行個體設定的進去,也就是持有了與zookeeper服務端的連結,友善操作資料
緊接着通過Registry.register方法向注冊中心注冊了自己,也就是在Xxx.Service/consumer目錄下
然後,通過directory的subscribe訂閱了prividers/configurators/routers 這些目錄
這裡directory.substribe内部其實也是調用了registry.substribe,隻不過,由于需要設定NotifyListener,索引通過directory對象來操作更加友善
最終利用director建立了Invoker并傳回了
*/
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory(type, url);
directory.setRegistry(registry);
directory.setProtocol(this.protocol);
Map<String, String> parameters = new HashMap(directory.getUrl().getParameters());
URL subscribeUrl = new URL("consumer", (String)parameters.remove("register.ip"), 0, type.getName(), parameters);
if (!"*".equals(url.getServiceInterface()) && url.getParameter("register", true)) {
registry.register(subscribeUrl.addParameters(new String[]{"category", "consumers", "check", String.valueOf(false)}));
}
directory.subscribe(subscribeUrl.addParameter("category", "providers,configurators,routers"));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
實際上到這裡,服務引用過程剛剛完成了一般,把服務消費者自己注冊到了注冊中心,我們還沒有看到服務消費者建立于服務提供者之間的長連接配接,那一步是在哪裡做的呢?其實是在RegistryDirectory的notify方法中。由于在subscribe的過程中,RegistryDirectory作為了接收zookeeper資料變化的對象設定到了zookeeper會話中,是以在首次執行subscribe的時候,就會觸發notify方法的回調
//這裡會傳回很多個urls,我們隻說provider相關的
public synchronized void notify(List<URL> urls) {
this.refreshInvoker(invokerUrls);
}
private void refreshInvoker(List<URL> invokerUrls) {
Map<String, Invoker<T>> newUrlInvokerMap = this.toInvokers(invokerUrls);
}
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
//這裡的url就是dubbo://xxxxxx,protocol.ref
invoker = new RegistryDirectory.InvokerDelegate(this.protocol.refer(this.serviceType, url), url, providerUrl);
}
可以看到最終還是會調用protocol.refer,這裡的protoco其實就是DubboProtocol了
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
this.optimizeSerialization(url);
DubboInvoker<T> invoker = new DubboInvoker(serviceType, url, this.getClients(url), this.invokers);
this.invokers.add(invoker);
return invoker;
}
private ExchangeClient[] getClients(URL url) {
//這裡的主要邏輯就是建立于服務端的長連接配接
}
到這裡,服務引用的主要流程就算完成了,可以看到,服務消費者與服務提供者之間的長連接配接時異步建立的。
最後,就是把Invoker封裝成業務接口,傳回給業務層了,我們在傳回到ReferenceConfig中看一下createProxy方法的最後一步
return proxyFactory.getProxy(this.invoker);
這裡是通過Jdk proxy或javaassist proxy(預設)建立符合業務接口規範的代理對象,最終,這個代理對象内部,就包含了與注冊中心互動的Regitry和與服務提供者之間的長連接配接,萬事俱備,隻差調用。
下面簡單總結一下:
1)依賴注入時,觸發ReferenceBean覆寫FactoryBean的getObject方法,調用init()開始服務引用
2)觸發ReferenceConfig的createProxy方法,進入核心邏輯
3)判斷是否為injvm模式,如果是,則直接通過InjvmProtocol的refer方法創new InjvmInvoker
4)如果不是,則首先加載注冊中心url
5)如果是單個注冊中心,則執行RegistryProtocol的refer,因為此時的url為registry://
6)建構Registry及RegistryDirectory
7)向注冊中心注冊目前consomer并訂閱相關的zookeeper目錄(category/providers/configurators/routers),訂閱時設定NotifyListener
8)訂閱完成後,異步觸發RegistryDirectory的notify方法
9)拿到具體的服務提供者url後(dubbo://),執行DubboProtocol的refer方法
10)在refer方法中,建立了DubboInvoker執行個體,其中包含了NettyClient
11)服務引用完成
服務調用
對于服務調用這部分邏輯,到網上找了好多部落格資料,也參考了一些書籍,但始終暈暈乎乎,這次下決心搞明白,結果還是自己通過debug代碼徹底搞明白了,不得不說,哎啥也不說了。
先上兩張按照我個人了解話的圖,有點醜,看不明白的同學請忽略。
Injvm模式
Dubbo模式
由于服務的導出和服務的引用主要涉及兩種模式,一種是Local模式一種是Remote模式,是以這裡以InjvmProtocol和DubboProtocol為例來說,先說InjvmProtocol模式。
在服務導出邏輯中,Injvm模式我們可以再回顧一下
InjvmProtocol.java
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//注意這裡的exporterMap
return new InjvmExporter(invoker, invoker.getUrl().getServiceKey(), this.exporterMap);
}
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
//再注意這裡的exporterMap
exporterMap.put(key, this);
}
這裡的exportMap是一個重點,上面的邏輯說明,每一個Exporter執行個體,都會被儲存在InjvmProtocl的exporterMap中。
然後我們在看看Injvm模式下的調用過程。在服務引用一節,我們知道了,最終拿到的服務引用對象實際上是一個代理對象,這個代理對象是通過下面的代碼建立出來的
return proxyFactory.getProxy(this.invoker);
這裡最終會調用到JavaassistProxyFactory類中的getProxy方法
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
是以,對于接口中某個方法的調用,最終會轉換成對InjvmInvoker的調用,也就是下面的方法
public Result doInvoke(Invocation invocation) throws Throwable {
//看這裡,實際上就是從InjvmProtocol的exporterMap中拿到了執行export時得到的Exporter對象
Exporter<?> exporter = InjvmProtocol.getExporter(this.exporterMap, this.getUrl());
if (exporter == null) {
throw new RpcException("Service [" + this.key + "] not found.");
} else {
RpcContext.getContext().setRemoteAddress("127.0.0.1", 0);
//最終,從Exporter中拿到導出時的Invoker執行個體,并調用對應的invok方法
return exporter.getInvoker().invoke(invocation);
}
}
然後,由于在進行服務導出時,導出的Invoker是通過ProxyFactory.getInvoker方法建立的
proxyFactory.getInvoker(this.ref, this.interfaceClass, local);
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(36) < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
也就是說,服務導出的Invoker是提供服務的對象的代理對象,最終調用就通過這個Inoker傳遞到了具體的服務對象,到這裡,我們可以回過頭反觀一下ProxyFactory這個元件
public class JavassistProxyFactory extends AbstractProxyFactory {
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(36) < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
這裡面有兩個方法,第一個方法getProxy是将Invoker轉換成對應的業務接口執行個體對象,而第二個方法getInvoker是将一個服務對象包裝成Invoker對象,也就是說,第一個方法在服務消費端用到,第二個方法在服務提供端被用到,服務提供端在釋出服務時把一個業務類的代理對象轉換成Invoker放到Exporter中,然後把Exporter放到exporterMap中與具體的接口關聯。
仔細思考一下,協定相關的Invoker隻會出現在服務消費端,例如InjvmInvoker、DoubleInvoker等,而服務提供端中對在對應的Exportor中對應了一個通過ProxyFactory建立的匿名Invoker,也就是AbstractProxyInvoker,就像下面這樣
(Consumer)InjvmInvoker/DubboInvoker ---> InjvmExporter/DoubleExporter ---> AbstractProxyInvoker --> ServiceImpl
下面再來看看dubbo協定下的調用過程,在服務引用一節,說到了代理對象内部實際上是封裝了DubboInvoker,下面是建立DubboInvoker對象的代碼
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
this.optimizeSerialization(url);
DubboInvoker<T> invoker = new DubboInvoker(serviceType, url, this.getClients(url), this.invokers);
this.invokers.add(invoker);
return invoker;
}
getClients方法實際上是建立了一個與服務提供者的長連接配接,當調用接口方法時,最終就會調用DubboInvoker中的doInvoke方法,其中的一句核心代碼如下
ResponseFuture future = currentClient.request(inv, timeout);
以上是用戶端的邏輯,下面我們再看看服務端,上面我們說到,與DubboInvoker對應的是DubboExporter,我們看看是怎麼執行的。
當用戶端向服務端發起調用時,dubbo自己封裝的一個ExchangeHandler就會被底層的NettyServer調用,這個ExchangeHandler被定義在了DubboProtocol中,我們來看一下它的核心邏輯
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
Invocation inv = (Invocation)message;
Invoker<?> invoker = DubboProtocol.this.getInvoker(channel, inv);
.....
invoker.invoke(inv);
}
}
/**
這裡我們可以看到,Handler最終會調用被封裝在DubboExporter中的Invoker,而這個Exporter
其實是從我們前面說的exporterMap中擷取的,是以我們就可以總結一下:
每一個Protocol的實作内部都有一個exporterMap域,用于緩存在該協定下報了的所有服務,也就是Exporter
**/
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
String serviceKey = serviceKey(port, path, (String)inv.getAttachments().get("version"), (String)inv.getAttachments().get("group"));
DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey);
return exporter.getInvoker();
}
總價一下,也就是,當服務消費者發起調用時,會觸發服務端的ExchangerHandler被Netty調用,然後從DubboProtocol中的exporterMap拿到具體的Exporter,從Exporter中拿到服務對象的代理Invoker,調用Invoker的invoke方法将請求傳遞到具體的服務對象中。
上面隻是我個人了解的相對核心的邏輯,當然服務調用還設計到與dubbo協定、參數序列化和反序列化的知識點,下一個小節會簡單說說dubbo協定,對于序列化的知識點讀者感興趣的話可以自己看一看。
總結一下:
由于服務導出與服務引用時,分為injvm和dubbo兩種,是以,服務調用也分為這兩種協定
injvm協定的服務調用:
先回憶一下injvm協定服務導出和引用過程,provider導出時,通過ProxyFactory建立了一個業務處理Bean的代理對象,将其轉換成了一個Invoker,通過invoke方法傳遞調用請求,并最終通過InjvmExporter包裝,這個InjvmExporter被緩存到了InjvmProtocol中的exporterMap中。。
在引用injvm服務時,InjvmProtocl的refer直接建立了一個InjvmInvoker,這個InjvmInvoker覆寫了doInvoke方法,直接從exportMap中拿到Exporter,也就是服務導出時放進去的Exporter,并拿到對應的Invoker直接進行調用。
由于,在服務引用時,通過ProxyFactory将Invoker代理成了業務接口,是以,當對業務接口進行調用時,最終救護執行到InjvmInvoker的doInvoke方法,而這個doInvoke就從exporterMap中拿到了Exporter找那個的Invoker,這個Invoker是服務導出時,通過ProxyFactory的getInvoke方法得到了,它是業務接口實作類的代理,這樣最終調用鍊就到了服務實作類中。
dubbo協定的服務調用:
同樣,我們先來回顧一下dubbo協定下的服務暴露和服務引用邏輯,簡單說,registry://協定觸發RegistryProtocol的export方法,該方法首先提起export參數,及dubbo:///,利用DubboProtocol的export方法啟動NettyServer,然後将對應的Exporter放到DubboProtocol的exporterMap中,然後RegistryProtocol的export繼續向zookeeper注冊服務,同時訂閱configurators目錄。
服務引用過程是registry://協定觸發RegistryProtocol的refer方法,然後建立Registry和ReigistryDirector,向注冊中心注冊自己并監聽相關目錄的資料變化,RegistryDirectory的notify方法被zookeeper觸發,得到所有provider的dubbo:///服務暴露協定url,然後通過DubboProtocl的refer方法建立DubboInvoker并啟動NettyClient。
那麼調用時,調用鍊就會進入到這個DubboInvoker中,DubboInvoker中的doInvoke方法,通過NettyClient将相關資料序列化後發送到provider監聽的端口,這時候就會觸發DubboProtocol中的ExchangeHandler,從replay方法中會從exporterMap中拿到對應的Exporter,從中擷取業務實作類的代理Invoker,最終将調用請求傳遞給業務實作類處理,然後原路傳回。
dubbo協定
下面這張圖是官網給出來的dubbo協定的組成圖
在dubbo協定中,用16個位元組(0-127 一共128個比特位)來定義協定頭,下面簡單說明一個每個部分的含義
- 0 ~ 15 bit:存儲魔法數字,dubbo用這個魔法數字辨別來解決tcp中的粘包/拆包問題
- 16 bit:資料包的類型,0位response,1為request
- 17 bit:調用方式,0為單向調用,1為雙向調用(dubbo進行優雅停機時發送的readonly消息中,該bit為空)
- 18 bit:事件辨別,0位請求/響應包,1位心跳包
- 19 ~ 23 bit:是序列化器id,用來統一consumer和provider的消息序列化方式,例如2為Hessian2,3為Java
- 24 ~ 31 bit:狀态辨別,例如20為OK,30位Client_timeout
- 32 ~ 95 bit:請求編号
- 96 ~ 127 bit:消息體長度
Cluster原理
回顧dubbo架構,我們了解到右一層Cluster,我個人了解Cluster就像它的字面意思一樣,即叢集的意思,是以,Cluster主要解決叢集層面的問題。那麼什麼是叢集呢,在dubbo中可以了解為多節點+對等部署的服務,這也是使用dubbo時的組常用用法。
在對等叢集中,我們通常面臨的問題有服務路由、叢集容錯、負載均衡,服務降級等問題,在dubbo中,這些問題實際上都是在Cluster層面解決的。
我看在回顧一下服務引用中的一段代碼
//周遊每一個服務提供者,并建立對應的Invoker
while(i$.hasNext()) {
url = (URL)i$.next();
invokers.add(refprotocol.refer(this.interfaceClass, url));
if ("registry".equals(url.getProtocol())) {
registryURL = url;
}
}
//将Invoker裝飾成Cluster
if (registryURL != null) {
u = registryURL.addParameter("cluster", "available");
this.invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
this.invoker = cluster.join(new StaticDirectory(invokers));
}
這裡的cluster采用SPI自适應模式加載
@SPI("failover")
public interface Cluster {
@Adaptive
<T> Invoker<T> join(Directory<T> var1) throws RpcException;
}
可以看到,預設的Cluster為FailoverCluster,那我們看一下FailoverCluster的代碼
public class FailoverCluster implements Cluster {
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker(directory);
}
}
很簡單,就是傳回了一個Invoker執行個體,隻不過這個Invoker包含了所有服務提供者對應的Invoker,也就是,将所有的Service provider Invoker聚合成一個Invoker,是以,最終調用就會被傳遞到這個FailoverClusterInvoker中,我們來看一下對應的源碼,其實還是挺簡單的
AbstractClusterInvoker.java 中的invoke封裝了cluster模式下的通用邏輯
public Result invoke(Invocation invocation) throws RpcException {
//檢查服務對應的Invoker是否可用
this.checkWhetherDestroyed();
LoadBalance loadbalance = null;
/**
這裡是一個重點:
這裡是擷取服務對應的Invoker清單,最終,其實是通過RegisterDirectory的doList從緩存中擷取的,
期間包含了服務路由的過程
*/
List<Invoker<T>> invokers = this.list(invocation);
if (invokers != null && !invokers.isEmpty()) {
//SPI自使用擷取負載均衡政策,預設為random權重随機模式
loadbalance = (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random"));
}
RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
//調用子類,預設是FailoverClusterInvoker的doInvoke方法
return this.doInvoke(invocation, invokers, loadbalance);
}
整體思路很清楚,首先通過RegistryDirectory從緩存中擷取所有的Invoker清單,然後經過路由規則過濾出必要的Invoker,然後,根據參數配置建立對應的負載均衡器,最後,根據負載均衡器選擇一個Invoker進行調用。
FailoverClusterInvoker.java
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
//擷取重試次數,也就是retries參數
int len = this.getUrl().getMethodParameter(invocation.getMethodName(), "retries", 2) + 1;
//循環周遊指定的次數
for(int i = 0; i < len; ++i) {
//這裡是,如果調用失敗了一次,要重新進行檢查和擷取可用的Invoker清單
if (i > 0) {
this.checkWhetherDestroyed();
copyinvokers = this.list(invocation);
this.checkInvokers(copyinvokers, invocation);
}
//這裡就是通過制定的負載均衡政策,選擇出一個Invoker
Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked);
//如果調用成功,則就将結果傳回,跳出循環
Result result = invoker.invoke(invocation);
Result var12 = result;
return var12;
}
}
下面,我們先來看一下list這個方法,在這個方法中設計到了路由比對
AbstractClusterInvoker.java
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = this.directory.list(invocation);
return invokers;
}
//這裡最終就進入到了Directory體系,可以看到,職責隔離做的真的很好
AbstractDirectory.java
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = this.doList(invocation);
List<Router> localRouters = this.routers;
if (localRouters != null && !localRouters.isEmpty()) {
Iterator i$ = localRouters.iterator();
while(i$.hasNext()) {
Router router = (Router)i$.next();
try {
if (router.getUrl() == null || router.getUrl().getParameter("runtime", false)) {
invokers = router.route(invokers, this.getConsumerUrl(), invocation);
}
} catch (Throwable var7) {
logger.error("Failed to execute router: " + this.getUrl() + ", cause: " + var7.getMessage(), var7);
}
}
return invokers;
}
RegistryDirectory.java
public List<Invoker<T>> doList(Invocation invocation) {
/**
從這裡可以看到,實際上Invoker是從緩存中擷取的
在服務引用邏輯中,由于會觸發notify方法的執行,最終,會将服務提供者對應的Invoker放到
methodInvokerMap這個本地緩存中,這樣可以避免每次進行服務調用是,都從zookeeper擷取
減少注冊中心的壓力
*/
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;
invokers = (List)localMethodInvokerMap.get(methodName + "." + args[0]);
invokers = (List)localMethodInvokerMap.get(methodName);
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = (List)iterator.next();
}
return (List)(invokers == null ? new ArrayList(0) : invokers);
}
Cluster層涉及到了Router、Directory、LoadBalance,而且不同的Cluster政策都對應不同的實作,在dubbo中提供了一下幾種常用的叢集政策:
- Failover
- FastFail
- Failback
- Failsafe
- Forking
-
Broadcat
提供了四種負載均衡政策
- random 基于權重的随機
- roundrobin 叢集權重的平滑輪詢
- lastactive 最少激活原則,處理請求最少的節點會被配置設定更少的請求
- consistenthash 一緻性hash
這裡涉及到的每一種類型的元件都有自己不同的實作方式,暫時就不擴充了,以後有時間在對詳細看具體的實作方式。
Filter原理
Filter是Dubbo中一個很重要的元件,有一些核心的功能都是以及Filter實作的,例如流量控制,記錄通路日志,服務監控等,Dubbo中的Filter原理實際上和Java web中的Filter一樣,都是在調用目标方法的前後做一些功能上的增強。
Dubbo Filter作為核心元件之一,也是基于SPI模式實作,Dubbo提供了一個Protocl擴充點的包裝類
ProtocolFilterWrapper
,這樣再加載
Protocol
的時候,
ProtocolFilterWrapper
就會被作為包裝類自動被加載,我們可以看一下它的源碼
//實作了Protocol接口
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
/**
這裡是重點:遵循了Dubbo SPI 的AOP規範,在構造函數中依賴Protocol元件
**/
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
} else {
this.protocol = protocol;
}
}
/**
在服務導出和服務引用時,調用了buildInvokerChain方法對Invoker進行包裝
**/
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return "registry".equals(invoker.getUrl().getProtocol()) ? this.protocol.export(invoker) : this.protocol.export(buildInvokerChain(invoker, "service.filter", "provider"));
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return "registry".equals(url.getProtocol()) ? this.protocol.refer(type, url) : buildInvokerChain(this.protocol.refer(type, url), "reference.filter", "consumer");
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
final Invoker<T> last = invoker;
// 這裡通過SPI機制加載所有被激活的Filter
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
/**
這裡的周遊順序可以留意一下,是倒序周遊的
例如有 filter a b c,invoker 周遊順序是
c - invoker , b - c - invoker, a - b - c - invoker,這樣就可以保證最終的執行順序是 a - b - c invoker了 。
*/
for(int i = filters.size() - 1; i >= 0; --i) {
final Filter filter = (Filter)filters.get(i);
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
//把所有的Filter層層包裝在原始的Invoker外面
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(last, invocation);
}
public void destroy() {
invoker.destroy();
}
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
}
總結
以我個人的經驗和眼界來看,目前單體架構、SOA架構和微服務架構這三種架構模式在IT系統中都有一定的占有率。單體架構适合簡單的服務,一般利用nginx做反向代理進行叢集對等部署,SOA架構适用于中等規模的團隊,以整塊服務為機關劃分開發團隊,應用非常廣泛,微服務架構模式目前應該還處在初期使用階段,這種架構可以很好的适應docker、雲計算場景,随着雲計算的快速普及,微服務架構也會越來越流行。
dubbo作為一個SOA架構下的分布式服務開發架構解決了SOA架構中的兩大難題,分别是RPC和服務治理。華為林海峰的《分布式服務架構原理與實踐》從抽象層面比價系統的介紹了分布式系統的組成部分,可以說,一個完整的SOA架構包含了很多的内容,例如、叢集容錯、序列化、通信模型、協定、注冊中心、監控中心等等,dubbo可以說是目前在開源領域最流程的一款分布式服務架構了,但是它還有很多需要完善的地方。
從宏觀角度觀察dubbo,主要包含了四個部分,分别是服務調用者、服務提供者、注冊中心和監控中心,服務提供者異步向注冊中心注冊服務,服務消費者異步從注冊中心拉去服務提供者資訊,同時監控相關配置資訊的變化,服務提供者和服務消費者異步向注冊中心上報監控資料。
從微觀角度看dubbo,我覺得可以從橫向和縱向兩個角度來看,從橫向角度看,dubbo采用了一種“微核心+富生态元件”的方式建構,利用SPI擴充點機制,組合各個核心元件,為dubbo帶來了高可擴充性,為dubbo的快發展提供了很好的支撐。
從縱向角度看dubbo,也就是從dubbo的執行流程,可以分為Service/Config/Proxy/Cluster/Monitor/Protocol/Exchanger/Transport/Serializable等,清晰的劃分了各層元件的職責邊界,也為dubbo的高可擴充性提供了支援。
是以,dubbo在我看來是一個360度立體高可擴充的架構。
dubbo提供了多種注冊中心的實作,目前最新版本中提供了zookeeper、redis、sofa、mutiple、etcd3等多種協定,在配置上支援xml配置、注解配置以及properties/yml配置支援,序列化方案就更加豐富了例如在大資料領域廣泛應用的avro、jdk、kryo、json、hessian2、fastjson等等,支援的協定也很多,例如dubbo、http、rest、thirft、rmi、webservice、hessiam等等,能夠很好的滿足各種分布式系統以及異構系統內建的需要,在叢集容錯方面也有豐富的側臉,例如Failover、Fastfail、Failback、Failsafe、Forking、Broadcast等等,同時還利用Mock機制支援服務的平滑降級能力,但是在監控方面dubbo還沒有提供一個完整的監控中心,在使用dubbo時,一般也不會使用dubbo官方提供的監控中心,因為功能過于簡陋,一般都是自己實作。
據悉,dubbo未來會向Service mesh方向方法,雖然我還沒太弄清楚什麼是Service mesh,不過應該是一種更牛逼的、能夠使用雲原生的架構模式吧,期待。