IMLC.ME

okhttp 源代码分析

okhttp 作为一个非常流行的 HTTP 客户端库。虽然一开始是为 Android 程序编写,但是很快就在 Java 的后端世界占领了不少的份额。

okhttp 底层基于 NIO 实现 IO 的读取写入,性能方面自然也不用担心。 另外 okhttp 同时提供了同步异步的接口,可以说在各种使用场景下,okhttp 使用起来都非常舒服。 可以说除非有特殊需求,把 okhttp 在 HTTP 客户端方面作为一个默认的技术选型,应该是没有问题的。

本文会初步梳理一遍 okhttp 的源代码流程, 关注作为 HTTP 客户端最重要的同步异步发送请求、拦截器(Interceptor)的功能、 HTTP/1.1、HTTP/2和WebSocket 的实现 以及提供 NIO 支持的 okio 库。

由于 okhttp 以 Kotlin 实现,但对 Java 程序员来说,Kotlin 哪怕不学也能无师自通地看懂,所以安心往下看吧。

基础流程 Basic / 同步请求 Synchronized Call

相信大家对 okhttp.newCall(req).execute() 这个同步请求代码很熟悉了。 让我们先从这里触发,过一遍 okhttp 的基础流程。

okhttp.newCall(req) 构造了 okhttp 最关键的类 RealCall。

正如 RealCall 的 Javadoc 所言:

Bridge between OkHttp's application and network layers. This class exposes high-level application layer primitives: connections, requests, responses, and streams.

RealCall 对上层提供了连接、请求、响应、流等处于 HTTP 应用层的操作,对下实现了 TCP 网络层的数据操作。

构造了 RealCall 对象后,用户通过调用 RealCall#execute 方法完成同步操作。

override fun execute(): Response {
  ...
  return getResponseWithInterceptorChain()
  ...
}

先暂且忽略一下不重要的代码。 execute() 方法调用了 getResponseWithInterceptorChain()。继续往下走。

@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
  // Build a full stack of interceptors.
  val interceptors = mutableListOf<Interceptor>()
  interceptors += client.interceptors
  interceptors += RetryAndFollowUpInterceptor(client)
  interceptors += BridgeInterceptor(client.cookieJar)
  interceptors += CacheInterceptor(client.cache)
  interceptors += ConnectInterceptor
  ...
  interceptors += CallServerInterceptor(forWebSocket)

  val chain = RealInterceptorChain(
      call = this,
      interceptors = interceptors,
      ...
  )

  ...
  try {
    val response = chain.proceed(originalRequest)
    ...
    return response
  } catch (e: IOException) {
    ...
  } finally {
    ...
  }
}

getResponseWithInterceptorChain() 的逻辑非常简单。代码依次构造了一条拦截器 Interceptor 的链:

  1. RetryAndFollowUpInterceptor
  2. BridgeInterceptor
  3. CacheInterceptor
  4. ConnectInterceptor
  5. CallServerInterceptor

然后通过 chain.proceed(...) 方法,从上到下依次执行每个 Interceptor ,完成 HTTP 请求的收发。

为了不一次性引入太多概念,导致理解困难,我会把每个拦截器的作用放到第三节单独解释。

至此,一个 HTTP 请求就完成了。

异步请求 Async Call

异步请求 Async Call,是在高吞吐量场景下的必然选择。

okhttp.newCall(req).enqueue(callback)

这段代码也是相当熟悉了。和同步代码一样,通过 newCall() 构造 RealCall 对象,然后调用 RealCall#enqueue 方法执行异步请求。

进入 RealCall#enqueue:

override fun enqueue(responseCallback: Callback) {
  ...
  client.dispatcher.enqueue(AsyncCall(responseCallback))
}

首先,这里构造了一个 AsyncCall 对象。然后通过 Dispatcher#enqueue 排队执行。

先看 AsyncCall。 AsyncCall 是 RealCall 的内部类。

inner class AsyncCall(...
    fun executeOn(executorService: ExecutorService) {}
    override fun run() {}
  }

AsyncCall 很简单。 首先,它继承了 Runnable 接口,这使得它可以被 ExecutorService 调度和执行。 然后,AsyncCall 提供了 executeOn() 方法,方便把自己丢进 ExecutorService 里。 最后,必不可少的 run() 。

继续往里走

