文章目录
- 1、将
配置解析成BeanDefinitiondubbo:service
- 2、Spring容器通过
配置实例化BeanDefinition
对象ServiceBean
- 3、监听Spring容器事件,触发
方法,启动服务暴露逻辑DubboBootstrap.start()
- 4、根据
配置进行服务暴露ServiceBean
-
- 4.1 获取注册中心配置及协议配置
- 4.2 根据需要暴露的bean动态生成代理类并包装成invoker链
- 4.3 使用Netty框架开启Socket服务
- 4.4 将provider信息发布到注册中心
这里debug的代码用的是github上dubbo项目的dubbo-demo里的 dubbo-demo-xml下的代码。
这里以默认的dubbo通信协议为debug的代码。上一章讲到Spring实例化consumer的过程,这一篇将Spring暴露provider服务的过程,对应的配置是
<dubbo:service/>
,通过 《spring的xml文件里dubbo标签解析过程》了解到该配置对应的是
ServiceBean
类,主要分成三步:
1、将
dubbo:service
配置解析成BeanDefinition
2、Spring容器通过
BeanDefinition
配置实例化
ServiceBean
对象
3、将服务信息注册到注册中心中,暴露出去
先贴出整个过程完整时序图:
demo里的consumer端的xml配置:
<beans
// xmlns:xsi是xsi标签命名空间
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
// xmlns:dubbo是dubbo标签的命名空间
xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
// 当前那xml文件默认的命名空间
xmlns="http://www.springframework.org/schema/beans"
// xsi:schemaLocation 配置了每个命名空间对应里配置规范,用来做格式校验
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
<dubbo:application name="demo-provider"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181" timeout="6000"/>
<!-- 协议配置-->
<dubbo:protocol name="dubbo"/>
<bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" timeout="60000"/>
</beans>
1、将 dubbo:service
配置解析成BeanDefinition
dubbo:service
Dubbo的consumer配置是
<dubbo:reference>
,在《spring解析dubbo标签配置过程》中可以看到这个配置被解析成
BeanDefinition
类的过程,在通过
DubboNamespaceHandler
的
parse
方法解析dubbo自定义标签时,会向Spring容器注册一个监听器
DubboLifecycleComponentApplicationListener
,该类监听了Spring容器的
ApplicationEvent
事件,正是这个监听事件触发服务暴露的流程,注册监听器代码如下:
public BeanDefinition parse(Element element, ParserContext parserContext) {
BeanDefinitionRegistry registry = parserContext.getRegistry();
this.registerAnnotationConfigProcessors(registry);
// 向Spring容器注册监听器
this.registerDubboLifecycleComponentApplicationListener(registry);
BeanDefinition beanDefinition = super.parse(element, parserContext);
this.setSource(beanDefinition);
return beanDefinition;
}
时序图如下:
2、Spring容器通过 BeanDefinition
配置实例化 ServiceBean
对象
BeanDefinition
ServiceBean
在《spring解析dubbo标签配置过程》中可以看出
dubbo:service
配置对应的
BeanDefinition
的BeanClass属性值为
org.apache.dubbo.config.spring.ServiceBean.class
类,会被Spring通过
getBean()
进行实例化(
getBean()
详细逻辑见这里),debug截图如下:
ServiceBean
继承了
AbstractConfig
类,这个抽象类有个
addIntoConfigManager
方法,会将当前对象缓存到一个全局的
ConfigManager
对象中:
@PostConstruct
public void addIntoConfigManager() {
ApplicationModel.getConfigManager().addConfig(this);
}
@PostConstruct
注解会告诉Spring容器在实例化该对象后,会执行注解标注的方法,几乎所有的dubbo的相关配置类都继承了
AbstractConfig
类,比如
<dubbo:registry>
对应的
RegistryConfig
、
<dubbo:reference>
对应的
ReferenceBean
等,都会被放入这个全局的缓存
DubboBootstrap.start()
会读取这个缓存,进而触发注册中心及暴露服务等逻辑。
3、监听Spring容器事件,触发 DubboBootstrap.start()
方法,启动服务暴露逻辑
DubboBootstrap.start()
暴露服务的入口在
DubboLifecycleComponentApplicationListener.onApplicationEvent(ApplicationEvent event)
方法里,调用时序图如下:
DubboLifecycleComponentApplicationListener.onApplicationEvent()
代码:
public void onApplicationEvent(ApplicationEvent event) {
if (this.supportsEvent(event)) {
if (event instanceof ContextRefreshedEvent) {
this.onContextRefreshedEvent((ContextRefreshedEvent)event);
} else if (event instanceof ContextClosedEvent) {
this.onContextClosedEvent((ContextClosedEvent)event);
}
}
}
protected void onContextRefreshedEvent(ContextRefreshedEvent event) {
ApplicationContext context = event.getApplicationContext();
DubboBootstrap bootstrap = this.loadBootsttrapAsBean(context);
if (bootstrap == null) {
// 单例模式创建DubboBootstrap
bootstrap = DubboBootstrap.getInstance();
}
// 1.0
bootstrap.start();
}
start方法代码如下:
public DubboBootstrap start() {
if (this.started.compareAndSet(false, true)) {
// 初始化配置,包括注册中心
this.initialize();
if (this.logger.isInfoEnabled()) {
this.logger.info(NAME + " is starting...");
}
// 暴露服务
this.exportServices();
if (!this.isOnlyRegisterProvider() || this.hasExportedServices()) {
this.exportMetadataService();
this.registerServiceInstance();
}
// 解析并注册consumer配置
this.referServices();
if (this.logger.isInfoEnabled()) {
this.logger.info(NAME + " has started.");
}
}
return this;
}
4、根据 ServiceBean
配置进行服务暴露
ServiceBean
在第2步中,配置会被解析成
ServiceBean
,并缓存到全局的
ConfigManager
对象中,这里会从这个全局对象中取出配置,逐个进行服务暴露,主干代码如下:
private void exportServices() {
this.configManager.getServices().forEach((sc) -> {
ServiceConfig serviceConfig = (ServiceConfig)sc;
serviceConfig.setBootstrap(this);
...
sc.export();
this.exportedServices.add(sc);
});
}
4.1 获取注册中心配置及协议配置
接着上面的代码,dubbo支持很多种协议,包括默认的dubbo协议、hessian、http、thrift等,协议之间的差别可以自行百度下。
public synchronized void export() {
if (this.shouldExport()) {
if (this.bootstrap == null) {
this.bootstrap = DubboBootstrap.getInstance();
this.bootstrap.init();
}
// 检查和修改子配置,其中包括检查注册中心地址、协议配置等
this.checkAndUpdateSubConfigs();
...
this.doExport();
}
}
检查配置源码如下:
private void checkAndUpdateSubConfigs() {
...
// 检查协议配置,如果没有则使用默认的dubbo,并将ProtocolConfig配置set到protocols属性上
this.checkProtocol();
// 检查注册中心配置,并将RegistryConfig对象set到registries属性上
if (!this.isOnlyInJvm()) {
this.checkRegistry();
}
...
}
这里的配置都是从全局的的
ConfigManager
对象中取的,在所有的配置被Spring初始化后,都会放入这个缓存中。原理在上面已经讲到。
private void doExportUrls() {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(this.getInterfaceClass());
repository.registerProvider(this.getUniqueServiceName(), this.ref, serviceDescriptor, this, this.serviceMetadata);
// 获取注册中心地址
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
// 配置的协议类容
Iterator var4 = this.protocols.iterator();
while(var4.hasNext()) {
ProtocolConfig protocolConfig = (ProtocolConfig)var4.next();
...
// 以注册中心配置及协议配置处理后续的暴露逻辑
this.doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
4.2 根据需要暴露的bean动态生成代理类并包装成invoker链
这里调用到
doExportUrlsFor1Protocol
方法,我这里保留主干代码,如下:
private static final ProxyFactory PROXY_FACTORY = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs){
...
// 1 通过ProxyFactory生成代理类,构造invoker链
Invoker<?> invoker = PROXY_FACTORY.getInvoker(this.ref, this.interfaceClass, registryURL.addParameterAndEncoded("export", url.toFullString()));
// 包装类,内部的invoke方法只做了转发逻辑
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 2 以包装的invoker为参数,调用export暴露服务
Exporter<?> exporter = protocol.export(wrapperInvoker);
this.exporters.add(exporter);
}
PROXY_FACTORY是通过SPI扩展的实现类,在《Spring解析并注册Dubbo consumer端过程详解》里讲过,SPI扩展的类中有适配器类->包装类->实现类三种,返回的实际是个适配器类,debug图如下:
Wrapper
类是动态生成的,该类的
invokeMethod
只负责做了层校验,通过arthas反编译后的代码如下:
public Object invokeMethod(Object object, String string, Class[] classArray, Object[] objectArray) throws InvocationTargetException {
DemoServiceImpl demoServiceImpl;
try {
demoServiceImpl = (DemoServiceImpl)object;
}
catch (Throwable throwable) {
throw new IllegalArgumentException(throwable);
}
try {
if ("sayHello".equals(string) && classArray.length == 1) {
return demoServiceImpl.sayHello((String)objectArray[0]);
}
if ("sayHelloAsync".equals(string) && classArray.length == 1) {
return demoServiceImpl.sayHelloAsync((String)objectArray[0]);
}
}
catch (Throwable throwable) {
throw new InvocationTargetException(throwable);
}
throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(string).append("\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.").toString());
}
采用包装类的方式构成一个调用链,最终的invoker链如下:
4.3 使用Netty框架开启Socket服务
接着4.2中的源码,以4.2包装成的invoker为参数,调用
Exporter<?> exporter = protocol.export(wrapperInvoker)
方法,这里
protocol
是通过SPI扩展的类,由于这里的协议是
registry
,最后调用的是RegistryProtocol类,debug截图:
providerUrl
里包含了很多元数据信息,比如version、group等信息,这里会根据配置中心里的配置,覆盖这些信息,为后续的服务暴露做准备,当没有配置配置中心时,dubbo默认使用注册中心来当配置中心使用。我这里的注册中心是zookeeper,所以也会将zookeeper当做配置中心
public <T> Exporter<T> export(Invoker<T> originInvoker) throws RpcException {
// 获取注册中心URL
URL registryUrl = this.getRegistryUrl(originInvoker);
// 获取provider的URL
URL providerUrl = this.getProviderUrl(originInvoker);
URL overrideSubscribeUrl = this.getSubscribedOverrideUrl(providerUrl);
RegistryProtocol.OverrideListener overrideSubscribeListener = new RegistryProtocol.OverrideListener(overrideSubscribeUrl, originInvoker);
this.overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 根据配置中心的配置 修改providerUrl中的参数信息
providerUrl = this.overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// 暴露服务
RegistryProtocol.ExporterChangeableWrapper<T> exporter = this.doLocalExport(originInvoker, providerUrl);
// 4.3.2 将服务注册到注册中心
Registry registry = this.getRegistry(originInvoker);
URL registeredProviderUrl = this.getUrlToRegistry(providerUrl, registryUrl);
boolean register = providerUrl.getParameter("register", true);
if (register) {
this.register(registryUrl, registeredProviderUrl);
}
// 订阅注册中心的configurators节点信息,并注册监听器以便实时回调
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
return new RegistryProtocol.DestroyableExporter(exporter);
}
接着上面的
this.doLocalExport(originInvoker, providerUrl)
代码:
private <T> RegistryProtocol.ExporterChangeableWrapper<T> doLocalExport(Invoker<T> originInvoker, URL providerUrl) {
String key = this.getCacheKey(originInvoker);
return (RegistryProtocol.ExporterChangeableWrapper)this.bounds.computeIfAbsent(key, (s) -> {
// 将invoker包装成InvokerDelegate对象
Invoker<?> invokerDelegate = new RegistryProtocol.InvokerDelegate(originInvoker, providerUrl);
// 暴露服务,并包装返回的Exporter,重点在this.protocol.export(invokerDelegate)
return new RegistryProtocol.ExporterChangeableWrapper(this.protocol.export(invokerDelegate), originInvoker);
});
}
private static final Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
Exporter<?> exporter = protocol.export(wrapperInvoker);
protocol
是通过SPI扩展的类,在上面已经讲到。只是在这里的url中的协议变成了dubbo,所以最终调用到
DubboProtocol
类的
export
方法,代码如下:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 将invoker和key包装成一个exporter并缓存到Map中,在接受到请求时,通过这个Map来匹配对应的invoker
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap);
this.exporterMap.put(key, exporter);
Boolean isStubSupportEvent = url.getParameter("dubbo.stub.event", false);
Boolean isCallbackservice = url.getParameter("is_callback_service", false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter("dubbo.stub.event.methods");
if (stubServiceMethods != null && stubServiceMethods.length() != 0) {
this.stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
} else if (this.logger.isWarnEnabled()) {
this.logger.warn(new IllegalStateException("consumer [" + url.getParameter("interface") + "], has set stubproxy support event ,but no stub methods founded."));
}
}
// 启动ServerSocket服务
this.openServer(url);
this.optimizeSerialization(url);
return exporter;
}
该方法最主要作用是两个:
- 以Url中的类名、端口、版本号、group分组四个维度组成key,以封装了invoker信息的Exporter为value,缓存到
类的DubboProtocol
中,在后续处理consumser的请求时,会通过这个Map来匹配对应的invoker进而处理请求,exporterMap
代码如下:serviceKey(URL url)
protected static String serviceKey(URL url) {
int port = url.getParameter("bind.port", url.getPort());
return serviceKey(port, url.getPath(), url.getParameter("version"), url.getParameter("group"));
}
- 通过Netty初始化一个NettyServer,并启动ServerSocket服务,监听端口事件,准备接收请求,代码如下:
private void openServer(URL url) {
String key = url.getAddress();
boolean isServer = url.getParameter("isserver", true);
if (isServer) {
ProtocolServer server = (ProtocolServer)this.serverMap.get(key);
if (server == null) {
synchronized(this) {
server = (ProtocolServer)this.serverMap.get(key);
if (server == null) {
// 创建Server,并将Server放到serverMap中缓存
this.serverMap.put(key, this.createServer(url));
}
}
} else {
server.reset(url);
}
}
}
this.serverMap.put(key, this.createServer(url));
最终调用
NettyTransporter.bind(URL url, ChannelHandler listener)
方法:
public RemotingServer bind(URL url, ChannelHandler listener) throws RemotingException {
return new NettyServer(url, listener);
}
在
NettyServer
构造器中,会开启Socket监听,并根据配置初始化线程池,代码如下:
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
String bindIp = this.getUrl().getParameter("bind.ip", this.getUrl().getHost());
int bindPort = this.getUrl().getParameter("bind.port", this.getUrl().getPort());
if (url.getParameter("anyhost", false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = "0.0.0.0";
}
this.bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter("accepts", 0);
this.idleTimeout = url.getParameter("idle.timeout", 600000);
try {
// 开启Socket服务
this.doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + this.getClass().getSimpleName() + " bind " + this.getBindAddress() + ", export " + this.getLocalAddress());
}
} catch (Throwable var6) {
throw new RemotingException(url.toInetSocketAddress(), (InetSocketAddress)null, "Failed to bind " + this.getClass().getSimpleName() + " on " + this.getLocalAddress() + ", cause: " + var6.getMessage(), var6);
}
// 根据SPI扩展机制初始化线程池
this.executor = this.executorRepository.createExecutorIfAbsent(url);
}
最终构成的调用链如下:
4.4 将provider信息发布到注册中心
this.register(registryUrl, registeredProviderUrl);
public void register(URL registryUrl, URL registeredProviderUrl) {、
// 获取注册中心
Registry registry = this.registryFactory.getRegistry(registryUrl);
// 将服务提供方发布到注册中心,底层使用zookeeper客户端curator来注册实现
registry.register(registeredProviderUrl);
ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
model.addStatedUrl(new RegisterStatedURL(registeredProviderUrl, registryUrl, true));
}
this.registryFactory
是通过SPI机制扩展类,返回的是一个适配器类,会根据
registryUrl
的协议头来决定返回那种注册中心对象,我这里配置的是zookeeper,debug截图如下:
到这里dubbo的服务提供方的服务暴露逻辑就完了,这里大量使用了包装类、调用链的设计模式,通过SPI来扩展各种功能。