天天看点

OKHttp 3.10源码解析(四):连接机制

OKhttp的底层用的是Socket连接而不是URLConnection,所以整体来说还是比较复杂的,涉及到Http协议的封装和解封装、TLS/SSL安全协议的封装、Http2的封装等等,但还是非常值得我们去学习一下的,本篇文章不会对这些底层原理有详细的解析,但读完之后至少大概知道Http请求的底层是如何实现的

一.HTTP请求的优化

1.keep-alive机制

我们知道,一个HTTP的请求需要经过三次握手和四次挥手,一般的流程为先tcp握手,然后传输数据,后面释放资源。在HTTP1.0是没有keep-alive机制的,它的流程大概如下:

OKHttp 3.10源码解析(四):连接机制

 这种请求方式本来没什么问题,但是在复杂的请求场景中就显示出它的缺点了。比如在一个网页中同时有十几个资源要请求,按照HTTP1.0的做法, 每一个请求都要创建一个tcp连接,这样就创建了十几个tcp连接,而每一个连接都要经过三次握手和四次挥手,这种情景下,资源消耗是很大的,极大得影响了网络请求效率。

针对这种情况,在HTTP1.x中提出了keep-alive机制,即当一个tcp连接请求完成以后不立即释放,如果下一个http请求的host跟上一次的相同,那么就复用上一次的tcp连接,这样就免去了tcp重复的创建和销毁的开销,流程如下

OKHttp 3.10源码解析(四):连接机制

一般在浏览器中,都会保持6~8个keep-alive的socket连接,且保持一定的生命周期,当不需要的时候才会关闭。

2.HTTP/2

HTTP/2致力于解决HTTP1.x中存在的一些不足,HTTP1.x中客户端想发起多个并行的请求就要建立多个tcp连接,HTTP1.x不会压缩请求头和响应头,导致流量的浪费,HTTP1.x不支持资源的优先级传输,所以针对这些问题,HTTP2实现了如下功能:

           ① 报头压缩:HTTP/2使用HPACK压缩格式压缩请求和响应报头数据,减少了传输流量的输出

           ② 多路复用:在HTTP1.x中每个tcp连接只能同时处理一个请求-响应,浏览器按照FIFO原则处理请求的执行,上一个请求未完成会阻塞下一个请求的执行。在HTTP/2中,我们可以把一个HTTP请求当作一个流,每个流分解成不同的帧,帧是传输的最小单位,而且每个帧都有标记来识别它属于哪个流,不同流的帧可以在同一个连接中交错发送,然后在另一端重新组装,这样就实现了一个连接可以有多个请求-响应并行执行

           ③ 数据流的优先级:HTTP/2中每个帧都是可以交错发送的,这样一来发送的顺序就很关键了,我们可以设置每个数据流的权重和依赖关系,这样就可以做到权重高的流优先发送

HTTP/2功能增强的核心是新增了二进制分帧层Binary Framing,它定义了如何封装HTTP消息并在客户端和服务端之间进行传输

OKHttp 3.10源码解析(四):连接机制

HTTP/2的数据传输是以帧的方式交错发送的,如下图示:

OKHttp 3.10源码解析(四):连接机制

二.关键类:ConnectionPool、RealConnection、StreamAllocation

上面讲了http1.x和http2的相关知识,在OKhttp中不管是哪种连接都需要用到连接池来维护,通过连接池可以有效地提高连接的使用效率,连接池的实现类为ConnectionPool

1.ConnectionPool类详解

主要用来管理http1.x或者http/2的链接,该类实现了链接的复用和清理工作,来看看它的主要属性

//清除过期链接的线程池
    private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
      new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

  //最大的空闲链接数和保活时间
  private final int maxIdleConnections;
  private final long keepAliveDurationNs;
  //双端队列,保存链接
  private final Deque<RealConnection> connections = new ArrayDeque<>();
  final RouteDatabase routeDatabase = new RouteDatabase();
  boolean cleanupRunning;
  //创建连接池,默认最大空闲连接为5个,连接的最长空闲时间为5分钟
  public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

  public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
    this.maxIdleConnections = maxIdleConnections;
    this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

    // Put a floor on the keep alive duration, otherwise cleanup will spin loop.
    if (keepAliveDuration <= 0) {
      throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
    }
  }
           

既然是Http链接的管理,那么就会有添加连接、获取连接、删除连接等行为的实现

get方法:获取连接

