天天看点

rxjava+retrofit一次完整的网络请求过程(源码解析)

前言:从retrofit实例构建到onCompleted(),一次完整的请求过程解析

compile 'com.squareup.retrofit2:adapter-rxjava:2.0.2'
ompile 'com.squareup.retrofit2:retrofit:2.0.2'
           

这是正常的一次请求用到的代码,都很熟悉了,但在这个过程中retrofit和rxjava究竟做了什么呢?

  • 如何通过retrofit.create创建实例?
  • RxJavaCallAdapterFactory.create()做了什么
  • Observable在哪里构建,如何构建?
  • 缓存策略,如何处理request的一些细节,
  • 如何封装一个call
  • 如何建立socket管道
  • 如何解析Response
Retrofit retrofit = new Retrofit.Builder()
                .client(MyInterceptor.genericClient(MainActivity1.this))//拦截器添加headers
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .baseUrl(BASE_URL)
                .build();
        MyService service = retrofit.create(MyService.class);
        String json = "{\"xxx\":\"xxx\"}";
        RequestBody requestBody = RequestBody.create(MediaType.parse("application/JSON_LOCKPERMISSION; charset=utf-8"), json);
        service.login(requestBody)                
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<UserInfo>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(UserInfo userInfo) {
                    }
                });
           

如何通过create创建MyService实例?

public <T> T create(final Class<T> service) {
    Utils.validateServiceInterface(service);
    if (validateEagerly) {
      eagerlyValidateMethods(service);
    }
    return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
        new InvocationHandler() {
           

通过代理的方式构建了传入的类实例,这个invoke方法将会在调用login方法时触发,通过代理的形式拿到我们传入的参数和注解等信息来构建一个初步的request

@Override public Object invoke(Object proxy, Method method, Object... args)
              throws Throwable {
            // If the method is a method from Object then defer to normal invocation.
            if (method.getDeclaringClass() == Object.class) {
              return method.invoke(this, args);
            }
            if (platform.isDefaultMethod(method)) {
              return platform.invokeDefaultMethod(method, service, proxy, args);
            }
            ServiceMethod serviceMethod = loadServiceMethod(method);
            OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
            return serviceMethod.callAdapter.adapt(okHttpCall);
           

serviceMethod这个类主要作用是根据method构建request,最终一个toRequest完成request构建,在后续的代码中能看到,再跟入loadServiceMethod函数

ServiceMethod loadServiceMethod(Method method) {
    ServiceMethod result;
    synchronized (serviceMethodCache) {
      result = serviceMethodCache.get(method);
      if (result == null) {
        result = new ServiceMethod.Builder(this, method).build();
        serviceMethodCache.put(method, result);
      }
    }
    return result;
  }
           

//build再跟入 在build中确定了方法的返回类型,并根据注解和方法的返回类型拿到CallAdapter ,CallAdapter 会构建Observable返回

public ServiceMethod build() {
      callAdapter = createCallAdapter();  
      responseType = callAdapter.responseType();
      if (responseType == Response.class || responseType == okhttp3.Response.class) {
        throw methodError("'"
            + Utils.getRawType(responseType).getName()
            + "' is not a valid response body type. Did you mean ResponseBody?");
      }
      responseConverter = createResponseConverter();

      for (Annotation annotation : methodAnnotations) {
        parseMethodAnnotation(annotation);
      }   
           

再看下如何确认CallAdapter

private CallAdapter<?> createCallAdapter() {
      Type returnType = method.getGenericReturnType();  //拿到返回类型 我的是userInfo  method是login
      if (Utils.hasUnresolvableType(returnType)) {
        throw methodError(
            "Method return type must not include a type variable or wildcard: %s", returnType);
      }
      if (returnType == void.class) {
        throw methodError("Service methods cannot return void.");
      }
      Annotation[] annotations = method.getAnnotations();
      try {
        return retrofit.callAdapter(returnType, annotations);  
      } catch (RuntimeException e) { // Wide exception range because factories are user code.
        throw methodError(e, "Unable to create call adapter for %s", returnType);
      }
    }
           

根据注解和我们方法的实际类型拿到CallAdapter,我这里是SimpleCallAdapter,他是RxJavaCallAdapterFactory的一个内部类,实现了CallAdapter接口,还有ResultCallAdapter ResponseCallAdapter这两个,我这里返回的是SimpleCallAdapter,看下我的方法

@POST("UserLogin")
 Observable<UserInfo> login(@Body RequestBody json);
           

如果要返回ResultCallAdapter,需要修改如下

@POST("UserLogin")
 Observable<Result<UserInfo>> login(@Body RequestBody json);
           

SimpleCallAdapter 只接收Response.body,,Observable< Response< T>>向Subscribe发送全部网络响应数据(可以从中读取headers的cookies)、Observable只向Subscribe发送Response.body部分内容,这个不多说

ServiceMethod总结:该类比较复杂,主要作用是将我们接口中的方法转化为一个request对象,根据我们方法的注解包括我们方法的参数初步构建了RequestBody

ok这里我们就知道invoke函数中最后一步的 return serviceMethod.callAdapter.adapt(okHttpCall);的callAdapter就是SimpleCallAdapter,那么直接去看他的adapt方法

@Override public <R> Observable<R> adapt(Call<R> call) {
      Observable<R> observable = Observable.create(new CallOnSubscribe<>(call)) //
          .lift(OperatorMapResponseToBodyOrError.<R>instance());
      if (scheduler != null) {
        return observable.subscribeOn(scheduler);
      }
      return observable;
    }
           

这里构建了Observable,看下CallOnSubscribe,当.subscribe执行到时触发OnSubscribe的Call函数时,开始执行网络请求方法(前面相当于配置)

static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
    private final Call<T> originalCall;

    CallOnSubscribe(Call<T> originalCall) {
      this.originalCall = originalCall;//okhttpCall用于执行网络请求
    }

    @Override public void call(final Subscriber<? super Response<T>> subscriber) {
      // Since Call is a one-shot type, clone it for each new subscriber.
      Call<T> call = originalCall.clone();

      // Wrap the call in a helper which handles both unsubscription and backpressure.
      RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
      subscriber.add(requestArbiter);
      subscriber.setProducer(requestArbiter);
    }
  }
           
public void setProducer(Producer p) {
        long toRequest;
        boolean passToSubscriber = false;
        synchronized (this) {
            toRequest = requested;
            producer = p;
            if (subscriber != null) {
                // middle operator ... we pass thru unless a request has been made
                if (toRequest == NOT_SET) {
                    // we pass-thru to the next producer as nothing has been requested
                    passToSubscriber = true;
                }
            }
        }
        // do after releasing lock
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else {
            // we execute the request with whatever has been requested (or Long.MAX_VALUE)
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else {
                producer.request(toRequest);//最终会调用request,继续往下看
            }
        }
    }
           
@Override public void request(long n) {
      if (n < ) throw new IllegalArgumentException("n < 0: " + n);
      if (n == ) return; // Nothing to do when requesting 0.
      if (!compareAndSet(false, true)) return; // Request was already triggered.

      try {
        Response<T> response = call.execute();okhttp的同步方法(这边有一系列的操作下面分析到)
        if (!subscriber.isUnsubscribed()) {
          subscriber.onNext(response);//如果没有解绑就调用onNext
        }
      } catch (Throwable t) {
        Exceptions.throwIfFatal(t);
        if (!subscriber.isUnsubscribed()) {
          subscriber.onError(t);///如果没有解绑就调用onNext  如果出错了那么直接返回 onCompleted方法将不再执行 同理如果没有进入catch,他们两个中只会执行一个
        }
        return;
      }

      if (!subscriber.isUnsubscribed()) {
        subscriber.onCompleted();///如果没有解绑就调用onNext
      }
    }
  @Override public void unsubscribe() {
      call.cancel();//断开网络连接 一次完整的请求结束
    }

    @Override public boolean isUnsubscribed() {
      return call.isCanceled();
    }
  }
           

一次请求结束 再看下okhttp的同步函数

@Override public Response<T> execute() throws IOException {
    okhttp3.Call call;

    synchronized (this) {
      if (executed) throw new IllegalStateException("Already executed.");
      executed = true;

      if (creationFailure != null) {
        if (creationFailure instanceof IOException) {
          throw (IOException) creationFailure;
        } else {
          throw (RuntimeException) creationFailure;
        }
      }

      call = rawCall;
      if (call == null) {
        try {
          call = rawCall = createRawCall();
        } catch (IOException | RuntimeException e) {
          creationFailure = e;
          throw e;
        }
      }
    }

    if (canceled) {
      call.cancel();
    }

    return parseResponse(call.execute());
  }
           

看到call.execute()再看看细节如何构建call的

private okhttp3.Call createRawCall() throws IOException {
    Request request = serviceMethod.toRequest(args);//在开始到 分析中曾总结了serviceMethod可以将一个method转化为一个request,args也就是我们代理invoke函数中构建okhttp实例时传入的(method中的参数),通过debug可以看到args中包含的参数
    okhttp3.Call call = serviceMethod.callFactory.newCall(request);//
    if (call == null) {
      throw new NullPointerException("Call.Factory returned null.");
    }
    return call;
  }
           

RealCall创建完成, 一个可以执行的http请求,因为最终的同步和异步是在RealCall中执行 再跟下去看一下,这边已经都是okhttp的任务了

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");//检查这个Call是否已经被执行,单个Call只能执行一次,当然可以通过clone克隆一个一样的Call,该方法在CallOnSubscribe中
      executed = true;
    }
    try {
      client.dispatcher().executed(this);//加入到runningSyncCalls,他的作用是判断正在运行的任务,引用正在运行的任务来判断并发量,在finally中可以看到删除完成的任务
      Response result = getResponseWithInterceptorChain(false);//主要的网络请求方法
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }
           

dispatcher(分发者)在OkHttpClient的构造函数中初始化控制并发,这里不深究,看getResponseWithInterceptorChain方法

private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException {
    Interceptor.Chain chain = new ApplicationInterceptorChain(, originalRequest, forWebSocket);
    return chain.proceed(originalRequest);
  }




@Override public Response proceed(Request request) throws IOException {
      // If there's another interceptor in the chain, call that.
      if (index < client.interceptors().size()) {//判断是否有自定义的拦截器存在,比如自定义一个拦截器打印发送日志信息,然后生成新的ApplicationInterceptorChain并执行
        Interceptor.Chain chain = new ApplicationInterceptorChain(index + , request, forWebSocket);
        Interceptor interceptor = client.interceptors().get(index);
        Response interceptedResponse = interceptor.intercept(chain);

        if (interceptedResponse == null) {
          throw new NullPointerException("application interceptor " + interceptor
              + " returned null");
        }

        return interceptedResponse;
      }

      // No more interceptors. Do HTTP.
      return getResponse(request, forWebSocket);
    }
  }
           

