[关闭]
@universal 2018-10-30T08:07:40.000000Z 字数 9395 阅读 246

Are you OK ? OK了解下

源码分析


简介

先简单介绍下Okhttp的用法吧:
直接从官网摘的:

  1. OkHttpClient client = new OkHttpClient();
  2. String run(String url) throws IOException {
  3. Request request = new Request.Builder()
  4. .url(url)
  5. .build();
  6. Response response = client.newCall(request).execute();
  7. return response.body().string();
  8. }

这是同步的方式,异步就不放了。可以看到里面其实很关键的几个变量或者方法:OkHttpClient、Request、Response、newCall(),newCall其实返回的是一个Call对象,所以OK实际上也就是由这几个关键类构造起来的,为外部提供接口调用。
其他一些OK的详细用法,就不在这里讲了,这篇主要串一下OK的工作原理。

Loading.......


入口

先看下OkhttpClient这个类,它实现了Call.Factory接口,是构建call对象的关键。

  1. public class OkHttpClient implements Cloneable, Call.Factory, WebSocket.Factory {
  2. public OkHttpClient() {
  3. this(new Builder());
  4. }
  5. @Override public Call newCall(Request request) {
  6. return RealCall.newRealCall(this, request, false /* for web socket */);
  7. }
  8. }

Builder是OkHttpClient中的静态内部类,主要是对一些参数进行初始化,比如说dispatcher等。

  1. //OkHttpClient.Builder
  2. public Builder() {
  3. dispatcher = new Dispatcher();
  4. protocols = DEFAULT_PROTOCOLS;//协议
  5. connectionSpecs = DEFAULT_CONNECTION_SPECS;
  6. eventListenerFactory = EventListener.factory(EventListener.NONE);
  7. proxySelector = ProxySelector.getDefault();
  8. cookieJar = CookieJar.NO_COOKIES;
  9. socketFactory = SocketFactory.getDefault();
  10. hostnameVerifier = OkHostnameVerifier.INSTANCE;
  11. certificatePinner = CertificatePinner.DEFAULT;
  12. proxyAuthenticator = Authenticator.NONE;
  13. authenticator = Authenticator.NONE;
  14. connectionPool = new ConnectionPool();
  15. dns = Dns.SYSTEM;
  16. followSslRedirects = true;
  17. followRedirects = true;
  18. retryOnConnectionFailure = true;
  19. connectTimeout = 10_000;
  20. readTimeout = 10_000;
  21. writeTimeout = 10_000;
  22. pingInterval = 0;
  23. }

OkhttpClient中的newCall方法返回了一个realCall对象。我们这里先从同步的方式入手:

  1. final class RealCall implements Call {
  2. static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
  3. RealCall call = new RealCall(client, originalRequest, forWebSocket);
  4. call.eventListener = client.eventListenerFactory().create(call);
  5. return call;
  6. }
  7. @Override
  8. public Response execute() throws IOException {
  9. synchronized (this) {
  10. if (executed) throw new IllegalStateException("Already Executed");
  11. executed = true;
  12. }
  13. captureCallStackTrace();
  14. eventListener.callStart(this);
  15. try {
  16. client.dispatcher().executed(this);
  17. Response result = getResponseWithInterceptorChain();
  18. if (result == null) throw new IOException("Canceled");
  19. return result;
  20. } catch (IOException e) {
  21. eventListener.callFailed(this, e);
  22. throw e;
  23. } finally {
  24. client.dispatcher().finished(this);
  25. }
  26. }
  27. }

再看其调用的excute()方法,先同步保证当前call对象只能被执行一次。然后调用dispatcher().executed(this),做一些准备工作。
调用getResponseWithInterceptorChain();来实际执行请求并且获取实际Http返回结果,最后调用dispatcher()通知请求结束。

  1. //Dispatcher
  2. synchronized void executed(RealCall call) {
  3. runningSyncCalls.add(call);
  4. }

runningSyncCalls是一个双向队列,存储的是正在执行的同步的RealCall对象。

核心

  1. //RealCall
  2. Response getResponseWithInterceptorChain() throws IOException {
  3. // Build a full stack of interceptors.
  4. List<Interceptor> interceptors = new ArrayList<>();
  5. interceptors.addAll(client.interceptors());
  6. interceptors.add(retryAndFollowUpInterceptor);
  7. interceptors.add(new BridgeInterceptor(client.cookieJar()));
  8. interceptors.add(new CacheInterceptor(client.internalCache()));
  9. interceptors.add(new ConnectInterceptor(client));
  10. if (!forWebSocket) {
  11. interceptors.addAll(client.networkInterceptors());
  12. }
  13. interceptors.add(new CallServerInterceptor(forWebSocket));
  14. Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
  15. originalRequest, this, eventListener, client.connectTimeoutMillis(),
  16. client.readTimeoutMillis(), client.writeTimeoutMillis());
  17. return chain.proceed(originalRequest);
  18. }

