天天看點

rxjava+retrofit一次完整的網絡請求過程(源碼解析)

前言:從retrofit執行個體建構到onCompleted(),一次完整的請求過程解析

compile 'com.squareup.retrofit2:adapter-rxjava:2.0.2'
ompile 'com.squareup.retrofit2:retrofit:2.0.2'
           

這是正常的一次請求用到的代碼,都很熟悉了,但在這個過程中retrofit和rxjava究竟做了什麼呢?

  • 如何通過retrofit.create建立執行個體?
  • RxJavaCallAdapterFactory.create()做了什麼
  • Observable在哪裡建構,如何建構?
  • 緩存政策,如何處理request的一些細節,
  • 如何封裝一個call
  • 如何建立socket管道
  • 如何解析Response
Retrofit retrofit = new Retrofit.Builder()
                .client(MyInterceptor.genericClient(MainActivity1.this))//攔截器添加headers
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .baseUrl(BASE_URL)
                .build();
        MyService service = retrofit.create(MyService.class);
        String json = "{\"xxx\":\"xxx\"}";
        RequestBody requestBody = RequestBody.create(MediaType.parse("application/JSON_LOCKPERMISSION; charset=utf-8"), json);
        service.login(requestBody)                
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<UserInfo>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(UserInfo userInfo) {
                    }
                });
           

如何通過create建立MyService執行個體?

