OKhttp的底层用的是Socket连接而不是URLConnection,所以整体来说还是比较复杂的,涉及到Http协议的封装和解封装、TLS/SSL安全协议的封装、Http2的封装等等,但还是非常值得我们去学习一下的,本篇文章不会对这些底层原理有详细的解析,但读完之后至少大概知道Http请求的底层是如何实现的
一.HTTP请求的优化
1.keep-alive机制
我们知道,一个HTTP的请求需要经过三次握手和四次挥手,一般的流程为先tcp握手,然后传输数据,后面释放资源。在HTTP1.0是没有keep-alive机制的,它的流程大概如下:
![](https://img.laitimes.com/img/9ZDMuAjOiMmIsIjOiQnIsIyZwpmL5czMxAjNzETM0EjMxgTMwIzLc52YucWbp5GZzNmLn9Gbi1yZtl2Lc9CX6MHc0RHaiojIsJye.jpg)
这种请求方式本来没什么问题,但是在复杂的请求场景中就显示出它的缺点了。比如在一个网页中同时有十几个资源要请求,按照HTTP1.0的做法, 每一个请求都要创建一个tcp连接,这样就创建了十几个tcp连接,而每一个连接都要经过三次握手和四次挥手,这种情景下,资源消耗是很大的,极大得影响了网络请求效率。
针对这种情况,在HTTP1.x中提出了keep-alive机制,即当一个tcp连接请求完成以后不立即释放,如果下一个http请求的host跟上一次的相同,那么就复用上一次的tcp连接,这样就免去了tcp重复的创建和销毁的开销,流程如下
一般在浏览器中,都会保持6~8个keep-alive的socket连接,且保持一定的生命周期,当不需要的时候才会关闭。
2.HTTP/2
HTTP/2致力于解决HTTP1.x中存在的一些不足,HTTP1.x中客户端想发起多个并行的请求就要建立多个tcp连接,HTTP1.x不会压缩请求头和响应头,导致流量的浪费,HTTP1.x不支持资源的优先级传输,所以针对这些问题,HTTP2实现了如下功能:
① 报头压缩:HTTP/2使用HPACK压缩格式压缩请求和响应报头数据,减少了传输流量的输出
② 多路复用:在HTTP1.x中每个tcp连接只能同时处理一个请求-响应,浏览器按照FIFO原则处理请求的执行,上一个请求未完成会阻塞下一个请求的执行。在HTTP/2中,我们可以把一个HTTP请求当作一个流,每个流分解成不同的帧,帧是传输的最小单位,而且每个帧都有标记来识别它属于哪个流,不同流的帧可以在同一个连接中交错发送,然后在另一端重新组装,这样就实现了一个连接可以有多个请求-响应并行执行
③ 数据流的优先级:HTTP/2中每个帧都是可以交错发送的,这样一来发送的顺序就很关键了,我们可以设置每个数据流的权重和依赖关系,这样就可以做到权重高的流优先发送
HTTP/2功能增强的核心是新增了二进制分帧层Binary Framing,它定义了如何封装HTTP消息并在客户端和服务端之间进行传输
HTTP/2的数据传输是以帧的方式交错发送的,如下图示:
二.关键类:ConnectionPool、RealConnection、StreamAllocation
上面讲了http1.x和http2的相关知识,在OKhttp中不管是哪种连接都需要用到连接池来维护,通过连接池可以有效地提高连接的使用效率,连接池的实现类为ConnectionPool
1.ConnectionPool类详解
主要用来管理http1.x或者http/2的链接,该类实现了链接的复用和清理工作,来看看它的主要属性
//清除过期链接的线程池
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
//最大的空闲链接数和保活时间
private final int maxIdleConnections;
private final long keepAliveDurationNs;
//双端队列,保存链接
private final Deque<RealConnection> connections = new ArrayDeque<>();
final RouteDatabase routeDatabase = new RouteDatabase();
boolean cleanupRunning;
//创建连接池,默认最大空闲连接为5个,连接的最长空闲时间为5分钟
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}
public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);
// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}
既然是Http链接的管理,那么就会有添加连接、获取连接、删除连接等行为的实现
get方法:获取连接
//获取连接
@Nullable RealConnection get(Address address, StreamAllocation streamAllocation, Route route) {
assert (Thread.holdsLock(this));
//遍历已有的连接
for (RealConnection connection : connections) {
//如果地址和路由都符合要求,则返回
if (connection.isEligible(address, route)) {
streamAllocation.acquire(connection, true);
return connection;
}
}
return null;
}
put方法:添加连接
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
//如果连接清除工作已停止,则触发清除工作
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
//将连接添加到连接池
connections.add(connection);
}
每当添加一个新的连接,如果当前连接的清理工作已经停止,则会触发连接的清理工作,把符合清除条件的连接清理掉或者在指定时间删除即将过期的连接,连接清除工作需要启动一个工作线程来执行,来看看
连接的清理:
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
//一个无限循环
while (true) {
//调用leanup方法清除,返回下一次清理的等待时间
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return; //没有连接可清理了,结束清理线程
//等待一定的时间
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};
清理连接的核心在cleanup方法
long cleanup(long now) {
int inUseConnectionCount = 0; //正在使用的连接数
int idleConnectionCount = 0; //空闲的连接数
RealConnection longestIdleConnection = null; //空闲时间最长的连接
long longestIdleDurationNs = Long.MIN_VALUE;
//遍历所有的连接,找出符合条件的连接并且清除
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
//如果当前连接正在使用中,继续下一个查找
// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}
//空闲连接数加1
idleConnectionCount++;
// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
//找出空闲时间最长的那个连接以及它的空闲时间时长
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}
if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
//有符合清理条件的连接,清除空闲时间最长的那个连接
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
//有空闲连接,返回空闲时间最长的那个连接的剩余时间,然后线程会等待这个时间后再回来清理
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
//连接都在使用种,没有空闲连接,则等待5分钟(最大空闲时间)再次执行清理工作
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
//没有连接
cleanupRunning = false;
return -1;
}
}
//关闭清理连接的socket
closeQuietly(longestIdleConnection.socket());
// 执行完清理某个连接以后,马上再执行清理工作
return 0;
}
cleanup方法主要实现以下三个可能:
1.清理符合条件的连接,返回0,马上又进入下一轮清理工作
2.没有符合清理条件的连接,但是有空闲连接,返回空闲最长的那个连接的剩余时间,线程在等待这个剩余时间以后再进行清理工作
3.没有连接了,返回-1结束清理工作线程
连接池用一个双端队列来保存连接,默认空闲连接的最大数量为5,连接的最长空闲时间为5分钟,每一个连接都代表一个Http连接或TCP连接,用RealConnection表示
2.RealConnection类详解
RealConnection是Connection的实现类,代表一个真正的socket连接,RealConnection代表着客户端和服务器已经有了一个通信链路,先来对它的属性和构造函数有个大致的了解
private static final String NPE_THROW_WITH_NULL = "throw with null exception";
private static final int MAX_TUNNEL_ATTEMPTS = 21;
private final ConnectionPool connectionPool;
private final Route route;
/** The low-level TCP socket. */
private Socket rawSocket; //底层socket
//应用层socket,通过底层socket封装而来,比如TLS/SSL的连接,此时的应用层socket就是
//将底层socket经过TLS/SSL的封装而来的,即这是一个有加密通道的安全的socket连接
//数据发送经过这个socket以后都是经过加密的,接收的加密数据经过这个socket也是可以解密的
private Socket socket;
private Handshake handshake; //握手
private Protocol protocol; //协议
private Http2Connection http2Connection; //http2的连接
//与服务器的输入输出流
private BufferedSource source;
private BufferedSink sink;
public boolean noNewStreams;
public int successCount;
public int allocationLimit = 1;
/** Current streams carried by this connection. */
public final List<Reference<StreamAllocation>> allocations = new ArrayList<>();
/** Nanotime timestamp when {@code allocations.size()} reached zero. */
public long idleAtNanos = Long.MAX_VALUE;
public RealConnection(ConnectionPool connectionPool, Route route) {
this.connectionPool = connectionPool;
this.route = route;
}
来看看RealConnection类的连接方法connect,主要是建立普通socket连接或者隧道连接(不太懂),然后如果是安全的SSL连接则建立SSL的socket连接,如果是HTTP2连接,则建立HTTP2的相关协议
public void connect(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled, Call call,
EventListener eventListener) {
if (protocol != null) throw new IllegalStateException("already connected");
RouteException routeException = null;
List<ConnectionSpec> connectionSpecs = route.address().connectionSpecs();
ConnectionSpecSelector connectionSpecSelector = new ConnectionSpecSelector(connectionSpecs);
if (route.address().sslSocketFactory() == null) {
if (!connectionSpecs.contains(ConnectionSpec.CLEARTEXT)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication not enabled for client"));
}
String host = route.address().url().host();
if (!Platform.get().isCleartextTrafficPermitted(host)) {
throw new RouteException(new UnknownServiceException(
"CLEARTEXT communication to " + host + " not permitted by network security policy"));
}
}
//连接开始
while (true) {
try {
//要求隧道模式,则建立通道连接,通常不是这种
if (route.requiresTunnel()) {
connectTunnel(connectTimeout, readTimeout, writeTimeout, call, eventListener);
if (rawSocket == null) {
// We were unable to connect the tunnel but properly closed down our resources.
break;
}
} else {
//创建socket连接,一般都走这条逻辑
connectSocket(connectTimeout, readTimeout, call, eventListener);
}
//建立协议,这一步是建立好TCP连接之后,而在该TCP能被用来收发数据之前执行的
//它主要为了数据的加密传输做一些初始化,比如TLS握手,HTTP/2的协商
establishProtocol(connectionSpecSelector, pingIntervalMillis, call, eventListener);
eventListener.connectEnd(call, route.socketAddress(), route.proxy(), protocol);
break;
} catch (IOException e) {
closeQuietly(socket);
closeQuietly(rawSocket);
socket = null;
rawSocket = null;
source = null;
sink = null;
handshake = null;
protocol = null;
http2Connection = null;
eventListener.connectFailed(call, route.socketAddress(), route.proxy(), null, e);
if (routeException == null) {
routeException = new RouteException(e);
} else {
routeException.addConnectException(e);
}
if (!connectionRetryEnabled || !connectionSpecSelector.connectionFailed(e)) {
throw routeException;
}
}
}
if (route.requiresTunnel() && rawSocket == null) {
ProtocolException exception = new ProtocolException("Too many tunnel connections attempted: "
+ MAX_TUNNEL_ATTEMPTS);
throw new RouteException(exception);
}
if (http2Connection != null) {
synchronized (connectionPool) {
allocationLimit = http2Connection.maxConcurrentStreams();
}
}
}
来看建立普通的socket连接connectSocket方法
private void connectSocket(int connectTimeout, int readTimeout, Call call,
EventListener eventListener) throws IOException {
Proxy proxy = route.proxy();
Address address = route.address();
//根据代理类型来选择socket类型
rawSocket = proxy.type() == Proxy.Type.DIRECT || proxy.type() == Proxy.Type.HTTP
? address.socketFactory().createSocket()
: new Socket(proxy);
//设置监听和超时
eventListener.connectStart(call, route.socketAddress(), proxy);
rawSocket.setSoTimeout(readTimeout);
try {
//连接socket,利用Platfor是为了适应不同平台
Platform.get().connectSocket(rawSocket, route.socketAddress(), connectTimeout);
} catch (ConnectException e) {
ConnectException ce = new ConnectException("Failed to connect to " + route.socketAddress());
ce.initCause(e);
throw ce;
}
try {
//通过OKio获取socket的输入输出流
source = Okio.buffer(Okio.source(rawSocket));
sink = Okio.buffer(Okio.sink(rawSocket));
} catch (NullPointerException npe) {
if (NPE_THROW_WITH_NULL.equals(npe.getMessage())) {
throw new IOException(npe);
}
}
}
建立好socket连接以后,接下来建立网络协议相关,比如TLS/SSL、http1或http2等
private void establishProtocol(ConnectionSpecSelector connectionSpecSelector,
int pingIntervalMillis, Call call, EventListener eventListener) throws IOException {
//如果不是ssl连接,则底层socket和应用层socket相同
if (route.address().sslSocketFactory() == null) {
protocol = Protocol.HTTP_1_1;
socket = rawSocket;
return;
}
eventListener.secureConnectStart(call);
//如果是ssl,建立Tls连接
connectTls(connectionSpecSelector);
eventListener.secureConnectEnd(call, handshake);
//如果是HTTP2的连接,则建立http2相关
if (protocol == Protocol.HTTP_2) {
socket.setSoTimeout(0); // HTTP/2 connection timeouts are set per-stream.
http2Connection = new Http2Connection.Builder(true)
.socket(socket, route.address().url().host(), source, sink)
.listener(this)
.pingIntervalMillis(pingIntervalMillis)
.build();
http2Connection.start();
}
}
如果是TLS连接,则执行下面方法
private void connectTls(ConnectionSpecSelector connectionSpecSelector) throws IOException {
Address address = route.address();
SSLSocketFactory sslSocketFactory = address.sslSocketFactory();
boolean success = false;
SSLSocket sslSocket = null;
try {
// Create the wrapper over the connected socket.
// 因为是SSL的连接,所以在原来的socket上加一层SSL
sslSocket = (SSLSocket) sslSocketFactory.createSocket(
rawSocket, address.url().host(), address.url().port(), true /* autoClose */);
//配置SSL扩展参数
// Configure the socket's ciphers, TLS versions, and extensions.
ConnectionSpec connectionSpec = connectionSpecSelector.configureSecureSocket(sslSocket);
if (connectionSpec.supportsTlsExtensions()) {
Platform.get().configureTlsExtensions(
sslSocket, address.url().host(), address.protocols());
}
//启动TLS握手
// Force handshake. This can throw!
sslSocket.startHandshake();
//获取证书信息
// block for session establishment
SSLSession sslSocketSession = sslSocket.getSession();
if (!isValid(sslSocketSession)) {
throw new IOException("a valid ssl session was not established");
}
Handshake unverifiedHandshake = Handshake.get(sslSocketSession);
//验证证书是否可行,不行的话抛出异常
// Verify that the socket's certificates are acceptable for the target host.
if (!address.hostnameVerifier().verify(address.url().host(), sslSocketSession)) {
X509Certificate cert = (X509Certificate) unverifiedHandshake.peerCertificates().get(0);
throw new SSLPeerUnverifiedException("Hostname " + address.url().host() + " not verified:"
+ "\n certificate: " + CertificatePinner.pin(cert)
+ "\n DN: " + cert.getSubjectDN().getName()
+ "\n subjectAltNames: " + OkHostnameVerifier.allSubjectAltNames(cert));
}
// Check that the certificate pinner is satisfied by the certificates presented.
address.certificatePinner().check(address.url().host(),
unverifiedHandshake.peerCertificates());
// 证书验证成功的话,则保持选择的应用层协议,主要是HTTP1和HTTP2两个协议
// Success! Save the handshake and the ALPN protocol.
String maybeProtocol = connectionSpec.supportsTlsExtensions()
? Platform.get().getSelectedProtocol(sslSocket)
: null;
//将sslSocket赋值给全局的应用层socket,并获取socket的输入输出流
socket = sslSocket;
source = Okio.buffer(Okio.source(socket));
sink = Okio.buffer(Okio.sink(socket));
handshake = unverifiedHandshake;
//保存应用层协议
protocol = maybeProtocol != null
? Protocol.get(maybeProtocol)
: Protocol.HTTP_1_1;
success = true;
} catch (AssertionError e) {
if (Util.isAndroidGetsocknameError(e)) throw new IOException(e);
throw e;
} finally {
if (sslSocket != null) {
Platform.get().afterHandshake(sslSocket);
}
if (!success) {
closeQuietly(sslSocket);
}
}
}
过程主要就是获得加了SSL层的sslSocket,然后是TLS握手、验证证书信息、获取IO流、保存连接协议,最后完成TLS的连接建立
3.StreamAllocation类解析
StreamAllocation从字面上理解叫流的分配,它是外界使用OKhttp的连接的桥梁,通过newStream方法返回一个可使用的流,即HttpCodec实例,HttpCodec其实是对http1.x或http2协议的封装,还包含对应的Socket的IO流,通过HttpCodec可以将输入数据封装成相应的Http协议的数据,然后通过socket的输出流发送出去,同时也可以通过socket的输入流读取数据并解析数据,下面来看看newStream方法
public HttpCodec newStream(
OkHttpClient client, Interceptor.Chain chain, boolean doExtensiveHealthChecks) {
int connectTimeout = chain.connectTimeoutMillis();
int readTimeout = chain.readTimeoutMillis();
int writeTimeout = chain.writeTimeoutMillis();
int pingIntervalMillis = client.pingIntervalMillis();
boolean connectionRetryEnabled = client.retryOnConnectionFailure();
try {
//从连接池中获取一个可行的连接
RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
//获取一个HttpCodec实例,HttpCodec主要是对httpx和http/2的连接和流的封装
HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);
synchronized (connectionPool) {
codec = resultCodec;
return resultCodec;
}
} catch (IOException e) {
throw new RouteException(e);
}
}
来看findHealthyConnection方法
private RealConnection findHealthyConnection(int connectTimeout, int readTimeout,
int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled,
boolean doExtensiveHealthChecks) throws IOException {
while (true) {
RealConnection candidate = findConnection(connectTimeout, readTimeout, writeTimeout,
pingIntervalMillis, connectionRetryEnabled);
// If this is a brand new connection, we can skip the extensive health checks.
synchronized (connectionPool) {
if (candidate.successCount == 0) {
return candidate;
}
}
// Do a (potentially slow) check to confirm that the pooled connection is still good. If it
// isn't, take it out of the pool and start again.
if (!candidate.isHealthy(doExtensiveHealthChecks)) {
noNewStreams();
continue;
}
return candidate;
}
}
关键是通过findConnection获取一个RealConnection,再检查返回的连接是否可行,不可行的话则继续查找
private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout,
int pingIntervalMillis, boolean connectionRetryEnabled) throws IOException {
boolean foundPooledConnection = false;
RealConnection result = null;
Route selectedRoute = null;
Connection releasedConnection;
Socket toClose;
synchronized (connectionPool) {
if (released) throw new IllegalStateException("released");
if (codec != null) throw new IllegalStateException("codec != null");
if (canceled) throw new IOException("Canceled");
//首先看已有的连接是否可行,如果可以则利用它
releasedConnection = this.connection;
toClose = releaseIfNoNewStreams();
if (this.connection != null) {
// We had an already-allocated connection and it's good.
result = this.connection;
releasedConnection = null;
}
if (!reportedAcquired) {
// If the connection was never reported acquired, don't report it as released!
releasedConnection = null;
}
//从缓存中获取连接
if (result == null) {
// Attempt to get a connection from the pool.
Internal.instance.get(connectionPool, address, this, null);
if (connection != null) {
foundPooledConnection = true;
result = connection;
} else {
selectedRoute = route;
}
}
}
closeQuietly(toClose);
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection);
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
//返回我们已有的连接或者从连接池获取的连接
return result;
}
//如果上面的条件都不符合,那么就需要创建一个新的连接
// If we need a route selection, make one. This is a blocking operation.
boolean newRouteSelection = false;
if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) {
newRouteSelection = true;
routeSelection = routeSelector.next();
}
synchronized (connectionPool) {
if (canceled) throw new IOException("Canceled");
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
List<Route> routes = routeSelection.getAll();
for (int i = 0, size = routes.size(); i < size; i++) {
Route route = routes.get(i);
Internal.instance.get(connectionPool, address, this, route);
if (connection != null) {
foundPooledConnection = true;
result = connection;
this.route = route;
break;
}
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection.next();
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
route = selectedRoute;
refusedStreamCount = 0;
//创建一个新的连接
result = new RealConnection(connectionPool, selectedRoute);
acquire(result, false);
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result);
return result;
}
// Do TCP + TLS handshakes. This is a blocking operation.
result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis,
connectionRetryEnabled, call, eventListener);
routeDatabase().connected(result.route());
Socket socket = null;
synchronized (connectionPool) {
reportedAcquired = true;
// 将新的连接放入连接池
Internal.instance.put(connectionPool, result);
// If another multiplexed connection to the same address was created concurrently, then
// release this connection and acquire that one.
if (result.isMultiplexed()) {
socket = Internal.instance.deduplicate(connectionPool, address, this);
result = connection;
}
}
closeQuietly(socket);
eventListener.connectionAcquired(call, result);
return result;
}
首先看已有的连接是否符合条件,不行则从连接池获取连接,如果连接池也没有符合条件的连接,则创建一个新的连接,然后将新连接放入连接池中
获取到连接以后,然后关键的就是HttpCodec了,根据连接是HTTP1.x还是HTTP2请求,返回对应的HttpCodec实例
public HttpCodec newCodec(OkHttpClient client, Interceptor.Chain chain,
StreamAllocation streamAllocation) throws SocketException {
//如果是HTTP2请求
if (http2Connection != null) {
return new Http2Codec(client, chain, streamAllocation, http2Connection);
} else { //HTTP1.x 请求
socket.setSoTimeout(chain.readTimeoutMillis());
source.timeout().timeout(chain.readTimeoutMillis(), MILLISECONDS);
sink.timeout().timeout(chain.writeTimeoutMillis(), MILLISECONDS);
return new Http1Codec(client, streamAllocation, source, sink);
}
}
三.拦截器中实现连接
分析了OKHttp连接机制中最重要的三个类以后,那么接下来看看拦截器中最终是如何完成连接的,一个连接的StreamLocation最开始在RetryAndFollowUpInterceptor拦截器中被初始化的,如下
@Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Call call = realChain.call();
EventListener eventListener = realChain.eventListener();
//初始化一个StreamLocation
StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
createAddress(request.url()), call, eventListener, callStackTrace);
this.streamAllocation = streamAllocation;
int followUpCount = 0;
Response priorResponse = null;
while (true) {
if (canceled) {
streamAllocation.release();
throw new IOException("Canceled");
}
Response response;
boolean releaseConnection = true;
try {
//将StreamLocation传递给下一个拦截器
response = realChain.proceed(request, streamAllocation, null, null);
releaseConnection = false;
} catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
.......省略代码
}
}
而HttpCodec是在连接拦截器ConnectInterceptor中初始化的,如下
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//通过StreamLocation获取一个HttpCodec实例
HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
RealConnection connection = streamAllocation.connection();
//将HttpCodec传递给下一个拦截器
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
最后到了访问服务器的拦截器CallServerInterceptor,我们来看看
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
//获取本次连接的HttpCodec实例,
HttpCodec httpCodec = realChain.httpStream();
//获取StreamLocation
StreamAllocation streamAllocation = realChain.streamAllocation();
RealConnection connection = (RealConnection) realChain.connection();
Request request = realChain.request(); //请求
long sentRequestMillis = System.currentTimeMillis();
realChain.eventListener().requestHeadersStart(realChain.call());
//通过HttpCodec写入请求头
httpCodec.writeRequestHeaders(request);
realChain.eventListener().requestHeadersEnd(realChain.call(), request);
Response.Builder responseBuilder = null;
if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
// If there's a "Expect: 100-continue" header on the request, wait for a "HTTP/1.1 100
// Continue" response before transmitting the request body. If we don't get that, return
// what we did get (such as a 4xx response) without ever transmitting the request body.
if ("100-continue".equalsIgnoreCase(request.header("Expect"))) {
httpCodec.flushRequest();
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(true);
}
//写入请求体
if (responseBuilder == null) {
// Write the request body if the "Expect: 100-continue" expectation was met.
realChain.eventListener().requestBodyStart(realChain.call());
long contentLength = request.body().contentLength();
CountingSink requestBodyOut =
new CountingSink(httpCodec.createRequestBody(request, contentLength));
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
realChain.eventListener()
.requestBodyEnd(realChain.call(), requestBodyOut.successfulCount);
} else if (!connection.isMultiplexed()) {
// If the "Expect: 100-continue" expectation wasn't met, prevent the HTTP/1 connection
// from being reused. Otherwise we're still obligated to transmit the request body to
// leave the connection in a consistent state.
streamAllocation.noNewStreams();
}
}
httpCodec.finishRequest();
//读取响应头
if (responseBuilder == null) {
realChain.eventListener().responseHeadersStart(realChain.call());
responseBuilder = httpCodec.readResponseHeaders(false);
}
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
//读取响应码
int code = response.code();
if (code == 100) {
// server sent a 100-continue even though we did not request one.
// try again to read the actual response
responseBuilder = httpCodec.readResponseHeaders(false);
response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
code = response.code();
}
realChain.eventListener()
.responseHeadersEnd(realChain.call(), response);
//读取响应体
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}
if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}
if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}
//返回响应
return response;
}
这个我就不做过多分析了,无非就是通过HttpCodec来写入请求头和请求体,然后读取响应头和响应体,最后返回一个Response
最后引用一个别人的OKhttp的整体架构图,如下: