天天看点

dubbo协议_dubbo服务之底层通讯协议(Protocol)

我们先来找到通讯协议的入口点吧。通过Protocol接口查找通讯协议入口点,我们根据接口的export方法搜索发现入口了,在ServiceConfig的doExportUrlsFor1Protocol方法,如下图:

dubbo协议_dubbo服务之底层通讯协议(Protocol)

然后我们进入 protocol.export(invoker)方法发现有很多实现类,根据spi(不懂的请看之前写的容器篇)查看配置文件能找到如下

dubbo协议_dubbo服务之底层通讯协议(Protocol)
registry=com.alibaba.dubbo.registry.integration.RegistryProtocoldubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol   //这个是默认的,我们在Protocol接口上可以看到spi的注解filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapperlistener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrappermock=com.alibaba.dubbo.rpc.support.MockProtocolinjvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocolrmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocolhessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocolcom.alibaba.dubbo.rpc.protocol.http.HttpProtocolcom.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocolthrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocolmemcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocolredis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocolrest=com.alibaba.dubbo.rpc.protocol.rest.RestProtocol
           

进入DubboProtocol.export(Invoker invoker)方法里面有个 openServer(url);

代码:

  private void openServer(URL url) {        // find server.        String key = url.getAddress();        //client 也可以暴露一个只有server可以调用的服务。        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY,true);        if (isServer) {        ExchangeServer server = serverMap.get(key);        if (server == null) {        serverMap.put(key, createServer(url)); //createServer是创建服务        } else {        //server支持reset,配合override功能使用        server.reset(url);        }        }    }
           

继续进入createServer,上源码

    private ExchangeServer createServer(URL url) {        //默认开启server关闭时发送readonly事件        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());        //默认开启heartbeat        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);        if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))            throw new RpcException("Unsupported server type: " + str + ", url: " + url);        url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);        ExchangeServer server;        try {            server = Exchangers.bind(url, requestHandler);        } catch (RemotingException e) {            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);        }        str = url.getParameter(Constants.CLIENT_KEY);        if (str != null && str.length() > 0) {            Set supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();            if (!supportedTypes.contains(str)) {                throw new RpcException("Unsupported client type: " + str);            }        }        return server;    }
           

dubbo从要暴漏的服务的URL中取得相关的配置(host,port等)进行服务端server的创建,同上面的server = Exchangers.bind(url, requestHandler) 正式创建服务。

所以基本的创建步骤是

export() --> openServer() --> createServer() --> server = Exchangers.bind(url, requestHandler);

我们进行来看 Exchangers.bind(url, requestHandler)

源码:

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {      if (url == null) {          throw new IllegalArgumentException("url == null");      }      if (handler == null) {          throw new IllegalArgumentException("handler == null");      }      url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");      return getExchanger(url).bind(url, handler);  }
           

然后通过getExchanger(url).bind(url, handler)的bing进入 HeaderExchanger类

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }

在进入Transporters类的bing的

public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); }
dubbo协议_dubbo服务之底层通讯协议(Protocol)

通过bing可以知道他讲调用:GrizzlyTransporter,MinaTransporter,NettyTransporter 通过spi默认是调用NettyTransporter

到这里我们基本明白dubbo的通讯默认是交给了netty来处理,

我们在看下doOPen方法

@Override protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true)); ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true)); ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS)); bootstrap = new ServerBootstrap(channelFactory); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); channels = nettyHandler.getChannels(); // https://issues.jboss.org/browse/NETTY-365 // https://issues.jboss.org/browse/NETTY-379 // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true)); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder());//解码 pipeline.addLast("encoder", adapter.getEncoder());//编码 pipeline.addLast("handler", nettyHandler); return pipeline; } }); // bind channel = bootstrap.bind(getBindAddress()); }

了解netty的同学,肯定早已习惯这个方法的写法,就是创建了netty的server嘛,到这里dubbo的服务创建完毕了,这个时候控制台见打印:

[DUBBO] Start NettyServer bind /0.0.0.0:20880, export /192.168.4.241:20880, dubbo version: 2.8.4, current host: 127.0.0.1

好了 在下面netty的底层就不在这里讲解了本人内功不够。