天天看点

OkHttp 源码解析(2) Dispatcher 工作机制

上一节说到,发送一个请求会到RealCall的execute()和enqueue(CallBack callBack) 方法中,首先看一下这俩个方法的实现。

@Override public Response execute() throws IOException {
  synchronized (this) {// 判断这个Call有没有执行过,每一个Call只能执行一次
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  try {
    client.dispatcher().executed(this);// 最终交给client的任务调度器处理
    Response result = getResponseWithInterceptorChain();// 使用拦截器处理返回结果
    if (result == null) throw new IOException("Canceled");
    return result;
  } finally {
    client.dispatcher().finished(this);
  }
}
           

再看看异步的方法

@Override 
public void enqueue(Callback responseCallback) {
  synchronized (this) {
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();
  client.dispatcher().enqueue(new AsyncCall(responseCallback));// 还是交给dispatcher处理
}
           

AsynCall 是RealCall的内部类,实现NameRunnable接口,NameRunable为Runnable的实现,在run()方法里会执行execute()抽象方法,所以在线程池运行AsynCall时会调用AsynCall的execute()方法,下面看一下AsynCall的execute()方法

@Override 
protected void execute() {
  boolean signalledCallback = false;
  try {
    Response response = getResponseWithInterceptorChain();
    if (retryAndFollowUpInterceptor.isCanceled()) {
      signalledCallback = true;
      responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
    } else {
      signalledCallback = true;
      responseCallback.onResponse(RealCall.this, response);
    }
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      responseCallback.onFailure(RealCall.this, e);
    }
  } finally {
    client.dispatcher().finished(this);
  }
}
           

成功或者失败会调用responseCallBack的onResponse()、onFailure()方法;

既然请求最终是Dispatcher处理,那么就看一下Dispatcher

首先,Dispatcher维护三个阵列,等待运行的异步Calls阵列,正在执行的异步Calls阵列,同步的Calls阵列

/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
           

Dispatcher的同步方法

/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
  runningSyncCalls.add(call);// 将RealCall放入执行阵列中
}
           

Dispatcher的异步方法

synchronized void enqueue(AsyncCall call) {
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    runningAsyncCalls.add(call);// 将Call放入正在执行的异步阵列中
    executorService().execute(call);// 线程池中执行这个Call
  } else {
    readyAsyncCalls.add(call);// Call 添加到readyAsyncCalls 阵列中,等待条件满足执行
  }
}
           

异步回调会调用promoteCall,从等待队列加入运行队列并开始执行

private void promoteCalls() {  
  if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.  
  if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.  

  for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {  
    AsyncCall call = i.next();  

    if (runningCallsForHost(call) < maxRequestsPerHost) {  
      i.remove();  
      runningAsyncCalls.add(call);  
      executorService().execute(call);  
    }  

    if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.  
  }  
}  
           

Dispatcher大概这些内容。

Dispatcher发送完请求后,接着就是通过强大的拦截器Intercepter对Response的一些处理

在获取response时,可以看到都会调用

Response result = getResponseWithInterceptorChain();

Response getResponseWithInterceptorChain() throws IOException {  
  // Build a full stack of interceptors.  
  List<Interceptor> interceptors = new ArrayList<>();  
  interceptors.addAll(client.interceptors());  
  interceptors.add(retryAndFollowUpInterceptor);  
  interceptors.add(new BridgeInterceptor(client.cookieJar()));//call request 与network处理  
  interceptors.add(new CacheInterceptor(client.internalCache()));//缓存处理  
  interceptors.add(new ConnectInterceptor(client));//连接处理  
  if (!forWebSocket) {  
    interceptors.addAll(client.networkInterceptors());  
  }  
  interceptors.add(new CallServerInterceptor(forWebSocket));  

  Interceptor.Chain chain = new RealInterceptorChain(  
      interceptors, null, null, null, , originalRequest);  
  return chain.proceed(originalRequest);  
}  
           

拦截器的工作机制下一节,再做研究吧!

继续阅读