okhttp 源代码分析
okhttp 作为一个非常流行的 HTTP 客户端库。虽然一开始是为 Android 程序编写,但是很快就在 Java 的后端世界占领了不少的份额。
okhttp 底层基于 NIO 实现 IO 的读取写入,性能方面自然也不用担心。 另外 okhttp 同时提供了同步异步的接口,可以说在各种使用场景下,okhttp 使用起来都非常舒服。 可以说除非有特殊需求,把 okhttp 在 HTTP 客户端方面作为一个默认的技术选型,应该是没有问题的。
本文会初步梳理一遍 okhttp 的源代码流程, 关注作为 HTTP 客户端最重要的同步异步发送请求、拦截器(Interceptor)的功能、 HTTP/1.1、HTTP/2和WebSocket 的实现 以及提供 NIO 支持的 okio 库。
由于 okhttp 以 Kotlin 实现,但对 Java 程序员来说,Kotlin 哪怕不学也能无师自通地看懂,所以安心往下看吧。
基础流程 Basic / 同步请求 Synchronized Call
相信大家对 okhttp.newCall(req).execute()
这个同步请求代码很熟悉了。
让我们先从这里触发,过一遍 okhttp 的基础流程。
okhttp.newCall(req)
构造了 okhttp 最关键的类 RealCall。
正如 RealCall 的 Javadoc 所言:
Bridge between OkHttp's application and network layers. This class exposes high-level application layer primitives: connections, requests, responses, and streams.
RealCall 对上层提供了连接、请求、响应、流等处于 HTTP 应用层的操作,对下实现了 TCP 网络层的数据操作。
构造了 RealCall 对象后,用户通过调用 RealCall#execute
方法完成同步操作。
override fun execute(): Response {
...
return getResponseWithInterceptorChain()
...
}
先暂且忽略一下不重要的代码。 execute() 方法调用了 getResponseWithInterceptorChain()。继续往下走。
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
...
interceptors += CallServerInterceptor(forWebSocket)
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
...
)
...
try {
val response = chain.proceed(originalRequest)
...
return response
} catch (e: IOException) {
...
} finally {
...
}
}
getResponseWithInterceptorChain() 的逻辑非常简单。代码依次构造了一条拦截器 Interceptor 的链:
- RetryAndFollowUpInterceptor
- BridgeInterceptor
- CacheInterceptor
- ConnectInterceptor
- CallServerInterceptor
然后通过 chain.proceed(...)
方法,从上到下依次执行每个 Interceptor ,完成 HTTP 请求的收发。
为了不一次性引入太多概念,导致理解困难,我会把每个拦截器的作用放到第三节单独解释。
至此,一个 HTTP 请求就完成了。
异步请求 Async Call
异步请求 Async Call,是在高吞吐量场景下的必然选择。
okhttp.newCall(req).enqueue(callback)
这段代码也是相当熟悉了。和同步代码一样,通过 newCall() 构造 RealCall 对象,然后调用 RealCall#enqueue 方法执行异步请求。
进入 RealCall#enqueue:
override fun enqueue(responseCallback: Callback) {
...
client.dispatcher.enqueue(AsyncCall(responseCallback))
}
首先,这里构造了一个 AsyncCall 对象。然后通过 Dispatcher#enqueue 排队执行。
先看 AsyncCall。 AsyncCall 是 RealCall 的内部类。
inner class AsyncCall(...
fun executeOn(executorService: ExecutorService) {}
override fun run() {}
}
AsyncCall 很简单。 首先,它继承了 Runnable 接口,这使得它可以被 ExecutorService 调度和执行。 然后,AsyncCall 提供了 executeOn() 方法,方便把自己丢进 ExecutorService 里。 最后,必不可少的 run() 。
继续往里走
internal fun enqueue(call: AsyncCall) {
...
promoteAndExecute()
}
看注释。
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) { // 循环获取所有可执行的 Async Call
val asyncCall = i.next()
...
executableCalls.add(asyncCall) // 添加到 executableCalls
runningAsyncCalls.add(asyncCall)
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
// 循环提交每个 Async Call 到 ExecutorService 里。
asyncCall.executeOn(executorService)
}
return isRunning
}
再往后,很显然就是 AsyncCall 在 ExecutorService 里面排队。轮到它时,ExecutorService 执行 AsyncCall#run 方法。
override fun run() {
// threadName() 方法对 Java 的同学可能会带来点困惑。
// threadName(name: String, block: () -> Unit)
// 第一个入参是线程名,第二个入参是一个 function/lambda
// 这个方法的作用是在 lambda 执行前修改线程名,执行后恢复线程名。
threadName("OkHttp ${redactedUrl()}") {
try {
// 这里的 getResponseWithInterceptorChain() 是不是很眼熟?
// 和同步请求里的是同一个方法
val response = getResponseWithInterceptorChain()
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} finally {
...
}
}
}
到了这里,异步请求如何实现就基本有个印象了。
说白了,就是利用 ExecutorService 异步执行同步请求。
简单得有点让人小失望= =!
说到 ExecutorService,不由得就要问,ExecutorService的核心参数是怎么配置的?线程数是多少?用的是什么Queue?
在 okhttp3.Dispatcher#executorService
:
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
拦截器 Interceptor
回到 RealCall#getResponseWithInterceptorChain
:
val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)
这里通过多个 Interceptor 构造了一条 InterceptorChain。WebSocket 的情况后面单独去说,我们先关注最基础的 Interceptor。
每个 Interceptor 的作用如下:
Interceptor | Description |
---|---|
RetryAndFollowUpInterceptor | 失败重试和重定向 |
BridgeInterceptor | 把 HTTP 应用层的请求转换成 TCP 层的请求,以及把 TCP 层的响应转换成 HTTP 层的响应(例如 gzip 的编码解码) |
CacheInterceptor | 缓存 |
ConnectInterceptor | 完成 TCP 层连接 |
CallServerInterceptor | 完成 HTTP 层请求 |
HTTP/1.1
HTTP/1.1 的实现非常简单。HTTP —— 超文本传输协议,或者说,基于文本的传输协议,要完成信息的收发,只要往 TCP 层写入特定格式的文本数据就可以了。得益于网络栈的精致的分层设计,TCP 层的复杂性被操作系统屏蔽,所以应用只需要完成:
- 打开 TCP 层连接
- 写入数据
- 读取数据
(虽然实际上要考虑长连接、连接复用、代理支持等场景,实现一个 production-grade 的 HTTP 服务器/客户端也是挑战性十足)
在 okhttp 里,ConnectInterceptor 负责打开 TCP 连接。也实际编程中,也负责打开 Socket 链接。
下图为 okhttp 创建 Socket 的调用链:

