一、OkHttp 概述与整体架构 1.1 OkHttp 是什么 OkHttp 是 Square 公司开发的 HTTP 客户端,也是 Android 平台事实上的标准 HTTP 实现。从 Android 4.4 开始,HttpURLConnection 的底层实现就已经被替换为 OkHttp(AOSP 中称为 OkHttpURLConnection)。
源码仓库 :github.com/square/okhttp
核心特点 :
HTTP/2 支持(多路复用、Header 压缩、Server Push)
连接池(复用 TCP 连接,减少握手延迟)
透明 GZIP 压缩(自动添加 Accept-Encoding,自动解压)
响应缓存(基于 DiskLruCache,遵循 RFC 7234)
拦截器链(Interceptor Chain,责任链模式)
自动重试与重定向
WebSocket 支持
完整的网络事件监听(EventListener)
1.2 整体架构 OkHttpClient (配置中心) │ └── newCall(Request) → RealCall │ ├── Client 拦截器链 (用户自定义 interceptors) │ ├── AuthInterceptor │ ├── LoggingInterceptor │ └── ... │ ├── 内置拦截器链 (RealInterceptorChain) │ ├── 0: RetryAndFollowUpInterceptor (重试与重定向) │ ├── 1: BridgeInterceptor (HTTP 桥接) │ ├── 2: CacheInterceptor (缓存) │ ├── 3: ConnectInterceptor (连接) │ └── 4: CallServerInterceptor (发送请求 + 读取响应) │ ├── ConnectionPool (连接池) │ ├── RealConnection (实际 TCP 连接) │ └── cleanupRunnable (清理过期连接) │ └── EventListener (网络事件回调) ├── dnsStart / dnsEnd ├── connectStart / connectEnd ├── secureConnectStart / secureConnectEnd ├── requestHeadersStart / requestHeadersEnd ├── requestBodyStart / requestBodyEnd ├── responseHeadersStart / responseHeadersEnd └── responseBodyStart / responseBodyEnd
1.3 一次完整的请求流程 OkHttpClient.newCall(Request) └── RealCall.execute() 或 .enqueue() └── getResponseWithInterceptorChain() ┌─ 1. 遍历 client.interceptors()(用户自定义拦截器) ├─ 2. RetryAndFollowUpInterceptor │ ├─ 创建 StreamAllocation │ ├─ 处理重定向(最多 20 次) │ └─ 处理重试(网络异常 + 请求幂等性判断) ├─ 3. BridgeInterceptor │ ├─ 补全请求头(Host, Connection, Accept-Encoding, Cookie) │ ├─ 处理 gzip │ └─ 解析响应头,补全响应信息 ├─ 4. CacheInterceptor │ ├─ 根据 CacheStrategy 决定使用缓存还是网络 │ ├─ 条件 GET(If-None-Match / If-Modified-Since) │ └─ 缓存响应 ├─ 5. ConnectInterceptor │ ├─ StreamAllocation.newStream() │ ├─ findHealthyConnection() │ │ ├─ 先从连接池查找 │ │ └─ 池中无合适的 → 创建新连接 │ └─ RealConnection.connect() │ ├─ RouteSelector 选择路由 │ ├─ DNS 解析 │ ├─ TCP 连接 (Socket) │ └─ TLS 握手 (SSLSocket) └─ 6. CallServerInterceptor ├─ 写入请求头 (exchange.writeRequestHeaders) ├─ 写入请求体 (exchange.createRequestBody) ├─ 读取响应头 (exchange.readResponseHeaders) └─ 读取响应体 (exchange.openResponseBodySource)
二、拦截器链 —— OkHttp 的灵魂 2.1 RealInterceptorChain 源码位置 :okhttp3/internal/http/RealInterceptorChain.kt
class RealInterceptorChain ( internal val call: RealCall, private val interceptors: List<Interceptor>, private val index: Int , internal val exchange: Exchange?, internal val request: Request, internal val calls: Int , internal val connectTimeout: Int , internal val readTimeout: Int , internal val writeTimeout: Int ) : Interceptor.Chain { override fun proceed (request: Request ) : Response { check(index < interceptors.size) calls++ if (this .exchange != null ) { check(this .request.url.canReuseConnectionFor(request.url)) { "cannot make a new request because the exchange is still active" } check(exchange == this .exchange) { "cannot make a new request with a different exchange" } } val next = RealInterceptorChain( call = call, interceptors = interceptors, index = index + 1 , exchange = exchange, request = request, calls = calls, connectTimeout = connectTimeout, readTimeout = readTimeout, writeTimeout = writeTimeout ) val interceptor = interceptors[index] val response = interceptor.intercept(next) ?: throw NullPointerException( "interceptor ${interceptor} returned null" ) check(response.body != null ) { "interceptor ${interceptor} returned a response with no body" } return response } }
责任链的核心 :
每个 Interceptor.intercept(chain) 接收一个 Chain 对象
调用 chain.proceed(request) 将请求传递给下一个拦截器
下一个拦截器处理完成后返回 Response,当前拦截器可以对 Response 做后处理
形成经典的「前处理 → 传递 → 后处理」模式
2.2 完整的责任链构建 源码位置 :okhttp3/RealCall.kt
class RealCall ( val client: OkHttpClient, val originalRequest: Request, val forWebSocket: Boolean ) : Call { override fun execute () : Response = synchronized(this ) { check(!executed) { "Already Executed" } executed = true timeout.enter() callStart() try { client.dispatcher.executed(this ) val result = getResponseWithInterceptorChain() result ?: throw IOException("Canceled" ) } catch (e: IOException) { if (e !is TimeoutIOException) { callFailed(e) } throw e } finally { client.dispatcher.finished(this ) } } @Throws(IOException::class) internal fun getResponseWithInterceptorChain () : Response { val interceptors = mutableListOf<Interceptor>() interceptors += client.interceptors interceptors += RetryAndFollowUpInterceptor(client) interceptors += BridgeInterceptor(client.cookieJar) interceptors += CacheInterceptor(client.cache) interceptors += ConnectInterceptor if (!forWebSocket) { interceptors += client.networkInterceptors } interceptors += CallServerInterceptor(forWebSocket) val chain = RealInterceptorChain( call = this , interceptors = interceptors, index = 0 , exchange = null , request = originalRequest, calls = 0 , connectTimeout = client.connectTimeoutMillis, readTimeout = client.readTimeoutMillis, writeTimeout = client.writeTimeoutMillis ) val response = chain.proceed(originalRequest) if (isCanceled()) { response.closeQuietly() throw IOException("Canceled" ) } return response } }
关键点 :
client.interceptors(应用拦截器)和 client.networkInterceptors(网络拦截器)的区别在于位置:
应用拦截器在 RetryAndFollowUp 之前,只需执行一次
网络拦截器在 Connect 之后,每次重定向/重试都会执行
ConnectInterceptor 的 exchange 变量是判断「是否已建立连接」的标志
三、五大内置拦截器详解 3.1 RetryAndFollowUpInterceptor —— 重试与重定向 源码位置 :okhttp3/internal/http/RetryAndFollowUpInterceptor.kt
class RetryAndFollowUpInterceptor (private val client: OkHttpClient) : Interceptor { companion object { private const val MAX_FOLLOW_UPS = 20 } override fun intercept (chain: Interceptor .Chain ) : Response { val realChain = chain as RealInterceptorChain var request = realChain.request val call = realChain.call var followUpCount = 0 var priorResponse: Response? = null var newExchangeFinder = true var recoveredFailures = listOf<IOException>() while (true ) { call.enterNetworkInterceptorExchange(request, newExchangeFinder) var response: Response var closeActiveExchange = true try { if (call.isCanceled) { throw IOException("Canceled" ) } try { response = realChain.proceed(request) closeActiveExchange = false } catch (e: RouteException) { if (!recover(e.lastConnectException, call, request, requestSendStarted = false )) { throw e.firstConnectException.withSuppressed(recoveredFailures) } recoveredFailures += e.firstConnectException newExchangeFinder = false continue } catch (e: IOException) { if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) { throw e.withSuppressed(recoveredFailures) } recoveredFailures += e newExchangeFinder = false continue } } finally { if (closeActiveExchange) { call.exitNetworkInterceptorExchange() } } if (priorResponse != null ) { response = response.newBuilder() .priorResponse(priorResponse.newBuilder() .body(null ) .build()) .build() } val exchange = call.interceptorScopedExchange val followUp = followUpRequest(response, exchange) if (followUp == null ) { if (exchange != null && exchange.isDuplex) { call.timeoutEarlyExit() } closeActiveExchange = false return response } closeActiveExchange = true if (followUp.body is UnrepeatableRequestBody) { return response } if (++followUpCount > MAX_FOLLOW_UPS) { throw ProtocolException("Too many follow-up requests: $followUpCount " ) } request = followUp priorResponse = response } } }
重试判断逻辑 :
private fun recover ( e: IOException , call: RealCall , userRequest: Request , requestSendStarted: Boolean ) : Boolean { if (!client.retryOnConnectionFailure) return false if (requestSendStarted && requestIsOneShot(e, userRequest)) return false if (!isRecoverable(e, requestSendStarted)) return false if (!call.retryAfterFailure()) return false return true } private fun isRecoverable (e: IOException , requestSendStarted: Boolean ) : Boolean { if (e is ProtocolException) return false if (e is SSLException && requestSendStarted) return false return e !is InterruptedIOException } private fun requestIsOneShot (e: IOException , userRequest: Request ) : Boolean { val requestBody = userRequest.body if (requestBody != null && requestBody.isOneShot()) return true if (e is FileNotFoundException) return true return false }
重定向处理 :
fun followUpRequest (userResponse: Response , exchange: Exchange ?) : Request? { val route = exchange?.connection?.route() val responseCode = userResponse.code val method = userResponse.request.method return when (responseCode) { HTTP_PROXY_AUTH -> { val selectedProxy = route?.proxy ?: client.proxy if (selectedProxy.type() != Proxy.Type.DIRECT) { client.proxyAuthenticator.authenticate(route, userResponse) } else null } HTTP_UNAUTHORIZED -> client.authenticator.authenticate(route, userResponse) HTTP_PERM_REDIRECT, HTTP_TEMP_REDIRECT -> { if (method != "GET" && method != "HEAD" ) { client.followRedirects ?: return null userResponse.request.newBuilder() .url(userResponse.header("Location" )!!) .build() } else null } HTTP_MULT_CHOICE, HTTP_MOVED_PERM, HTTP_MOVED_TEMP, HTTP_SEE_OTHER -> { if (method != "GET" && method != "HEAD" ) { client.followRedirects ?: return null userResponse.request.newBuilder() .url(userResponse.header("Location" )!!) .method("GET" , null ) .build() } else null } HTTP_CLIENT_TIMEOUT -> { if (!userResponse.request.body.isDuplex() && !userResponse.request.body.isOneShot()) { userResponse.request } else null } HTTP_UNAVAILABLE -> { if (userResponse.priorResponse != null && userResponse.priorResponse.code == HTTP_UNAVAILABLE) { return null } if (retryAfter(userResponse, Integer.MAX_VALUE) == 0 ) { userResponse.request } else null } else -> null } }
3.2 BridgeInterceptor —— HTTP 桥接 源码位置 :okhttp3/internal/http/BridgeInterceptor.kt
class BridgeInterceptor (private val cookieJar: CookieJar) : Interceptor { override fun intercept (chain: Interceptor .Chain ) : Response { val userRequest = chain.request() val requestBuilder = userRequest.newBuilder() val body = userRequest.body if (body != null ) { val contentType = body.contentType() if (contentType != null ) { requestBuilder.header("Content-Type" , contentType.toString()) } val contentLength = body.contentLength() if (contentLength != -1L ) { requestBuilder.header("Content-Length" , contentLength.toString()) requestBuilder.removeHeader("Transfer-Encoding" ) } else { requestBuilder.header("Transfer-Encoding" , "chunked" ) requestBuilder.removeHeader("Content-Length" ) } } if (userRequest.header("Host" ) == null ) { requestBuilder.header("Host" , userRequest.url.toHostHeader()) } if (userRequest.header("Connection" ) == null ) { requestBuilder.header("Connection" , "Keep-Alive" ) } var transparentGzip = false if (userRequest.header("Accept-Encoding" ) == null && userRequest.header("Range" ) == null ) { transparentGzip = true requestBuilder.header("Accept-Encoding" , "gzip" ) } val cookies = cookieJar.loadForRequest(userRequest.url) if (cookies.isNotEmpty()) { requestBuilder.header("Cookie" , cookieHeader(cookies)) } if (userRequest.header("User-Agent" ) == null ) { requestBuilder.header("User-Agent" , userAgent) } val networkResponse = chain.proceed(requestBuilder.build()) HttpHeaders.receiveHeaders(cookieJar, userRequest.url, networkResponse.headers) val responseBuilder = networkResponse.newBuilder() .request(userRequest) if (transparentGzip && "gzip" .equals(networkResponse.header("Content-Encoding" ), ignoreCase = true ) && HttpHeaders.promisesBody(networkResponse)) { val responseBody = networkResponse.body if (responseBody != null ) { val gzipSource = GzipSource(responseBody.source()) val strippedHeaders = networkResponse.headers.newBuilder() .removeAll("Content-Encoding" ) .removeAll("Content-Length" ) .build() responseBuilder.headers(strippedHeaders) val contentType = networkResponse.header("Content-Type" ) responseBuilder.body(RealResponseBody(contentType, -1L , Okio.buffer(gzipSource))) } } return responseBuilder.build() } }
BridgeInterceptor 的职责 :
将用户友好的 Request 转换为 HTTP 规范的 Request(补全 Content-Type,Content-Length,Host,Connection)
透明 GZIP 压缩:自动添加 Accept-Encoding: gzip,自动对响应解压
Cookie 管理:发送前从 CookieJar 读取 Cookie,收到响应后保存 Set-Cookie
Transfer-Encoding 处理:没有 Content-Length 时自动切换为分块传输
3.3 CacheInterceptor —— 缓存策略 源码位置 :okhttp3/internal/cache/CacheInterceptor.kt
class CacheInterceptor (internal val cache: Cache?) : Interceptor { override fun intercept (chain: Interceptor .Chain ) : Response { val cacheCandidate = if (cache != null ) { cache.get (chain.request()) } else { null } val now = System.currentTimeMillis() val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate) .compute() val networkRequest = strategy.networkRequest val cacheResponse = strategy.cacheResponse cache?.trackResponse(strategy) if (cacheCandidate != null && cacheResponse == null ) { cacheCandidate.body?.closeQuietly() } if (networkRequest == null && cacheResponse == null ) { return Response.Builder() .request(chain.request()) .protocol(Protocol.HTTP_1_1) .code(HTTP_GATEWAY_TIMEOUT) .message("Unsatisfiable Request (only-if-cached)" ) .body(EMPTY_RESPONSE) .sentRequestAtMillis(-1L ) .receivedResponseAtMillis(System.currentTimeMillis()) .build() } if (networkRequest == null ) { return cacheResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .build() } var networkResponse: Response? = null try { networkResponse = chain.proceed(networkRequest) } finally { if (networkResponse == null && cacheCandidate != null ) { cacheCandidate.body?.closeQuietly() } } if (cacheResponse != null ) { if (networkResponse?.code == HTTP_NOT_MODIFIED) { val response = cacheResponse.newBuilder() .headers(combine(cacheResponse.headers, networkResponse.headers)) .sentRequestAtMillis(networkResponse.sentRequestAtMillis) .receivedResponseAtMillis(networkResponse.receivedResponseAtMillis) .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() networkResponse.body!!.close() cache!!.trackConditionalCacheHit() cache.update(cacheResponse, response) return response } else { cacheResponse.body?.closeQuietly() } } val response = networkResponse!!.newBuilder() .cacheResponse(stripBody(cacheResponse)) .networkResponse(stripBody(networkResponse)) .build() if (cache != null ) { if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) { val cacheRequest = cache.put(response) return cacheWritingResponse(cacheRequest, response) } if (HttpMethod.invalidatesCache(networkRequest.method)) { try { cache.remove(networkRequest) } catch (_: IOException) {} } } return response } }
CacheStrategy 的核心算法 :
class CacheStrategy internal constructor ( val networkRequest: Request?, val cacheResponse: Response? ) { class Factory ( private val nowMillis: Long , private val request: Request, private val cacheResponse: Response? ) { fun compute () : CacheStrategy { val candidate = computeCandidate() if (candidate.sentRequestMillis != -1L && candidate.receivedResponseMillis != -1L ) { val maxAgeMillis = candidate.maxAgeMillis() if (maxAgeMillis != -1L && nowMillis - candidate.sentRequestMillis < maxAgeMillis - (candidate.receivedResponseMillis - candidate.sentRequestMillis)) { if (candidate.cacheControl.isPublic) { return CacheStrategy(null , candidate) } if (candidate.cacheControl.isPrivate) { if (request.cacheControl.onlyIfCached) { return CacheStrategy(null , candidate) } } } } } } }
缓存相关的时间计算 (RFC 7234):
private fun computeCandidate () : Response { if (cacheResponse == null ) { return CacheStrategy(request, null ) } if (request.isHttps && cacheResponse.handshake == null ) { return CacheStrategy(request, null ) } if (!CacheStrategy.isCacheable(cacheResponse, request)) { return CacheStrategy(request, null ) } val requestCaching = request.cacheControl if (requestCaching.noCache || hasConditions(request)) { return CacheStrategy(request, null ) } val ageMillis = cacheResponseAge() var freshMillis = computeFreshnessLifetime() if (requestCaching.maxAgeSeconds != -1 ) { freshMillis = minOf(freshMillis, SECONDS.toMillis(requestCaching.maxAgeSeconds.toLong())) } var minFreshMillis: Long = 0 if (requestCaching.minFreshSeconds != -1 ) { minFreshMillis = SECONDS.toMillis(requestCaching.minFreshSeconds.toLong()) } var maxStaleMillis: Long = 0 if (!responseCaching.mustRevalidate && requestCaching.maxStaleSeconds != -1 ) { maxStaleMillis = SECONDS.toMillis(requestCaching.maxStaleSeconds.toLong()) } if (!responseCaching.noCache && ageMillis + minFreshMillis < freshMillis + maxStaleMillis) { val builder = cacheResponse.newBuilder() if (ageMillis + minFreshMillis >= freshMillis) { builder.addHeader("Warning" , "110 HttpURLConnection \"Response is stale\"" ) } val oneDayMillis = 24 * 3600 * 1000L if (ageMillis > oneDayMillis && isFreshnessLifetimeHeuristic()) { builder.addHeader("Warning" , "113 HttpURLConnection \"Heuristic expiration\"" ) } return CacheStrategy(null , builder.build()) } val conditionBuilder = request.newBuilder() if (etag != null ) { conditionBuilder.header("If-None-Match" , etag) } else if (lastModified != null ) { conditionBuilder.header("If-Modified-Since" , lastModifiedString) } else if (servedDate != null ) { conditionBuilder.header("If-Modified-Since" , servedDateString) } val conditionalRequest = conditionBuilder.build() return if (hasConditions(conditionalRequest)) { CacheStrategy(conditionalRequest, cacheResponse) } else { CacheStrategy(conditionalRequest, null ) } }
Cache-Control 指令影响 :
请求指令
含义
no-cache
强制验证(始终发 IF-None-Match)
no-store
不缓存任何内容
max-age=0
立即过期,要求验证
max-stale=3600
接受过期不超过 1 小时的缓存
min-fresh=60
缓存至少还要新鲜 60 秒
only-if-cached
只用缓存,不发起网络请求
响应指令
含义
public
任何中间节点均可缓存
private
仅浏览器可缓存
no-cache
使用前必须验证
no-store
不可缓存
max-age=3600
缓存有效期 3600 秒
must-revalidate
过期后必须验证
immutable
内容永不变(静态资源)
3.4 ConnectInterceptor —— 连接建立 源码位置 :okhttp3/internal/connection/ConnectInterceptor.kt
object ConnectInterceptor : Interceptor { override fun intercept (chain: Interceptor .Chain ) : Response { val realChain = chain as RealInterceptorChain val exchange = realChain.call.initExchange(realChain) val connectedChain = realChain.copy(exchange = exchange) return connectedChain.proceed(realChain.request) } }
ConnectInterceptor 是 OkHttp 中最简短但最重要的拦截器。它的核心是 RealCall.initExchange():
internal fun initExchange (chain: RealInterceptorChain ) : Exchange { synchronized(this ) { check(expectMoreExchanges) { "released" } check(!responseBodyOpen) check(!requestBodyOpen) } val exchangeFinder = this .exchangeFinder!! val codec = exchangeFinder.find(client, chain) val result = Exchange(this , eventListener, exchangeFinder, codec) this .interceptorScopedExchange = result this .exchange = result synchronized(this ) { this .requestBodyOpen = true this .responseBodyOpen = true } if (canceled) throw IOException("Canceled" ) return result }
ExchangeFinder.find() —— 查找/创建连接的完整流程:
fun find ( client: OkHttpClient , chain: RealInterceptorChain ) : ExchangeCodec { try { val resultConnection = findHealthyConnection( connectTimeout = chain.connectTimeoutMillis, readTimeout = chain.readTimeoutMillis, writeTimeout = chain.writeTimeoutMillis, pingIntervalMillis = client.pingIntervalMillis, connectionRetryEnabled = client.retryOnConnectionFailure, doExtensiveHealthChecks = chain.request.method != "GET" ) return resultConnection.newCodec(client, chain) } catch (e: RouteException) { trackFailure(e.lastConnectException) throw e } catch (e: IOException) { trackFailure(e) throw RouteException(e) } } @Throws(IOException::class) private fun findHealthyConnection ( connectTimeout: Int , readTimeout: Int , writeTimeout: Int , pingIntervalMillis: Int , connectionRetryEnabled: Boolean , doExtensiveHealthChecks: Boolean ) : RealConnection { while (true ) { val candidate = findConnection( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled) if (!candidate.isHealthy(doExtensiveHealthChecks)) { candidate.noNewExchanges() continue } return candidate } } @Throws(IOException::class) private fun findConnection ( connectTimeout: Int , readTimeout: Int , writeTimeout: Int , pingIntervalMillis: Int , connectionRetryEnabled: Boolean ) : RealConnection { if (call.isCanceled()) throw IOException("Canceled" ) val callConnection = call.connection if (callConnection != null ) { var toClose: Socket? = null synchronized(callConnection) { if (callConnection.noNewExchanges || !sameHostAndPort(callConnection.route().address.url)) { toClose = call.releaseConnectionNoEvents() } } if (call.connection != null ) { check(toClose == null ) return callConnection } toClose?.closeQuietly() eventListener.connectionReleased(call, callConnection) } if (connectionPool.callAcquirePooledConnection( address, call, null , false )) { val result = call.connection!! eventListener.connectionAcquired(call, result) return result } val routes: List<Route>? val route: Route if (nextRouteToTry != null ) { routes = null route = nextRouteToTry!! nextRouteToTry = null } else { var localRouteSelector = routeSelector if (localRouteSelector == null ) { localRouteSelector = RouteSelector(address, client.routeDatabase, call, eventListener) this .routeSelector = localRouteSelector } val routeSelection = localRouteSelector.next() routes = routeSelection.routes route = routeSelection.routes[0 ] } val newConnection = RealConnection(connectionPool, route) call.connectionToCancel = newConnection try { newConnection.connect( connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener) } finally { call.connectionToCancel = null } call.client.routeDatabase.connected(newConnection.route()) if (connectionPool.callAcquirePooledConnection( address, call, newConnection, true )) { val result = call.connection!! nextRouteToTry = route newConnection.socket().closeQuietly() eventListener.connectionAcquired(call, result) return result } synchronized(newConnection) { connectionPool.put(newConnection) call.acquireConnectionNoEvents(newConnection) } eventListener.connectionAcquired(call, newConnection) return newConnection }
3.5 CallServerInterceptor —— 发送请求与读取响应 源码位置 :okhttp3/internal/http/CallServerInterceptor.kt
class CallServerInterceptor (private val forWebSocket: Boolean ) : Interceptor { override fun intercept (chain: Interceptor .Chain ) : Response { val realChain = chain as RealInterceptorChain val exchange = realChain.exchange!! val request = realChain.request val requestBody = request.body exchange.writeRequestHeaders(request) var responseBuilder: Response.Builder? = null if (HttpMethod.permitsRequestBody(request.method) && requestBody != null ) { if ("100-continue" .equals(request.header("Expect" ), ignoreCase = true )) { exchange.flushRequest() responseBuilder = exchange.readResponseHeaders(expectContinue = true ) exchange.responseHeadersStart() if (responseBuilder == null ) { throw ProtocolException( "Received null response for Expect: 100-continue" ) } } if (responseBuilder == null ) { if (requestBody.isDuplex()) { exchange.flushRequest() val bufferedRequestBody = exchange.createRequestBody(request, true ) requestBody.writeTo(bufferedRequestBody) } else { val bufferedRequestBody = exchange.createRequestBody(request, false ) requestBody.writeTo(bufferedRequestBody) bufferedRequestBody.close() } } } else { exchange.noRequestBody() } if (requestBody == null || !requestBody.isDuplex()) { exchange.finishRequest() } if (responseBuilder == null ) { responseBuilder = exchange.readResponseHeaders(expectContinue = false )!! exchange.responseHeadersStart() } var response = responseBuilder .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(exchange.sentRequestMillis) .receivedResponseAtMillis(exchange.receivedResponseMillis) .build() var code = response.code if (code == 100 ) { response = exchange.readResponseHeaders(false )!! .request(request) .handshake(exchange.connection.handshake()) .sentRequestAtMillis(exchange.sentRequestMillis) .receivedResponseAtMillis(exchange.receivedResponseMillis) .build() code = response.code } exchange.responseHeadersEnd(response) response = if (forWebSocket && code == 101 ) { response.newBuilder() .body(EMPTY_RESPONSE) .build() } else { response.newBuilder() .body(exchange.openResponseBody(response)) .build() } if ("close" .equals(response.request.header("Connection" ), ignoreCase = true ) || "close" .equals(response.header("Connection" ), ignoreCase = true )) { exchange.connection.noNewExchanges() } return response } }
四、连接池(ConnectionPool) 4.1 连接池的核心设计 源码位置 :okhttp3/internal/connection/RealConnectionPool.kt
class RealConnectionPool ( taskRunner: TaskRunner, private val maxIdleConnections: Int , keepAliveDuration: Long , timeUnit: TimeUnit ) { private val connections = ArrayDeque<RealConnection>() private val cleanupQueue: TaskQueue = taskRunner.newQueue() private val cleanupTask = object : Task("OkHttp ConnectionPool" ) { override fun runOnce () : Long = cleanup(System.nanoTime()) } companion object { fun get (call: OkHttpClient ) : RealConnectionPool = (call.connectionPool.delegate as RealConnectionPool) } fun idleConnectionCount () : Int { return synchronized(this ) { connections.count { it.calls.isEmpty() } } } fun connectionCount () : Int { return synchronized(this ) { connections.size } } }
4.2 连接的存入与取出 fun put (connection: RealConnection ) { connection.assertThreadHoldsLock() connections.add(connection) cleanupQueue.schedule(cleanupTask) } fun callAcquirePooledConnection ( address: Address , call: RealCall , newConnection: RealConnection ?, requireMultiplexed: Boolean ) : Boolean { for (connection in connections) { synchronized(connection) { if (requireMultiplexed && !connection.isMultiplexed) continue if (!connection.isEligible(address, call)) continue call.acquireConnectionNoEvents(connection) return true } } return false } fun RealConnection.isEligible (address: Address , call: RealCall ) : Boolean { if (noNewExchanges) return false if (!this .route().address.equalsNonHost(address)) return false if (address.url.host != this .route().address.url.host) { if (noCoalescedConnections) return false } if (address.certificatePinner != this .route().address.certificatePinner) { return false } if (isMultiplexed) { if (address.url.host == this .route().address.url.host) { return true } return hostnameVerifier == OkHostnameVerifier && address.url.host in this .route().address.url.host && certificatePinner.check(address.url.host, handshake!!.peerCertificates) } return true }
4.3 连接清理算法 fun cleanup (now: Long ) : Long { var inUseConnectionCount = 0 var idleConnectionCount = 0 var longestIdleConnection: RealConnection? = null var longestIdleDurationNs = Long .MIN_VALUE synchronized(this ) { for (connection in connections) { if (pruneAndGetAllocationCount(connection, now) > 0 ) { inUseConnectionCount++ } else { idleConnectionCount++ val idleDurationNs = now - connection.idleAtNs if (idleDurationNs > longestIdleDurationNs) { longestIdleDurationNs = idleDurationNs longestIdleConnection = connection } } } when { longestIdleDurationNs >= keepAliveDurationNs || idleConnectionCount > this .maxIdleConnections -> { val connection = longestIdleConnection!! connections.remove(connection) connection.socket().closeQuietly() return 0L } idleConnectionCount > 0 -> { return keepAliveDurationNs - longestIdleDurationNs } inUseConnectionCount > 0 -> { return keepAliveDurationNs } else -> { return -1L } } } } private fun pruneAndGetAllocationCount (connection: RealConnection , now: Long ) : Int { val references = connection.calls var i = 0 while (i < references.size) { val reference = references[i] if (reference.get () != null ) { i++ continue } references.removeAt(i) connection.calls.removeAt(i) if (references.isEmpty()) { connection.idleAtNs = now return 0 } } return references.size }
五、DNS 解析 5.1 Dns 接口 源码位置 :okhttp3/Dns.kt
interface Dns { @Throws(UnknownHostException::class) fun lookup (hostname: String ) : List<InetAddress> companion object { val SYSTEM: Dns = Dns { hostname -> try { InetAddress.getAllByName(hostname).toList() } catch (e: NullPointerException) { throw UnknownHostException("Broken system behaviour for dns lookup of $hostname " ).apply { initCause(e) } } } } }
5.2 自定义 DNS 实现(HTTPDNS) class HttpDns : Dns { override fun lookup (hostname: String ) : List<InetAddress> { val ip = fetchIpFromHttpDnsService(hostname) if (ip != null ) { return listOf(InetAddress.getByName(ip)) } return Dns.SYSTEM.lookup(hostname) } private fun fetchIpFromHttpDnsService (hostname: String ) : String? { val client = OkHttpClient() val request = Request.Builder() .url("http://203.107.1.1/d?dn=$hostname " ) .build() return try { val response = client.newCall(request).execute() val body = response.body?.string() body?.split("," )?.firstOrNull() } catch (e: Exception) { null } } } val client = OkHttpClient.Builder() .dns(HttpDns()) .build()
六、HTTP/2 支持 6.1 HTTP/2 连接 源码位置 :okhttp3/internal/http2/Http2Connection.kt
class Http2Connection internal constructor (builder: Builder) { companion object { const val TYPE_DATA = 0x0 const val TYPE_HEADERS = 0x1 const val TYPE_PRIORITY = 0x2 const val TYPE_RST_STREAM = 0x3 const val TYPE_SETTINGS = 0x4 const val TYPE_PUSH_PROMISE = 0x5 const val TYPE_PING = 0x6 const val TYPE_GOAWAY = 0x7 const val TYPE_WINDOW_UPDATE = 0x8 const val TYPE_CONTINUATION = 0x9 } const val FLAG_NONE = 0x0 const val FLAG_ACK = 0x1 const val FLAG_END_STREAM = 0x1 const val FLAG_END_HEADERS = 0x4 const val FLAG_PADDED = 0x8 const val FLAG_PRIORITY = 0x20 const val INITIAL_MAX_FRAME_SIZE = 0x4000 }
6.2 多路复用(Multiplexing) OkHttp 在 HTTP/2 连接上支持多个并发流(Stream),每个流对应一个请求/响应对:
fun newStream ( requestHeaders: List <Header >, out : Boolean ) : Http2Stream { val streamId = if (out ) nextStreamId else 0 val stream = Http2Stream(streamId, this , out , requestHeaders) streams[streamId] = stream if (out ) { nextStreamId += 2 } return stream }
HTTP/2 连接复用 :多个 URL 可以共享同一个 TCP 连接,条件是它们具有相同的 IP 地址和 TLS 证书(支持 Subject Alternative Name)。
private fun isEligible (address: Address , routes: List <Route >?) : Boolean { if (allocations.size >= allocationLimit) return false if (!allocations.keys.any { it.equalsNonHost(address) }) return false }
HTTP/2 使用 HPACK 算法压缩头部,OkHttp 实现了 Hpack.Writer 和 Hpack.Reader:
object Hpack { private const val PREFIX_4_BITS = 0x0f private const val PREFIX_6_BITS = 0x3f private const val PREFIX_7_BITS = 0x7f private val STATIC_HEADER_TABLE = arrayOf( Header(":authority" , "" ), Header(":method" , "GET" ), Header(":method" , "POST" ), Header(":path" , "/" ), Header(":path" , "/index.html" ), ) }
七、WebSocket 支持 7.1 WebSocket 接口 源码位置 :okhttp3/WebSocket.kt
interface WebSocket { fun queueSize () : Long fun send (text: String ) : Boolean fun send (bytes: ByteString ) : Boolean fun close (code: Int , reason: String ?) : Boolean fun cancel () interface Factory { fun newWebSocket (request: Request , listener: WebSocketListener ) : WebSocket } }
7.2 RealWebSocket 源码位置 :okhttp3/internal/ws/RealWebSocket.kt
class RealWebSocket ( taskRunner: TaskRunner, private val originalRequest: Request, private val listener: WebSocketListener, private val random: Random, private val pingIntervalMillis: Long , private val minimumDeflateSize: Long ) : WebSocket, WebSocketReader.FrameCallback { private var state: Int = CLOSED companion object { const val CLOSED = 0 const val CONNECTING = 1 const val OPEN = 2 const val CLOSING = 3 } fun connect (client: OkHttpClient ) { val webSocketClient = client.newBuilder() .eventListener(EventListener.NONE) .protocols(listOf(Protocol.HTTP_1_1)) .build() val request = originalRequest.newBuilder() .header("Upgrade" , "websocket" ) .header("Connection" , "Upgrade" ) .header("Sec-WebSocket-Key" , key) .header("Sec-WebSocket-Version" , "13" ) .build() call = RealCall(webSocketClient, request, forWebSocket = true ) call!!.enqueue(object : Callback { override fun onResponse (call: Call , response: Response ) { val exchange = response.exchange val streams: Streams try { checkUpgradeSuccess(response, exchange) streams = exchange!!.newWebSocketStreams() initReaderAndWriter("OkHttp WebSocket" , streams) } catch (e: Exception) { failWebSocket(e, response) } } }) } private val pongRunnable = object : Runnable { override fun run () { if (failed) return writer.writePing(ByteString.EMPTY) listener.onPing(this @RealWebSocket , ByteString.EMPTY) } } }
WebSocket 的关闭过程 :
override fun close (code: Int , reason: String ?) : Boolean { val reasonBytes: ByteString synchronized(this ) { if (state == CLOSED || state == CLOSING) return false state = CLOSING writer.writeClose(code, reasonBytes) taskRunner.newQueue().schedule(object : Task("OkHttp WebSocket Close" ) { override fun runOnce () : Long { cancel() return -1L } }, CLOSE_TIMEOUT_MILLIS) } return true }
八、EventListener —— 网络请求全生命周期监控 8.1 EventListener 接口 源码位置 :okhttp3/EventListener.kt
abstract class EventListener { open fun callStart (call: Call ) {} open fun dnsStart (call: Call , domainName: String ) {} open fun dnsEnd (call: Call , domainName: String , inetAddressList: List <InetAddress >) {} open fun connectStart (call: Call , inetSocketAddress: InetSocketAddress , proxy: Proxy ) {} open fun secureConnectStart (call: Call ) {} open fun secureConnectEnd (call: Call , handshake: Handshake ?) {} open fun connectEnd (call: Call , inetSocketAddress: InetSocketAddress , proxy: Proxy , protocol: Protocol ?) {} open fun connectFailed (call: Call , inetSocketAddress: InetSocketAddress , proxy: Proxy , protocol: Protocol ?, ioe: IOException ) {} open fun connectionAcquired (call: Call , connection: Connection ) {} open fun connectionReleased (call: Call , connection: Connection ) {} open fun requestHeadersStart (call: Call ) {} open fun requestHeadersEnd (call: Call , request: Request ) {} open fun requestBodyStart (call: Call ) {} open fun requestBodyEnd (call: Call , byteCount: Long ) {} open fun requestFailed (call: Call , ioe: IOException ) {} open fun responseHeadersStart (call: Call ) {} open fun responseHeadersEnd (call: Call , response: Response ) {} open fun responseBodyStart (call: Call ) {} open fun responseBodyEnd (call: Call , byteCount: Long ) {} open fun responseFailed (call: Call , ioe: IOException ) {} open fun callEnd (call: Call ) {} open fun callFailed (call: Call , ioe: IOException ) {} open fun canceled (call: Call ) {} }
8.2 完整的性能监控实现 class NetworkPerformanceListener : EventListener () { private val timestamps = ConcurrentHashMap<Call, CallMetrics>() data class CallMetrics ( var callStart: Long = 0 , var dnsStart: Long = 0 , var dnsEnd: Long = 0 , var connectStart: Long = 0 , var connectEnd: Long = 0 , var tlsStart: Long = 0 , var tlsEnd: Long = 0 , var requestHeadersStart: Long = 0 , var requestHeadersEnd: Long = 0 , var requestBodyStart: Long = 0 , var requestBodyEnd: Long = 0 , var responseHeadersStart: Long = 0 , var responseHeadersEnd: Long = 0 , var responseBodyStart: Long = 0 , var responseBodyEnd: Long = 0 , var callEnd: Long = 0 ) { val dnsDuration: Long get () = dnsEnd - dnsStart val connectDuration: Long get () = connectEnd - connectStart val tlsDuration: Long get () = tlsEnd - tlsStart val requestDuration: Long get () = responseHeadersEnd - requestHeadersStart val responseDuration: Long get () = responseBodyEnd - responseBodyStart val totalDuration: Long get () = callEnd - callStart fun toReport () : String = buildString { appendLine("=== Network Request Performance ===" ) appendLine("DNS Lookup: ${dnsDuration} ms" ) appendLine("TCP Connect: ${connectDuration} ms" ) appendLine("TLS Handshake: ${tlsDuration} ms" ) appendLine("Request: ${requestDuration} ms" ) appendLine("Response: ${responseDuration} ms" ) appendLine("Total: ${totalDuration} ms" ) } } private fun metrics (call: Call ) = timestamps.getOrPut(call) { CallMetrics() } override fun callStart (call: Call ) { metrics(call).callStart = System.currentTimeMillis() } override fun dnsStart (call: Call , domainName: String ) { metrics(call).dnsStart = System.currentTimeMillis() Log.d("Network" , "DNS Start: $domainName " ) } override fun dnsEnd (call: Call , domainName: String , inetAddressList: List <InetAddress >) { metrics(call).dnsEnd = System.currentTimeMillis() val ips = inetAddressList.map { it.hostAddress } Log.d("Network" , "DNS End: $domainName -> $ips " + "(${metrics(call).dnsDuration} ms)" ) } override fun connectStart (call: Call , inetSocketAddress: InetSocketAddress , proxy: Proxy ) { metrics(call).connectStart = System.currentTimeMillis() } override fun connectEnd (call: Call , inetSocketAddress: InetSocketAddress , proxy: Proxy , protocol: Protocol ?) { metrics(call).connectEnd = System.currentTimeMillis() } override fun secureConnectStart (call: Call ) { metrics(call).tlsStart = System.currentTimeMillis() } override fun secureConnectEnd (call: Call , handshake: Handshake ?) { metrics(call).tlsEnd = System.currentTimeMillis() if (handshake != null ) { Log.d("Network" , "TLS: ${handshake.cipherSuite} " + "(${metrics(call).tlsDuration} ms)" ) } } override fun requestHeadersStart (call: Call ) { metrics(call).requestHeadersStart = System.currentTimeMillis() } override fun requestHeadersEnd (call: Call , request: Request ) { metrics(call).requestHeadersEnd = System.currentTimeMillis() } override fun responseHeadersStart (call: Call ) { metrics(call).responseHeadersStart = System.currentTimeMillis() } override fun responseHeadersEnd (call: Call , response: Response ) { metrics(call).responseHeadersEnd = System.currentTimeMillis() Log.d("Network" , "Response: ${response.code} " + "(${metrics(call).requestDuration} ms)" ) } override fun responseBodyStart (call: Call ) { metrics(call).responseBodyStart = System.currentTimeMillis() } override fun responseBodyEnd (call: Call , byteCount: Long ) { metrics(call).responseBodyEnd = System.currentTimeMillis() Log.d("Network" , "Body: $byteCount bytes " + "(${metrics(call).responseDuration} ms)" ) } override fun callEnd (call: Call ) { metrics(call).callEnd = System.currentTimeMillis() Log.d("Network" , metrics(call).toReport()) timestamps.remove(call) } override fun callFailed (call: Call , ioe: IOException ) { Log.e("Network" , "Failed: ${ioe.message} " + "after ${System.currentTimeMillis() - metrics(call).callStart} ms" ) timestamps.remove(call) } } val client = OkHttpClient.Builder() .eventListener(NetworkPerformanceListener()) .build()
九、OkHttp 的配置与最佳实践 9.1 完整的 OkHttpClient 配置 val okHttpClient = OkHttpClient.Builder() .connectTimeout(10 , TimeUnit.SECONDS) .readTimeout(30 , TimeUnit.SECONDS) .writeTimeout(30 , TimeUnit.SECONDS) .callTimeout(60 , TimeUnit.SECONDS) .connectionPool(ConnectionPool( maxIdleConnections = 5 , keepAliveDuration = 5 , TimeUnit.MINUTES )) .protocols(listOf(Protocol.HTTP_2, Protocol.HTTP_1_1)) .addInterceptor(LoggingInterceptor()) .addInterceptor(AuthInterceptor()) .addNetworkInterceptor(CacheInterceptor()) .sslSocketFactory(sslSocketFactory, trustManager) .hostnameVerifier(HostnameVerifier { _, _ -> true }) .dns(HttpDns()) .cookieJar(PersistentCookieJar()) .cache(Cache(File(context.cacheDir, "okhttp_cache" ), 10 * 1024 * 1024 )) .proxy(Proxy.NO_PROXY) .retryOnConnectionFailure(true ) .eventListener(NetworkPerformanceListener()) .eventListenerFactory(NetworkPerformanceListenerFactory()) .build()
9.2 自定义 Interceptor 示例 日志拦截器 :
class LoggingInterceptor : Interceptor { override fun intercept (chain: Interceptor .Chain ) : Response { val request = chain.request() val startTime = System.nanoTime() Log.d("OkHttp" , "--> ${request.method} ${request.url} " ) request.headers.forEach { (name, value) -> Log.d("OkHttp" , " $name : $value " ) } val response = chain.proceed(request) val duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime) Log.d("OkHttp" , "<-- ${response.code} ${response.message} " + "${request.url} (${duration} ms)" ) response.headers.forEach { (name, value) -> Log.d("OkHttp" , " $name : $value " ) } return response } }
Token 自动刷新拦截器 :
class TokenRefreshInterceptor ( private val tokenProvider: TokenProvider ) : Interceptor { override fun intercept (chain: Interceptor .Chain ) : Response { val originalRequest = chain.request() val token = tokenProvider.getToken() val requestWithAuth = originalRequest.newBuilder() .header("Authorization" , "Bearer $token " ) .build() var response = chain.proceed(requestWithAuth) if (response.code == 401 && !originalRequest.url.encodedPath .contains("refresh" )) { synchronized(this ) { val newToken = tokenProvider.refreshToken() val retryRequest = originalRequest.newBuilder() .header("Authorization" , "Bearer $newToken " ) .build() response.close() response = chain.proceed(retryRequest) } } return response } }
十、总结 OkHttp 通过精心设计的拦截器链 架构,将 HTTP 通信的每一个关注点(重试、头部处理、缓存、连接、发送接收)完全解耦。其核心设计值得深入学习:
责任链模式 :每个 Interceptor 独立负责一个职责,通过 chain.proceed() 串联
连接池 :TCP 连接的复用策略,空闲连接的 LRU 清理
HTTP/2 多路复用 :单连接多 Stream,HPACK 头部压缩
缓存策略 :遵循 RFC 7234,条件 GET,Cache-Control 完整解析
WebSocket :协议升级、Ping/Pong 保活、优雅关闭
EventListener :完整的网络指标采集,DNS → TCP → TLS → Request → Response
参考资源