例子
从简单的例子开始分析Retrofit2是怎么和其他的库一起合作的,
下边是一个很简单的例子,是rxjava2 + retrofit2 + okhttp3 + gson混合使用,是访问淘宝的ip地址查询服务,返回信息输出到EditText里。
public static Retrofit getRetrofit() { if (retrofit == null) { synchronized (Retrofit.class) { if (retrofit == null) { retrofit = new Retrofit.Builder() .baseUrl(BASE_URL) .addConverterFactory(ScalarsConverterFactory.create()) .addConverterFactory(GsonConverterFactory.create()) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .client(getOkHttpClient()) .build(); } } } return retrofit; }
public interface IpServiceRx { @Headers({ "Accept-Encoding: application/json", "User-Agent: wz" }) @GET("getIpInfo.php") Observable<Response<IpModel>> getIpMsg(@Query("ip") String ip); }
/** * rxjava2 + retrofit2 + okhttp3 */ private void requestData3() { Retrofit retrofit = NetworkUtils.getRetrofit(); IpServiceRx ipServiceRx = retrofit.create(IpServiceRx.class); String ip = "117.100.130.5"; Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip); ipMsg.throttleFirst(500, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Response<IpModel>>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Response<IpModel> ipModelResponse) { IpModel ipModel = ipModelResponse.body(); if (ipModel == null) { return; } IpData data = ipModel.getData(); if (data == null) { return; } mEt.setText(getCSData(data)); } @Override public void onError(@NonNull Throwable e) { mEt.setText(e.toString()); e.printStackTrace(); } @Override public void onComplete() { } }); }
先从创建Retrofit时传递的几个factory看起
ConverterFactory
.addConverterFactory(GsonConverterFactory.create())
public Builder addConverterFactory(Converter.Factory factory) { converterFactories.add(checkNotNull(factory, "factory == null")); return this; }
把转换器加入到了一个list中
public final class GsonConverterFactory extends Converter.Factory { /** * Create an instance using a default {@link Gson} instance for conversion. Encoding to JSON and * decoding from JSON (when no charset is specified by a header) will use UTF-8. */ public static GsonConverterFactory create() { return create(new Gson()); } /** * Create an instance using {@code gson} for conversion. Encoding to JSON and * decoding from JSON (when no charset is specified by a header) will use UTF-8. */ @SuppressWarnings("ConstantConditions") // Guarding public API nullability. public static GsonConverterFactory create(Gson gson) { if (gson == null) throw new NullPointerException("gson == null"); return new GsonConverterFactory(gson); } private final Gson gson; private GsonConverterFactory(Gson gson) { this.gson = gson; } //返回解析okhttp3.ResponseBody的Converter实例 @Override public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) { TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type)); return new GsonResponseBodyConverter<>(gson, adapter); } //返回解析okhttp3.RequsetBody的Converter实例 @Override public Converter<?, RequestBody> requestBodyConverter(Type type, Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) { TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type)); return new GsonRequestBodyConverter<>(gson, adapter); } }
public interface Converter<F, T> { @Nullable T convert(F value) throws IOException; /** Creates {@link Converter} instances based on a type and target usage. */ abstract class Factory { /** * Returns a {@link Converter} for converting an HTTP response body to {@code type}, or null if * {@code type} cannot be handled by this factory. This is used to create converters for * response types such as {@code SimpleResponse} from a {@code Call<SimpleResponse>} * declaration. */ public @Nullable Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) { return null; } /** * Returns a {@link Converter} for converting {@code type} to an HTTP request body, or null if * {@code type} cannot be handled by this factory. This is used to create converters for types * specified by {@link Body @Body}, {@link Part @Part}, and {@link PartMap @PartMap} * values. */ public @Nullable Converter<?, RequestBody> requestBodyConverter(Type type, Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) { return null; } /** * Returns a {@link Converter} for converting {@code type} to a {@link String}, or null if * {@code type} cannot be handled by this factory. This is used to create converters for types * specified by {@link Field @Field}, {@link FieldMap @FieldMap} values, * {@link Header @Header}, {@link HeaderMap @HeaderMap}, {@link Path @Path}, * {@link Query @Query}, and {@link QueryMap @QueryMap} values. */ public @Nullable Converter<?, String> stringConverter(Type type, Annotation[] annotations, Retrofit retrofit) { return null; } /** * Extract the upper bound of the generic parameter at {@code index} from {@code type}. For * example, index 1 of {@code Map<String, ? extends Runnable>} returns {@code Runnable}. */ protected static Type getParameterUpperBound(int index, ParameterizedType type) { return Utils.getParameterUpperBound(index, type); } /** * Extract the raw class type from {@code type}. For example, the type representing * {@code List<? extends Runnable>} returns {@code List.class}. */ protected static Class<?> getRawType(Type type) { return Utils.getRawType(type); } } }
CallAdapterFactory
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
public Builder addCallAdapterFactory(CallAdapter.Factory factory) { callAdapterFactories.add(checkNotNull(factory, "factory == null")); return this; }
public final class RxJava2CallAdapterFactory extends CallAdapter.Factory { /** * Returns an instance which creates synchronous observables that do not operate on any scheduler * by default. */ public static RxJava2CallAdapterFactory create() { return new RxJava2CallAdapterFactory(null, false); } private final @Nullable Scheduler scheduler; private final boolean isAsync; private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) { this.scheduler = scheduler; this.isAsync = isAsync; } ... }
public interface CallAdapter<R, T> { Type responseType(); //注意这里的Call其实是Retrofit自己写的Call,并不是okhttp里的。 T adapt(Call<R> call); /** * Creates {@link CallAdapter} instances based on the return type of {@linkplain * Retrofit#create(Class) the service interface} methods. */ abstract class Factory { /** * Returns a call adapter for interface methods that return {@code returnType}, or null if it * cannot be handled by this factory. */ public abstract @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit); /** * Extract the upper bound of the generic parameter at {@code index} from {@code type}. For * example, index 1 of {@code Map<String, ? extends Runnable>} returns {@code Runnable}. */ protected static Type getParameterUpperBound(int index, ParameterizedType type) { return Utils.getParameterUpperBound(index, type); } /** * Extract the raw class type from {@code type}. For example, the type representing * {@code List<? extends Runnable>} returns {@code List.class}. */ protected static Class<?> getRawType(Type type) { return Utils.getRawType(type); } } }
上边只是暂时列出来,后边会慢慢分析。
然后看下build()
public Retrofit build() { // 没有设置时会自动创建一个OkHttpClient okhttp3.Call.Factory callFactory = this.callFactory; if (callFactory == null) { callFactory = new OkHttpClient(); } // platform是Android,defaultCallbackExecutor是主线程handler。 Executor callbackExecutor = this.callbackExecutor; if (callbackExecutor == null) { callbackExecutor = platform.defaultCallbackExecutor(); } // 可以看到callAdapterFactories包含了我们设置的,还有platform自带的 // Make a defensive copy of the adapters and add the default Call adapter. List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(this.callAdapterFactories); callAdapterFactories.addAll(platform.defaultCallAdapterFactories(callbackExecutor)); // 而converterFactories也是类似,包含了我们设置的,还有自带的几个。 // Make a defensive copy of the converters. List<Converter.Factory> converterFactories = new ArrayList<>( 1 + this.converterFactories.size() + platform.defaultConverterFactoriesSize()); // Add the built-in converter factory first. This prevents overriding its behavior but also // ensures correct behavior when using converters that consume all types. converterFactories.add(new BuiltInConverters()); converterFactories.addAll(this.converterFactories); converterFactories.addAll(platform.defaultConverterFactories()); return new Retrofit(callFactory, baseUrl, unmodifiableList(converterFactories), unmodifiableList(callAdapterFactories), callbackExecutor, validateEagerly); }
接着看retrofit.create
IpServiceRx ipServiceRx = retrofit.create(IpServiceRx.class);
public <T> T create(final Class<T> service) { ... return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service }, new InvocationHandler() { // 此处platform是Android,抽象类Platform有两个继承类,一个叫Android,还有一个Java8。 private final Platform platform = Platform.get(); private final Object[] emptyArgs = new Object[0]; @Override public @Nullable Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable { // 如果是object的方法则直接执行 if (method.getDeclaringClass() == Object.class) { return method.invoke(this, args); } // jdk8引入的接口默认方法,不过由于Java8这个类实现了invokeDefaultMethod,而Android这个类没有实现此方法所以跳过 if (platform.isDefaultMethod(method)) { return platform.invokeDefaultMethod(method, service, proxy, args); } return loadServiceMethod(method).invoke(args != null ? args : emptyArgs); } }); }
可以看到其实是使用了动态代理的方法,来把原类型创建出一个代理对象,
接着我们通过这个代理对象调用方法,
Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip);
就会执行InvocationHandler.invoke方法,
invoke方法里,如果是object的方法则直接执行并返回,接着默认方法也跳过,
直接看loadServiceMethod
ServiceMethod<?> loadServiceMethod(Method method) { ServiceMethod<?> result = serviceMethodCache.get(method); if (result != null) return result; synchronized (serviceMethodCache) { result = serviceMethodCache.get(method); if (result == null) { result = ServiceMethod.parseAnnotations(this, method); serviceMethodCache.put(method, result); } } return result; }
ServiceMethod
static <T> ServiceMethod<T> parseAnnotations(Retrofit retrofit, Method method) { // 这个类是用来把我们在方法上的注解和之后传递的参数生成一个okhttp的request,下边会用到。 RequestFactory requestFactory = RequestFactory.parseAnnotations(retrofit, method); Type returnType = method.getGenericReturnType(); if (Utils.hasUnresolvableType(returnType)) { throw methodError(method, "Method return type must not include a type variable or wildcard: %s", returnType); } // 返回类型不能时void if (returnType == void.class) { throw methodError(method, "Service methods cannot return void."); } return HttpServiceMethod.parseAnnotations(retrofit, method, requestFactory); }
HttpServiceMethod
static <ResponseT, ReturnT> HttpServiceMethod<ResponseT, ReturnT> parseAnnotations( Retrofit retrofit, Method method, RequestFactory requestFactory) { boolean isKotlinSuspendFunction = requestFactory.isKotlinSuspendFunction; boolean continuationWantsResponse = false; boolean continuationBodyNullable = false; // 获取方法上的注解 Annotation[] annotations = method.getAnnotations(); Type adapterType; if (isKotlinSuspendFunction) { ... } else { // 方法的返回Type类型 adapterType = method.getGenericReturnType(); } // 在下边进行分析 CallAdapter<ResponseT, ReturnT> callAdapter = createCallAdapter(retrofit, method, adapterType, annotations); // 校验返回类型是否正确,即Response<IpModel> Type responseType = callAdapter.responseType(); // 就是说返回类型不能时okhttp3.Response if (responseType == okhttp3.Response.class) { throw methodError(method, "‘" + getRawType(responseType).getName() + "‘ is not a valid response body type. Did you mean ResponseBody?"); } // 返回类型不能是Response,必须要包含泛型才行Response<String>,这个Response是retrofit2里定义的,不是okhttp3.Response if (responseType == Response.class) { throw methodError(method, "Response must include generic type (e.g., Response<String>)"); } // TODO support Unit for Kotlin? if (requestFactory.httpMethod.equals("HEAD") && !Void.class.equals(responseType)) { throw methodError(method, "HEAD method must use Void as response type."); } // 在下边进行分析 Converter<ResponseBody, ResponseT> responseConverter = createResponseConverter(retrofit, method, responseType); // callFactory 其实就是OkHttpClient okhttp3.Call.Factory callFactory = retrofit.callFactory; if (!isKotlinSuspendFunction) { return new CallAdapted<>(requestFactory, callFactory, responseConverter, callAdapter); } else ... } }
最后创建了一个CallAdapted对象返回,
CallAdapted继承关系:
CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT>
HttpServiceMethod<ResponseT, ReturnT> extends ServiceMethod<ReturnT>
createCallAdapter
HttpServiceMethod.createCallAdapter
private static <ResponseT, ReturnT> CallAdapter<ResponseT, ReturnT> createCallAdapter( Retrofit retrofit, Method method, Type returnType, Annotation[] annotations) { try { //noinspection unchecked return (CallAdapter<ResponseT, ReturnT>) retrofit.callAdapter(returnType, annotations); } catch (RuntimeException e) { // Wide exception range because factories are user code. throw methodError(method, e, "Unable to create call adapter for %s", returnType); }
retrofit.callAdapter
public CallAdapter<?, ?> callAdapter(Type returnType, Annotation[] annotations) { return nextCallAdapter(null, returnType, annotations); } public CallAdapter<?, ?> nextCallAdapter(@Nullable CallAdapter.Factory skipPast, Type returnType, Annotation[] annotations) { int start = callAdapterFactories.indexOf(skipPast) + 1; for (int i = start, count = callAdapterFactories.size(); i < count; i++) { CallAdapter<?, ?> adapter = callAdapterFactories.get(i).get(returnType, annotations, this); if (adapter != null) { return adapter; } } ... throw new IllegalArgumentException(builder.toString()); }
总的来说就是从我们之前设置的和自带的calladapterFactory中找到一个,调用get获取一个CallAdapter的就直接返回。
就用RxJava2CallAdapterFactory.get来说明:
@Override public @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) { // 我们的returnType是Observable<Response<IpModel>>的Type。 // 此方法返回Observable,具体看下边getRawType源码 Class<?> rawType = getRawType(returnType); // 显然下边都为false boolean isFlowable = rawType == Flowable.class; boolean isSingle = rawType == Single.class; boolean isMaybe = rawType == Maybe.class; if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) { return null; } boolean isResult = false; boolean isBody = false; Type responseType; // 返回泛型参数,即Response<IpModel> Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType); // 再次返回Response<IpModel>的RawType,即retrofit的Response Class<?> rawObservableType = getRawType(observableType); if (rawObservableType == Response.class) { // 再次返回Response<IpModel>的UpperBound,即IpModel responseType = getParameterUpperBound(0, (ParameterizedType) observableType); } else if (rawObservableType == Result.class) { if (!(observableType instanceof ParameterizedType)) { throw new IllegalStateException("Result must be parameterized" + " as Result<Foo> or Result<? extends Foo>"); } responseType = getParameterUpperBound(0, (ParameterizedType) observableType); isResult = true; } else { responseType = observableType; isBody = true; } // 由上边可知,传递进构造函数的Boolean都是false,创建RxJava2CallAdapterFactory时scheduler为null,isAsync为false, // responseType为IpModel return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false); }
Utils.getRawType
static Class<?> getRawType(Type type) { // 是具体类型 if (type instanceof Class<?>) { // Type is a normal class. return (Class<?>) type; } // 是带泛型的类型 if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; // 返回Observable Type rawType = parameterizedType.getRawType(); if (!(rawType instanceof Class)) throw new IllegalArgumentException(); return (Class<?>) rawType; } // 其他类型 ...
Utils.getParameterUpperBound
static Type getParameterUpperBound(int index, ParameterizedType type) { Type[] types = type.getActualTypeArguments(); Type paramType = types[index]; return paramType; }
createResponseConverter
HttpServiceMethod.createResponseConverter
private static <ResponseT> Converter<ResponseBody, ResponseT> createResponseConverter(Retrofit retrofit, Method method, Type responseType) { Annotation[] annotations = method.getAnnotations(); try { return retrofit.responseBodyConverter(responseType, annotations); } catch (RuntimeException e) { // Wide exception range because factories are user code. throw methodError(method, e, "Unable to create converter for %s", responseType); } }
retrofit.responseBodyConverter
public <T> Converter<ResponseBody, T> responseBodyConverter(Type type, Annotation[] annotations) { return nextResponseBodyConverter(null, type, annotations); } public <T> Converter<ResponseBody, T> nextResponseBodyConverter(@Nullable Converter.Factory skipPast, Type type, Annotation[] annotations) { int start = converterFactories.indexOf(skipPast) + 1; for (int i = start, count = converterFactories.size(); i < count; i++) { Converter<ResponseBody, ?> converter = converterFactories.get(i).responseBodyConverter(type, annotations, this); if (converter != null) { //noinspection unchecked return (Converter<ResponseBody, T>) converter; } } ... throw new IllegalArgumentException(builder.toString()); }
总的来说就是从我们之前设置的和自带的converterFactory中找到一个,然后获取具体的responseBodyConverter。
就用GsonConverterFactory.responseBodyConverter来说明:
@Override public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) { TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type)); return new GsonResponseBodyConverter<>(gson, adapter); }
GsonResponseBodyConverter
GsonRequestBodyConverter(Gson gson, TypeAdapter<T> adapter) { this.gson = gson; this.adapter = adapter; } @Override public RequestBody convert(T value) throws IOException { Buffer buffer = new Buffer(); Writer writer = new OutputStreamWriter(buffer.outputStream(), UTF_8); JsonWriter jsonWriter = gson.newJsonWriter(writer); adapter.write(jsonWriter, value); jsonWriter.close(); return RequestBody.create(MEDIA_TYPE, buffer.readByteString()); }
loadServiceMethod(method).invoke
一圈分析后在返回上边的retrofit.create内部分invoke的最后
loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
由上边可知loadServiceMethod方法返回的是CallAdapted,
而CallAdapted继承关系:
CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT>
HttpServiceMethod<ResponseT, ReturnT> extends ServiceMethod<ReturnT>
调用invoke是调用到的HttpServiceMethod.invoke
@Override final @Nullable ReturnT invoke(Object[] args) { Call<ResponseT> call = new OkHttpCall<>(requestFactory, args, callFactory, responseConverter); return adapt(call, args); }
注意此处的call都是retrofit的,不是okhttp的。
在其中创建了个OkHttpCall对象,顾名思义,里边肯定就是通过okhttp的call进行网络请求的,绕了一大圈终于找到实际请求的地方了。
接着看adapt
adapt实际调用的是CallAdapted.adapt
@Override protected ReturnT adapt(Call<ResponseT> call, Object[] args) { return callAdapter.adapt(call); }
此处的callAdapter其实就是上边的RxJava2CallAdapter,
所以就去RxJava2CallAdapter中看看
@Override public Object adapt(Call<R> call) { Observable<Response<R>> responseObservable = isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call); Observable<?> observable; if (isResult) { observable = new ResultObservable<>(responseObservable); } else if (isBody) { observable = new BodyObservable<>(responseObservable); } else { observable = responseObservable; } if (scheduler != null) { observable = observable.subscribeOn(scheduler); } if (isFlowable) { return observable.toFlowable(BackpressureStrategy.LATEST); } if (isSingle) { return observable.singleOrError(); } if (isMaybe) { return observable.singleElement(); } if (isCompletable) { return observable.ignoreElements(); } return RxJavaPlugins.onAssembly(observable); }
由上可知
- isAsync,isResult,isBody为false,
- scheduler = null
- isFlowable,isSingle,isMaybe,isCompletable都为false
所以说最终返回就是new CallExecuteObservable<>(call);
而RxJavaPlugins.onAssembly(observable);中
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
我们并没有对rxjava设置hook,所以返回的还是CallExecuteObservable,
CallExecuteObservable创建时传递的call就是OkHttpCall。
接着就是rxjava操作了
这里顺带把rxjava的一些源码也简单分析了。
Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip); ipMsg.throttleFirst(500, TimeUnit.MILLISECONDS) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Response<IpModel>>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Response<IpModel> ipModelResponse) { IpModel ipModel = ipModelResponse.body(); if (ipModel == null) { return; } IpData data = ipModel.getData(); if (data == null) { return; } mEt.setText(getCSData(data)); } @Override public void onError(@NonNull Throwable e) { mEt.setText(e.toString()); e.printStackTrace(); } @Override public void onComplete() { } });
rxjava每次调用一个转换操作,都会返回一个不同的observable,这个observable会记录上层的observable,从而形成一个从上到下的链,所以也叫链式操作。
直到最后调用subscribe,此时会触发向上订阅,即下层都会调用上层的subscribe,当然每层observable都有不同的subscribeActual实现,所以每层其实是上层的observer,同时又是下层的observable。
直到调用到顶层层的subscribeActual,即本例中的CallExecuteObservable的subscribeActual:
@Override protected void subscribeActual(Observer<? super Response<T>> observer) { // Since Call is a one-shot type, clone it for each new observer. // 就是OkHttpCall Call<T> call = originalCall.clone(); CallDisposable disposable = new CallDisposable(call); observer.onSubscribe(disposable); if (disposable.isDisposed()) { return; } boolean terminated = false; try { // 此处会去调用OkHttpCall的execute,里边肯定就是okhttp的call.execute Response<T> response = call.execute(); if (!disposable.isDisposed()) { // 开始往下层传递消息 observer.onNext(response); } if (!disposable.isDisposed()) { terminated = true; observer.onComplete(); } } catch (Throwable t) { ... } }
OkHttpCall.execute
@Override public Response<T> execute() throws IOException { okhttp3.Call call; synchronized (this) { //正确性检查 ... call = rawCall; if (call == null) { try { // 创建一个新的网络请求,看下边代码 call = rawCall = createRawCall(); } catch (IOException | RuntimeException | Error e) { throwIfFatal(e); // Do not assign a fatal error to creationFailure. creationFailure = e; throw e; } } } if (canceled) { call.cancel(); } // 解析 阻塞式call.execute() 返回的okhttp3.Response,看下边代码 return parseResponse(call.execute()); }
private okhttp3.Call createRawCall() throws IOException { okhttp3.Call call = callFactory.newCall(requestFactory.create(args)); return call; }
此处的callFactory就是上边ServiceMethod.parseAnnotations中创建的RequestFactory,通过RequestFactory构建出来一个okhttp的request对象,
最后生成一个okhttp3.Call返回。
Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException { ResponseBody rawBody = rawResponse.body(); // Remove the body‘s source (the only stateful object) so we can pass the response along. rawResponse = rawResponse.newBuilder() .body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength())) .build(); int code = rawResponse.code(); if (code < 200 || code >= 300) { try { // Buffer the entire body to avoid future I/O. ResponseBody bufferedBody = Utils.buffer(rawBody); return Response.error(bufferedBody, rawResponse); } finally { rawBody.close(); } } if (code == 204 || code == 205) { rawBody.close(); return Response.success(null, rawResponse); } ExceptionCatchingResponseBody catchingBody = new ExceptionCatchingResponseBody(rawBody); try { // 此处会用我们之前设置的Converter(即GsonResponseBodyConverter)来解析出具体的bean对象, T body = responseConverter.convert(catchingBody); return Response.success(body, rawResponse); } catch (RuntimeException e) { // If the underlying source threw an exception, propagate that rather than indicating it was // a runtime exception. catchingBody.throwIfCaught(); throw e; } }
GsonResponseBodyConverter.convert
@Override public T convert(ResponseBody value) throws IOException { JsonReader jsonReader = gson.newJsonReader(value.charStream()); try { T result = adapter.read(jsonReader); if (jsonReader.peek() != JsonToken.END_DOCUMENT) { throw new JsonIOException("JSON document was not fully consumed."); } return result; } finally { value.close(); }
observer.onNext(response);
向下传递,此时还是subscribeOn(Schedulers.io())指定的线程上操作的,
当传递到observeOn(AndroidSchedulers.mainThread())时,此observable会把线程转换成mainThread,
最后传递到subscribe传递的observer的onNext中
其他
返回值中带不带Response逻辑有什么区别
Observable<Response<IpModel>> getIpMsg(@Query("ip") String ip);
上边的分析都是基于带Response的,
那如果定义接口时不带呢,即
Observable<IpModel> getIpMsg(@Query("ip") String ip);
那么接着上边的createCallAdapter分析里的RxJava2CallAdapterFactory.get来说明:
@Override public @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) { // 我们的returnType是Observable<IpModel>的Type。 // 此方法返回Observable,具体看下边getRawType源码 Class<?> rawType = getRawType(returnType); // 显然下边都为false boolean isFlowable = rawType == Flowable.class; boolean isSingle = rawType == Single.class; boolean isMaybe = rawType == Maybe.class; if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) { return null; } boolean isResult = false; boolean isBody = false; Type responseType; // 返回泛型参数,即IpModel Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType); // 还是IpModel Class<?> rawObservableType = getRawType(observableType); if (rawObservableType == Response.class) { responseType = getParameterUpperBound(0, (ParameterizedType) observableType); } else if (rawObservableType == Result.class) { if (!(observableType instanceof ParameterizedType)) { throw new IllegalStateException("Result must be parameterized" + " as Result<Foo> or Result<? extends Foo>"); } responseType = getParameterUpperBound(0, (ParameterizedType) observableType); isResult = true; } else { // 此时会进入此逻辑,isBody为true了 responseType = observableType; isBody = true; } // 由上边可知,传递进构造函数的Boolean除了isBody为true,其他都是false,创建RxJava2CallAdapterFactory时scheduler为null,isAsync为false, // responseType为IpModel return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false); }
然后接着loadServiceMethod(method).invoke里
RxJava2CallAdapter.adapt
@Override public Object adapt(Call<R> call) { Observable<Response<R>> responseObservable = isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call); Observable<?> observable; if (isResult) { observable = new ResultObservable<>(responseObservable); } else if (isBody) { observable = new BodyObservable<>(responseObservable); } else { observable = responseObservable; } if (scheduler != null) { observable = observable.subscribeOn(scheduler); } if (isFlowable) { return observable.toFlowable(BackpressureStrategy.LATEST); } if (isSingle) { return observable.singleOrError(); } if (isMaybe) { return observable.singleElement(); } if (isCompletable) { return observable.ignoreElements(); } return RxJavaPlugins.onAssembly(observable); }
由上可知
- isAsync,isResult,
- isBody为true,
- scheduler = null,
- isFlowable,isSingle,isMaybe,isCompletable都为false
所以说最终返回就是new BodyObservable<>(responseObservable);
BodyObservable(Observable<Response<T>> upstream) { this.upstream = upstream; } @Override protected void subscribeActual(Observer<? super T> observer) { upstream.subscribe(new BodyObserver<T>(observer)); }
就是说最上层是responseObservable,
那么当responseObservable开始下传数据时,会调用BodyObserver的onNext:
@Override public void onNext(Response<R> response) { if (response.isSuccessful()) { // 会把body直接传递到下层,即IpModal observer.onNext(response.body()); } else { terminated = true; Throwable t = new HttpException(response); try { observer.onError(t); } catch (Throwable inner) { Exceptions.throwIfFatal(inner); RxJavaPlugins.onError(new CompositeException(t, inner)); } } }
此处的response是retrofit的,
response会携带更多的此次网络请求的信息,如果只返回实际的bean/modal对象,那么就不能够有更多控制。
原文地址:https://www.cnblogs.com/muouren/p/11768914.html