internal fun enqueue(call: AsyncCall) {
  ...
  promoteAndExecute()
}

看注释。

private fun promoteAndExecute(): Boolean {
  this.assertThreadDoesntHoldLock()

  val executableCalls = mutableListOf<AsyncCall>()
  val isRunning: Boolean
  synchronized(this) {
    val i = readyAsyncCalls.iterator()
    while (i.hasNext()) { // 循环获取所有可执行的 Async Call
      val asyncCall = i.next()
      ...
      executableCalls.add(asyncCall) // 添加到 executableCalls 
      runningAsyncCalls.add(asyncCall)
    }
    isRunning = runningCallsCount() > 0
  }

  for (i in 0 until executableCalls.size) {
    val asyncCall = executableCalls[i]
    // 循环提交每个 Async Call 到 ExecutorService 里。
    asyncCall.executeOn(executorService)
  }

  return isRunning
}

再往后,很显然就是 AsyncCall 在 ExecutorService 里面排队。轮到它时,ExecutorService 执行 AsyncCall#run 方法。

override fun run() {
  // threadName() 方法对 Java 的同学可能会带来点困惑。
  // threadName(name: String, block: () -> Unit)
  // 第一个入参是线程名,第二个入参是一个 function/lambda
  // 这个方法的作用是在 lambda 执行前修改线程名,执行后恢复线程名。
  threadName("OkHttp ${redactedUrl()}") {
    try {
      // 这里的 getResponseWithInterceptorChain() 是不是很眼熟?
      // 和同步请求里的是同一个方法
      val response = getResponseWithInterceptorChain()
      signalledCallback = true
      responseCallback.onResponse(this@RealCall, response)
    } finally {
      ...
    }
  }
}

到了这里,异步请求如何实现就基本有个印象了。
说白了,就是利用 ExecutorService 异步执行同步请求。
简单得有点让人小失望= =!

说到 ExecutorService,不由得就要问,ExecutorService的核心参数是怎么配置的?线程数是多少?用的是什么Queue?

okhttp3.Dispatcher#executorService

@get:Synchronized
  @get:JvmName("executorService") val executorService: ExecutorService
  get() {
    if (executorServiceOrNull == null) {
      executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
          SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
    }
    return executorServiceOrNull!!
  }

拦截器 Interceptor

回到 RealCall#getResponseWithInterceptorChain

val interceptors = mutableListOf<Interceptor>()
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client)
interceptors += BridgeInterceptor(client.cookieJar)
interceptors += CacheInterceptor(client.cache)
interceptors += ConnectInterceptor
if (!forWebSocket) {
  interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket)

这里通过多个 Interceptor 构造了一条 InterceptorChain。WebSocket 的情况后面单独去说,我们先关注最基础的 Interceptor。

每个 Interceptor 的作用如下:

InterceptorDescription
RetryAndFollowUpInterceptor失败重试和重定向
BridgeInterceptor把 HTTP 应用层的请求转换成 TCP 层的请求,以及把 TCP 层的响应转换成 HTTP 层的响应(例如 gzip 的编码解码)
CacheInterceptor缓存
ConnectInterceptor完成 TCP 层连接
CallServerInterceptor完成 HTTP 层请求

HTTP/1.1

HTTP/1.1 的实现非常简单。HTTP —— 超文本传输协议,或者说,基于文本的传输协议,要完成信息的收发,只要往 TCP 层写入特定格式的文本数据就可以了。得益于网络栈的精致的分层设计,TCP 层的复杂性被操作系统屏蔽,所以应用只需要完成:

  1. 打开 TCP 层连接
  2. 写入数据
  3. 读取数据

(虽然实际上要考虑长连接、连接复用、代理支持等场景,实现一个 production-grade 的 HTTP 服务器/客户端也是挑战性十足)

在 okhttp 里,ConnectInterceptor 负责打开 TCP 连接。也实际编程中,也负责打开 Socket 链接。

下图为 okhttp 创建 Socket 的调用链:

Preview

重点1 数据的读取写入

// RealConnection
private fun connectSocket(...) {
  ...
  val rawSocket = when (proxy.type()) {
    Proxy.Type.DIRECT, Proxy.Type.HTTP -> address.socketFactory.createSocket()!!
    else -> Socket(proxy)
  }
  ...
  source = rawSocket.source().buffer()
  sink = rawSocket.sink().buffer()
  ...
}

