服务发布端启动流程
- 配置承载初始化
- 远程服务发布(暴露)
-
- 1.ServiceConfig#export
- 2.ServiceConfigBase#shouldDelay
- 3.ServiceConfig#doExport
- 4.ServiceConfig#doExportUrls(重要)
- 5.ConfigValidationUtils#loadRegistries(待完善)
- 6.doExportUrlsFor1Protocol(重要)
- 7.RegistryProtocol#export(重要)
- 8.RegistryProtocol#doLocalExport
- 9.ProtocolFilterWrapper#export
- 10.ProtocolListenerWrapper#export
- 11.QosProtocolWrapper#export
- 12.DubboProtocol#export(重要)
- 13.DubboProtocol#openServer
- 14.DubboProtocol#createServer
- 15.Exchangers#bind
- 16.HeaderExchanger#bind
- 17.Transporters#bind
- 18.NettyTransporter#bind
- 19.NettyServer#doOpen
- 20.RegistryProtocol#getRegistry
- 21.ZookeeperRegistryFactory#getRegistry
- 22.ZookeeperRegistryFactory#createRegistry
- 23.FailbackRegistry#register(重要)
- 小结
- 本地服务发布(暴露)
-
- 1.ServiceConfig#exportLocal
- 2.InjvmProtocol#export
配置承载初始化
Dubbo框架会根据 优先级 对 配置信息 做 聚合处理,目前 默认覆盖策略 主要遵循以下几点规则:
- -D传递给JVM参数优先级最高
- 代码 或 XML配置优先级 次高
- 配置文件优先级最低
Dubbo的配置 也会受到 provider的影响,这个属于 运行期 属性值影响,同样遵循以下几点规则:
- 如果只有 provider段指定配置,则会 自动透传到客户端
- 如果 客户端也配置了响应属性,则 服务端配置 会被覆盖
一般 不允许 透传的属性 都会在 ClusterUtils#mergeUrl中 进行 特殊处理
远程服务发布(暴露)
Dubbo框架 做服务发布(暴露) 分为两大部分
- 将 持有的 服务实例 通过代理 转换成 Invoker(1-6)
- 把Invoker 通过具体的协议 转换成 Exporter(7-结束)
Invoker 可以简单理解成 一个真实的 服务对象实例
1.ServiceConfig#export
@Override
public synchronized void export() {
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.initialize();
}
checkAndUpdateSubConfigs();
initServiceMetadata(provider);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setTarget(getRef());
serviceMetadata.generateServiceKey();
if (!shouldExport()) {
return;
}
if (shouldDelay()) {
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
doExport();
}
exported();
}
private static final ScheduledExecutorService DELAY_EXPORT_EXECUTOR = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("DubboServiceDelayExporter", true));
如果没有设置延迟时间,则直接调用
doExport
方法发布服务
如果设置了延迟发布,则等时间过期后调用
doExport
方法来发布服务
2.ServiceConfigBase#shouldDelay
public boolean shouldDelay() {
Integer delay = getDelay();
return delay != null && delay > 0;
}
@Override
public Integer getDelay() {
return (delay == null && provider != null) ? provider.getDelay() : delay;
}
是否有延迟是根据
provider.getDelay()
3.ServiceConfig#doExport
protected synchronized void doExport() {
// unexported应该是取消发布(暴露)
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
if (exported) {
return;
}
exported = true;
if (StringUtils.isEmpty(path)) {
path = interfaceName;
}
doExportUrls();
bootstrap.setReady(true);
}
/**
* Whether the provider has been exported
*/
private transient volatile boolean exported;
/**
* The flag whether a service has unexported ,if the method unexported is invoked, the value is true
*/
private transient volatile boolean unexported;
/**
* The service name
*/
protected String path;
/**
* The interface name of the exported service
*/
protected String interfaceName;
4.ServiceConfig#doExportUrls(重要)
private void doExportUrls() {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
// TODO
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
// 获取 当前服务 对应的 注册中心实例
// TODO
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
int protocolConfigNum = protocols.size();
for (ProtocolConfig protocolConfig : protocols) {
// TODO
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
// TODO
repository.registerService(pathKey, interfaceClass);
doExportUrlsFor1Protocol(protocolConfig, registryURLs, protocolConfigNum);
}
}
5.ConfigValidationUtils#loadRegistries(待完善)
在Dubbo中,一个服务 可以被注册到 多个服务注册中心
6.doExportUrlsFor1Protocol(重要)
这个方法特别长
首先把参数封装为URL(在Dubbo里会把所有参数封装到一个URL里),然后 具体执行 服务导出
代码1解析MethodConfig对象 设置的 方法级别的配置 并保存到 参数map中
代码2用来判断调用类型,如果为泛型调用,则设置泛型类型(true、nativejava或bean方式)
代码5用来导出服务,Dubbo服务导出 分 本地导出 与 远程导出
本地导出使用了injvm协议,是一个伪协议,它不开启端口,不发起远程调用,只在JVM 内直接关联,但执行Dubbo的Filter链
在默认情况下,Dubbo同时支持本地导出与远程导出协议
可以通过ServiceConfig的setScope()方法设置,其中配置为none表示不导出服务,为remote表示只导出远程服务,为local表示只导出本地服务
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs, int protocolConfigNum) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
// 1 解析MethodConfig数据
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
ServiceConfig.appendRuntimeParameters(map);
// 读取其他配置信息到map,用后续构造URL
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ProviderConfig
// appendParameters(map, provider, Constants.DEFAULT_KEY);
AbstractConfig.appendParameters(map, provider);
AbstractConfig.appendParameters(map, protocolConfig);
AbstractConfig.appendParameters(map, this);
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
if (CollectionUtils.isNotEmpty(getMethods())) {
for (MethodConfig method : getMethods()) {
AbstractConfig.appendParameters(map, method, method.getName());
String retryKey = method.getName() + RETRY_SUFFIX;
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if (FALSE_VALUE.equals(retryValue)) {
map.put(method.getName() + RETRIES_SUFFIX, ZERO_VALUE);
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
AbstractConfig
.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException(
"Argument config error : the index attribute and type attribute not match :index :" +
argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException(
"Argument config error : the index attribute and type attribute not match :index :" +
argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException(
"Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
// 2 如果为泛型调用,设置 泛型类型
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
// 3 正常调用设置拼接URL的参数
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
/**
* Here the token value configured by the provider is used to assign the value to ServiceConfig#token
*/
if (ConfigUtils.isEmpty(token) && provider != null) {
token = provider.getToken();
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
//init serviceMetadata attachments
serviceMetadata.getAttachments().putAll(map);
// export service
// 4 拼接URL对象
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map, protocolConfigNum);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// You can customize Configurator to append extra parameters
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
// 5 导出服务 本地服务,远程服务
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
// 如果scope为SCOPE_NONE,则不导出服务
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
// 如果scope不是SCOPE_LOCAL,则 导出 本地服务
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
// 如果scope不是SCOPE_LOCAL,则 导出 远程服务
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
// 如果有服务中心注册地址
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())) {
url = url.addParameterIfAbsent(SERVICE_NAME_MAPPING_KEY, "true");
}
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
// 如果配置了监控地址,则服务调用信息会上报
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " +
registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 通过 动态代理 转换成Invoker
// registryURL存储的是 注册中心地址
// 使用export作为key 追加 服务(的)元数据信息
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 把Invoker转换成Exporter
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
// 没有注册中心,直接发布(暴露)服务
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
// 元数据存储
MetadataUtils.publishServiceDefinition(url);
}
}
this.urls.add(url);
}
如果使用MetadataReportConfig设置了元数据存储信息,则将元数据 保存到 指定配置中心
在Dubbo 2.7.0中 对 服务(的)元数据进行了改造,其把原来都保存到服务注册中心的元数据进行了分类存储,注册中心 将只用于 存储 关键服务信息,比如 服务提供者地址列表、完整的接口定义等
Dubbo 2.7.0使用更专业的配置中心,如Nacos、Apollo、Consul和Etcd等,提供更灵活、更丰富的配置规则,包括服务和应用不同粒度的配置、更丰富的路由规则和集中式管理的动态参数规则等
重点看这几行
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker =
new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
// PROXY_FACTORY
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
// PROTOCOL
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
ProxyFactory 和 Protocol 都是 扩展接口的 适配器类
执行代码
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url)
时
实际上是首先执行扩展接口ProxyFactory 的 适配器类ProxyFactory$Adaptive 的
getInvoker()
方法
getInvoker()
方法内部 根据 URL里的 (参数)proxy的类型 选择 具体的 代理工厂,这里默认的proxy类型为javassist
所以又调用了JavassistProxyFactory的
getInvoker()
方法获取了代理类
getExtensionLoader
方法返回的是ExtensionLoader<T>
getAdaptiveExtension
方法 是 通过动态代理 生成 扩展接口ProxyFactory 的 适配器类ProxyFactory$Adaptive 的 实例对象
JavassistProxyFactory的
getInvoker()
方法的代码如下:
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
JavassistProxyFactory首先把 服务实现类 转换为 Wrapper 类,是为了减少反射的调用,这里返回的是 AbstractProxyInvoker对象,其内部重写
doInvoke()
方法,并委托给Wrapper 实现具体功能
到这里就完成了 服务提供方实现类 到 Invoker 的转换
在子类中实现invokeMethod方法,方法体内 会为每个ref方法 都做 方法名和方法参数匹配校验,匹配成功则直接可调用
对比看一下JdkProxyFactory
public class JdkProxyFactory extends AbstractProxyFactory {
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}
}
JdkProxyFactory 是通过反射获取真实对象的方法,然后调用
JavassistProxyFactory 相比 JdkProxyFactory 减少了反射开销
另外,当执行
protocol.export(wrapperInvoker)
方法的时候,实际调用了Protocol 的适配器类 Protocol$Adaptive 的
export()
方法
如果为远程服务发布(暴露),则 其内部 根据 URL中Protocol的类型为registry,会选择Protocol的实现类RegistryProtocol
如果为本地服务发布(暴露),则 其内部 根据URL中Protocol的类型为injvm,会选择Protocol的实现类InjvmProtocol
发布的时候Protocol的类型在哪里确定的?
在ConfigValidationUtils.loadRegistries方法中的
extractRegistryType(url)
另外,上面说到的"URL中Protocol的类型"的URL,指的是invoker中的url
由于Dubbo SPI 的扩展点使用了Wrapper自动增强,这里就使用了ProtocolFilterWrapper、ProtocolListenerWrapper、QosProtocolWrapper 对其进行了增强,所以需要一层层调用才会调用到RegistryProtocol的
export()
方法
7.RegistryProtocol#export(重要)
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
// export invoker
// 重要!!!
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// url to registry
// 创建注册中心实例,重要!!!
final Registry registry = getRegistry(originInvoker);
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
// 注册 服务(的)元数据,重要!!!
registry.register(registeredProviderUrl);
}
// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
注意registryUrl 、providerUrl、registeredProviderUrl几个变量值的不同
8.RegistryProtocol#doLocalExport
RegistryProtocol的
doLocalExport()
方法内 调用了 Protocol的适配器类Protocol$Adaptive,这里 URL内的 协议类型是dubbo,所以 返回的SPI扩展实现类 是 DubboProtocol,由于DubboProtocol也被Wrapper类增强了,所以也是一层层调用后,才执行到上面时序图中的步骤8,即调用DubboProtocol的
export()
方法
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
9.ProtocolFilterWrapper#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
// 先构造 拦截器链 然后触发Dubbo协议 暴露
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
在触发 Dubbo协议 暴露(服务)之前 先对 服务Invoker 做了一层拦截器构建,在 加载所有拦截器时 会过滤 只对provider生效的数据
传入到
buildInvokerChain
方法中的参数是:
String SERVICE_FILTER_KEY = "service.filter";
String PROVIDER = "provider";
看一下
buildInvokerChain
方法
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
// 根据url + service.filter + provider 获得 扩展接口Filter的 多个 自动激活实现类 的 实例对象
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
last = new FilterNode<T>(invoker, last, filter);
}
}
return last;
}
获取 真实服务ref 对应的 Invoker 并 挂载到 整个拦截器链的尾部,然后 逐层 包裹 其他拦截器
10.ProtocolListenerWrapper#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 如果 URl 协议是 registry 则不处理,直接转换invoker
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
// 否则对 Exporter 进行增强,这里是 获取 SPI接口-ExporterListener的实现类 作为入参
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), EXPORTER_LISTENER_KEY)));
}
11.QosProtocolWrapper#export
QoS(Quality of Service,服务质量)指 一个网络 能够 利用 各种基础技术,为 指定的网络通信 提供 更好的服务能力,是网络的一种安全机制, 是用来 解决 网络延迟和阻塞等问题 的 一种技术
dubbo为用户提供类似的网络服务 用来online和offline service 来解决 网络延迟,阻塞等问题
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
// 启动Qos 服务,其中通过Cas 判断是否已经开启过,确保只启动一次
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}
可以通过
qos.enable
参数控制是否开启
看一下
startQosServer
方法
private void startQosServer(URL url) {
try {
if (!hasStarted.compareAndSet(false, true)) {
return;
}
boolean qosEnable = url.getParameter(QOS_ENABLE, true);
if (!qosEnable) {
logger.info("qos won't be started because it is disabled. " +
"Please check dubbo.application.qos.enable is configured either in system property, " +
"dubbo.properties or XML/spring-boot configuration.");
return;
}
String host = url.getParameter(QOS_HOST);
int port = url.getParameter(QOS_PORT, QosConstants.DEFAULT_PORT);
boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP, "false"));
Server server = Server.getInstance();
server.setHost(host);
server.setPort(port);
server.setAcceptForeignIp(acceptForeignIp);
server.start();
} catch (Throwable throwable) {
logger.warn("Fail to start qos server: ", throwable);
}
}
public interface QosConstants {
String QOS_ENABLE = "qos.enable";
String QOS_HOST = "qos.host";
String QOS_PORT = "qos.port";
String ACCEPT_FOREIGN_IP = "qos.accept.foreign.ip";
}
9-11 阅读参考,上面时序图中的顺序并不是真是的顺序:
- https://blog.csdn.net/qq_36882793/article/details/114970511
- https://blog.csdn.net/luzhensmart/article/details/108548095
12.DubboProtocol#export(重要)
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
// 根据 服务分组、版本、接口、端口 构造key
String key = serviceKey(url);
// 转换成 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 保存到缓存exporterMap中
exporterMap.addExportMap(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
// 服务初次发布(暴露) 会 创建 监听服务器
openServer(url);
optimizeSerialization(url);
return exporter;
}
将Invoker 转换为 DubboExporter 对象
并且 把DubboExporter 保存到了 缓存exporterMap里(在 服务提供方 处理请求时 会从exporterMap中获取出来)
然后执行上面时序图中步骤10的
openServer()
方法
13.DubboProtocol#openServer
先来看一下openServer及后续流程的时序图
private void openServer(URL url) {
// find server.
// 提供者机器的地址
String key = url.getAddress();
//client can export a service which's only for server to invoke
// 只有 服务端 提供 才会 启动监听
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
首先是获取 当前机器地址信息(ip:port)并作为key,然后判断当前是否为服务提供端
如果是,则以此key 为key 查看缓存serverMap中 是否有 对应的Server,如果没有则调用
createServer
方法来创建,否则 返回 缓存中的value
由于每个机器的ip:port是唯一的,所以 多个不同服务 启动时 只有 第一个会被创建,后面的服务都是直接从缓存中返回的
14.DubboProtocol#createServer
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 创建NettyServer 并 初始化Handler
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return new DubboProtocolServer(server);
}
15.Exchangers#bind
Exchangers位于交换层,倒数第三层
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).bind(url, handler);
}
看一下
getExchanger
方法
public static Exchanger getExchanger(URL url) {
String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
return getExchanger(type);
}
String EXCHANGER_KEY = "exchanger";
String DEFAULT_EXCHANGER = "header";
public static Exchanger getExchanger(String type) {
return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
}
获取url中key=exchanger的参数值,作为Exchanger的类型,Exchanger默认类型为header
实际上就是通过
getExtension
方法获取 扩展接口Exchanger 的 实现类 HeaderExchanger
然后调用 HeaderExchanger#bind
16.HeaderExchanger#bind
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
17.Transporters#bind
Transporters位于传输层,倒数第二层
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
handler = new ChannelHandlerDispatcher(handlers);
}
return getTransporter().bind(url, handler);
}
getTransporter方法如下,就是获取 扩展接口Transporter的 适配器类 Transporter$Adaptive
public static Transporter getTransporter() {
return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
}
然后 Transporter$Adaptive 的bind 获取URL中的server和transporter两个参数值,确定使用哪一个Transporter的具体实现了,默认是NettyTransporter,所以,最终调用的是NettyTransporter#bind方法
18.NettyTransporter#bind
@Override
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
NettyServer 的构造函数如下:
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
try {
// 重要!!!
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
executor = executorRepository.createExecutorIfAbsent(url);
}
public AbstractEndpoint(URL url, ChannelHandler handler) {
super(url, handler);
this.codec = getChannelCodec(url);
this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT);
}
19.NettyServer#doOpen
@Override
protected void doOpen() throws Throwable {
// 创建ServerBootstrap
bootstrap = new ServerBootstrap();
// 设置Netty的boss线程池 和 worker线程池
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
// 配置NettyServer,添加 handler 到 管线
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
boolean keepalive = getUrl().getParameter(KEEP_ALIVE_KEY, Boolean.FALSE);
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_KEEPALIVE, keepalive)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
// 添加handler到 接收链接 的 管线
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
// 重要!!!
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
// 心跳检查
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
// 绑定本地端口,并启动监听服务
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
至此,服务提供方的NettyServer已经启动了
这里需要注意是,将 NettyServerHandler 和 编解码Handler 注册到了 每个接收链接 的 通道(channel)管理(动词)的管线中
20.RegistryProtocol#getRegistry
回到7-RegistryProtocol#export方法,继续往下看,在doLocalExport之后,就是getRegistry 和 register两步
先看一下时序图
步骤2 用来获取 服务注册中心,其中RegistryFactory为扩展接口,所以这里 通过适配器类 来确定 RegistryFactory的扩展实现 为 ZookeeperRegistryFactory
然后后者(ZookeeperRegistryFactory)内部调用
createRegistry()
方法创建了一个ZookeeperRegistry作为ZooKeeper注册中心
protected Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return getRegistry(registryUrl);
}
protected Registry getRegistry(URL url) {
try {
return registryFactory.getRegistry(url);
} catch (Throwable t) {
LOGGER.error(t.getMessage(), t);
throw t;
}
}
RegistryFactory 也是一个 扩展接口,对应的适配器类 是 RegistryFactory$Adaptive,会根据URL中的protocol参数获取具体的扩展实现,默认是DubboRegistryFactory
然后执行的是DubboRegistryFactory#getRegistry
如果设置protocol参数值=zookeeper,则会执行ZookeeperRegistryFactory#getRegistry
21.ZookeeperRegistryFactory#getRegistry
ZookeeperRegistryFactory继承了AbstractRegistryFactory
ZookeeperRegistryFactory#getRegistry 其实执行的是 AbstractRegistryFactory#getRegistry
@Override
public Registry getRegistry(URL url) {
Registry defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
if (null != defaultNopRegistry) {
return defaultNopRegistry;
}
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
// 独占锁 保证 同时 只有一个线程实例 创建 服务注册实例
LOCK.lock();
try {
// double check
// fix https://github.com/apache/dubbo/issues/7265.
defaultNopRegistry = getDefaultNopRegistryIfDestroyed();
if (null != defaultNopRegistry) {
return defaultNopRegistry;
}
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
// 创建服务注册中心
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
// 释放锁
LOCK.unlock();
}
}
protected static final ReentrantLock LOCK = new ReentrantLock();
protected static final Map<String, Registry> REGISTRIES = new HashMap<>();
22.ZookeeperRegistryFactory#createRegistry
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
23.FailbackRegistry#register(重要)
RegistryProtocol#export中
registry.register
调用的是FailbackRegistry#register
FailbackRegistry#register中会调用
super.register(url);
,也就是AbstractRegistryFactory#register
AbstractRegistryFactory#register仅仅是将注册地址 添加到 注册中心列表中
然后会执行
doRegister
方法,对应到ZookeeperRegistry#doRegister,主要的操作就是zkClient.create,将服务的url创建到zk的节点上
和上面时序图中的7、8步骤略有不同
相关内容见下面小结链接
小结
ExchangeServer、Exchangers、Transport 相关内容请见
- Dubbo-Exchanger
- Dubbo-Transport
- Dubbo-RemotingServer
ZooKeeper注册中心相关内容请见
- Dubbo注册中心(二)-ZooKeeper
- Dubbo注册中心(七)-AbstractRegistry及相关内容
- Dubbo注册中心(五)-FailbackRegistry+重试机制
本地服务发布(暴露)
很多使用Dubbo框架的应用 可能存在 同一个JVM暴露了远程服务,同时 同一个JVM内部 又引用了自身服务的情况
Dubbo会默认 把远程服务 用injvm协议 再暴露一份,这样 消费方 直接消费同一JVM内部的服务,避免了 跨网络进行远程通信
1.ServiceConfig#exportLocal
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
Exporter<?> exporter = PROTOCOL.export(
PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
String LOCAL_PROTOCOL = "injvm";
String LOCALHOST_VALUE = "127.0.0.1";
上面的
export
方法会走到InjvmProtocol#export
InjvmProtocol这个协议比较特殊,不会做端口打开操作,仅仅是 把服务 保存到 内存中
2.InjvmProtocol#export
export方法直接返回InjvmExporter实例对象
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
String serviceKey = invoker.getUrl().getServiceKey();
InjvmExporter<T> tInjvmExporter = new InjvmExporter<>(invoker, serviceKey, exporterMap);
exporterMap.addExportMap(serviceKey, tInjvmExporter);
return tInjvmExporter;
}
InjvmExporter(Invoker<T> invoker, String key, DelegateExporterMap delegateExporterMap) {
super(invoker);
this.key = key;
this.delegateExporterMap = delegateExporterMap;
}