看下getResponse()下面进行最后的操作,发送,接收,处理请求

Response getResponse(Request request, boolean forWebSocket) throws IOException {
    // Copy body metadata to the appropriate request headers.
    RequestBody body = request.body();//判断是否是post请求,处理一些头部信息
    if (body != null) {
      Request.Builder requestBuilder = request.newBuilder();

      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }

      request = requestBuilder.build();
    }

    // Create the initial HTTP engine. Retries and redirects need new engine for each attempt.
    engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);

    int followUpCount = ;
    while (true) {
      if (canceled) {
        engine.releaseStreamAllocation();
        throw new IOException("Canceled");
      }

      boolean releaseConnection = true;
      try {
    //发送请求
        engine.sendRequest();
//读取结果
        engine.readResponse();
        releaseConnection = false;
      } catch (RequestException e) {
        // The attempt to interpret the request failed. Give up.
        throw e.getCause();
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null);
        if (retryEngine != null) {
          releaseConnection = false;
          engine = retryEngine;
          continue;
        }
        // Give up; recovery is not possible.
        throw e.getLastConnectException();
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        HttpEngine retryEngine = engine.recover(e, null);
        if (retryEngine != null) {
          releaseConnection = false;
          engine = retryEngine;
          continue;
        }

        // Give up; recovery is not possible.
        throw e;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          StreamAllocation streamAllocation = engine.close();
          streamAllocation.release();
        }
      }
    //
      Response response = engine.getResponse();
      Request followUp = engine.followUpRequest();

      if (followUp == null) {
        if (!forWebSocket) {
          engine.releaseStreamAllocation();
        }
        return response;
      }

      StreamAllocation streamAllocation = engine.close();

      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (!engine.sameConnection(followUp.url())) {
        streamAllocation.release();
        streamAllocation = null;
      }

      request = followUp;
      engine = new HttpEngine(client, request, false, false, forWebSocket, streamAllocation, null,
          response);
    }
  }
           