RealConnection#connectSocket 中,创建完 Socket 后,会打开对应的 Source 和 Sink。 Source 和 Sink 是 okio 里的概念。其中 Source 负责读取数据, Sink 负责写入数据。 后续的 HTTP 层数据的读取写入会由这两个类负责,而不会直接操作 Socket。

okio 的内容会在后面描述。在那之前,先把它们当黑盒去理解吧。

重点2 HTTP的数据操作

这里有两个比较关键的类。

  • okhttp3.internal.connection.Exchange:负责发送 HTTP 请求和读取响应。Exchange,交换,用请求交换响应。一个 HTTP 编程了很常见的概念和命名。Exchange 抽象了 HTTP 的操作,底层可能是 HTTP/1.1 也可能是 HTTP/2。

  • okhttp3.internal.http.ExchangeCodec:负责 HTTP 请求响应的编码解码。具体的实现类为 Http1ExchangeCodec 和 Http2ExchangeCodec。在 okhttp3.internal.connection.RealConnection#newCodec 中,会根据情况返回具体的实现类。

然后,让我们移步到 ServerCallInterceptor。我把 GET 请求的代码单独抽了出来。 虽然处理不同的请求和响应的逻辑有点复杂,但总而言之,都是利用 Exchange 对象完成 HTTP headers 和 body 的写入读取。

class CallServerInterceptor(private val forWebSocket: Boolean) : Interceptor {

  @Throws(IOException::class)
  override fun intercept(chain: Interceptor.Chain): Response {

      exchange.writeRequestHeaders(request)
      exchange.noRequestBody()
      exchange.finishRequest()
      responseBuilder = exchange.readResponseHeaders(expectContinue = false)!!

      var response = responseBuilder
          .request(request)
          .handshake(exchange.connection.handshake())
          .sentRequestAtMillis(sentRequestMillis)
          .receivedResponseAtMillis(System.currentTimeMillis())
          .build()
      response.newBuilder()
            .body(exchange.openResponseBody(response))
            .build()

  }
}

从 Exchange 开始追溯代码,最后都会调用 okio 的 Sink 和 Source 完成 IO 操作。

HTTP 写入

Exchange.writeRequestHeaders() 
  -> Http1ExchangeCodec#writeRequestHeaders() 
  -> Http1ExchangeCodec#writeRequest()
  -> Sink#writeUtf8()
fun writeRequest(headers: Headers, requestLine: String) {
  check(state == STATE_IDLE) { "state: $state" }
  sink.writeUtf8(requestLine).writeUtf8("\r\n")
  for (i in 0 until headers.size) {
    sink.writeUtf8(headers.name(i))
        .writeUtf8(": ")
        .writeUtf8(headers.value(i))
        .writeUtf8("\r\n")
  }
  sink.writeUtf8("\r\n")
  state = STATE_OPEN_REQUEST_BODY
}

HTTP 读取

Exchange#readResponseHeaders() 
  -> Http1ExchangeCodec#readResponseHeaders() 
  -> HeadersReader#readHeaders()
  -> Headers.Builder#addLenient()
internal fun addLenient(line: String) = apply {
  val index = line.indexOf(':', 1)
  when {
    index != -1 -> {
      addLenient(line.substring(0, index), line.substring(index + 1))
    }
    line[0] == ':' -> {
      addLenient("", line.substring(1)) // Empty header name.
    }
    else -> {
      addLenient("", line)
    }
  }
}

HTTP/2

WebSocket

okio

okio 是 okhttp 底层的 NIO 框架。 在 okhttp 项目的早期阶段,okio 只是 okhttp 内部的使用 NIO 的抽象和包装。 后续迭代中,okio 逐渐完善,并被抽离出来成为了一个独立可复用的 NIO 框架。

okio 包装了 java.iojava.nio ,对外提供了一套易于使用的非阻塞 IO 操作接口。

与 Netty 和 XNIO 等其他 NIO 框架类似,okio 也在减少数据复制成本、提高代码效率方面花了不少功夫。 但总的来说,业界在 NIO 方面的思路都是大同小异的。

// TODO:

其实实现细节

超时 Timeout
线程调度

参考材料

Droidcon Montreal Jake Wharton - A Few Ok Libraries - YouTube