//获取连接
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
    assert (Thread.holdsLock(this));
    //遍历已有的连接
    for (RealConnection connection : connections) {
      //如果地址和路由都符合要求,则返回  
      if (connection.isEligible(address, route)) {
        streamAllocation.acquire(connection, true);
        return connection;
      }
    }
    return null;
  }
           

put方法:添加连接

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    //如果连接清除工作已停止,则触发清除工作
    if (!cleanupRunning) {
      cleanupRunning = true;
      executor.execute(cleanupRunnable);
    }
    //将连接添加到连接池
    connections.add(connection);
  }
           

每当添加一个新的连接,如果当前连接的清理工作已经停止,则会触发连接的清理工作,把符合清除条件的连接清理掉或者在指定时间删除即将过期的连接,连接清除工作需要启动一个工作线程来执行,来看看

连接的清理:

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      //一个无限循环
      while (true) {
        //调用leanup方法清除,返回下一次清理的等待时间
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return; //没有连接可清理了,结束清理线程
        //等待一定的时间
        if (waitNanos > 0) {
          long waitMillis = waitNanos / 1000000L;
          waitNanos -= (waitMillis * 1000000L);
          synchronized (ConnectionPool.this) {
            try {
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };
           

清理连接的核心在cleanup方法

long cleanup(long now) {
    int inUseConnectionCount = 0; //正在使用的连接数
    int idleConnectionCount = 0; //空闲的连接数
    RealConnection longestIdleConnection = null; //空闲时间最长的连接
    long longestIdleDurationNs = Long.MIN_VALUE; 

    //遍历所有的连接,找出符合条件的连接并且清除
    synchronized (this) {
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        //如果当前连接正在使用中,继续下一个查找
        // If the connection is in use, keep searching.
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }
        //空闲连接数加1
        idleConnectionCount++;

        // If the connection is ready to be evicted, we're done.
        long idleDurationNs = now - connection.idleAtNanos;
        if (idleDurationNs > longestIdleDurationNs) {
          //找出空闲时间最长的那个连接以及它的空闲时间时长
          longestIdleDurationNs = idleDurationNs;
          longestIdleConnection = connection;
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {
        //有符合清理条件的连接,清除空闲时间最长的那个连接 
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {
        //有空闲连接,返回空闲时间最长的那个连接的剩余时间,然后线程会等待这个时间后再回来清理
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        //连接都在使用种,没有空闲连接,则等待5分钟(最大空闲时间)再次执行清理工作
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
        //没有连接
        cleanupRunning = false;
        return -1;
      }
    }
    //关闭清理连接的socket
    closeQuietly(longestIdleConnection.socket());

    // 执行完清理某个连接以后,马上再执行清理工作
    return 0;
  }
           

cleanup方法主要实现以下三个可能:

1.清理符合条件的连接,返回0,马上又进入下一轮清理工作

2.没有符合清理条件的连接,但是有空闲连接,返回空闲最长的那个连接的剩余时间,线程在等待这个剩余时间以后再进行清理工作

3.没有连接了,返回-1结束清理工作线程

连接池用一个双端队列来保存连接,默认空闲连接的最大数量为5,连接的最长空闲时间为5分钟,每一个连接都代表一个Http连接或TCP连接,用RealConnection表示

2.RealConnection类详解

RealConnection是Connection的实现类,代表一个真正的socket连接,RealConnection代表着客户端和服务器已经有了一个通信链路,先来对它的属性和构造函数有个大致的了解

private static final String NPE_THROW_WITH_NULL = "throw with null exception";
  private static final int MAX_TUNNEL_ATTEMPTS = 21;

  private final ConnectionPool connectionPool;
  private final Route route;
  /** The low-level TCP socket. */
  private Socket rawSocket; //底层socket

  //应用层socket,通过底层socket封装而来,比如TLS/SSL的连接,此时的应用层socket就是
  //将底层socket经过TLS/SSL的封装而来的,即这是一个有加密通道的安全的socket连接
  //数据发送经过这个socket以后都是经过加密的,接收的加密数据经过这个socket也是可以解密的
  private Socket socket; 
  private Handshake handshake; //握手
  private Protocol protocol; //协议
  private Http2Connection http2Connection; //http2的连接
  //与服务器的输入输出流
  private BufferedSource source;
  private BufferedSink sink;

  public boolean noNewStreams;

  public int successCount;

  public int allocationLimit = 1;

  /** Current streams carried by this connection. */
  public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();

  /** Nanotime timestamp when {@code allocations.size()} reached zero. */
  public long idleAtNanos = Long.MAX_VALUE;

  public RealConnection(ConnectionPool connectionPool, Route route) {
    this.connectionPool = connectionPool;
    this.route = route;
  }
           

来看看RealConnection类的连接方法connect,主要是建立普通socket连接或者隧道连接(不太懂),然后如果是安全的SSL连接则建立SSL的socket连接,如果是HTTP2连接,则建立HTTP2的相关协议

public void connect(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
      EventListener eventListener) {
    if (protocol != null) throw new IllegalStateException("already connected");

    RouteException routeException = null;
    List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);

    if (route.address().sslSocketFactory() == null) {
      if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication not enabled for client"));
      }
      String host = route.address().url().host();
      if (!Platform.get().isCleartextTrafficPermitted(host)) {
        throw new RouteException(new UnknownServiceException(
            "CLEARTEXT communication to " + host + " not permitted by network security policy"));
      }
    }
    //连接开始
    while (true) {
      try {
        //要求隧道模式,则建立通道连接,通常不是这种
        if (route.requiresTunnel()) {
          connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
          if (rawSocket == null) {
            // We were unable to connect the tunnel but properly closed down our resources.
            break;
          }
        } else {
          //创建socket连接,一般都走这条逻辑  
          connectSocket(connectTimeout, readTimeout, call, eventListener);
        }
        //建立协议,这一步是建立好TCP连接之后,而在该TCP能被用来收发数据之前执行的
        //它主要为了数据的加密传输做一些初始化,比如TLS握手,HTTP/2的协商
        establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
        eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
        break;
      } catch (IOException e) {
        closeQuietly(socket);
        closeQuietly(rawSocket);
        socket = null;
        rawSocket = null;
        source = null;
        sink = null;
        handshake = null;
        protocol = null;
        http2Connection = null;

        eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);

        if (routeException == null) {
          routeException = new RouteException(e);
        } else {
          routeException.addConnectException(e);
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException;
        }
      }
    }

    if (route.requiresTunnel() && rawSocket == null) {
      ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
          + MAX_TUNNEL_ATTEMPTS);
      throw new RouteException(exception);
    }

    if (http2Connection != null) {
      synchronized (connectionPool) {
        allocationLimit = http2Connection.maxConcurrentStreams();
      }
    }
  }
           

来看建立普通的socket连接connectSocket方法

private void connectSocket(int connectTimeout, int readTimeout, Call call,
      EventListener eventListener) throws IOException {
    Proxy proxy = route.proxy();
    Address address = route.address();
    //根据代理类型来选择socket类型
    rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
        ? address.socketFactory().createSocket()
        : new Socket(proxy);
    //设置监听和超时
    eventListener.connectStart(call, route.socketAddress(), proxy);
    rawSocket.setSoTimeout(readTimeout);
    try {
      //连接socket,利用Platfor是为了适应不同平台
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
    } catch (ConnectException e) {
      ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
      ce.initCause(e);
      throw ce;
    }

    try {
      //通过OKio获取socket的输入输出流
      source = Okio.buffer(Okio.source(rawSocket));
      sink = Okio.buffer(Okio.sink(rawSocket));
    } catch (NullPointerException npe) {
      if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
        throw new IOException(npe);
      }
    }
  }
           

建立好socket连接以后,接下来建立网络协议相关,比如TLS/SSL、http1或http2等

private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
      int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
    //如果不是ssl连接,则底层socket和应用层socket相同
    if (route.address().sslSocketFactory() == null) {
      protocol = Protocol.HTTP_1_1;
      socket = rawSocket;
      return;
    }

    eventListener.secureConnectStart(call);
    //如果是ssl,建立Tls连接
    connectTls(connectionSpecSelector);
    eventListener.secureConnectEnd(call, handshake);
    //如果是HTTP2的连接,则建立http2相关
    if (protocol == Protocol.HTTP_2) {
      socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
      http2Connection = new Http2Connection.Builder(true)
          .socket(socket, route.address().url().host(), source, sink)
          .listener(this)
          .pingIntervalMillis(pingIntervalMillis)
          .build();
      http2Connection.start();
    }
  }
           

如果是TLS连接,则执行下面方法