分析下engine.sendRequest();方法

public void sendRequest() throws RequestException, RouteException, IOException {
    if (cacheStrategy != null) return; // Already sent.
    if (httpStream != null) throw new IllegalStateException();

    Request request = networkRequest(userRequest);
        //获取所有的请求缓存
    InternalCache responseCache = Internal.instance.internalCache(client);
    //从缓存中找到同一个请求的responseCache
    Response cacheCandidate = responseCache != null
        ? responseCache.get(request)
        : null;

    long now = System.currentTimeMillis();
    cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();
//如果请求策略是只要缓存那么networkRequest就为null,反之一样
    networkRequest = cacheStrategy.networkRequest;
    cacheResponse = cacheStrategy.cacheResponse;

    if (responseCache != null) {
      responseCache.trackResponse(cacheStrategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {//这个没什么好说 如果都为null那么直接构建一个504错误返回,在大多数情况下我们的networkRequest不会为null
      userResponse = new Response.Builder()
          .request(userRequest)
          .priorResponse(stripBody(priorResponse))
          .protocol(Protocol.HTTP_1_1)
          .code()
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_BODY)
          .build();
      return;
    }

    // If we don't need the network, we're done.
    if (networkRequest == null) {//ok,如果networkRequest为null那么cacheResponse肯定不为null,如果也为null在上一个判断中已经返回了
      userResponse = cacheResponse.newBuilder()
          .request(userRequest)
          .priorResponse(stripBody(priorResponse))
          .cacheResponse(stripBody(cacheResponse))
          .build();
      userResponse = unzip(userResponse);
      return;
    }

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean success = false;
    try {
      httpStream = connect();//获取httpStream,httpStream中封装了socket,看下connent方法
      httpStream.setHttpEngine(this);
        //如果请求中包含请求体 且请求方法是post方法,写入请求体数据
      if (writeRequestHeadersEagerly()) {
        long contentLength = OkHeaders.contentLength(request);
        if (bufferRequestBody) {
          if (contentLength > Integer.MAX_VALUE) {
            throw new IllegalStateException("Use setFixedLengthStreamingMode() or "
                + "setChunkedStreamingMode() for requests larger than 2 GiB.");
          }

          if (contentLength != -) {
            // Buffer a request body of a known length.
            httpStream.writeRequestHeaders(networkRequest);
            requestBodyOut = new RetryableSink((int) contentLength);
          } else {
            // Buffer a request body of an unknown length. Don't write request headers until the
            // entire body is ready; otherwise we can't set the Content-Length header correctly.
            requestBodyOut = new RetryableSink();
          }
        } else {
          httpStream.writeRequestHeaders(networkRequest);
          requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength);
        }
      }
      success = true;
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (!success && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }
  }
           

ok看下如何封装socket

private HttpStream connect() throws RouteException, RequestException, IOException {
    boolean doExtensiveHealthChecks = !networkRequest.method().equals("GET");
    return streamAllocation.newStream(client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis(),
        client.retryOnConnectionFailure(), doExtensiveHealthChecks);
  }
           
public HttpStream newStream(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws RouteException, IOException {
    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);//查找可用的socket对象

      HttpStream resultStream;
      if (resultConnection.framedConnection != null) {
        resultStream = new Http2xStream(this, resultConnection.framedConnection);
      } else {
        resultConnection.socket().setSoTimeout(readTimeout);
        resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
        resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
        resultStream = new Http1xStream(this, resultConnection.source, resultConnection.sink);
      }

      synchronized (connectionPool) {
        stream = resultStream;
        return resultStream;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }
           
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException, RouteException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == ) {
          return candidate;
        }
      }

      // Otherwise do a potentially-slow check to confirm that the pooled connection is still good.
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate;
      }

      connectionFailed(new IOException());
    }
  }
           
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException, RouteException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (stream != null) throw new IllegalStateException("stream != null");
      if (canceled) throw new IOException("Canceled");

      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // Attempt to get a connection from the pool.
      RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);//如果连接池中已经存在就从中取出RealConnection
      if (pooledConnection != null) {
        this.connection = pooledConnection;
        return pooledConnection;
      }

      selectedRoute = route;
    }

    if (selectedRoute == null) {//获取路线
      selectedRoute = routeSelector.next();
      synchronized (connectionPool) {
        route = selectedRoute;
      }
    }
    RealConnection newConnection = new RealConnection(selectedRoute);//并根据路线构建socket链路
    acquire(newConnection);

    synchronized (connectionPool) {
      Internal.instance.put(connectionPool, newConnection);//将建立成功的RealConnection放入连接池缓存
      this.connection = newConnection;
      if (canceled) throw new IOException("Canceled");
    }

    newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
        connectionRetryEnabled);//看下连接方法
    routeDatabase().connected(newConnection.route());

    return newConnection;
  }
           
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
      List<ConnectionSpec> connectionSpecs, boolean connectionRetryEnabled) throws RouteException {
    if (protocol != null) throw new IllegalStateException("already connected"); //已经连接抛出异常

    RouteException routeException = null;
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
    Proxy proxy = route.proxy();
    Address address = route.address();

    if (route.address().sslSocketFactory() == null
        && !connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
      throw new RouteException(new UnknownServiceException(
          "CLEARTEXT communication not supported: " + connectionSpecs));
    }

    while (protocol == null) {
      try {
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket()
            : new Socket(proxy);
        connectSocket(connectTimeout, readTimeout, writeTimeout, connectionSpecSelector);//看下连接socket方法 wocao还有完没完了 好累
      } catch (IOException e) {
        closeQuietly(socket);
        closeQuietly(rawSocket);
        socket = null;
        rawSocket = null;
        source = null;
        sink = null;
        handshake = null;
        protocol = null;

        if (routeException == null) {
          routeException = new RouteException(e);
        } else {
          routeException.addConnectException(e);
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException;
        }
      }
    }
  }
           
