例子
从简单的例子开始分析 Retrofit2 是怎么和其他的库一起合作的,
下边是一个很简单的例子, 是 rxjava2 + retrofit2 + okhttp3 + gson 混合使用, 是访问淘宝的 ip 地址查询服务, 返回信息输出到 EditText 里.
- public static Retrofit getRetrofit() {
- if (retrofit == null) {
- synchronized (Retrofit.class) {
- if (retrofit == null) {
- retrofit = new Retrofit.Builder()
- .baseUrl(BASE_URL)
- .addConverterFactory(ScalarsConverterFactory.create())
- .addConverterFactory(GsonConverterFactory.create())
- .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
- .client(getOkHttpClient())
- .build();
- }
- }
- }
- return retrofit;
- }
- public interface IpServiceRx {
- @Headers({
- "Accept-Encoding: application/json",
- "User-Agent: wz"
- })
- @GET("getIpInfo.php")
- Observable<Response<IpModel>> getIpMsg(@Query("ip") String ip);
- }
- /**
- * rxjava2 + retrofit2 + okhttp3
- */
- private void requestData3() {
- Retrofit retrofit = NetworkUtils.getRetrofit();
- IpServiceRx ipServiceRx = retrofit.create(IpServiceRx.class);
- String ip = "117.100.130.5";
- Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip);
- ipMsg.throttleFirst(500, TimeUnit.MILLISECONDS)
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Observer<Response<IpModel>>() {
- @Override
- public void onSubscribe(@NonNull Disposable d) {
- }
- @Override
- public void onNext(@NonNull Response<IpModel> ipModelResponse) {
- IpModel ipModel = ipModelResponse.body();
- if (ipModel == null) {
- return;
- }
- IpData data = ipModel.getData();
- if (data == null) {
- return;
- }
- mEt.setText(getCSData(data));
- }
- @Override
- public void onError(@NonNull Throwable e) {
- mEt.setText(e.toString());
- e.printStackTrace();
- }
- @Override
- public void onComplete() {
- }
- });
- }
先从创建 Retrofit 时传递的几个 factory 看起
- ConverterFactory
- .addConverterFactory(GsonConverterFactory.create())
- public Builder addConverterFactory(Converter.Factory factory) {
- converterFactories.add(checkNotNull(factory, "factory == null"));
- return this;
- }
把转换器加入到了一个 list 中
- public final class GsonConverterFactory extends Converter.Factory {
- /**
- * Create an instance using a default {@link Gson} instance for conversion. Encoding to JSON and
- * decoding from JSON (when no charset is specified by a header) will use UTF-8.
- */
- public static GsonConverterFactory create() {
- return create(new Gson());
- }
- /**
- * Create an instance using {@code gson} for conversion. Encoding to JSON and
- * decoding from JSON (when no charset is specified by a header) will use UTF-8.
- */
- @SuppressWarnings("ConstantConditions") // Guarding public API nullability.
- public static GsonConverterFactory create(Gson gson) {
- if (gson == null) throw new NullPointerException("gson == null");
- return new GsonConverterFactory(gson);
- }
- private final Gson gson;
- private GsonConverterFactory(Gson gson) {
- this.gson = gson;
- }
- // 返回解析 okhttp3.ResponseBody 的 Converter 实例
- @Override
- public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations,
- Retrofit retrofit) {
- TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
- return new GsonResponseBodyConverter<>(gson, adapter);
- }
- // 返回解析 okhttp3.RequsetBody 的 Converter 实例
- @Override
- public Converter<?, RequestBody> requestBodyConverter(Type type,
- Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) {
- TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
- return new GsonRequestBodyConverter<>(gson, adapter);
- }
- }
- public interface Converter<F, T> {
- @Nullable T convert(F value) throws IOException;
- /** Creates {@link Converter} instances based on a type and target usage. */
- abstract class Factory {
- /**
- * Returns a {@link Converter} for converting an HTTP response body to {@code type}, or null if
- * {@code type} cannot be handled by this factory. This is used to create converters for
- * response types such as {@code SimpleResponse} from a {@code Call<SimpleResponse>}
- * declaration.
- */
- public @Nullable Converter<ResponseBody, ?> responseBodyConverter(Type type,
- Annotation[] annotations, Retrofit retrofit) {
- return null;
- }
- /**
- * Returns a {@link Converter} for converting {@code type} to an HTTP request body, or null if
- * {@code type} cannot be handled by this factory. This is used to create converters for types
- * specified by {@link Body @Body}, {@link Part @Part}, and {@link PartMap @PartMap}
- * values.
- */
- public @Nullable Converter<?, RequestBody> requestBodyConverter(Type type,
- Annotation[] parameterAnnotations, Annotation[] methodAnnotations, Retrofit retrofit) {
- return null;
- }
- /**
- * Returns a {@link Converter} for converting {@code type} to a {@link String}, or null if
- * {@code type} cannot be handled by this factory. This is used to create converters for types
- * specified by {@link Field @Field}, {@link FieldMap @FieldMap} values,
- * {@link Header @Header}, {@link HeaderMap @HeaderMap}, {@link Path @Path},
- * {@link Query @Query}, and {@link QueryMap @QueryMap} values.
- */
- public @Nullable Converter<?, String> stringConverter(Type type, Annotation[] annotations,
- Retrofit retrofit) {
- return null;
- }
- /**
- * Extract the upper bound of the generic parameter at {@code index} from {@code type}. For
- * example, index 1 of {@code Map<String, ? extends Runnable>} returns {@code Runnable}.
- */
- protected static Type getParameterUpperBound(int index, ParameterizedType type) {
- return Utils.getParameterUpperBound(index, type);
- }
- /**
- * Extract the raw class type from {@code type}. For example, the type representing
- * {@code List<? extends Runnable>} returns {@code List.class}.
- */
- protected static Class<?> getRawType(Type type) {
- return Utils.getRawType(type);
- }
- }
- }
- CallAdapterFactory
- .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
- public Builder addCallAdapterFactory(CallAdapter.Factory factory) {
- callAdapterFactories.add(checkNotNull(factory, "factory == null"));
- return this;
- }
- public final class RxJava2CallAdapterFactory extends CallAdapter.Factory {
- /**
- * Returns an instance which creates synchronous observables that do not operate on any scheduler
- * by default.
- */
- public static RxJava2CallAdapterFactory create() {
- return new RxJava2CallAdapterFactory(null, false);
- }
- private final @Nullable Scheduler scheduler;
- private final boolean isAsync;
- private RxJava2CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) {
- this.scheduler = scheduler;
- this.isAsync = isAsync;
- }
- ...
- }
- public interface CallAdapter<R, T> {
- Type responseType();
- // 注意这里的 Call 其实是 Retrofit 自己写的 Call, 并不是 okhttp 里的.
- T adapt(Call<R> call);
- /**
- * Creates {@link CallAdapter} instances based on the return type of {@linkplain
- * Retrofit#create(Class) the service interface} methods.
- */
- abstract class Factory {
- /**
- * Returns a call adapter for interface methods that return {@code returnType}, or null if it
- * cannot be handled by this factory.
- */
- public abstract @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations,
- Retrofit retrofit);
- /**
- * Extract the upper bound of the generic parameter at {@code index} from {@code type}. For
- * example, index 1 of {@code Map<String, ? extends Runnable>} returns {@code Runnable}.
- */
- protected static Type getParameterUpperBound(int index, ParameterizedType type) {
- return Utils.getParameterUpperBound(index, type);
- }
- /**
- * Extract the raw class type from {@code type}. For example, the type representing
- * {@code List<? extends Runnable>} returns {@code List.class}.
- */
- protected static Class<?> getRawType(Type type) {
- return Utils.getRawType(type);
- }
- }
- }
上边只是暂时列出来, 后边会慢慢分析.
然后看下 build()
- public Retrofit build() {
- // 没有设置时会自动创建一个 OkHttpClient
- okhttp3.Call.Factory callFactory = this.callFactory;
- if (callFactory == null) {
- callFactory = new OkHttpClient();
- }
- // platform 是 Android,defaultCallbackExecutor 是主线程 handler.
- Executor callbackExecutor = this.callbackExecutor;
- if (callbackExecutor == null) {
- callbackExecutor = platform.defaultCallbackExecutor();
- }
- // 可以看到 callAdapterFactories 包含了我们设置的, 还有 platform 自带的
- // Make a defensive copy of the adapters and add the default Call adapter.
- List<CallAdapter.Factory> callAdapterFactories = new ArrayList<>(this.callAdapterFactories);
- callAdapterFactories.addAll(platform.defaultCallAdapterFactories(callbackExecutor));
- // 而 converterFactories 也是类似, 包含了我们设置的, 还有自带的几个.
- // Make a defensive copy of the converters.
- List<Converter.Factory> converterFactories = new ArrayList<>(
- 1 + this.converterFactories.size() + platform.defaultConverterFactoriesSize());
- // Add the built-in converter factory first. This prevents overriding its behavior but also
- // ensures correct behavior when using converters that consume all types.
- converterFactories.add(new BuiltInConverters());
- converterFactories.addAll(this.converterFactories);
- converterFactories.addAll(platform.defaultConverterFactories());
- return new Retrofit(callFactory, baseUrl, unmodifiableList(converterFactories),
- unmodifiableList(callAdapterFactories), callbackExecutor, validateEagerly);
- }
接着看 retrofit.create
- IpServiceRx ipServiceRx = retrofit.create(IpServiceRx.class);
- public <T> T create(final Class<T> service) {
- ...
- return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[] { service },
- new InvocationHandler() {
- // 此处 platform 是 Android, 抽象类 Platform 有两个继承类, 一个叫 Android, 还有一个 Java8.
- private final Platform platform = Platform.get();
- private final Object[] emptyArgs = new Object[0];
- @Override public @Nullable Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {
- // 如果是 object 的方法则直接执行
- if (method.getDeclaringClass() == Object.class) {
- return method.invoke(this, args);
- }
- // jdk8 引入的接口默认方法, 不过由于 Java8 这个类实现了 invokeDefaultMethod, 而 Android 这个类没有实现此方法所以跳过
- if (platform.isDefaultMethod(method)) {
- return platform.invokeDefaultMethod(method, service, proxy, args);
- }
- return loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
- }
- });
- }
可以看到其实是使用了动态代理的方法, 来把原类型创建出一个代理对象,
接着我们通过这个代理对象调用方法,
Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip); 就会执行 InvocationHandler.invoke 方法,
invoke 方法里, 如果是 object 的方法则直接执行并返回, 接着默认方法也跳过,
直接看 loadServiceMethod
- ServiceMethod<?> loadServiceMethod(Method method) {
- ServiceMethod<?> result = serviceMethodCache.get(method);
- if (result != null) return result;
- synchronized (serviceMethodCache) {
- result = serviceMethodCache.get(method);
- if (result == null) {
- result = ServiceMethod.parseAnnotations(this, method);
- serviceMethodCache.put(method, result);
- }
- }
- return result;
- }
- ServiceMethod
- static <T> ServiceMethod<T> parseAnnotations(Retrofit retrofit, Method method) {
- // 这个类是用来把我们在方法上的注解和之后传递的参数生成一个 okhttp 的 request, 下边会用到.
- RequestFactory requestFactory = RequestFactory.parseAnnotations(retrofit, method);
- Type returnType = method.getGenericReturnType();
- if (Utils.hasUnresolvableType(returnType)) {
- throw methodError(method,
- "Method return type must not include a type variable or wildcard: %s", returnType);
- }
- // 返回类型不能时 void
- if (returnType == void.class) {
- throw methodError(method, "Service methods cannot return void.");
- }
- return HttpServiceMethod.parseAnnotations(retrofit, method, requestFactory);
- }
- HttpServiceMethod
- static <ResponseT, ReturnT> HttpServiceMethod<ResponseT, ReturnT> parseAnnotations(
- Retrofit retrofit, Method method, RequestFactory requestFactory) {
- boolean isKotlinSuspendFunction = requestFactory.isKotlinSuspendFunction;
- boolean continuationWantsResponse = false;
- boolean continuationBodyNullable = false;
- // 获取方法上的注解
- Annotation[] annotations = method.getAnnotations();
- Type adapterType;
- if (isKotlinSuspendFunction) {
- ...
- } else {
- // 方法的返回 Type 类型
- adapterType = method.getGenericReturnType();
- }
- // 在下边进行分析
- CallAdapter<ResponseT, ReturnT> callAdapter = createCallAdapter(retrofit, method, adapterType, annotations);
- // 校验返回类型是否正确, 即 Response<IpModel>
- Type responseType = callAdapter.responseType();
- // 就是说返回类型不能时 okhttp3.Response
- if (responseType == okhttp3.Response.class) {
- throw methodError(method, "'"
- + getRawType(responseType).getName()
- + "'is not a valid response body type. Did you mean ResponseBody?");
- }
- // 返回类型不能是 Response, 必须要包含泛型才行 Response<String>, 这个 Response 是 retrofit2 里定义的, 不是 okhttp3.Response
- if (responseType == Response.class) {
- throw methodError(method, "Response must include generic type (e.g., Response<String>)");
- }
- // TODO support Unit for Kotlin?
- if (requestFactory.httpMethod.equals("HEAD") && !Void.class.equals(responseType)) {
- throw methodError(method, "HEAD method must use Void as response type.");
- }
- // 在下边进行分析
- Converter<ResponseBody, ResponseT> responseConverter = createResponseConverter(retrofit, method, responseType);
- // callFactory 其实就是 OkHttpClient
- okhttp3.Call.Factory callFactory = retrofit.callFactory;
- if (!isKotlinSuspendFunction) {
- return new CallAdapted<>(requestFactory, callFactory, responseConverter, callAdapter);
- } else
- ...
- }
- }
最后创建了一个 CallAdapted 对象返回,
CallAdapted 继承关系:
- CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT>
- HttpServiceMethod<ResponseT, ReturnT> extends ServiceMethod<ReturnT>
- createCallAdapter
- HttpServiceMethod.createCallAdapter
- private static <ResponseT, ReturnT> CallAdapter<ResponseT, ReturnT> createCallAdapter(
- Retrofit retrofit, Method method, Type returnType, Annotation[] annotations) {
- try {
- //noinspection unchecked
- return (CallAdapter<ResponseT, ReturnT>) retrofit.callAdapter(returnType, annotations);
- } catch (RuntimeException e) { // Wide exception range because factories are user code.
- throw methodError(method, e, "Unable to create call adapter for %s", returnType);
- }
- retrofit.callAdapter
- public CallAdapter<?, ?> callAdapter(Type returnType, Annotation[] annotations) {
- return nextCallAdapter(null, returnType, annotations);
- }
- public CallAdapter<?, ?> nextCallAdapter(@Nullable CallAdapter.Factory skipPast, Type returnType, Annotation[] annotations) {
- int start = callAdapterFactories.indexOf(skipPast) + 1;
- for (int i = start, count = callAdapterFactories.size(); i <count; i++) {
- CallAdapter<?, ?> adapter = callAdapterFactories.get(i).get(returnType, annotations, this);
- if (adapter != null) {
- return adapter;
- }
- }
- ...
- throw new IllegalArgumentException(builder.toString());
- }
总的来说就是从我们之前设置的和自带的 calladapterFactory 中找到一个, 调用 get 获取一个 CallAdapter 的就直接返回.
就用 RxJava2CallAdapterFactory.get 来说明:
- @Override public @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
- // 我们的 returnType 是 Observable<Response<IpModel>> 的 Type.
- // 此方法返回 Observable, 具体看下边 getRawType 源码
- Class<?> rawType = getRawType(returnType);
- // 显然下边都为 false
- boolean isFlowable = rawType == Flowable.class;
- boolean isSingle = rawType == Single.class;
- boolean isMaybe = rawType == Maybe.class;
- if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) {
- return null;
- }
- boolean isResult = false;
- boolean isBody = false;
- Type responseType;
- // 返回泛型参数, 即 Response<IpModel>
- Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
- // 再次返回 Response<IpModel > 的 RawType, 即 retrofit 的 Response
- Class<?> rawObservableType = getRawType(observableType);
- if (rawObservableType == Response.class) {
- // 再次返回 Response<IpModel > 的 UpperBound, 即 IpModel
- responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
- } else if (rawObservableType == Result.class) {
- if (!(observableType instanceof ParameterizedType)) {
- throw new IllegalStateException("Result must be parameterized"
- + "as Result<Foo> or Result<? extends Foo>");
- }
- responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
- isResult = true;
- } else {
- responseType = observableType;
- isBody = true;
- }
- // 由上边可知, 传递进构造函数的 Boolean 都是 false, 创建 RxJava2CallAdapterFactory 时 scheduler 为 null,isAsync 为 false,
- // responseType 为 IpModel
- return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
- }
- Utils.getRawType
- static Class<?> getRawType(Type type) {
- // 是具体类型
- if (type instanceof Class<?>) {
- // Type is a normal class.
- return (Class<?>) type;
- }
- // 是带泛型的类型
- if (type instanceof ParameterizedType) {
- ParameterizedType parameterizedType = (ParameterizedType) type;
- // 返回 Observable
- Type rawType = parameterizedType.getRawType();
- if (!(rawType instanceof Class)) throw new IllegalArgumentException();
- return (Class<?>) rawType;
- }
- // 其他类型
- ...
- Utils.getParameterUpperBound
- static Type getParameterUpperBound(int index, ParameterizedType type) {
- Type[] types = type.getActualTypeArguments();
- Type paramType = types[index];
- return paramType;
- }
- createResponseConverter
- HttpServiceMethod.createResponseConverter
- private static <ResponseT> Converter<ResponseBody, ResponseT> createResponseConverter(Retrofit retrofit, Method method, Type responseType) {
- Annotation[] annotations = method.getAnnotations();
- try {
- return retrofit.responseBodyConverter(responseType, annotations);
- } catch (RuntimeException e) { // Wide exception range because factories are user code.
- throw methodError(method, e, "Unable to create converter for %s", responseType);
- }
- }
- retrofit.responseBodyConverter
- public <T> Converter<ResponseBody, T> responseBodyConverter(Type type, Annotation[] annotations) {
- return nextResponseBodyConverter(null, type, annotations);
- }
- public <T> Converter<ResponseBody, T> nextResponseBodyConverter(@Nullable Converter.Factory skipPast, Type type, Annotation[] annotations) {
- int start = converterFactories.indexOf(skipPast) + 1;
- for (int i = start, count = converterFactories.size(); i <count; i++) {
- Converter<ResponseBody, ?> converter = converterFactories.get(i).responseBodyConverter(type, annotations, this);
- if (converter != null) {
- //noinspection unchecked
- return (Converter<ResponseBody, T>) converter;
- }
- }
- ...
- throw new IllegalArgumentException(builder.toString());
- }
总的来说就是从我们之前设置的和自带的 converterFactory 中找到一个, 然后获取具体的 responseBodyConverter.
就用 GsonConverterFactory.responseBodyConverter 来说明:
- @Override
- public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) {
- TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
- return new GsonResponseBodyConverter<>(gson, adapter);
- }
- GsonResponseBodyConverter
- GsonRequestBodyConverter(Gson gson, TypeAdapter<T> adapter) {
- this.gson = gson;
- this.adapter = adapter;
- }
- @Override
- public RequestBody convert(T value) throws IOException {
- Buffer buffer = new Buffer();
- Writer writer = new OutputStreamWriter(buffer.outputStream(), UTF_8);
- JsonWriter jsonWriter = gson.newJsonWriter(writer);
- adapter.write(jsonWriter, value);
- jsonWriter.close();
- return RequestBody.create(MEDIA_TYPE, buffer.readByteString());
- }
- loadServiceMethod(method).invoke
一圈分析后在返回上边的 retrofit.create 内部分 invoke 的最后
loadServiceMethod(method).invoke(args != null ? args : emptyArgs);
由上边可知 loadServiceMethod 方法返回的是 CallAdapted,
而 CallAdapted 继承关系:
- CallAdapted<ResponseT, ReturnT> extends HttpServiceMethod<ResponseT, ReturnT>
- HttpServiceMethod<ResponseT, ReturnT> extends ServiceMethod<ReturnT>
调用 invoke 是调用到的 HttpServiceMethod.invoke
- @Override final @Nullable ReturnT invoke(Object[] args) {
- Call<ResponseT> call = new OkHttpCall<>(requestFactory, args, callFactory, responseConverter);
- return adapt(call, args);
- }
注意此处的 call 都是 retrofit 的, 不是 okhttp 的.
在其中创建了个 OkHttpCall 对象, 顾名思义, 里边肯定就是通过 okhttp 的 call 进行网络请求的, 绕了一大圈终于找到实际请求的地方了.
接着看 adapt
adapt 实际调用的是 CallAdapted.adapt
- @Override
- protected ReturnT adapt(Call<ResponseT> call, Object[] args) {
- return callAdapter.adapt(call);
- }
此处的 callAdapter 其实就是上边的 RxJava2CallAdapter,
所以就去 RxJava2CallAdapter 中看看
- @Override
- public Object adapt(Call<R> call) {
- Observable<Response<R>> responseObservable = isAsync
- ? new CallEnqueueObservable<>(call)
- : new CallExecuteObservable<>(call);
- Observable<?> observable;
- if (isResult) {
- observable = new ResultObservable<>(responseObservable);
- } else if (isBody) {
- observable = new BodyObservable<>(responseObservable);
- } else {
- observable = responseObservable;
- }
- if (scheduler != null) {
- observable = observable.subscribeOn(scheduler);
- }
- if (isFlowable) {
- return observable.toFlowable(BackpressureStrategy.LATEST);
- }
- if (isSingle) {
- return observable.singleOrError();
- }
- if (isMaybe) {
- return observable.singleElement();
- }
- if (isCompletable) {
- return observable.ignoreElements();
- }
- return RxJavaPlugins.onAssembly(observable);
- }
由上可知
isAsync,isResult,isBody 为 false,
scheduler = null
isFlowable,isSingle,isMaybe,isCompletable 都为 false
所以说最终返回就是 new CallExecuteObservable<>(call);
而 RxJavaPlugins.onAssembly(observable); 中
- public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
- Function<? super Observable, ? extends Observable> f = onObservableAssembly;
- if (f != null) {
- return apply(f, source);
- }
- return source;
- }
我们并没有对 rxjava 设置 hook, 所以返回的还是 CallExecuteObservable,
CallExecuteObservable 创建时传递的 call 就是 OkHttpCall.
接着就是 rxjava 操作了
这里顺带把 rxjava 的一些源码也简单分析了.
- Observable<Response<IpModel>> ipMsg = ipServiceRx.getIpMsg(ip);
- ipMsg.throttleFirst(500, TimeUnit.MILLISECONDS)
- .subscribeOn(Schedulers.io())
- .observeOn(AndroidSchedulers.mainThread())
- .subscribe(new Observer<Response<IpModel>>() {
- @Override
- public void onSubscribe(@NonNull Disposable d) {
- }
- @Override
- public void onNext(@NonNull Response<IpModel> ipModelResponse) {
- IpModel ipModel = ipModelResponse.body();
- if (ipModel == null) {
- return;
- }
- IpData data = ipModel.getData();
- if (data == null) {
- return;
- }
- mEt.setText(getCSData(data));
- }
- @Override
- public void onError(@NonNull Throwable e) {
- mEt.setText(e.toString());
- e.printStackTrace();
- }
- @Override
- public void onComplete() {
- }
- });
rxjava 每次调用一个转换操作, 都会返回一个不同的 observable, 这个 observable 会记录上层的 observable, 从而形成一个从上到下的链, 所以也叫链式操作.
直到最后调用 subscribe, 此时会触发向上订阅, 即下层都会调用上层的 subscribe, 当然每层 observable 都有不同的 subscribeActual 实现, 所以每层其实是上层的 observer, 同时又是下层的 observable.
直到调用到顶层层的 subscribeActual, 即本例中的 CallExecuteObservable 的 subscribeActual:
- @Override protected void subscribeActual(Observer<? super Response<T>> observer) {
- // Since Call is a one-shot type, clone it for each new observer.
- // 就是 OkHttpCall
- Call<T> call = originalCall.clone();
- CallDisposable disposable = new CallDisposable(call);
- observer.onSubscribe(disposable);
- if (disposable.isDisposed()) {
- return;
- }
- boolean terminated = false;
- try {
- // 此处会去调用 OkHttpCall 的 execute, 里边肯定就是 okhttp 的 call.execute
- Response<T> response = call.execute();
- if (!disposable.isDisposed()) {
- // 开始往下层传递消息
- observer.onNext(response);
- }
- if (!disposable.isDisposed()) {
- terminated = true;
- observer.onComplete();
- }
- } catch (Throwable t) {
- ...
- }
- }
- OkHttpCall.execute
- @Override
- public Response<T> execute() throws IOException {
- okhttp3.Call call;
- synchronized (this) {
- // 正确性检查
- ...
- call = rawCall;
- if (call == null) {
- try {
- // 创建一个新的网络请求, 看下边代码
- call = rawCall = createRawCall();
- } catch (IOException | RuntimeException | Error e) {
- throwIfFatal(e); // Do not assign a fatal error to creationFailure.
- creationFailure = e;
- throw e;
- }
- }
- }
- if (canceled) {
- call.cancel();
- }
- // 解析 阻塞式 call.execute() 返回的 okhttp3.Response, 看下边代码
- return parseResponse(call.execute());
- }
- private okhttp3.Call createRawCall() throws IOException {
- okhttp3.Call call = callFactory.newCall(requestFactory.create(args));
- return call;
- }
此处的 callFactory 就是上边 ServiceMethod.parseAnnotations 中创建的 RequestFactory, 通过 RequestFactory 构建出来一个 okhttp 的 request 对象,
最后生成一个 okhttp3.Call 返回.
- Response<T> parseResponse(okhttp3.Response rawResponse) throws IOException {
- ResponseBody rawBody = rawResponse.body();
- // Remove the body's source (the only stateful object) so we can pass the response along.
- rawResponse = rawResponse.newBuilder()
- .body(new NoContentResponseBody(rawBody.contentType(), rawBody.contentLength()))
- .build();
- int code = rawResponse.code();
- if (code <200 || code>= 300) {
- try {
- // Buffer the entire body to avoid future I/O.
- ResponseBody bufferedBody = Utils.buffer(rawBody);
- return Response.error(bufferedBody, rawResponse);
- } finally {
- rawBody.close();
- }
- }
- if (code == 204 || code == 205) {
- rawBody.close();
- return Response.success(null, rawResponse);
- }
- ExceptionCatchingResponseBody catchingBody = new ExceptionCatchingResponseBody(rawBody);
- try {
- // 此处会用我们之前设置的 Converter(即 GsonResponseBodyConverter) 来解析出具体的 bean 对象,
- T body = responseConverter.convert(catchingBody);
- return Response.success(body, rawResponse);
- } catch (RuntimeException e) {
- // If the underlying source threw an exception, propagate that rather than indicating it was
- // a runtime exception.
- catchingBody.throwIfCaught();
- throw e;
- }
- }
- GsonResponseBodyConverter.convert
- @Override public T convert(ResponseBody value) throws IOException {
- JsonReader jsonReader = gson.newJsonReader(value.charStream());
- try {
- T result = adapter.read(jsonReader);
- if (jsonReader.peek() != JsonToken.END_DOCUMENT) {
- throw new JsonIOException("JSON document was not fully consumed.");
- }
- return result;
- } finally {
- value.close();
- }
- observer.onNext(response);
向下传递, 此时还是 subscribeOn(Schedulers.io()) 指定的线程上操作的,
当传递到 observeOn(AndroidSchedulers.mainThread()) 时, 此 observable 会把线程转换成 mainThread,
最后传递到 subscribe 传递的 observer 的 onNext 中
其他
返回值中带不带 Response 逻辑有什么区别
Observable<Response<IpModel>> getIpMsg(@Query("ip") String ip);
上边的分析都是基于带 Response 的,
那如果定义接口时不带呢, 即
Observable<IpModel> getIpMsg(@Query("ip") String ip);
那么接着上边的 createCallAdapter 分析里的 RxJava2CallAdapterFactory.get 来说明:
- @Override public @Nullable CallAdapter<?, ?> get(Type returnType, Annotation[] annotations, Retrofit retrofit) {
- // 我们的 returnType 是 Observable<IpModel > 的 Type.
- // 此方法返回 Observable, 具体看下边 getRawType 源码
- Class<?> rawType = getRawType(returnType);
- // 显然下边都为 false
- boolean isFlowable = rawType == Flowable.class;
- boolean isSingle = rawType == Single.class;
- boolean isMaybe = rawType == Maybe.class;
- if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) {
- return null;
- }
- boolean isResult = false;
- boolean isBody = false;
- Type responseType;
- // 返回泛型参数, 即 IpModel
- Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType);
- // 还是 IpModel
- Class<?> rawObservableType = getRawType(observableType);
- if (rawObservableType == Response.class) {
- responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
- } else if (rawObservableType == Result.class) {
- if (!(observableType instanceof ParameterizedType)) {
- throw new IllegalStateException("Result must be parameterized"
- + "as Result<Foo> or Result<? extends Foo>");
- }
- responseType = getParameterUpperBound(0, (ParameterizedType) observableType);
- isResult = true;
- } else {
- // 此时会进入此逻辑, isBody 为 true 了
- responseType = observableType;
- isBody = true;
- }
- // 由上边可知, 传递进构造函数的 Boolean 除了 isBody 为 true, 其他都是 false, 创建 RxJava2CallAdapterFactory 时 scheduler 为 null,isAsync 为 false,
- // responseType 为 IpModel
- return new RxJava2CallAdapter(responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false);
- }
然后接着 loadServiceMethod(method).invoke 里
- RxJava2CallAdapter.adapt
- @Override
- public Object adapt(Call<R> call) {
- Observable<Response<R>> responseObservable = isAsync
- ? new CallEnqueueObservable<>(call)
- : new CallExecuteObservable<>(call);
- Observable<?> observable;
- if (isResult) {
- observable = new ResultObservable<>(responseObservable);
- } else if (isBody) {
- observable = new BodyObservable<>(responseObservable);
- } else {
- observable = responseObservable;
- }
- if (scheduler != null) {
- observable = observable.subscribeOn(scheduler);
- }
- if (isFlowable) {
- return observable.toFlowable(BackpressureStrategy.LATEST);
- }
- if (isSingle) {
- return observable.singleOrError();
- }
- if (isMaybe) {
- return observable.singleElement();
- }
- if (isCompletable) {
- return observable.ignoreElements();
- }
- return RxJavaPlugins.onAssembly(observable);
- }
由上可知
isAsync,isResult,
isBody 为 true,
scheduler = null,
isFlowable,isSingle,isMaybe,isCompletable 都为 false
所以说最终返回就是 new BodyObservable<>(responseObservable);
- BodyObservable(Observable<Response<T>> upstream) {
- this.upstream = upstream;
- }
- @Override protected void subscribeActual(Observer<? super T> observer) {
- upstream.subscribe(new BodyObserver<T>(observer));
- }
就是说最上层是 responseObservable,
那么当 responseObservable 开始下传数据时, 会调用 BodyObserver 的 onNext:
- @Override
- public void onNext(Response<R> response) {
- if (response.isSuccessful()) {
- // 会把 body 直接传递到下层, 即 IpModal
- observer.onNext(response.body());
- } else {
- terminated = true;
- Throwable t = new HttpException(response);
- try {
- observer.onError(t);
- } catch (Throwable inner) {
- Exceptions.throwIfFatal(inner);
- RxJavaPlugins.onError(new CompositeException(t, inner));
- }
- }
- }
此处的 response 是 retrofit 的,
response 会携带更多的此次网络请求的信息, 如果只返回实际的 bean/modal 对象, 那么就不能够有更多控制.
来源: http://www.bubuko.com/infodetail-3265944.html