private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
    Address address = route.address();
    SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
    boolean success = false;
    SSLSocket sslSocket = null;
    try {
      // Create the wrapper over the connected socket.
      // 因为是SSL的连接,所以在原来的socket上加一层SSL
      sslSocket = (SSLSocket) sslSocketFactory.createSocket(
          rawSocket, address.url().host(), address.url().port(), true /* autoClose */);
      //配置SSL扩展参数
      // Configure the socket's ciphers, TLS versions, and extensions.
      ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
      if (connectionSpec.supportsTlsExtensions()) {
        Platform.get().configureTlsExtensions(
            sslSocket, address.url().host(), address.protocols());
      }
      //启动TLS握手
      // Force handshake. This can throw!
      sslSocket.startHandshake();
      //获取证书信息
      // block for session establishment
      SSLSession sslSocketSession = sslSocket.getSession();
      if (!isValid(sslSocketSession)) {
        throw new IOException("a valid ssl session was not established");
      }
      Handshake unverifiedHandshake = Handshake.get(sslSocketSession);
      //验证证书是否可行,不行的话抛出异常
      // Verify that the socket's certificates are acceptable for the target host.
      if (!address.hostnameVerifier().verify(address.url().host(), sslSocketSession)) {
        X509Certificate cert = (X509Certificate) unverifiedHandshake.peerCertificates().get(0);
        throw new SSLPeerUnverifiedException("Hostname " + address.url().host() + " not verified:"
            + "\n    certificate: " + CertificatePinner.pin(cert)
            + "\n    DN: " + cert.getSubjectDN().getName()
            + "\n    subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
      }
      
      // Check that the certificate pinner is satisfied by the certificates presented.
      address.certificatePinner().check(address.url().host(),
          unverifiedHandshake.peerCertificates());
      // 证书验证成功的话,则保持选择的应用层协议,主要是HTTP1和HTTP2两个协议
      // Success! Save the handshake and the ALPN protocol.
      String maybeProtocol = connectionSpec.supportsTlsExtensions()
          ? Platform.get().getSelectedProtocol(sslSocket)
          : null;
      //将sslSocket赋值给全局的应用层socket,并获取socket的输入输出流
      socket = sslSocket;
      source = Okio.buffer(Okio.source(socket));
      sink = Okio.buffer(Okio.sink(socket));
      handshake = unverifiedHandshake;
      //保存应用层协议
      protocol = maybeProtocol != null
          ? Protocol.get(maybeProtocol)
          : Protocol.HTTP_1_1;
      success = true;
    } catch (AssertionError e) {
      if (Util.isAndroidGetsocknameError(e)) throw new IOException(e);
      throw e;
    } finally {
      if (sslSocket != null) {
        Platform.get().afterHandshake(sslSocket);
      }
      if (!success) {
        closeQuietly(sslSocket);
      }
    }
  }
           

过程主要就是获得加了SSL层的sslSocket,然后是TLS握手、验证证书信息、获取IO流、保存连接协议,最后完成TLS的连接建立

3.StreamAllocation类解析

StreamAllocation从字面上理解叫流的分配,它是外界使用OKhttp的连接的桥梁,通过newStream方法返回一个可使用的流,即HttpCodec实例,HttpCodec其实是对http1.x或http2协议的封装,还包含对应的Socket的IO流,通过HttpCodec可以将输入数据封装成相应的Http协议的数据,然后通过socket的输出流发送出去,同时也可以通过socket的输入流读取数据并解析数据,下面来看看newStream方法

public HttpCodec newStream(
      OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
    int connectTimeout = chain.connectTimeoutMillis();
    int readTimeout = chain.readTimeoutMillis();
    int writeTimeout = chain.writeTimeoutMillis();
    int pingIntervalMillis = client.pingIntervalMillis();
    boolean connectionRetryEnabled = client.retryOnConnectionFailure();

    try {
      //从连接池中获取一个可行的连接
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
      //获取一个HttpCodec实例,HttpCodec主要是对httpx和http/2的连接和流的封装
      HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

      synchronized (connectionPool) {
        codec = resultCodec;
        return resultCodec;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }
           

来看findHealthyConnection方法

private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
      boolean doExtensiveHealthChecks) throws IOException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          pingIntervalMillis, connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == 0) {
          return candidate;
        }
      }

      // Do a (potentially slow) check to confirm that the pooled connection is still good. If it
      // isn't, take it out of the pool and start again.
      if (!candidate.isHealthy(doExtensiveHealthChecks)) {
        noNewStreams();
        continue;
      }

      return candidate;
    }
  }
           

关键是通过findConnection获取一个RealConnection,再检查返回的连接是否可行,不可行的话则继续查找