public <T> T create(final Class<T> service) {
    Utils.validateServiceInterface(service);
    if (validateEagerly) {
      eagerlyValidateMethods(service);
    }
    return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
        new InvocationHandler() {
           

通過代理的方式建構了傳入的類執行個體,這個invoke方法将會在調用login方法時觸發,通過代理的形式拿到我們傳入的參數和注解等資訊來建構一個初步的request

@Override public Object invoke(Object proxy, Method method, Object... args)
              throws Throwable {
            // If the method is a method from Object then defer to normal invocation.
            if (method.getDeclaringClass() == Object.class) {
              return method.invoke(this, args);
            }
            if (platform.isDefaultMethod(method)) {
              return platform.invokeDefaultMethod(method, service, proxy, args);
            }
            ServiceMethod serviceMethod = loadServiceMethod(method);
            OkHttpCall okHttpCall = new OkHttpCall<>(serviceMethod, args);
            return serviceMethod.callAdapter.adapt(okHttpCall);
           

serviceMethod這個類主要作用是根據method建構request,最終一個toRequest完成request建構,在後續的代碼中能看到,再跟入loadServiceMethod函數

ServiceMethod loadServiceMethod(Method method) {
    ServiceMethod result;
    synchronized (serviceMethodCache) {
      result = serviceMethodCache.get(method);
      if (result == null) {
        result = new ServiceMethod.Builder(this, method).build();
        serviceMethodCache.put(method, result);
      }
    }
    return result;
  }
           

//build再跟入 在build中确定了方法的傳回類型,并根據注解和方法的傳回類型拿到CallAdapter ,CallAdapter 會建構Observable傳回

public ServiceMethod build() {
      callAdapter = createCallAdapter();  
      responseType = callAdapter.responseType();
      if (responseType == Response.class || responseType == okhttp3.Response.class) {
        throw methodError("'"
            + Utils.getRawType(responseType).getName()
            + "' is not a valid response body type. Did you mean ResponseBody?");
      }
      responseConverter = createResponseConverter();

      for (Annotation annotation : methodAnnotations) {
        parseMethodAnnotation(annotation);
      }   
           

再看下如何确認CallAdapter

private CallAdapter<?> createCallAdapter() {
      Type returnType = method.getGenericReturnType();  //拿到傳回類型 我的是userInfo  method是login
      if (Utils.hasUnresolvableType(returnType)) {
        throw methodError(
            "Method return type must not include a type variable or wildcard: %s", returnType);
      }
      if (returnType == void.class) {
        throw methodError("Service methods cannot return void.");
      }
      Annotation[] annotations = method.getAnnotations();
      try {
        return retrofit.callAdapter(returnType, annotations);  
      } catch (RuntimeException e) { // Wide exception range because factories are user code.
        throw methodError(e, "Unable to create call adapter for %s", returnType);
      }
    }
           

根據注解和我們方法的實際類型拿到CallAdapter,我這裡是SimpleCallAdapter,他是RxJavaCallAdapterFactory的一個内部類,實作了CallAdapter接口,還有ResultCallAdapter ResponseCallAdapter這兩個,我這裡傳回的是SimpleCallAdapter,看下我的方法

@POST("UserLogin")
 Observable<UserInfo> login(@Body RequestBody json);
           

如果要傳回ResultCallAdapter,需要修改如下

@POST("UserLogin")
 Observable<Result<UserInfo>> login(@Body RequestBody json);
           

SimpleCallAdapter 隻接收Response.body,,Observable< Response< T>>向Subscribe發送全部網絡響應資料(可以從中讀取headers的cookies)、Observable隻向Subscribe發送Response.body部分内容,這個不多說

ServiceMethod總結:該類比較複雜,主要作用是将我們接口中的方法轉化為一個request對象,根據我們方法的注解包括我們方法的參數初步建構了RequestBody

ok這裡我們就知道invoke函數中最後一步的 return serviceMethod.callAdapter.adapt(okHttpCall);的callAdapter就是SimpleCallAdapter,那麼直接去看他的adapt方法

@Override public <R> Observable<R> adapt(Call<R> call) {
      Observable<R> observable = Observable.create(new CallOnSubscribe<>(call)) //
          .lift(OperatorMapResponseToBodyOrError.<R>instance());
      if (scheduler != null) {
        return observable.subscribeOn(scheduler);
      }
      return observable;
    }
           

這裡建構了Observable,看下CallOnSubscribe,當.subscribe執行到時觸發OnSubscribe的Call函數時,開始執行網絡請求方法(前面相當于配置)

static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
    private final Call<T> originalCall;

    CallOnSubscribe(Call<T> originalCall) {
      this.originalCall = originalCall;//okhttpCall用于執行網絡請求
    }

    @Override public void call(final Subscriber<? super Response<T>> subscriber) {
      // Since Call is a one-shot type, clone it for each new subscriber.
      Call<T> call = originalCall.clone();

      // Wrap the call in a helper which handles both unsubscription and backpressure.
      RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
      subscriber.add(requestArbiter);
      subscriber.setProducer(requestArbiter);
    }
  }
           
public void setProducer(Producer p) {
        long toRequest;
        boolean passToSubscriber = false;
        synchronized (this) {
            toRequest = requested;
            producer = p;
            if (subscriber != null) {
                // middle operator ... we pass thru unless a request has been made
                if (toRequest == NOT_SET) {
                    // we pass-thru to the next producer as nothing has been requested
                    passToSubscriber = true;
                }
            }
        }
        // do after releasing lock
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else {
            // we execute the request with whatever has been requested (or Long.MAX_VALUE)
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else {
                producer.request(toRequest);//最終會調用request,繼續往下看
            }
        }
    }
           
@Override public void request(long n) {
      if (n < ) throw new IllegalArgumentException("n < 0: " + n);
      if (n == ) return; // Nothing to do when requesting 0.
      if (!compareAndSet(false, true)) return; // Request was already triggered.

      try {
        Response<T> response = call.execute();okhttp的同步方法(這邊有一系列的操作下面分析到)
        if (!subscriber.isUnsubscribed()) {
          subscriber.onNext(response);//如果沒有解綁就調用onNext
        }
      } catch (Throwable t) {
        Exceptions.throwIfFatal(t);
        if (!subscriber.isUnsubscribed()) {
          subscriber.onError(t);///如果沒有解綁就調用onNext  如果出錯了那麼直接傳回 onCompleted方法将不再執行 同理如果沒有進入catch,他們兩個中隻會執行一個
        }
        return;
      }

      if (!subscriber.isUnsubscribed()) {
        subscriber.onCompleted();///如果沒有解綁就調用onNext
      }
    }
  @Override public void unsubscribe() {
      call.cancel();//斷開網絡連接配接 一次完整的請求結束
    }

    @Override public boolean isUnsubscribed() {
      return call.isCanceled();
    }
  }
           

一次請求結束 再看下okhttp的同步函數

@Override public Response<T> execute() throws IOException {
    okhttp3.Call call;

    synchronized (this) {
      if (executed) throw new IllegalStateException("Already executed.");
      executed = true;

      if (creationFailure != null) {
        if (creationFailure instanceof IOException) {
          throw (IOException) creationFailure;
        } else {
          throw (RuntimeException) creationFailure;
        }
      }

      call = rawCall;
      if (call == null) {
        try {
          call = rawCall = createRawCall();
        } catch (IOException | RuntimeException e) {
          creationFailure = e;
          throw e;
        }
      }
    }

    if (canceled) {
      call.cancel();
    }

    return parseResponse(call.execute());
  }
           

看到call.execute()再看看細節如何建構call的

private okhttp3.Call createRawCall() throws IOException {
    Request request = serviceMethod.toRequest(args);//在開始到 分析中曾總結了serviceMethod可以将一個method轉化為一個request,args也就是我們代理invoke函數中建構okhttp執行個體時傳入的(method中的參數),通過debug可以看到args中包含的參數
    okhttp3.Call call = serviceMethod.callFactory.newCall(request);//
    if (call == null) {
      throw new NullPointerException("Call.Factory returned null.");
    }
    return call;
  }
           

RealCall建立完成, 一個可以執行的http請求,因為最終的同步和異步是在RealCall中執行 再跟下去看一下,這邊已經都是okhttp的任務了

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");//檢查這個Call是否已經被執行,單個Call隻能執行一次,當然可以通過clone克隆一個一樣的Call,該方法在CallOnSubscribe中
      executed = true;
    }
    try {
      client.dispatcher().executed(this);//加入到runningSyncCalls,他的作用是判斷正在運作的任務,引用正在運作的任務來判斷并發量,在finally中可以看到删除完成的任務
      Response result = getResponseWithInterceptorChain(false);//主要的網絡請求方法
      if (result == null) throw new IOException("Canceled");
      return result;
    } finally {
      client.dispatcher().finished(this);
    }
  }
           