private void connectSocket(int connectTimeout, int readTimeout, int writeTimeout,
      ConnectionSpecSelector connectionSpecSelector) throws IOException {
    rawSocket.setSoTimeout(readTimeout);
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);//根据选择的路线,选择当前平台下最优的socket库握手
    } catch (ConnectException e) {
      throw new ConnectException("Failed to connect to " + route.socketAddress());
    }
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));

    if (route.address().sslSocketFactory() != null) {//如果存在TLS
      connectTls(readTimeout, writeTimeout, connectionSpecSelector);//根据ssl版本与证书安全握手
    } else {
      protocol = Protocol.HTTP_1_1;
      socket = rawSocket;
    }

    if (protocol == Protocol.SPDY_3 || protocol == Protocol.HTTP_2) {
      socket.setSoTimeout(); // Framed connection timeouts are set per-stream.

      FramedConnection framedConnection = new FramedConnection.Builder(true)
          .socket(socket, route.address().url().host(), source, sink)
          .protocol(protocol)
          .listener(this)
          .build();
      framedConnection.sendConnectionPreface();

      // Only assign the framed connection once the preface has been sent successfully.
      this.allocationLimit = framedConnection.maxConcurrentStreams();
      this.framedConnection = framedConnection;
    } else {
      this.allocationLimit = ;
    }
  }
           

到这里httpStream构造完成,管道已经建立,回到我们的readResponse读取响应方法