Interceptor 这个东西不仅仅是仅仅起了拦截请求的功能,它是OK的核心,把实际的网络请求、IO、缓存、透明压缩等功能都统一了起来,每一个功能都只是一个 Interceptor,它们再连接成一个 Interceptor.Chain,环环相扣,最终圆满完成一次网络请求。
此处输入图片的描述
可以看到上面代码,责任链的顺序:
1. 在配置OkhttpClient时设置的interceptor(自定义的interceptor);
2. 负责失败重试和重定向的拦截器;
3. 负责把用户构造的请求转换为发送到服务器的请求、把服务器返回的网络响应转换为用户响应的 BridgeInterceptor;
4. 负责请求的缓存,包括读取和写入;
5. 负责和服务器建立连接的 ConnectInterceptor;
6. 配置OkHttpClient时设置的 networkInterceptors;
7. 负责向服务器发送请求数据、从服务器读取响应数据的 CallServerInterceptor。

简单分析下:
1)这里功能虽然复杂,但是最后一个功能一定是负责和服务器进行实际通讯的,关于缓存、重定向之类都是需要在之前设置的。
2)Request通过这个责任链转换为Response, 而每个interceptor都可以独立的返回一个response,交由上层处理。
3)上层链的proceed方法 调用下层的intercept方法执行处理逻辑并继续去调用下层链的proceed,并返回response。层层嵌套,从而执行完整个责任链。这里逻辑的总控在类 RealInterceptorChain 中,大家可以下去仔细看下。
4)这种责任链模式将实际的网络请求操作逻辑给隔离,脱离RealCall,简化了各个部分负责的功能,优雅的实现了最终的需求。

其实这个责任链特别像计网中的分层,有木有! 下层向上层传递数据。

Interceptor

  1. public final class ConnectInterceptor implements Interceptor {
  2. public final OkHttpClient client;
  3. public ConnectInterceptor(OkHttpClient client) {
  4. this.client = client;
  5. }
  6. @Override public Response intercept(Chain chain) throws IOException {
  7. RealInterceptorChain realChain = (RealInterceptorChain) chain;
  8. Request request = realChain.request();
  9. StreamAllocation streamAllocation = realChain.streamAllocation();
  10. // We need the network to satisfy this request. Possibly for validating a conditional GET.
  11. boolean doExtensiveHealthChecks = !request.method().equals("GET");
  12. HttpCodec httpCodec = streamAllocation.newStream(client, chain, doExtensiveHealthChecks);
  13. RealConnection connection = streamAllocation.connection();
  14. return realChain.proceed(request, streamAllocation, httpCodec, connection);
  15. }
  16. }

可以看到,建立连接其实就是根据StreamAllocation创建了一个HttpCodec对象和RealConnection对象,httpcodec主要是对http请求进行编码和对响应进行解码,HttpCodec的两个实现类Http2Codec、Http1Codec,很明显是针对Http/1.1 和Http/2版本。
StreamAllocation主要是用来协调Connections、Streams、Calls三个实体类之间的关系。大致上的逻辑就是寻找合适的realconnection,利用 RealConnection 的输入输出(BufferedSource 和 BufferedSink)创建 HttpCodec对象。
比如这样:(只截取了一小段)

  1. RealConnection resultConnection = findHealthyConnection(connectTimeout, readTimeout,
  2. writeTimeout, pingIntervalMillis, connectionRetryEnabled, doExtensiveHealthChecks);
  3. HttpCodec resultCodec = resultConnection.newCodec(client, chain, this);

再看看CallServerInterceptor

  1. public final class CallServerInterceptor implements Interceptor {
  2. private final boolean forWebSocket;
  3. @Override public Response intercept(Chain chain) throws IOException {
  4. RealInterceptorChain realChain = (RealInterceptorChain) chain;
  5. HttpCodec httpCodec = realChain.httpStream();
  6. StreamAllocation streamAllocation = realChain.streamAllocation();
  7. RealConnection connection = (RealConnection) realChain.connection();
  8. Request request = realChain.request();
  9. httpCodec.writeRequestHeaders(request);
  10. if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
  11. CountingSink requestBodyOut =
  12. new CountingSink(httpCodec.createRequestBody(request, contentLength));
  13. BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
  14. request.body().writeTo(bufferedRequestBody);
  15. bufferedRequestBody.close();
  16. }
  17. httpCodec.finishRequest();
  18. if (responseBuilder == null) {
  19. responseBuilder = httpCodec.readResponseHeaders(false);
  20. }
  21. Response response = responseBuilder
  22. .request(request)
  23. .handshake(streamAllocation.connection().handshake())
  24. .sentRequestAtMillis(sentRequestMillis)
  25. .receivedResponseAtMillis(System.currentTimeMillis())
  26. .build();
  27. if (forWebSocket && code == 101) {
  28. response = response.newBuilder()
  29. .body(Util.EMPTY_RESPONSE)
  30. .build();
  31. } else {
  32. response = response.newBuilder()
  33. .body(httpCodec.openResponseBody(response))
  34. .build();
  35. }
  36. return response;
  37. }
  38. }