dispatcher(分發者)在OkHttpClient的構造函數中初始化控制并發,這裡不深究,看getResponseWithInterceptorChain方法

private Response getResponseWithInterceptorChain(boolean forWebSocket) throws IOException {
    Interceptor.Chain chain = new ApplicationInterceptorChain(, originalRequest, forWebSocket);
    return chain.proceed(originalRequest);
  }




@Override public Response proceed(Request request) throws IOException {
      // If there's another interceptor in the chain, call that.
      if (index < client.interceptors().size()) {//判斷是否有自定義的攔截器存在,比如自定義一個攔截器列印發送日志資訊,然後生成新的ApplicationInterceptorChain并執行
        Interceptor.Chain chain = new ApplicationInterceptorChain(index + , request, forWebSocket);
        Interceptor interceptor = client.interceptors().get(index);
        Response interceptedResponse = interceptor.intercept(chain);

        if (interceptedResponse == null) {
          throw new NullPointerException("application interceptor " + interceptor
              + " returned null");
        }

        return interceptedResponse;
      }

      // No more interceptors. Do HTTP.
      return getResponse(request, forWebSocket);
    }
  }
           

看下getResponse()下面進行最後的操作,發送,接收,處理請求

Response getResponse(Request request, boolean forWebSocket) throws IOException {
    // Copy body metadata to the appropriate request headers.
    RequestBody body = request.body();//判斷是否是post請求,處理一些頭部資訊
    if (body != null) {
      Request.Builder requestBuilder = request.newBuilder();

      MediaType contentType = body.contentType();
      if (contentType != null) {
        requestBuilder.header("Content-Type", contentType.toString());
      }

      long contentLength = body.contentLength();
      if (contentLength != -) {
        requestBuilder.header("Content-Length", Long.toString(contentLength));
        requestBuilder.removeHeader("Transfer-Encoding");
      } else {
        requestBuilder.header("Transfer-Encoding", "chunked");
        requestBuilder.removeHeader("Content-Length");
      }

      request = requestBuilder.build();
    }

    // Create the initial HTTP engine. Retries and redirects need new engine for each attempt.
    engine = new HttpEngine(client, request, false, false, forWebSocket, null, null, null);

    int followUpCount = ;
    while (true) {
      if (canceled) {
        engine.releaseStreamAllocation();
        throw new IOException("Canceled");
      }

      boolean releaseConnection = true;
      try {
    //發送請求
        engine.sendRequest();
//讀取結果
        engine.readResponse();
        releaseConnection = false;
      } catch (RequestException e) {
        // The attempt to interpret the request failed. Give up.
        throw e.getCause();
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        HttpEngine retryEngine = engine.recover(e.getLastConnectException(), null);
        if (retryEngine != null) {
          releaseConnection = false;
          engine = retryEngine;
          continue;
        }
        // Give up; recovery is not possible.
        throw e.getLastConnectException();
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        HttpEngine retryEngine = engine.recover(e, null);
        if (retryEngine != null) {
          releaseConnection = false;
          engine = retryEngine;
          continue;
        }

        // Give up; recovery is not possible.
        throw e;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          StreamAllocation streamAllocation = engine.close();
          streamAllocation.release();
        }
      }
    //
      Response response = engine.getResponse();
      Request followUp = engine.followUpRequest();

      if (followUp == null) {
        if (!forWebSocket) {
          engine.releaseStreamAllocation();
        }
        return response;
      }

      StreamAllocation streamAllocation = engine.close();

      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (!engine.sameConnection(followUp.url())) {
        streamAllocation.release();
        streamAllocation = null;
      }

      request = followUp;
      engine = new HttpEngine(client, request, false, false, forWebSocket, streamAllocation, null,
          response);
    }
  }
           