private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
    boolean foundPooledConnection = false;
    RealConnection result = null;
    Route selectedRoute = null;
    Connection releasedConnection;
    Socket toClose;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (codec != null) throw new IllegalStateException("codec != null");
      if (canceled) throw new IOException("Canceled");
      //首先看已有的连接是否可行,如果可以则利用它     
      releasedConnection = this.connection;
      toClose = releaseIfNoNewStreams();
      if (this.connection != null) {
        // We had an already-allocated connection and it's good.
        result = this.connection;
        releasedConnection = null;
      }
      if (!reportedAcquired) {
        // If the connection was never reported acquired, don't report it as released!
        releasedConnection = null;
      }
      //从缓存中获取连接
      if (result == null) {
        // Attempt to get a connection from the pool.
        Internal.instance.get(connectionPool, address, this, null);
        if (connection != null) {
          foundPooledConnection = true;
          result = connection;
        } else {
          selectedRoute = route;
        }
      }
    }
    closeQuietly(toClose);

    if (releasedConnection != null) {
      eventListener.connectionReleased(call, releasedConnection);
    }
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
    }
    if (result != null) {
      // If we found an already-allocated or pooled connection, we're done.
      //返回我们已有的连接或者从连接池获取的连接  
      return result;
    }
    //如果上面的条件都不符合,那么就需要创建一个新的连接
    // If we need a route selection, make one. This is a blocking operation.
    boolean newRouteSelection = false;
    if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
      newRouteSelection = true;
      routeSelection = routeSelector.next();
    }

    synchronized (connectionPool) {
      if (canceled) throw new IOException("Canceled");

      if (newRouteSelection) {
        // Now that we have a set of IP addresses, make another attempt at getting a connection from
        // the pool. This could match due to connection coalescing.
        List<Route> routes = routeSelection.getAll();
        for (int i = 0, size = routes.size(); i < size; i++) {
          Route route = routes.get(i);
          Internal.instance.get(connectionPool, address, this, route);
          if (connection != null) {
            foundPooledConnection = true;
            result = connection;
            this.route = route;
            break;
          }
        }
      }

      if (!foundPooledConnection) {
        if (selectedRoute == null) {
          selectedRoute = routeSelection.next();
        }

        // Create a connection and assign it to this allocation immediately. This makes it possible
        // for an asynchronous cancel() to interrupt the handshake we're about to do.
        route = selectedRoute;
        refusedStreamCount = 0;
        //创建一个新的连接
        result = new RealConnection(connectionPool, selectedRoute);
        acquire(result, false);
      }
    }

    // If we found a pooled connection on the 2nd time around, we're done.
    if (foundPooledConnection) {
      eventListener.connectionAcquired(call, result);
      return result;
    }

    // Do TCP + TLS handshakes. This is a blocking operation.
    result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
        connectionRetryEnabled, call, eventListener);
    routeDatabase().connected(result.route());

    Socket socket = null;
    synchronized (connectionPool) {
      reportedAcquired = true;

      // 将新的连接放入连接池
      Internal.instance.put(connectionPool, result);

      // If another multiplexed connection to the same address was created concurrently, then
      // release this connection and acquire that one.
      if (result.isMultiplexed()) {
        socket = Internal.instance.deduplicate(connectionPool, address, this);
        result = connection;
      }
    }
    closeQuietly(socket);

    eventListener.connectionAcquired(call, result);
    return result;
  }
           

首先看已有的连接是否符合条件,不行则从连接池获取连接,如果连接池也没有符合条件的连接,则创建一个新的连接,然后将新连接放入连接池中

获取到连接以后,然后关键的就是HttpCodec了,根据连接是HTTP1.x还是HTTP2请求,返回对应的HttpCodec实例

public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
      StreamAllocation streamAllocation) throws SocketException {
    //如果是HTTP2请求
    if (http2Connection != null) {
      return new Http2Codec(client, chain, streamAllocation, http2Connection);
    } else { //HTTP1.x 请求
      socket.setSoTimeout(chain.readTimeoutMillis());
      source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
      sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
      return new Http1Codec(client, streamAllocation, source, sink);
    }
  }
           

三.拦截器中实现连接

分析了OKHttp连接机制中最重要的三个类以后,那么接下来看看拦截器中最终是如何完成连接的,一个连接的StreamLocation最开始在RetryAndFollowUpInterceptor拦截器中被初始化的,如下

@Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();
    //初始化一个StreamLocation
    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    this.streamAllocation = streamAllocation;

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
        //将StreamLocation传递给下一个拦截器
        response = realChain.proceed(request, streamAllocation, null, null);
        releaseConnection = false;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.        
     .......省略代码
    }
  }
           

          而HttpCodec是在连接拦截器ConnectInterceptor中初始化的,如下

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Request request = realChain.request();
    StreamAllocation streamAllocation = realChain.streamAllocation();

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean doExtensiveHealthChecks = !request.method().equals("GET");
    //通过StreamLocation获取一个HttpCodec实例
    HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
    RealConnection connection = streamAllocation.connection();
    //将HttpCodec传递给下一个拦截器
    return realChain.proceed(request, streamAllocation, httpCodec, connection);
  }
           

最后到了访问服务器的拦截器CallServerInterceptor,我们来看看

@Override public Response intercept(Chain chain) throws IOException {
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    //获取本次连接的HttpCodec实例,
    HttpCodec httpCodec = realChain.httpStream();
    //获取StreamLocation
    StreamAllocation streamAllocation = realChain.streamAllocation();
    RealConnection connection = (RealConnection) realChain.connection();
    Request request = realChain.request(); //请求

    long sentRequestMillis = System.currentTimeMillis();

    realChain.eventListener().requestHeadersStart(realChain.call());
    //通过HttpCodec写入请求头
    httpCodec.writeRequestHeaders(request);
    realChain.eventListener().requestHeadersEnd(realChain.call(), request);

    Response.Builder responseBuilder = null;
    if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
      // If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
      // Continue" response before transmitting the request body. If we don't get that, return
      // what we did get (such as a 4xx response) without ever transmitting the request body.
      if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
        httpCodec.flushRequest();
        realChain.eventListener().responseHeadersStart(realChain.call());
        responseBuilder = httpCodec.readResponseHeaders(true);
      }
      //写入请求体
      if (responseBuilder == null) {
        // Write the request body if the "Expect: 100-continue" expectation was met.
        realChain.eventListener().requestBodyStart(realChain.call());
        long contentLength = request.body().contentLength();
        CountingSink requestBodyOut =
            new CountingSink(httpCodec.createRequestBody(request, contentLength));
        BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);

        request.body().writeTo(bufferedRequestBody);
        bufferedRequestBody.close();
        realChain.eventListener()
            .requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
      } else if (!connection.isMultiplexed()) {
        // If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
        // from being reused. Otherwise we're still obligated to transmit the request body to
        // leave the connection in a consistent state.
        streamAllocation.noNewStreams();
      }
    }

    httpCodec.finishRequest();
    //读取响应头
    if (responseBuilder == null) {
      realChain.eventListener().responseHeadersStart(realChain.call());
      responseBuilder = httpCodec.readResponseHeaders(false);
    }

    Response response = responseBuilder
        .request(request)
        .handshake(streamAllocation.connection().handshake())
        .sentRequestAtMillis(sentRequestMillis)
        .receivedResponseAtMillis(System.currentTimeMillis())
        .build();
    //读取响应码
    int code = response.code();
    if (code == 100) {
      // server sent a 100-continue even though we did not request one.
      // try again to read the actual response
      responseBuilder = httpCodec.readResponseHeaders(false);

      response = responseBuilder
              .request(request)
              .handshake(streamAllocation.connection().handshake())
              .sentRequestAtMillis(sentRequestMillis)
              .receivedResponseAtMillis(System.currentTimeMillis())
              .build();

      code = response.code();
    }

    realChain.eventListener()
            .responseHeadersEnd(realChain.call(), response);
    //读取响应体
    if (forWebSocket && code == 101) {
      // Connection is upgrading, but we need to ensure interceptors see a non-null response body.
      response = response.newBuilder()
          .body(Util.EMPTY_RESPONSE)
          .build();
    } else {
      response = response.newBuilder()
          .body(httpCodec.openResponseBody(response))
          .build();
    }

    if ("close".equalsIgnoreCase(response.request().header("Connection"))
        || "close".equalsIgnoreCase(response.header("Connection"))) {
      streamAllocation.noNewStreams();
    }

    if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
      throw new ProtocolException(
          "HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
    }
    //返回响应
    return response;
  }
           

这个我就不做过多分析了,无非就是通过HttpCodec来写入请求头和请求体,然后读取响应头和响应体,最后返回一个Response

最后引用一个别人的OKhttp的整体架构图,如下:

OKHttp 3.10源码解析(四):连接机制