文章目录
-
-
- 简介
- 架构演进
- 理解RPC与SOA
- dubbo架构简介
- SPI扩展机制
- 注册中心
- 配置解析(xml)
- 服务暴露原理
- 服务引用原理
- 服务调用
- dubbo协议
- Cluster原理
- Filter原理
- 总结
-
简介
目前的工作主要是做一些业务功能的开发,涉及到的系统架构以all-in-one的单体架构为主,也少量涉及分布式系统,对于涉及到的分布式系统,几乎都是以dubbo为基础构建的,日常也主要是对其进行维护以及对开发新的功能,之前对dubbo的使用也不是特别深入,应用场景也不复杂。dubbo作为目前最流行的分布式微服务架构之一,是非常值得好好去研究一下的。
《深入理解Apache Dubbo与实战》这本书主要是对dubbo的架构及原理进行讲解,中间穿插了一些实战案例,通过这本书可以更好的理解dubbo,理解其设计理念。两位作者商宗海和林琳,商宗海是dubbo的技术负责人,在dubbo开源到apache后,其本人称为了PMC Member,林琳也是dubbo的核心贡献者,这两位目前都就职于蚂蚁金服。
架构演进
在将dubbo前,我们先来简单回顾一下应用系统架构的演进,这是一个老生常谈的问题了。
对于一些比较简单的应用,我们通常采用all-in-one的单体架构,这种架构对于开发和部署都比较友好,能够在项目前期快速的响应需求,但随着业务的发展,系统中的功能越来越多,服务的体积也越来越庞大,另外团队成员也越来越多,这时候就会代理很多问题,比如代码重复问题,牵一发而动全身以及单次部署时间越来越长等,这时候就需要考虑对服务进行拆分。在拆分时,可以根据系统不同的特点采用横向拆分或纵向拆分。如果系统中各个子模块的业务边界比较清楚,内部不存在跨模块的相互调用的话,可以按照纵向拆分,把每一个模块拆分成一个独立的服务,单独维护部署,但是如果模块之间耦合度比较高的话,就可以考虑按照功能横向拆分,把每一个功能模块独立成一个服务,每个服务交给一个独立的小组负责,这样,可以很大程度的提高团队的开发效率,其实这时候系统架构就演进到了我们常说的SOA架构。如果架构再往前演进的话,就到了微服务架构,我觉得微服务架构可以简单的理解为拆分粒度更小的服务化,微服务架构由于对目前比较流程的敏捷开发和DevOps比较友好,所以也很流行,但是具体采用哪种架构,还是要根据具体的场景来决定。
理解RPC与SOA
在最开始接触dubbo的时候,认为dubbo就是架构,后来随着工作经验的增加,理解了dubbo本身其实是一种用来构建分布式系统的框架,通过dubbo构建的分布式系统遵循SOA架构,即面向服务的架构。那么,dubbo所要解决的问题就是SOA架构的中的问题,我个人理解,dubbo解决了soa架构中的两大核心问题:RPC调用和服务治理。
RPC是远程过程调用的缩写,它指的是一种基于网络的跨进程调用,在java中实现rpc的方式主要有两种,一种是基于jdk RMI实现,一种是基于自定义协议和反射实现,在几乎所有的rpc框架中都是采用第二种方式实现。RPC解决了分布式系统的核心的远程服务调用的问题。
但是SOA架构中的另一个重要功能就是服务治理,服务治理包括服务注册与发现,服务的限流、降级,服务监控等。这些功能dubbo也都有提供。
所以说,dubbo是一个分布式服务框架,基于dubbo构建的分布式系统基于SOA架构,也就是说dubbo解决了SOA架构中的核心问题。
下面,我们从dubbo的世界里暂时跳出来,看看一个通用的分布式服务框架都应该具有哪些功能。
在最上层是业务层,也就是具体的业务服务接口;下层是proxy代理层,proxy的主要作用是屏蔽rpc框架的底层细节,避免rpc框架侵入业务代码;下面的两层我理解成框架特性层,主要包含了负载均衡、超时重试、集群容错以及服务路由;再往下,就属于rpc层了,分为协议层、通信框架层和传输层,这几层包含了rpc框架核心,包括协议、序列化/反序列化等,总是这里的主要作用是把消息按照指定的格式发送到provider或返回给consumer。
dubbo架构简介
下面再来看看dubbo,下图是dubbo的总体抽象架构
这里包含了四个部分,分别是服务提供者,服务消费者,注册中心,监控中心,包含了SOA架构中的两个核心部分:RPC与服务治理。服务提供者异步向注册中心;服务消费者异步从注册中心拉取服务提供者信息,并接受注册中心的变更通知;服务消费者和服务提供者异步向监控中心上报数据;服务消费者同步调用服务提供者;
下面再从微观层面看看dubbo在代码层面的架构设计
对于dubbo来说这是一张非常经典的架构图,我们可以看到,完全满足一个分布式服务框架所应该具有的功能。
下面简单按照我个人的理解简单解释一下
- service:业务层,包含了业务接口及具体实现;
- proxy:服务代理层,封装底层实现细节,使业务层对rpc过程无感知;
- registry:注册层,负责与注册中心进行交互,进行服务注册于发现、接收注册中心消息变更通知;
- cluster:集群容错层,提供了dubbo在集群层面的核心功能,包括集群容错、负载均衡、服务路由、超时重试等;
- monitor:监控层。负责上报数据到监控中心;
- protocol:协议层,这里封装了rpc调用的完整逻辑;
- exchange:信息交换层,把对API接口的调用转换成-Request-Response模式;
- transport:网络传输层,即Netty或Mina;
- Serializable:序列化,将消息进行序列化或反序列化;
SPI扩展机制
上面的代码架构是纵向观察dubbo的结构,但从横向看,dubbo会呈现出一个不一样的风景。
在横向上,dubbo采用的是一种为内核架构,内核基于SPI扩展点机制将不同的组件组合在一起,形成一个完整的分布式服务框架,同时,用户还可以基于SPI标准实现自己的扩展组件,这些组件通过SPI可以很容易的整合到dubbo中,下图是我对dubbo微内核和spi机制的理解
提到SPI,如果不了解dubbo的同学可能没有听过,或者有的同学对Java的SPI有一些了解,SPI利用策略模式,将接口与实现进行解耦合,用法如下
- 创建接口和实现类
public interface IHelloService {
String hello(String name);
}
public class HelloService1 implements IHelloService {
@Override
public String hello(String name) {
return "HelloService1:" + name;
}
}
public class HelloService2 implements IHelloService {
@Override
public String hello(String name) {
return "HelloService2:" + name;
}
}
- 在META-INF/services中创建以接口全名称为名字的文件
com.learn.dubbo.chapter1.echo.server.spi.IHelloService
- 在文件中制定具体的实现类,多个实现类用换行符分隔
com.learn.dubbo.chapter1.echo.server.spi.HelloService1
com.learn.dubbo.chapter1.echo.server.spi.HelloService2
- 使用Java SPI api调用实现类
ServiceLoader<IHelloService> helloServices = ServiceLoader.load(IHelloService.class);
for (IHelloService helloService : helloServices) {
System.out.println(helloService.hello("zhangsan"));
}
输出:
HelloService1:zhangsan
HelloService2:zhangsan
Java SPI存在两个主要问题,一是不能按需加载实现类,只能全部加载,二是对异常不友好,如果实例化对象出现异常 通过SPI相关的api没办法感知异常,Dubbo并没有直接使用Java SPI来实现自己的扩展点机制,而是自己实现了一个与Java SPI类似但功能更强的Dubbo SPI,具体体现在一下几个方面
- 按需实例化接口的实现类
- 提供了IoC和AOP功能
- 能够基于参数动态加载获取实现类的对象
-
对异常友好
下面用具体事例看一下
@SPI("helloService1") //指定默认的SPI接口实现
public interface IHelloService {
//通过URL中的helloService参数对应的值进行自适应,主要为了配置setter实现依赖注入
@Adaptive("helloService")
String hello(URL url, String name);
}
//第一个实现类
public class HelloService1 implements IHelloService {
@Override
public String hello(URL url, String name) {
return "HelloService1:" + name;
}
}
//第二个实现类
public class HelloService2 implements IHelloService {
@Override
public String hello(URL url, String name) {
return "HelloService2:" + name;
}
}
//第三个实现类
public class MyHelloService implements IHelloService {
private IHelloService helloService;
//这里有一个setter依赖注入,具体注入的对象是由URL中的参数决定的
public void setHelloService(IHelloService helloService) {
this.helloService = helloService;
}
@Override
public String hello(URL url, String name) {
System.out.println("my hello service ");
return helloService.hello(url, name);
}
}
//这是一个具有AOP功能的实现类
public class HelloServiceWrapper implements IHelloService{
private IHelloService helloService;
//构造方法中如果右依赖的扩展点,就会为对每个实例进行包装
public HelloServiceWrapper(IHelloService helloService) {
this.helloService = helloService;
}
@Override
public String hello(URL url, String name) {
System.out.println("before...");
String result = helloService.hello(url, name);
System.out.println("after...");
return result;
}
}
//配置文件 com.learn.dubbo.chapter1.echo.server.spi.IHelloService
helloService1=com.learn.dubbo.chapter1.echo.server.spi.HelloService1
helloService2=com.learn.dubbo.chapter1.echo.server.spi.HelloService2
myHelloService=com.learn.dubbo.chapter1.echo.server.spi.MyHelloService
wrapper=com.learn.dubbo.chapter1.echo.server.spi.HelloServiceWrapper
//调用
IHelloService helloService = ExtensionLoader.getExtensionLoader(IHelloService.class).getDefaultExtension();
URL url1 = new URL("","",0);
System.out.println(helloService.hello(url1,"lisi"));
IHelloService helloService2 = ExtensionLoader.getExtensionLoader(IHelloService.class).getExtension("helloService2");
URL url2 = new URL("","",0);
System.out.println(helloService2.hello(url2,"wangwu"));
IHelloService helloService3 = ExtensionLoader.getExtensionLoader(IHelloService.class).getExtension("myHelloService");
Map<String, String> p3 = new HashMap<>();
p3.put("helloService","helloService2");
URL url3 = new URL("","",0, p3);
helloService3.hello(url3, "zhaoliu");
下面在说说Dubbo SPI中涉及到的注解
- @SPI 标记一个借口为SPI扩展点
- @Adaptive 实现基于URL总线的自适应注入机制
- @Active
下面说说getExtension(String name)原理,这个方法的作用是根据指定的名称获取扩展点实现类,主要分为以下几步
- 加载配置文件,通过反射得到对应的Class对象并进行缓存
- 创建Class对应的实例化对象
- 进行setter依赖注入
- 进行Aop包装处理
加载配置文件时,dubbo兼容了java spi,会从
/META-INF/services, /META-INF/dubbo, /META-INF/dubbo/internal
三个目录下读取,读取完成后还会对对应的Class对象进行缓存,缓存时,会区分普通扩展类、自适应扩展类、包装扩展类和Activate扩展类,以方便后续处理
紧接着就会通过名称确定对应的Class对象,然后对其进行实例化。
然后处理setter依赖注入,这里的原理很简单,就是根据setter方法规则,截取setter方法的set后剩下的字符串作为待注入的扩展实例名称,通过ExtensionFactory根据这个名称来获取实例,然后通过反射调用setter方法,将这个依赖的实例注入进去。
最后处理包装类,也即是AOP特性,这里也很简单,由于在加载class时,dubbo根据实现类是否包含一个依赖其他扩展点的构造器来判断是不是包装扩展类,如果是,则单独缓存,当进行AOP处理时,就会遍历在加载配置时得到的所有wrapper,然后通过反射调用构造器,层层注入 实现AOP。
下面看看getAdaptiveExtension()的原理,该方法可以通过URL总线完成自适应加载扩展点,先看看如何使用
@SPI
public interface AdaptiveService {
@Adaptive("impl")
void adaptive(URL url, String msg);
}
public class AdaptiveServiceImpl implements AdaptiveService {
@Override
public void adaptive(URL url, String msg) {
System.out.println("msg");
}
}
配置文件:com.learn.dubbo.chapter1.echo.server.spi.AdaptiveService
impl1=com.learn.dubbo.chapter1.echo.server.spi.AdaptiveServiceImpl
调用
AdaptiveService adaptiveService = ExtensionLoader.getExtensionLoader(AdaptiveService.class).getAdaptiveExtension();
Map<String, String> p5 = new HashMap<>();
p5.put("impl","impl1");
URL url5 = new URL("","",0, p5);
adaptiveService.adaptive(url5, "aaaaa");
输出:
aaaaa
可以看到,这里是通过在url设置与@Adaptive中指定的key的值来动态觉得最终加载那个扩展类的,其实,底层实现的原理很简单,简单来说,就是dubbo底层动态生成了Xxx$Adaptive类,在这个类中,会从url中获取key对应的值,然后通过getExtension(String name)来获取指定的扩展点,下面是dubbo动态生成的代码对应的字符串
package com.learn.dubbo.chapter1.echo.server.spi;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class AdaptiveService$Adaptive implements com.learn.dubbo.chapter1.echo.server.spi.AdaptiveService {
public void adaptive(com.alibaba.dubbo.common.URL arg0, java.lang.String arg1) {
if (arg0 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = url.getParameter("impl");
if(extName == null)
throw new IllegalStateException("Fail to get extension(com.learn.dubbo.chapter1.echo.server.spi.AdaptiveService) name from url(" + url.toString() + ") use keys([impl])");
com.learn.dubbo.chapter1.echo.server.spi.AdaptiveService extension = (com.learn.dubbo.chapter1.echo.server.spi.AdaptiveService)ExtensionLoader.getExtensionLoader(com.learn.dubbo.chapter1.echo.server.spi.AdaptiveService.class).getExtension(extName);
extension.adaptive(arg0, arg1);
}
}
然后dubbo会动态对这个字符串进行编译,最终返回这个代理类的实例给客户端。dubbo中提供了三种编译方式,分别是jdk 编译、javaassist编译和ActiveCompiler编译,其中javaassist是默认的编译方式(@SPI(“javaassist”)),可以通过
<dubbo:application compiler="jdk">
来修改。
最后,看一下getActivateExtension是怎么实现的
@SPI
public interface IActivateService {
void active();
}
@Activate(order = 1, group = "test")
public class ActiveService1 implements IActivateService {
@Override
public void active() {
System.out.println("Active1");
}
}
@Activate(order = 2, group = "test")
public class ActivateService2 implements IActivateService {
@Override
public void active() {
System.out.println("active2");
}
}
配置:com.learn.dubbo.chapter1.echo.server.spi.IActivateService
activateService1=com.learn.dubbo.chapter1.echo.server.spi.ActiveService1
activateService2=com.learn.dubbo.chapter1.echo.server.spi.ActivateService2
调用
Map<String, String> m = new HashMap<>();
m.put("activate1", "activateService1");
m.put("activate2", "activateService2");
URL url6 = new URL("","",0, m);
List<IActivateService> activateService = ExtensionLoader.getExtensionLoader(IActivateService.class).getActivateExtension(url6, "activate1,activate2", "test");
for (IActivateService a : activateService) {
a.active();
}
最终会根据key和group来匹配需要加载的扩展点实现,然后根据order指定的顺序进行排序,最终返回一个List集合。
注册中心
registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-provider&dubbo=2.0.2&pid=31757®istry=zookeeper×tamp=1574044913995
zookeeper://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-provider&dubbo=2.0.2&interface=com.alibaba.dubbo.registry.RegistryService&pid=31787×tamp=1574045806869
的主要功能是实现服务注册与发现,同时也对监控中心提供了一部分支持。dubbo支持不多种类型的注册中心,例如zookeeper、redis、sofa、multicast等等,下面看一下注册中心的总体结构
注册中心主要可以分为3部分,分别是RegistryFactory、Registry和NotifyListener,其中,Registry可以理解为进行服务注册与发现的核心组件,也可以简单理解为dubbo与注册中心之间的一个桥梁,从图中可以看到,Registry集成了RegistryService接口,RegistryService提供了四个与服务注册与发现相关的四个核心方法:
registry()/unregistry/subscribe/unsubscribe
,RegistryFactory则是用来创建Registry实例的,这里面采用了
工厂方法
设计模式,同时,RegistryFactory在SPI的加持下,实现了基于dubbo消息总线URL的自适应模式
@SPI("dubbo")
public interface RegistryFactory {
@Adaptive({"protocol"})
Registry getRegistry(URL var1);
}
第二个部分是NotifyListener,这个组件被组合在Registry中,主要用来接口注册中心的消息变更通知。
下面再解释一下中间层的三个抽象类
public abstract class AbstractRegistryFactory implements RegistryFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractRegistryFactory.class);
private static final ReentrantLock LOCK = new ReentrantLock();
private static final Map<String, Registry> REGISTRIES = new ConcurrentHashMap();
public AbstractRegistryFactory() {}
public static Collection<Registry> getRegistries() {
return Collections.unmodifiableCollection(REGISTRIES.values());
}
public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
LOCK.lock();
try {
Iterator i$ = getRegistries().iterator();
while(i$.hasNext()) {
Registry registry = (Registry)i$.next();
try {
registry.destroy();
} catch (Throwable var6) {
LOGGER.error(var6.getMessage(), var6);
}
}
REGISTRIES.clear();
} finally {
LOCK.unlock();
}
}
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName()).addParameter("interface", RegistryService.class.getName()).removeParameters(new String[]{"export", "refer"});
String key = url.toServiceString();
LOCK.lock();
Registry var4;
try {
Registry registry = (Registry)REGISTRIES.get(key);
if (registry != null) {
var4 = registry;
return var4;
}
registry = this.createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
var4 = registry;
} finally {
LOCK.unlock();
}
return var4;
}
protected abstract Registry createRegistry(URL var1);
}
可以看到,在AbstractRegistryFactory中,实现了一些通用逻辑,例如Registry的消耗和缓存,对于创建Registry实例的createRegistry方法则交给了子类来实现。
再来看AbstractRegistry,代码就不贴了,这个类中主要是实现了Registry的本地缓存,大概思路就是把数据写入本地磁盘中,这也就保证了即使注册中心故障了,在一定程度上也不会影响dubbo服务的调用。
以zookeeper为例,NotifyListener在执行subscribe是会设置到节点上,依次来箭筒zookeeper节点数据变化的。
另外,触发注册中心逻辑的地方是dubbo导出服务的时候
public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
Registry registry = this.getRegistry(originInvoker);
.....
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
}
private Registry getRegistry(Invoker<?> originInvoker) {
URL registryUrl = this.getRegistryUrl(originInvoker);
return this.registryFactory.getRegistry(registryUrl);
}
简单来说就是根据URL消息总线自适应获取RegistryFactory实例(这一步是由上层完成的),然后再用这个实例创建具体的Registry实例。
最后再总结一下注册中心这个模块涉及到的设计模式,对于SPI的应用设计到了
策略设计模式
,对于Registry和RegistryFactory体系涉及到了
模板设计模式
和
工厂方法设计模式
。
配置解析(xml)
dubbo支持三种配置方式,分别是xml配置、注解配置和properties/yml配置,这里以xml为例来说明,主要因为笔者目前在项目中用到的就是xml的配置方式。
在使用dubbo时,我们通常与Spring进行集成,所以dubbo也对此进行了支持,dubbo与spring集成的方式简单,基于spring规范开发自己的NamespaceHandler,将dubbo设计的到bean统一交给spring进行管理。
熟悉spring的同学都知道,spring在解析xml的时候,xml中的namespace对应的url从spring.handlers中找到对应的NamespaceHandler,然后通过NamespaceHandler来解析这个标签,所以,dubbo在config模块中也提供了一个spring.handlers,具体内容如下
http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
这里之所以有两个是为了兼容alibaba dubbo和apache dubbo,因为dubbo在2018年春节以及捐给了Apache进行维护。
接下来在看看DubboNamespaceHandler做了什么
this.registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
this.registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
this.registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
this.registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
this.registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
this.registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
this.registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
this.registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
this.registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
this.registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
可以很清楚的看到,Dubbo对所有标签的解析都是由DubboBeanDefinitionParser完成的,到这里,我们可以和dubbo的代码架构联系起来,这里就对应的config层,其中比较重要的像ApplicationConfig/RegistryConfig/ProviderConfig/ConsumerConfig/ProtocolConfig/ServiceConfig和ReferenceConfig,这些Config承载了dubbo的核心配置。
另外,在配置这里需要注意的一点是各种配置的优先级,因为dubbo中,很多配置都可以在不同的地方进行设置,下面来看一下这张图
根据这张图的结构可以了解到,对于同一个配置,优先级为方法级别>service/reference > provider /consumer ,consumer > provider,另外,需要知道的是,很多配置在provider端的配置最终会通过注册中心以消息总线URL为载体透传到consumer端,例如典型的负载均衡、集群容错及失败重试次数等,也就是说,这些设置最终都是在consumer端生效的。
对于配置解析这部分,我个人觉得了解到这儿就可以了,重点应该放在服务暴露、服务引用和服务调用这些核心逻辑上。
服务暴露原理
由于dubbo基于网络/自定义协议+反射实现的rpc框架,所以,暴露服务简单理解无非就是启动网络端口进行监听,依次来接收客户端tpc链接和调用的,但是当涉及到注册中心时,还需要将暴露的服务添加到注册中心,这样服务消费者才能感知到这个服务,另外,对于一个服务,如果在本地jvm中也存在服务调用者,为了保证效率,就不应该再通过网络进行调用,再了解服务暴露时,我们就可以从这三个方面入手。
从xml解析的章节我们可以知道,承载
<dubbo:service/>
配置的Bean是ServiceBean这个类,服务暴露的启动入口也在这里
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
public void onApplicationEvent(ContextRefreshedEvent event) {
this.export();
}
}
由于ServiceBean实现了Spring的ApplicationListener,当接受到容器初始化完成的事件时,就会执行
this.export()
进行服务导出
上面的图是一个简易的方法调用时序图(有点丑~ ~),下面直接把代码贴上来看更加直观
ServiceBean.java
public void onApplicationEvent(ContextRefreshedEvent event) {
this.export();
}
ServiceConfig.java
public synchronized void export() {
this.doExport();
}
protected synchronized void doExport() {
this.doExportUrls();
}
private void doExportUrls() {
List<URL> registryURLs = this.loadRegistries(true);
/*
这里是重点:多协议支持
如果我们在配置的时候指定了多个<dubbo:protocol/>那么这里就会多每个Protocol进行处理
这里的List<URL> 中的URL可以理解成一个<dubbo:protocol/>
我们详细说一下:
1)如果不配置<dubbo:registry/>的话,暴露服务的时候,就会直接打开对应的端口,而不会向注册中心注册服务,此时的URL是以具体协议开头的,比如dubbo://ip:port/xxx.xxx.Service?...
2)如果配置了<dubbo:registry/>,那么,暴露服务的时候处理打开网络端口,同时还会通过Registry向注册中心注册服务,此时的url以registry开头,例如:registry://127.0.0.1:2181/xxx.xxx.Service?protocol=zookeeper&export=dubbo://ip:port/xxx.xxx.Service
**/
Iterator i$ = this.protocols.iterator();
while(i$.hasNext()) {
ProtocolConfig protocolConfig = (ProtocolConfig)i$.next();
this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
//包括服务到本地,这个不主动设置的话都会执行,这里采用injvm protocol协议
this.exportLocal(url);
// 如果不是只在本地暴露
if (!"local".toString().equalsIgnoreCase(scope)) {
//如果<dubbo:registry/>不为空,对应上面的第二种情况
if (registryURLs != null && !registryURLs.isEmpty()) {
/**
这里也需要特殊注意以下,dubbo的多注册中心就是在这里支持的。
每个registryUrl代表了一个注册中心,也就是<dubbo:registry/>
*/
Iterator i$ = registryURLs.iterator();
while(i$.hasNext()) {
//创建Invoker代理
Invoker<?> invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//通过制定的协议进行暴露,这里涉及到了Protocol的SPI自适应
Exporter<?> exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);
}else{
//如果没有配置<dubbo:registry/>则直接暴露服务,不涉及注册中心的逻辑
Invoker<?> invoker = proxyFactory.getInvoker(this.ref, this.interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);
}
}
}
//这里是暴露服务到本地
private void exportLocal(URL url) {
if (!"injvm".equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString()).setProtocol("injvm").setHost("127.0.0.1").setPort(0);
ServiceClassHolder.getInstance().pushServiceClass(this.getServiceClass(this.ref));
Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(this.ref, this.interfaceClass, local));
this.exporters.add(exporter);
logger.info("Export dubbo service " + this.interfaceClass.getName() + " to local registry");
}
}
从代码中可以看到,设计到了三处
protocol.export
,分别对应了本地暴露、待注册中心的远程包括和直接远程暴露,那么从代码上看没有什么区别,是如何实现不同的服务暴露逻辑的呢?其实这里就用到了Dubbo SPI机制,前面我们说dubbo是一种微内核架构,这里的暴露服务的主逻辑就是微内核的一部分,而Protocol就是一个插件,我们看看Protocol的接口定义
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> var1) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> var1, URL var2) throws RpcException;
void destroy();
}
ServiceConfig中对protocol的初始化
private static final Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
可以很清楚的看到这是SPI的自适应模式,上面三种不同类型的服务导出就是根据URL中的protocol参数来决定具体选择哪个Protocol实现的,实际上分别对应了
- InjvmProtocol
- RegistryProtocol
- DubboProtocol
下面以
RegistryProtocol
为例,看具体的实现逻辑
public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
//导出本地服务,这里主要是开启本地服务端口
RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker);
URL registryUrl = this.getRegistryUrl(originInvoker);
/**
这里是获取Registry对象,主要逻辑就是通过SPI获取对应类型的RegistryFactory,然后
创建对应的Registry实例,以Zookeeper为例,就是ZookeeperRegistryFactory和
ZookeeperRegistry
*/
Registry registry = this.getRegistry(originInvoker);
URL registedProviderUrl = this.getRegistedProviderUrl(originInvoker);
boolean register = registedProviderUrl.getParameter("register", true);
//将暴露的服务利用Registry的register方法发布到注册中心
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
this.register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(registedProviderUrl);
RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker);
this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener):
//订阅相关的url,例如provider会订阅Service/configuators目录,监听配置信息的变化
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
return new RegistryProtocol.DestroyableExporter(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}
/**
这里的逻辑可以总结为:
1)从registry://... url中获取到export参数,也就是用于暴露服务的url
2)通过自使用SPI机制获取到对应的Protocol实现,通常是DubboProtocol
3)调用对应的export方法,将服务暴露到本地端口上
**/
private <T> RegistryProtocol.ExporterChangeableWrapper<T> doLocalExport(Invoker<T> originInvoker) {
String key = this.getCacheKey(originInvoker);
RegistryProtocol.ExporterChangeableWrapper<T> exporter = (RegistryProtocol.ExporterChangeableWrapper)this.bounds.get(key);
if (exporter == null) {
synchronized(this.bounds) {
exporter = (RegistryProtocol.ExporterChangeableWrapper)this.bounds.get(key);
if (exporter == null) {
Invoker<?> invokerDelegete = new RegistryProtocol.InvokerDelegete(originInvoker, this.getProviderUrl(originInvoker));
exporter = new RegistryProtocol.ExporterChangeableWrapper(this.protocol.export(invokerDelegete), originInvoker);
this.bounds.put(key, exporter);
}
}
}
return exporter;
}
//从Invoker中拿到registry:// 中对应的export参数
private URL getProviderUrl(Invoker<?> origininvoker) {
String export = origininvoker.getUrl().getParameterAndDecoded("export");
if (export != null && export.length() != 0) {
URL providerUrl = URL.valueOf(export);
return providerUrl;
} else {
throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
}
}
DubboProtocol.java
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap);
this.exporterMap.put(key, exporter);
Boolean isStubSupportEvent = url.getParameter("dubbo.stub.event", false);
Boolean isCallbackservice = url.getParameter("is_callback_service", false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter("dubbo.stub.event.methods");
if (stubServiceMethods != null && stubServiceMethods.length() != 0) {
this.stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
} else if (this.logger.isWarnEnabled()) {
this.logger.warn(new IllegalStateException("consumer [" + url.getParameter("interface") + "], has set stubproxy support event ,but no stub methods founded."));
}
}
/**
这里很重要,开启指定端口上的监听
最终底层调用的是Netty的
*/
this.openServer(url);
this.optimizeSerialization(url);
return exporter;
}
到此服务暴露的基本逻辑就清楚了,下面简单总结一下:
1)spring容器初始化完成时间触发ServiceBean的onApplicationEvent方法
2)调用ServiceConfig的doExport/doExportUrls 在这里处理多协议问题(循环遍历)
3)调用doExportUrlsFor1Protocol针对每种类型的协议进行导出(这里处理多注册中心问题,循环)
4)以injvm协议导出服务到本地,用于本地调用
5)以dubbo协议导出服务到远程
6)registry://触发RegistryProtocol的export方法创建于注册中心的链接
7)从registry://中拿到export参数,对应的值为dubbo://xxx
8)dubbo://触发DubboProtocol的export,开启NettyServer,同时把Exportor放入到exporterMap中
9)通过Registry向注册中心注册服务,同时订阅category/configurators/check目录
10)导出服务完成
服务引用原理
在引用服务时,通常是通过
<dubbo:reference/>
或
@Reference
来做的,这两个配置最终会被翻译成ReferenceBean,由于ReferenceBean实现了FactoryBean接口,所以,按照spring的规范,在进行依赖注入时,就会调用它的getObject()方法来获取实例,这个getObject方法就是在服务引用阶段dubbo与spring连接的桥梁。
public class ReferenceBean<T> extends ReferenceConfig<T> implements FactoryBean, ApplicationContextAware, InitializingBean, DisposableBean {
public Object getObject() throws Exception {
return this.get();
}
}
对于服务调用的整个流程这里引用一张网上的时序图,个人觉得画的非常清晰
下面,就贴一下源码,跟一下这个过程。由于getObject方法中调用了get方法,这个方法定义在了ReferenceConfig中
ReferenceConfig.java
/***
这三个属性很重要,分别对应的Protocol、Cluster和ProxyFactory
通过dubbo spi的自适应模式来获取实例,其次Cluster默认的是FailoverCluster,ProxyFactory默认是JavaAssistProxyFactory,这可以从这两个接口中@SPI注解中的默认值中看到
*/
private static final Protocol refprotocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final Cluster cluster = (Cluster)ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
p
public synchronized T get() {
//这里判断是否已经创建过,如果没有则调用init()方法初始化ref
if (this.ref == null) {
this.init();
}
return this.ref;
}
/**
在init方法中,总的来说就做了两件事
1. 创建包客户端与服务端之间的长连接的Invoker
2. 将Invoker封装成业务接口对象赋值给ref
但其中包含了不少细节,下面我们接着往下看
**/
private void init() {
//这一句是整个init()方法的核心,也就是返回了最终的ref对象并赋值给了ref
this.ref = this.createProxy(map);
}
private T createProxy(Map<String, String> map) {
.....
if (isJvmRefer) {
//这里判断是不是本地调用,如果是,则通过InjvmProtocol的refer方法引用,这里用到的也是Dubbo SPI的自适应机制
URL url = (new URL("injvm", "127.0.0.1", 0, this.interfaceClass.getName())).addParameters(map);
this.invoker = refprotocol.refer(this.interfaceClass, url);
} else {
.....
/**
这里的urls是
registry://localhost:2181/com.alibaba.dubbo.registry.RegistryService?application=echo-consumer&dubbo=2.0.2&pid=44067&refer=application%3Decho-consumer%26cache%3Dtrue%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dcom.learn.dubbo.chapter1.echo.api.EchoService%26methods%3Decho%26mock%3Dtrue%26pid%3D44067%26register.ip%3D192.168.199.238%26revision%3D0.0.1%26side%3Dconsumer%26timeout%3D2000%26timestamp%3D1571895539207%26version%3D0.0.1®istry=zookeeper×tamp=1571895543957
所以refprotocol对应的肯定是RegistryProtocol,主要用来处理服务注册于发现相关的逻辑
*/
if (this.urls.size() == 1) {
this.invoker = refprotocol.refer(this.interfaceClass, (URL)this.urls.get(0));
} else {
//这里主要用来处理多协议,将多个Invoker合并成cluster
List<Invoker<?>> invokers = new ArrayList();
URL registryURL = null;
Iterator i$ = this.urls.iterator();
while(i$.hasNext()) {
url = (URL)i$.next();
/**
可以看到,主要的处理逻辑和上面是一样的,只不过外面套了一个循环
*/
invokers.add(refprotocol.refer(this.interfaceClass, url));
if ("registry".equals(url.getProtocol())) {
registryURL = url;
}
}
//这里将多个Invoker合并成Cluster
if (registryURL != null) {
u = registryURL.addParameter("cluster", "available");
this.invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
this.invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
}
下面就来到了RegistryProtocol的refer方法,这里的远程服务引用的最核心逻辑
RegistryProtocol.java
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//这里就是从url中获取registry参数的值,一般是zookeeper,然后把这个参数从url中删掉
//最终url就变成了 zookeeper://....
url = url.setProtocol(url.getParameter("registry", "dubbo")).removeParameter("registry");
//获取一个注册中心实例,内部包含了客户端与服务端的长连接,这个在注册中心中已经讲过
Registry registry = this.registryFactory.getRegistry(url);
....
this.doRefer(this.cluster, registry, type, url)
}
/**
这里是宇宙的中心!
RegistryDirectory前面已经讲过,它实现了NotifyListener接口,用来接收zookeeper目录数据变化通知的
directory.setRegistry方法把Registry实例设置的进去,也就是持有了与zookeeper服务端的链接,方便操作数据
紧接着通过Registry.register方法向注册中心注册了自己,也就是在Xxx.Service/consumer目录下
然后,通过directory的subscribe订阅了prividers/configurators/routers 这些目录
这里directory.substribe内部其实也是调用了registry.substribe,只不过,由于需要设置NotifyListener,索引通过directory对象来操作更加方便
最终利用director创建了Invoker并返回了
*/
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory(type, url);
directory.setRegistry(registry);
directory.setProtocol(this.protocol);
Map<String, String> parameters = new HashMap(directory.getUrl().getParameters());
URL subscribeUrl = new URL("consumer", (String)parameters.remove("register.ip"), 0, type.getName(), parameters);
if (!"*".equals(url.getServiceInterface()) && url.getParameter("register", true)) {
registry.register(subscribeUrl.addParameters(new String[]{"category", "consumers", "check", String.valueOf(false)}));
}
directory.subscribe(subscribeUrl.addParameter("category", "providers,configurators,routers"));
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
实际上到这里,服务引用过程刚刚完成了一般,把服务消费者自己注册到了注册中心,我们还没有看到服务消费者创建于服务提供者之间的长连接,那一步是在哪里做的呢?其实是在RegistryDirectory的notify方法中。由于在subscribe的过程中,RegistryDirectory作为了接收zookeeper数据变化的对象设置到了zookeeper会话中,所以在首次执行subscribe的时候,就会触发notify方法的回调
//这里会返回很多个urls,我们只说provider相关的
public synchronized void notify(List<URL> urls) {
this.refreshInvoker(invokerUrls);
}
private void refreshInvoker(List<URL> invokerUrls) {
Map<String, Invoker<T>> newUrlInvokerMap = this.toInvokers(invokerUrls);
}
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
//这里的url就是dubbo://xxxxxx,protocol.ref
invoker = new RegistryDirectory.InvokerDelegate(this.protocol.refer(this.serviceType, url), url, providerUrl);
}
可以看到最终还是会调用protocol.refer,这里的protoco其实就是DubboProtocol了
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
this.optimizeSerialization(url);
DubboInvoker<T> invoker = new DubboInvoker(serviceType, url, this.getClients(url), this.invokers);
this.invokers.add(invoker);
return invoker;
}
private ExchangeClient[] getClients(URL url) {
//这里的主要逻辑就是创建于服务端的长连接
}
到这里,服务引用的主要流程就算完成了,可以看到,服务消费者与服务提供者之间的长连接时异步创建的。
最后,就是把Invoker封装成业务接口,返回给业务层了,我们在返回到ReferenceConfig中看一下createProxy方法的最后一步
return proxyFactory.getProxy(this.invoker);
这里是通过Jdk proxy或javaassist proxy(默认)创建符合业务接口规范的代理对象,最终,这个代理对象内部,就包含了与注册中心交互的Regitry和与服务提供者之间的长连接,万事俱备,只差调用。
下面简单总结一下:
1)依赖注入时,触发ReferenceBean覆盖FactoryBean的getObject方法,调用init()开始服务引用
2)触发ReferenceConfig的createProxy方法,进入核心逻辑
3)判断是否为injvm模式,如果是,则直接通过InjvmProtocol的refer方法创new InjvmInvoker
4)如果不是,则首先加载注册中心url
5)如果是单个注册中心,则执行RegistryProtocol的refer,因为此时的url为registry://
6)构建Registry及RegistryDirectory
7)向注册中心注册当前consomer并订阅相关的zookeeper目录(category/providers/configurators/routers),订阅时设置NotifyListener
8)订阅完成后,异步触发RegistryDirectory的notify方法
9)拿到具体的服务提供者url后(dubbo://),执行DubboProtocol的refer方法
10)在refer方法中,创建了DubboInvoker实例,其中包含了NettyClient
11)服务引用完成
服务调用
对于服务调用这部分逻辑,到网上找了好多博客资料,也参考了一些书籍,但始终晕晕乎乎,这次下决心搞明白,结果还是自己通过debug代码彻底搞明白了,不得不说,哎啥也不说了。
先上两张按照我个人理解话的图,有点丑,看不明白的同学请忽略。
Injvm模式
Dubbo模式
由于服务的导出和服务的引用主要涉及两种模式,一种是Local模式一种是Remote模式,所以这里以InjvmProtocol和DubboProtocol为例来说,先说InjvmProtocol模式。
在服务导出逻辑中,Injvm模式我们可以再回顾一下
InjvmProtocol.java
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//注意这里的exporterMap
return new InjvmExporter(invoker, invoker.getUrl().getServiceKey(), this.exporterMap);
}
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
//再注意这里的exporterMap
exporterMap.put(key, this);
}
这里的exportMap是一个重点,上面的逻辑说明,每一个Exporter实例,都会被保存在InjvmProtocl的exporterMap中。
然后我们在看看Injvm模式下的调用过程。在服务引用一节,我们知道了,最终拿到的服务引用对象实际上是一个代理对象,这个代理对象是通过下面的代码创建出来的
return proxyFactory.getProxy(this.invoker);
这里最终会调用到JavaassistProxyFactory类中的getProxy方法
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
所以,对于接口中某个方法的调用,最终会转换成对InjvmInvoker的调用,也就是下面的方法
public Result doInvoke(Invocation invocation) throws Throwable {
//看这里,实际上就是从InjvmProtocol的exporterMap中拿到了执行export时得到的Exporter对象
Exporter<?> exporter = InjvmProtocol.getExporter(this.exporterMap, this.getUrl());
if (exporter == null) {
throw new RpcException("Service [" + this.key + "] not found.");
} else {
RpcContext.getContext().setRemoteAddress("127.0.0.1", 0);
//最终,从Exporter中拿到导出时的Invoker实例,并调用对应的invok方法
return exporter.getInvoker().invoke(invocation);
}
}
然后,由于在进行服务导出时,导出的Invoker是通过ProxyFactory.getInvoker方法创建的
proxyFactory.getInvoker(this.ref, this.interfaceClass, local);
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(36) < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
也就是说,服务导出的Invoker是提供服务的对象的代理对象,最终调用就通过这个Inoker传递到了具体的服务对象,到这里,我们可以回过头反观一下ProxyFactory这个组件
public class JavassistProxyFactory extends AbstractProxyFactory {
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(36) < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
这里面有两个方法,第一个方法getProxy是将Invoker转换成对应的业务接口实例对象,而第二个方法getInvoker是将一个服务对象包装成Invoker对象,也就是说,第一个方法在服务消费端用到,第二个方法在服务提供端被用到,服务提供端在发布服务时把一个业务类的代理对象转换成Invoker放到Exporter中,然后把Exporter放到exporterMap中与具体的接口关联。
仔细思考一下,协议相关的Invoker只会出现在服务消费端,例如InjvmInvoker、DoubleInvoker等,而服务提供端中对在对应的Exportor中对应了一个通过ProxyFactory创建的匿名Invoker,也就是AbstractProxyInvoker,就像下面这样
(Consumer)InjvmInvoker/DubboInvoker ---> InjvmExporter/DoubleExporter ---> AbstractProxyInvoker --> ServiceImpl
下面再来看看dubbo协议下的调用过程,在服务引用一节,说到了代理对象内部实际上是封装了DubboInvoker,下面是创建DubboInvoker对象的代码
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
this.optimizeSerialization(url);
DubboInvoker<T> invoker = new DubboInvoker(serviceType, url, this.getClients(url), this.invokers);
this.invokers.add(invoker);
return invoker;
}
getClients方法实际上是创建了一个与服务提供者的长连接,当调用接口方法时,最终就会调用DubboInvoker中的doInvoke方法,其中的一句核心代码如下
ResponseFuture future = currentClient.request(inv, timeout);
以上是客户端的逻辑,下面我们再看看服务端,上面我们说到,与DubboInvoker对应的是DubboExporter,我们看看是怎么执行的。
当客户端向服务端发起调用时,dubbo自己封装的一个ExchangeHandler就会被底层的NettyServer调用,这个ExchangeHandler被定义在了DubboProtocol中,我们来看一下它的核心逻辑
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
Invocation inv = (Invocation)message;
Invoker<?> invoker = DubboProtocol.this.getInvoker(channel, inv);
.....
invoker.invoke(inv);
}
}
/**
这里我们可以看到,Handler最终会调用被封装在DubboExporter中的Invoker,而这个Exporter
其实是从我们前面说的exporterMap中获取的,所以我们就可以总结一下:
每一个Protocol的实现内部都有一个exporterMap域,用于缓存在该协议下报了的所有服务,也就是Exporter
**/
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
String serviceKey = serviceKey(port, path, (String)inv.getAttachments().get("version"), (String)inv.getAttachments().get("group"));
DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey);
return exporter.getInvoker();
}
总价一下,也就是,当服务消费者发起调用时,会触发服务端的ExchangerHandler被Netty调用,然后从DubboProtocol中的exporterMap拿到具体的Exporter,从Exporter中拿到服务对象的代理Invoker,调用Invoker的invoke方法将请求传递到具体的服务对象中。
上面只是我个人理解的相对核心的逻辑,当然服务调用还设计到与dubbo协议、参数序列化和反序列化的知识点,下一个小节会简单说说dubbo协议,对于序列化的知识点读者感兴趣的话可以自己看一看。
总结一下:
由于服务导出与服务引用时,分为injvm和dubbo两种,所以,服务调用也分为这两种协议
injvm协议的服务调用:
先回忆一下injvm协议服务导出和引用过程,provider导出时,通过ProxyFactory创建了一个业务处理Bean的代理对象,将其转换成了一个Invoker,通过invoke方法传递调用请求,并最终通过InjvmExporter包装,这个InjvmExporter被缓存到了InjvmProtocol中的exporterMap中。。
在引用injvm服务时,InjvmProtocl的refer直接创建了一个InjvmInvoker,这个InjvmInvoker覆盖了doInvoke方法,直接从exportMap中拿到Exporter,也就是服务导出时放进去的Exporter,并拿到对应的Invoker直接进行调用。
由于,在服务引用时,通过ProxyFactory将Invoker代理成了业务接口,所以,当对业务接口进行调用时,最终救护执行到InjvmInvoker的doInvoke方法,而这个doInvoke就从exporterMap中拿到了Exporter找那个的Invoker,这个Invoker是服务导出时,通过ProxyFactory的getInvoke方法得到了,它是业务接口实现类的代理,这样最终调用链就到了服务实现类中。
dubbo协议的服务调用:
同样,我们先来回顾一下dubbo协议下的服务暴露和服务引用逻辑,简单说,registry://协议触发RegistryProtocol的export方法,该方法首先提起export参数,及dubbo:///,利用DubboProtocol的export方法启动NettyServer,然后将对应的Exporter放到DubboProtocol的exporterMap中,然后RegistryProtocol的export继续向zookeeper注册服务,同时订阅configurators目录。
服务引用过程是registry://协议触发RegistryProtocol的refer方法,然后创建Registry和ReigistryDirector,向注册中心注册自己并监听相关目录的数据变化,RegistryDirectory的notify方法被zookeeper触发,得到所有provider的dubbo:///服务暴露协议url,然后通过DubboProtocl的refer方法创建DubboInvoker并启动NettyClient。
那么调用时,调用链就会进入到这个DubboInvoker中,DubboInvoker中的doInvoke方法,通过NettyClient将相关数据序列化后发送到provider监听的端口,这时候就会触发DubboProtocol中的ExchangeHandler,从replay方法中会从exporterMap中拿到对应的Exporter,从中获取业务实现类的代理Invoker,最终将调用请求传递给业务实现类处理,然后原路返回。
dubbo协议
下面这张图是官网给出来的dubbo协议的组成图
在dubbo协议中,用16个字节(0-127 一共128个比特位)来定义协议头,下面简单说明一个每个部分的含义
- 0 ~ 15 bit:存储魔法数字,dubbo用这个魔法数字标识来解决tcp中的粘包/拆包问题
- 16 bit:数据包的类型,0位response,1为request
- 17 bit:调用方式,0为单向调用,1为双向调用(dubbo进行优雅停机时发送的readonly消息中,该bit为空)
- 18 bit:事件标识,0位请求/响应包,1位心跳包
- 19 ~ 23 bit:是序列化器id,用来统一consumer和provider的消息序列化方式,例如2为Hessian2,3为Java
- 24 ~ 31 bit:状态标识,例如20为OK,30位Client_timeout
- 32 ~ 95 bit:请求编号
- 96 ~ 127 bit:消息体长度
Cluster原理
回顾dubbo架构,我们了解到右一层Cluster,我个人理解Cluster就像它的字面意思一样,即集群的意思,所以,Cluster主要解决集群层面的问题。那么什么是集群呢,在dubbo中可以理解为多节点+对等部署的服务,这也是使用dubbo时的组常用用法。
在对等集群中,我们通常面临的问题有服务路由、集群容错、负载均衡,服务降级等问题,在dubbo中,这些问题实际上都是在Cluster层面解决的。
我看在回顾一下服务引用中的一段代码
//遍历每一个服务提供者,并创建对应的Invoker
while(i$.hasNext()) {
url = (URL)i$.next();
invokers.add(refprotocol.refer(this.interfaceClass, url));
if ("registry".equals(url.getProtocol())) {
registryURL = url;
}
}
//将Invoker装饰成Cluster
if (registryURL != null) {
u = registryURL.addParameter("cluster", "available");
this.invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
this.invoker = cluster.join(new StaticDirectory(invokers));
}
这里的cluster采用SPI自适应模式加载
@SPI("failover")
public interface Cluster {
@Adaptive
<T> Invoker<T> join(Directory<T> var1) throws RpcException;
}
可以看到,默认的Cluster为FailoverCluster,那我们看一下FailoverCluster的代码
public class FailoverCluster implements Cluster {
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker(directory);
}
}
很简单,就是返回了一个Invoker实例,只不过这个Invoker包含了所有服务提供者对应的Invoker,也就是,将所有的Service provider Invoker聚合成一个Invoker,所以,最终调用就会被传递到这个FailoverClusterInvoker中,我们来看一下对应的源码,其实还是挺简单的
AbstractClusterInvoker.java 中的invoke封装了cluster模式下的通用逻辑
public Result invoke(Invocation invocation) throws RpcException {
//检查服务对应的Invoker是否可用
this.checkWhetherDestroyed();
LoadBalance loadbalance = null;
/**
这里是一个重点:
这里是获取服务对应的Invoker列表,最终,其实是通过RegisterDirectory的doList从缓存中获取的,
期间包含了服务路由的过程
*/
List<Invoker<T>> invokers = this.list(invocation);
if (invokers != null && !invokers.isEmpty()) {
//SPI自使用获取负载均衡策略,默认为random权重随机模式
loadbalance = (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random"));
}
RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
//调用子类,默认是FailoverClusterInvoker的doInvoke方法
return this.doInvoke(invocation, invokers, loadbalance);
}
整体思路很清楚,首先通过RegistryDirectory从缓存中获取所有的Invoker列表,然后经过路由规则过滤出必要的Invoker,然后,根据参数配置创建对应的负载均衡器,最后,根据负载均衡器选择一个Invoker进行调用。
FailoverClusterInvoker.java
public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
//获取重试次数,也就是retries参数
int len = this.getUrl().getMethodParameter(invocation.getMethodName(), "retries", 2) + 1;
//循环遍历指定的次数
for(int i = 0; i < len; ++i) {
//这里是,如果调用失败了一次,要重新进行检查和获取可用的Invoker列表
if (i > 0) {
this.checkWhetherDestroyed();
copyinvokers = this.list(invocation);
this.checkInvokers(copyinvokers, invocation);
}
//这里就是通过制定的负载均衡策略,选择出一个Invoker
Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked);
//如果调用成功,则就将结果返回,跳出循环
Result result = invoker.invoke(invocation);
Result var12 = result;
return var12;
}
}
下面,我们先来看一下list这个方法,在这个方法中设计到了路由匹配
AbstractClusterInvoker.java
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = this.directory.list(invocation);
return invokers;
}
//这里最终就进入到了Directory体系,可以看到,职责隔离做的真的很好
AbstractDirectory.java
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = this.doList(invocation);
List<Router> localRouters = this.routers;
if (localRouters != null && !localRouters.isEmpty()) {
Iterator i$ = localRouters.iterator();
while(i$.hasNext()) {
Router router = (Router)i$.next();
try {
if (router.getUrl() == null || router.getUrl().getParameter("runtime", false)) {
invokers = router.route(invokers, this.getConsumerUrl(), invocation);
}
} catch (Throwable var7) {
logger.error("Failed to execute router: " + this.getUrl() + ", cause: " + var7.getMessage(), var7);
}
}
return invokers;
}
RegistryDirectory.java
public List<Invoker<T>> doList(Invocation invocation) {
/**
从这里可以看到,实际上Invoker是从缓存中获取的
在服务引用逻辑中,由于会触发notify方法的执行,最终,会将服务提供者对应的Invoker放到
methodInvokerMap这个本地缓存中,这样可以避免每次进行服务调用是,都从zookeeper获取
减少注册中心的压力
*/
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;
invokers = (List)localMethodInvokerMap.get(methodName + "." + args[0]);
invokers = (List)localMethodInvokerMap.get(methodName);
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = (List)iterator.next();
}
return (List)(invokers == null ? new ArrayList(0) : invokers);
}
Cluster层涉及到了Router、Directory、LoadBalance,而且不同的Cluster策略都对应不同的实现,在dubbo中提供了一下几种常用的集群策略:
- Failover
- FastFail
- Failback
- Failsafe
- Forking
-
Broadcat
提供了四种负载均衡策略
- random 基于权重的随机
- roundrobin 集群权重的平滑轮询
- lastactive 最少激活原则,处理请求最少的节点会被分配更少的请求
- consistenthash 一致性hash
这里涉及到的每一种类型的组件都有自己不同的实现方式,暂时就不扩展了,以后有时间在对详细看具体的实现方式。
Filter原理
Filter是Dubbo中一个很重要的组件,有一些核心的功能都是以及Filter实现的,例如流量控制,记录访问日志,服务监控等,Dubbo中的Filter原理实际上和Java web中的Filter一样,都是在调用目标方法的前后做一些功能上的增强。
Dubbo Filter作为核心组件之一,也是基于SPI模式实现,Dubbo提供了一个Protocl扩展点的包装类
ProtocolFilterWrapper
,这样再加载
Protocol
的时候,
ProtocolFilterWrapper
就会被作为包装类自动被加载,我们可以看一下它的源码
//实现了Protocol接口
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
/**
这里是重点:遵循了Dubbo SPI 的AOP规范,在构造函数中依赖Protocol组件
**/
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
} else {
this.protocol = protocol;
}
}
/**
在服务导出和服务引用时,调用了buildInvokerChain方法对Invoker进行包装
**/
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return "registry".equals(invoker.getUrl().getProtocol()) ? this.protocol.export(invoker) : this.protocol.export(buildInvokerChain(invoker, "service.filter", "provider"));
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return "registry".equals(url.getProtocol()) ? this.protocol.refer(type, url) : buildInvokerChain(this.protocol.refer(type, url), "reference.filter", "consumer");
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
final Invoker<T> last = invoker;
// 这里通过SPI机制加载所有被激活的Filter
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
/**
这里的遍历顺序可以留意一下,是倒序遍历的
例如有 filter a b c,invoker 遍历顺序是
c - invoker , b - c - invoker, a - b - c - invoker,这样就可以保证最终的执行顺序是 a - b - c invoker了 。
*/
for(int i = filters.size() - 1; i >= 0; --i) {
final Filter filter = (Filter)filters.get(i);
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
//把所有的Filter层层包装在原始的Invoker外面
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(last, invocation);
}
public void destroy() {
invoker.destroy();
}
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
}
总结
以我个人的经验和眼界来看,目前单体架构、SOA架构和微服务架构这三种架构模式在IT系统中都有一定的占有率。单体架构适合简单的服务,一般利用nginx做反向代理进行集群对等部署,SOA架构适用于中等规模的团队,以整块服务为单位划分开发团队,应用非常广泛,微服务架构模式目前应该还处在初期使用阶段,这种架构可以很好的适应docker、云计算场景,随着云计算的快速普及,微服务架构也会越来越流行。
dubbo作为一个SOA架构下的分布式服务开发框架解决了SOA架构中的两大难题,分别是RPC和服务治理。华为林海峰的《分布式服务框架原理与实践》从抽象层面比价系统的介绍了分布式系统的组成部分,可以说,一个完整的SOA架构包含了很多的内容,例如、集群容错、序列化、通信模型、协议、注册中心、监控中心等等,dubbo可以说是目前在开源领域最流程的一款分布式服务框架了,但是它还有很多需要完善的地方。
从宏观角度观察dubbo,主要包含了四个部分,分别是服务调用者、服务提供者、注册中心和监控中心,服务提供者异步向注册中心注册服务,服务消费者异步从注册中心拉去服务提供者信息,同时监控相关配置信息的变化,服务提供者和服务消费者异步向注册中心上报监控数据。
从微观角度看dubbo,我觉得可以从横向和纵向两个角度来看,从横向角度看,dubbo采用了一种“微内核+富生态组件”的方式构建,利用SPI扩展点机制,组合各个核心组件,为dubbo带来了高可扩展性,为dubbo的快发展提供了很好的支撑。
从纵向角度看dubbo,也就是从dubbo的执行流程,可以分为Service/Config/Proxy/Cluster/Monitor/Protocol/Exchanger/Transport/Serializable等,清晰的划分了各层组件的职责边界,也为dubbo的高可扩展性提供了支持。
所以,dubbo在我看来是一个360度立体高可扩展的框架。
dubbo提供了多种注册中心的实现,目前最新版本中提供了zookeeper、redis、sofa、mutiple、etcd3等多种协议,在配置上支持xml配置、注解配置以及properties/yml配置支持,序列化方案就更加丰富了例如在大数据领域广泛应用的avro、jdk、kryo、json、hessian2、fastjson等等,支持的协议也很多,例如dubbo、http、rest、thirft、rmi、webservice、hessiam等等,能够很好的满足各种分布式系统以及异构系统集成的需要,在集群容错方面也有丰富的侧脸,例如Failover、Fastfail、Failback、Failsafe、Forking、Broadcast等等,同时还利用Mock机制支持服务的平滑降级能力,但是在监控方面dubbo还没有提供一个完整的监控中心,在使用dubbo时,一般也不会使用dubbo官方提供的监控中心,因为功能过于简陋,一般都是自己实现。
据悉,dubbo未来会向Service mesh方向方法,虽然我还没太弄清楚什么是Service mesh,不过应该是一种更牛逼的、能够使用云原生的架构模式吧,期待。