分析下engine.sendRequest();方法

public void sendRequest() throws RequestException, RouteException, IOException {
    if (cacheStrategy != null) return; // Already sent.
    if (httpStream != null) throw new IllegalStateException();

    Request request = networkRequest(userRequest);
        //擷取所有的請求緩存
    InternalCache responseCache = Internal.instance.internalCache(client);
    //從緩存中找到同一個請求的responseCache
    Response cacheCandidate = responseCache != null
        ? responseCache.get(request)
        : null;

    long now = System.currentTimeMillis();
    cacheStrategy = new CacheStrategy.Factory(now, request, cacheCandidate).get();
//如果請求政策是隻要緩存那麼networkRequest就為null,反之一樣
    networkRequest = cacheStrategy.networkRequest;
    cacheResponse = cacheStrategy.cacheResponse;

    if (responseCache != null) {
      responseCache.trackResponse(cacheStrategy);
    }

    if (cacheCandidate != null && cacheResponse == null) {
      closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
    }

    // If we're forbidden from using the network and the cache is insufficient, fail.
    if (networkRequest == null && cacheResponse == null) {//這個沒什麼好說 如果都為null那麼直接建構一個504錯誤傳回,在大多數情況下我們的networkRequest不會為null
      userResponse = new Response.Builder()
          .request(userRequest)
          .priorResponse(stripBody(priorResponse))
          .protocol(Protocol.HTTP_1_1)
          .code()
          .message("Unsatisfiable Request (only-if-cached)")
          .body(EMPTY_BODY)
          .build();
      return;
    }

    // If we don't need the network, we're done.
    if (networkRequest == null) {//ok,如果networkRequest為null那麼cacheResponse肯定不為null,如果也為null在上一個判斷中已經傳回了
      userResponse = cacheResponse.newBuilder()
          .request(userRequest)
          .priorResponse(stripBody(priorResponse))
          .cacheResponse(stripBody(cacheResponse))
          .build();
      userResponse = unzip(userResponse);
      return;
    }

    // We need the network to satisfy this request. Possibly for validating a conditional GET.
    boolean success = false;
    try {
      httpStream = connect();//擷取httpStream,httpStream中封裝了socket,看下connent方法
      httpStream.setHttpEngine(this);
        //如果請求中包含請求體 且請求方法是post方法,寫入請求體資料
      if (writeRequestHeadersEagerly()) {
        long contentLength = OkHeaders.contentLength(request);
        if (bufferRequestBody) {
          if (contentLength > Integer.MAX_VALUE) {
            throw new IllegalStateException("Use setFixedLengthStreamingMode() or "
                + "setChunkedStreamingMode() for requests larger than 2 GiB.");
          }

          if (contentLength != -) {
            // Buffer a request body of a known length.
            httpStream.writeRequestHeaders(networkRequest);
            requestBodyOut = new RetryableSink((int) contentLength);
          } else {
            // Buffer a request body of an unknown length. Don't write request headers until the
            // entire body is ready; otherwise we can't set the Content-Length header correctly.
            requestBodyOut = new RetryableSink();
          }
        } else {
          httpStream.writeRequestHeaders(networkRequest);
          requestBodyOut = httpStream.createRequestBody(networkRequest, contentLength);
        }
      }
      success = true;
    } finally {
      // If we're crashing on I/O or otherwise, don't leak the cache body.
      if (!success && cacheCandidate != null) {
        closeQuietly(cacheCandidate.body());
      }
    }
  }
           