这里只挑出了核心代码,省略了一些监听回调和检查代码。
可以看到它的大致流程:
1. 通过httpcodec写入request header
2. 如果request body不为空,则发送到服务器
3. 读取responseHeader,构建Response
4. 如果response body不为空,则加入response中
这4步的核心都是由HttpCodec完成,HttpCodec的底层又是通过Okio,而Okio实际上还是用的Socket(socket逻辑实现在RealConnection中)。这里具体的逻辑就不分析了,有兴趣的童鞋可以去了解下。

经过上面的同步请求execute后,我们就能拿到返回的Response。
注意:响应的其他部分可以随意获取,但是response.body必须通过数据流的方式来进行访问(ok也提供了 body.string() 和 bytes() 这样的方法将流内的数据一次性读取完毕)。响应的 body 被封装到 ResponseBody 类中,该类主要有两点需要注意:
(1)每个 body 只能被消费一次,多次消费会抛出异常;
(2)body 必须被关闭,否则会发生资源泄漏;


异步

  1. //RealCall
  2. @Override public void enqueue(Callback responseCallback) {
  3. synchronized (this) {
  4. if (executed) throw new IllegalStateException("Already Executed");
  5. executed = true;
  6. }
  7. captureCallStackTrace();
  8. eventListener.callStart(this);
  9. client.dispatcher().enqueue(new AsyncCall(responseCallback));
  10. }
  11. //Dispatcher
  12. synchronized void enqueue(AsyncCall call) {
  13. if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost){
  14. runningAsyncCalls.add(call);
  15. executorService().execute(call);
  16. } else {
  17. readyAsyncCalls.add(call);
  18. }
  19. }
  20. public synchronized ExecutorService executorService() {
  21. if (executorService == null) {
  22. //核心线程数为0,当线程空闲时只能活60秒,不存储元素的阻塞工作队列
  23. executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
  24. new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
  25. }
  26. return executorService;
  27. }

可以看到异步enqueue()方法主要是通过dispatcher来执行,如果队列runningAsyncCalls当前还能执行一个并发请求(具体的判断标准:即正在执行的异步队列中Call对象个数小于maxRequests(64)并且执行队列中的同一个host对应的Call对象个数小于maxRequestsPerHost(5)的时候),则调用executorService立即执行,可以看到这个executorService就是一个可缓存线程池。否则 将任务加入队列readyAsyncCalls,等待正在执行的Call请求的完成,调用promoteCalls,将call请求加入running状态中。

  1. final class AsyncCall extends NamedRunnable {
  2. private final Callback responseCallback;
  3. AsyncCall(Callback responseCallback) {
  4. super("OkHttp %s", redactedUrl());
  5. this.responseCallback = responseCallback;
  6. }
  7. String host() { return originalRequest.url().host(); }
  8. Request request() { return originalRequest; }
  9. RealCall get() { return RealCall.this; }
  10. @Override protected void execute() {
  11. boolean signalledCallback = false;
  12. try {
  13. Response response = getResponseWithInterceptorChain();
  14. if (retryAndFollowUpInterceptor.isCanceled()) {
  15. signalledCallback = true;
  16. responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
  17. } else {
  18. signalledCallback = true;
  19. responseCallback.onResponse(RealCall.this, response);
  20. }
  21. } catch (IOException e) {
  22. if (signalledCallback) {
  23. // Do not signal the callback twice!
  24. Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
  25. } else {
  26. eventListener.callFailed(RealCall.this, e);
  27. responseCallback.onFailure(RealCall.this, e);
  28. }
  29. } finally {
  30. client.dispatcher().finished(this);
  31. }
  32. }
  33. }

上面的AsyncCall 是RealCall中的一个内部类,它实现了Runnable接口,所以可以直接提交给线程池执行,这里的run方法在父类NamedRunnable 中,内部其实调用的还是子类的execute方法,因此这里获取response的逻辑还是通过getResponseWithInterceptorChain责任链来进行的,最后通过接口回调responseCallback将响应发回去。注意这里是没有做线程切换的,所以想要UI更新的童鞋注意了。

Last

本篇只是简单的介绍了ok的工作流程,里面详细的工作机制,比如socket连接池、缓存策略等,只能期待下一篇了。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注