public void readResponse() throws IOException {
    if (userResponse != null) {//已经读取直接返回
      return; // Already ready.
    }
    //这里觉得比较扯淡了,每调用sendRequest我能走到这里吗,在sendRequest方法中我们已经判断过了 且userResponse判断已经返回
    if (networkRequest == null && cacheResponse == null) {
      throw new IllegalStateException("call sendRequest() first!");
    }
    if (networkRequest == null) {//网络请求为Null
      return; // No network response to read.
    }

    Response networkResponse;
    //这里前面貌似直接传过来的false 忘了....WebSocket是基于TCP的一种新协议它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。
    //之前有人提过使用它来做推送.
    if (forWebSocket) {
      httpStream.writeRequestHeaders(networkRequest);
      networkResponse = readNetworkResponse();
    } else if (!callerWritesRequestBody) {
    //如果不需要写请求体 就通过网络拦截器处理得到response
      networkResponse = new NetworkInterceptorChain(, networkRequest).proceed(networkRequest);
    } else {
    //需要写请求体
      // Emit the request body's buffer so that everything is in requestBodyOut.
      if (bufferedRequestBody != null && bufferedRequestBody.buffer().size() > ) {
        bufferedRequestBody.emit();
      }

      // Emit the request headers if we haven't yet. We might have just learned the Content-Length.
      if (sentRequestMillis == -) {
        if (OkHeaders.contentLength(networkRequest) == -
            && requestBodyOut instanceof RetryableSink) {
          long contentLength = ((RetryableSink) requestBodyOut).contentLength();
          networkRequest = networkRequest.newBuilder()
              .header("Content-Length", Long.toString(contentLength))
              .build();
        }
        //写请求头
        httpStream.writeRequestHeaders(networkRequest);
      }

      // Write the request body to the socket.
      if (requestBodyOut != null) {
        if (bufferedRequestBody != null) {
          // This also closes the wrapped requestBodyOut.
          bufferedRequestBody.close();
        } else {
          requestBodyOut.close();
        }
        if (requestBodyOut instanceof RetryableSink) {
          httpStream.writeRequestBody((RetryableSink) requestBodyOut);
        }
      }
        //读取响应数据
      networkResponse = readNetworkResponse();
    }
    //保存请求头数据到cookies中
    receiveHeaders(networkResponse.headers());
    //如果有缓存响应数据直接获取,但是这里基本都是为Null的
    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (validate(cacheResponse, networkResponse)) {
        userResponse = cacheResponse.newBuilder()
            .request(userRequest)
            .priorResponse(stripBody(priorResponse))
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();
        releaseStreamAllocation();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        InternalCache responseCache = Internal.instance.internalCache(client);
        responseCache.trackConditionalCacheHit();
        responseCache.update(cacheResponse, stripBody(userResponse));
        userResponse = unzip(userResponse);
        return;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
    //构建用户需要的response,这个userResponse将在 engine.getResponse();方法中返回
    userResponse = networkResponse.newBuilder()
        .request(userRequest)
        .priorResponse(stripBody(priorResponse))
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (hasBody(userResponse)) {
      maybeCache();
      userResponse = unzip(cacheWritingResponse(storeRequest, userResponse));
    }
  }
           

最后在engine.getResponse();中返回了response

if (followUp == null) {
        if (!forWebSocket) {//这里传入写死的false 去释放资源和关闭连接
          engine.releaseStreamAllocation();
        }
        return response;
      }
           

在回到okhttp的同步方法中,最终通过return parseResponse(call.execute());

Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
    ResponseBody rawBody = rawResponse.body();

    // Remove the body's source (the only stateful object) so we can pass the response along.
    rawResponse = rawResponse.newBuilder()
        .body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
        .build();

    int code = rawResponse.code();
    if (code <  || code >= ) {
      try {
        // Buffer the entire body to avoid future I/O.
        ResponseBody bufferedBody = Utils.buffer(rawBody);
        return Response.error(bufferedBody, rawResponse);
      } finally {
        rawBody.close();
      }
    }

    if (code ==  || code == ) {
      return Response.success(null, rawResponse);
    }

    ExceptionCatchingRequestBody catchingBody = new ExceptionCatchingRequestBody(rawBody);
    try {
      T body = serviceMethod.toResponse(catchingBody);
      return Response.success(body, rawResponse);
    } catch (RuntimeException e) {
      // If the underlying source threw an exception, propagate that rather than indicating it was
      // a runtime exception.
      catchingBody.throwIfCaught();
      throw e;
    }
  }
           

在serviceMethod.toResponse(catchingBody);-(GsonConverter)-转化为我们需要的类型,也就是UserInfo类型返回

走到这里真是两行清泪留下来.这还是抛下了很多过程没有分析,在retrofit中比较核心的功能比如interceptor,rxjava的scheduler等,后面自己准备单独写博客学习