ok看下如何封裝socket

private HttpStream connect() throws RouteException, RequestException, IOException {
    boolean doExtensiveHealthChecks = !networkRequest.method().equals("GET");
    return streamAllocation.newStream(client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis(),
        client.retryOnConnectionFailure(), doExtensiveHealthChecks);
  }
           
public HttpStream newStream(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws RouteException, IOException {
    try {
      RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
          writeTimeout, connectionRetryEnabled, doExtensiveHealthChecks);//查找可用的socket對象

      HttpStream resultStream;
      if (resultConnection.framedConnection != null) {
        resultStream = new Http2xStream(this, resultConnection.framedConnection);
      } else {
        resultConnection.socket().setSoTimeout(readTimeout);
        resultConnection.source.timeout().timeout(readTimeout, MILLISECONDS);
        resultConnection.sink.timeout().timeout(writeTimeout, MILLISECONDS);
        resultStream = new Http1xStream(this, resultConnection.source, resultConnection.sink);
      }

      synchronized (connectionPool) {
        stream = resultStream;
        return resultStream;
      }
    } catch (IOException e) {
      throw new RouteException(e);
    }
  }
           
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
      int writeTimeout, boolean connectionRetryEnabled, boolean doExtensiveHealthChecks)
      throws IOException, RouteException {
    while (true) {
      RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
          connectionRetryEnabled);

      // If this is a brand new connection, we can skip the extensive health checks.
      synchronized (connectionPool) {
        if (candidate.successCount == ) {
          return candidate;
        }
      }

      // Otherwise do a potentially-slow check to confirm that the pooled connection is still good.
      if (candidate.isHealthy(doExtensiveHealthChecks)) {
        return candidate;
      }

      connectionFailed(new IOException());
    }
  }
           
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
      boolean connectionRetryEnabled) throws IOException, RouteException {
    Route selectedRoute;
    synchronized (connectionPool) {
      if (released) throw new IllegalStateException("released");
      if (stream != null) throw new IllegalStateException("stream != null");
      if (canceled) throw new IOException("Canceled");

      RealConnection allocatedConnection = this.connection;
      if (allocatedConnection != null && !allocatedConnection.noNewStreams) {
        return allocatedConnection;
      }

      // Attempt to get a connection from the pool.
      RealConnection pooledConnection = Internal.instance.get(connectionPool, address, this);//如果連接配接池中已經存在就從中取出RealConnection
      if (pooledConnection != null) {
        this.connection = pooledConnection;
        return pooledConnection;
      }

      selectedRoute = route;
    }

    if (selectedRoute == null) {//擷取路線
      selectedRoute = routeSelector.next();
      synchronized (connectionPool) {
        route = selectedRoute;
      }
    }
    RealConnection newConnection = new RealConnection(selectedRoute);//并根據路線建構socket鍊路
    acquire(newConnection);

    synchronized (connectionPool) {
      Internal.instance.put(connectionPool, newConnection);//将建立成功的RealConnection放入連接配接池緩存
      this.connection = newConnection;
      if (canceled) throw new IOException("Canceled");
    }

    newConnection.connect(connectTimeout, readTimeout, writeTimeout, address.connectionSpecs(),
        connectionRetryEnabled);//看下連接配接方法
    routeDatabase().connected(newConnection.route());

    return newConnection;
  }
           
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
      List<ConnectionSpec> connectionSpecs, boolean connectionRetryEnabled) throws RouteException {
    if (protocol != null) throw new IllegalStateException("already connected"); //已經連接配接抛出異常

    RouteException routeException = null;
    ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
    Proxy proxy = route.proxy();
    Address address = route.address();

    if (route.address().sslSocketFactory() == null
        && !connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
      throw new RouteException(new UnknownServiceException(
          "CLEARTEXT communication not supported: " + connectionSpecs));
    }

    while (protocol == null) {
      try {
        rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
            ? address.socketFactory().createSocket()
            : new Socket(proxy);
        connectSocket(connectTimeout, readTimeout, writeTimeout, connectionSpecSelector);//看下連接配接socket方法 wocao還有完沒完了 好累
      } catch (IOException e) {
        closeQuietly(socket);
        closeQuietly(rawSocket);
        socket = null;
        rawSocket = null;
        source = null;
        sink = null;
        handshake = null;
        protocol = null;

        if (routeException == null) {
          routeException = new RouteException(e);
        } else {
          routeException.addConnectException(e);
        }

        if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
          throw routeException;
        }
      }
    }
  }
           
