OkHttp框架源码解析——任务队列

前言

OkHttp的任务队列在内部维护了一个线程池用于执行具体的网络请求,而线程池最大的好处在于通过线程复用减少非核心任务的损耗。

线程池的优点

多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。但如果对多线程应用不当,会增加对单个任务的处理时间。可以举一个简单的栗子:

假设在一台服务器完成一项任务的时间为T

T1 创建线程的时间

T2 在线程中执行任务的时间,包括线程间同步所需时间

T3 线程销毁的时间

显然 T = T1 + T2 + T3。

可以看出T1, T3是多线程本身带来的开销(在Java中是,通过映射pThread,并进一步通过SystemCall实现native线程),我们渴望减少T1,T3所用的时间,从而减少T的时间。但一些线程的使用者并没有注意到这一点,所以在程序中频繁的创建或销毁线程,这导致T1和T3在T中占有相当比例。显然这是突出了线程的弱点,而不是优点(并发性)。

线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。

  • 通过对线程进行缓存,减少了创建销毁线程的时间损失
  • 通过控制线程数量阈值,减少了当线程过少时带来的CPU闲置与线程过多时对JVM的内存与线程切换时系统调用的压力

类似的还有Socket连接池、DB连接池、CommonPool(比如Jedis)等技术。

OkHttp的任务队列

OkHttp的任务队列主要由两部分组成:

  • 任务分发器dispatcher:负责为任务找到合适的执行线程
  • 网络请求任务线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
// 任务队列线程池
private @Nullable ExecutorService executorService;
// 待执行异步任务队列
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
// 运行中异步任务队列
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
// 运行中同步任务队列
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ThreadPoolExecutor extends AbstractExecutorService {
...
/* corePoolSize:最小并发线程数,这里并发同时包括空闲与活动的线程,如果是0的话,空闲一段时间后所需哦有线程 将全部被销毁
maxumumPoolSize:最大线程数,当任务进来时可以扩充的线程最大值,当大于这个值就会根据丢弃处理机制来处理
keepAliveTime:当线程数大于corePoolSize时,多余的空闲线程的最大存活时间,类似与HTTP中的Keep-alive
unit:时间单位,一般用秒
workQueue:工作队列,先进先出,可以看出并不像Picasso那样设置优先队列
threadFactory:单个线程的工厂,可以打Log,设置Daemon(即当JVM退出后,线程自动结束)等
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
...
}

可以看出,在OkHttp中,构建了一个阈值为[0 ,Integer.MAX_VALUE]的线程池,它不保留任何最小线程数,随时创建更多的线程数,当线程空闲时只能活60秒,它使用了一个不存储元素的阻塞工作队列,一个叫做“OkHttp Dispatcher”的线程工厂。

Dispathcer分发器

dispatcher分发器通过Dispathcer将任务分发到合适的空闲线程,实现非阻塞,高可用,高并发连接。

同步请求

当我们使用OkHttp进行同步请求时,实际上是调用了RealCall的execute方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}

同步调用的执行步骤是:

  • 将对应任务加入分发器
  • 执行任务
  • 执行完成后通知dispatcher对应任务已完成,对应任务出队

异步请求

使用OkHttp进行异步请求时,先会调用RealCall的enqueue方法。

1
2
3
4
5
6
7
8
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

而当HttpClient请求入队时,根据代码,可以发现实际上是Dispathcer进行了Dispathcer入队操作。

1
2
3
4
5
6
7
8
9
synchronized void enqueue(AsyncCall call) {
// 如果满足条件:当前请求数小于最大请求数(64)& 对单一host的请求小于阈值(5),将任务插入正在执行任务队 列,并执行对应任务,如果不满足则将其放入待执行队列
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call); // 添加正在运行的请求
executorService().execute(call); // 线程池执行请求
} else {
readyAsyncCalls.add(call); // 添加到缓存队列排队等候
}
}

接下来看看AsyncCall的execute方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}

当任务执行完成后,无论成功与否都会调用dispathcer的finished方法,通知分发器相关任务已结束。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
// 空闲出多余线程,调用promoteCalls调用待执行的任务
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
// 如果当前整个线程池都空闲下来,执行空闲通知回调线程idleCallback
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}

接下来看看promoteCalls方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}

promoteCalls的逻辑也很简单,扫描待执行任务队列,将任务放入正在执行任务队列,并执行该任务。

小结

以上就是整个任务队列的实现细节,总结起来有以下几个特点:

  • OkHttp采用Dispatcher技术,类似于Nginx,与线程池配合实现了高并发,低阻塞的运行
  • OkHttp采用Deque作为缓存,按照入队的顺序先进先出
  • OkHttp最出彩的地方就是在try / finally中调用了finished函数,可以主动控制等待队列的移动,而不是采用锁或者wait / notify,极大减少了编码复杂性