重点1 数据的读取写入
// RealConnection
private fun connectSocket(...) {
...
val rawSocket = when (proxy.type()) {
Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
else -> Socket(proxy)
}
...
source = rawSocket.source().buffer()
sink = rawSocket.sink().buffer()
...
}
在 RealConnection#connectSocket
中,创建完 Socket 后,会打开对应的 Source 和 Sink。
Source 和 Sink 是 okio 里的概念。其中 Source 负责读取数据, Sink 负责写入数据。
后续的 HTTP 层数据的读取写入会由这两个类负责,而不会直接操作 Socket。
okio 的内容会在后面描述。在那之前,先把它们当黑盒去理解吧。
重点2 HTTP的数据操作
这里有两个比较关键的类。
-
okhttp3.internal.connection.Exchange:负责发送 HTTP 请求和读取响应。Exchange,交换,用请求交换响应。一个 HTTP 编程了很常见的概念和命名。Exchange 抽象了 HTTP 的操作,底层可能是 HTTP/1.1 也可能是 HTTP/2。
-
okhttp3.internal.http.ExchangeCodec:负责 HTTP 请求响应的编码解码。具体的实现类为 Http1ExchangeCodec 和 Http2ExchangeCodec。在
okhttp3.internal.connection.RealConnection#newCodec
中,会根据情况返回具体的实现类。
然后,让我们移步到 ServerCallInterceptor。我把 GET 请求的代码单独抽了出来。 虽然处理不同的请求和响应的逻辑有点复杂,但总而言之,都是利用 Exchange 对象完成 HTTP headers 和 body 的写入读取。
class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
exchange.writeRequestHeaders(request)
exchange.noRequestBody()
exchange.finishRequest()
responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!
var response = responseBuilder
.request(request)
.handshake(exchange.connection.handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build()
response.newBuilder()
.body(exchange.openResponseBody(response))
.build()
}
}
从 Exchange 开始追溯代码,最后都会调用 okio 的 Sink 和 Source 完成 IO 操作。
HTTP 写入
Exchange.writeRequestHeaders()
-> Http1ExchangeCodec#writeRequestHeaders()
-> Http1ExchangeCodec#writeRequest()
-> Sink#writeUtf8()
fun writeRequest(headers: Headers, requestLine: String) {
check(state == STATE_IDLE) { "state: $state" }
sink.writeUtf8(requestLine).writeUtf8("\r\n")
for (i in 0 until headers.size) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n")
}
sink.writeUtf8("\r\n")
state = STATE_OPEN_REQUEST_BODY
}
HTTP 读取
Exchange#readResponseHeaders()
-> Http1ExchangeCodec#readResponseHeaders()
-> HeadersReader#readHeaders()
-> Headers.Builder#addLenient()
internal fun addLenient(line: String) = apply {
val index = line.indexOf(':', 1)
when {
index != -1 -> {
addLenient(line.substring(0, index), line.substring(index + 1))
}
line[0] == ':' -> {
addLenient("", line.substring(1)) // Empty header name.
}
else -> {
addLenient("", line)
}
}
}
HTTP/2
WebSocket
okio
okio 是 okhttp 底层的 NIO 框架。 在 okhttp 项目的早期阶段,okio 只是 okhttp 内部的使用 NIO 的抽象和包装。 后续迭代中,okio 逐渐完善,并被抽离出来成为了一个独立可复用的 NIO 框架。
okio 包装了 java.io
和 java.nio
,对外提供了一套易于使用的非阻塞 IO 操作接口。
与 Netty 和 XNIO 等其他 NIO 框架类似,okio 也在减少数据复制成本、提高代码效率方面花了不少功夫。 但总的来说,业界在 NIO 方面的思路都是大同小异的。
// TODO:
其实实现细节
超时 Timeout
线程调度
参考材料
Droidcon Montreal Jake Wharton - A Few Ok Libraries - YouTube