private void connectSocket(int connectTimeout, int readTimeout, int writeTimeout,
      ConnectionSpecSelector connectionSpecSelector) throws IOException {
    rawSocket.setSoTimeout(readTimeout);
    try {
      Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);//根據選擇的路線,選擇目前平台下最優的socket庫握手
    } catch (ConnectException e) {
      throw new ConnectException("Failed to connect to " + route.socketAddress());
    }
    source = Okio.buffer(Okio.source(rawSocket));
    sink = Okio.buffer(Okio.sink(rawSocket));

    if (route.address().sslSocketFactory() != null) {//如果存在TLS
      connectTls(readTimeout, writeTimeout, connectionSpecSelector);//根據ssl版本與證書安全握手
    } else {
      protocol = Protocol.HTTP_1_1;
      socket = rawSocket;
    }

    if (protocol == Protocol.SPDY_3 || protocol == Protocol.HTTP_2) {
      socket.setSoTimeout(); // Framed connection timeouts are set per-stream.

      FramedConnection framedConnection = new FramedConnection.Builder(true)
          .socket(socket, route.address().url().host(), source, sink)
          .protocol(protocol)
          .listener(this)
          .build();
      framedConnection.sendConnectionPreface();

      // Only assign the framed connection once the preface has been sent successfully.
      this.allocationLimit = framedConnection.maxConcurrentStreams();
      this.framedConnection = framedConnection;
    } else {
      this.allocationLimit = ;
    }
  }
           

到這裡httpStream構造完成,管道已經建立,回到我們的readResponse讀取響應方法

public void readResponse() throws IOException {
    if (userResponse != null) {//已經讀取直接傳回
      return; // Already ready.
    }
    //這裡覺得比較扯淡了,每調用sendRequest我能走到這裡嗎,在sendRequest方法中我們已經判斷過了 且userResponse判斷已經傳回
    if (networkRequest == null && cacheResponse == null) {
      throw new IllegalStateException("call sendRequest() first!");
    }
    if (networkRequest == null) {//網絡請求為Null
      return; // No network response to read.
    }

    Response networkResponse;
    //這裡前面貌似直接傳過來的false 忘了....WebSocket是基于TCP的一種新協定它實作了浏覽器與伺服器全雙工(full-duplex)通信——允許伺服器主動發送資訊給用戶端。
    //之前有人提過使用它來做推送.
    if (forWebSocket) {
      httpStream.writeRequestHeaders(networkRequest);
      networkResponse = readNetworkResponse();
    } else if (!callerWritesRequestBody) {
    //如果不需要寫請求體 就通過網絡攔截器處理得到response
      networkResponse = new NetworkInterceptorChain(, networkRequest).proceed(networkRequest);
    } else {
    //需要寫請求體
      // Emit the request body's buffer so that everything is in requestBodyOut.
      if (bufferedRequestBody != null && bufferedRequestBody.buffer().size() > ) {
        bufferedRequestBody.emit();
      }

      // Emit the request headers if we haven't yet. We might have just learned the Content-Length.
      if (sentRequestMillis == -) {
        if (OkHeaders.contentLength(networkRequest) == -
            && requestBodyOut instanceof RetryableSink) {
          long contentLength = ((RetryableSink) requestBodyOut).contentLength();
          networkRequest = networkRequest.newBuilder()
              .header("Content-Length", Long.toString(contentLength))
              .build();
        }
        //寫請求頭
        httpStream.writeRequestHeaders(networkRequest);
      }

      // Write the request body to the socket.
      if (requestBodyOut != null) {
        if (bufferedRequestBody != null) {
          // This also closes the wrapped requestBodyOut.
          bufferedRequestBody.close();
        } else {
          requestBodyOut.close();
        }
        if (requestBodyOut instanceof RetryableSink) {
          httpStream.writeRequestBody((RetryableSink) requestBodyOut);
        }
      }
        //讀取響應資料
      networkResponse = readNetworkResponse();
    }
    //儲存請求頭資料到cookies中
    receiveHeaders(networkResponse.headers());
    //如果有緩存響應資料直接擷取,但是這裡基本都是為Null的
    // If we have a cache response too, then we're doing a conditional get.
    if (cacheResponse != null) {
      if (validate(cacheResponse, networkResponse)) {
        userResponse = cacheResponse.newBuilder()
            .request(userRequest)
            .priorResponse(stripBody(priorResponse))
            .headers(combine(cacheResponse.headers(), networkResponse.headers()))
            .cacheResponse(stripBody(cacheResponse))
            .networkResponse(stripBody(networkResponse))
            .build();
        networkResponse.body().close();
        releaseStreamAllocation();

        // Update the cache after combining headers but before stripping the
        // Content-Encoding header (as performed by initContentStream()).
        InternalCache responseCache = Internal.instance.internalCache(client);
        responseCache.trackConditionalCacheHit();
        responseCache.update(cacheResponse, stripBody(userResponse));
        userResponse = unzip(userResponse);
        return;
      } else {
        closeQuietly(cacheResponse.body());
      }
    }
    //建構使用者需要的response,這個userResponse将在 engine.getResponse();方法中傳回
    userResponse = networkResponse.newBuilder()
        .request(userRequest)
        .priorResponse(stripBody(priorResponse))
        .cacheResponse(stripBody(cacheResponse))
        .networkResponse(stripBody(networkResponse))
        .build();

    if (hasBody(userResponse)) {
      maybeCache();
      userResponse = unzip(cacheWritingResponse(storeRequest, userResponse));
    }
  }
           

最後在engine.getResponse();中傳回了response

if (followUp == null) {
        if (!forWebSocket) {//這裡傳入寫死的false 去釋放資源和關閉連接配接
          engine.releaseStreamAllocation();
        }
        return response;
      }
           

在回到okhttp的同步方法中,最終通過return parseResponse(call.execute());

Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
    ResponseBody rawBody = rawResponse.body();

    // Remove the body's source (the only stateful object) so we can pass the response along.
    rawResponse = rawResponse.newBuilder()
        .body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
        .build();

    int code = rawResponse.code();
    if (code <  || code >= ) {
      try {
        // Buffer the entire body to avoid future I/O.
        ResponseBody bufferedBody = Utils.buffer(rawBody);
        return Response.error(bufferedBody, rawResponse);
      } finally {
        rawBody.close();
      }
    }

    if (code ==  || code == ) {
      return Response.success(null, rawResponse);
    }

    ExceptionCatchingRequestBody catchingBody = new ExceptionCatchingRequestBody(rawBody);
    try {
      T body = serviceMethod.toResponse(catchingBody);
      return Response.success(body, rawResponse);
    } catch (RuntimeException e) {
      // If the underlying source threw an exception, propagate that rather than indicating it was
      // a runtime exception.
      catchingBody.throwIfCaught();
      throw e;
    }
  }
           

在serviceMethod.toResponse(catchingBody);-(GsonConverter)-轉化為我們需要的類型,也就是UserInfo類型傳回

走到這裡真是兩行清淚留下來.這還是抛下了很多過程沒有分析,在retrofit中比較核心的功能比如interceptor,rxjava的scheduler等,後面自己準備單獨寫部落格學習