java 使用 RxJava 和 Okhttp

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/32687921/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 20:37:08  来源:igfitidea点击:

Using RxJava and Okhttp

javaandroidrx-javaokhttp

提问by Santos Black

I want to request to a url using okhttp in another thread (like IO thread) and get Responsein the Android main thread, But I don't know how to create an Observable.

我想在另一个线程(如 IO 线程)中使用 okhttp 请求一个 url 并进入ResponseAndroid 主线程,但我不知道如何创建Observable.

回答by Saeed Masoumi

First add RxAndroidto your dependencies, then create your Observablelike this:

首先添加RxAndroid到您的依赖项,然后Observable像这样创建:

 Subscription subscription =   Observable.create(new Observable.OnSubscribe<Response>() {
        OkHttpClient client = new OkHttpClient();
          @Override
          public void call(Subscriber<? super Response> subscriber) {
            try {
              Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
              if (response.isSuccessful()) {
                  if(!subscriber.isUnsubscribed()){
                     subscriber.onNext(response);
                  }
                  subscriber.onCompleted();
              } else if (!response.isSuccessful() && !subscriber.isUnsubscribed()) {
                  subscriber.onError(new Exception("error"));
                }
            } catch (IOException e) {
              if (!subscriber.isUnsubscribed()) {
                  subscriber.onError(e);
              }
            }
          }
        })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Response>() {
              @Override
              public void onCompleted() {

              }

              @Override
              public void onError(Throwable e) {

              }

              @Override
              public void onNext(Response response) {

              }
            });

It will request your url in another thread (io thread) and observe it on android main thread.

它将在另一个线程(io 线程)中请求您的 url 并在 android 主线程上观察它。

And finally when you leave the screen use subsribtion.unsubscribe()to avoid memory leak.

最后当你离开屏幕时使用subsribtion.unsubscribe()以避免内存泄漏。

When you use Observable.create, you should write a lot of boilerplate code, also you must handle subscription by your own. A better alternative is to use defer. Form the doc:

使用时Observable.create要写很多样板代码,还得自己处理订阅。更好的选择是使用defer。形成文档:

do not create the Observable until the observer subscribes, and create a fresh Observable for each observer

The Defer operator waits until an observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual sequence.

在观察者订阅之前不要创建 Observable,并为每个观察者创建一个新的 Observable

Defer 操作符等待观察者订阅它,然后它生成一个 Observable,通常带有一个 Observable 工厂函数。它为每个订阅者重新执行此操作,因此尽管每个订阅者可能认为它订阅了同一个 Observable,但实际上每个订阅者都有自己的单独序列。

So as Marcin Kozińskimentioned, you just need to do this:

所以正如Marcin Koziński提到的,你只需要这样做:

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});

回答by Marcin Koziński

It's easier and safer to use Observable.defer()instead of Observable.create():

使用它更容易、更安全,Observable.defer()而不是Observable.create()

final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
    @Override public Observable<Response> call() {
        try {
            Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
            return Observable.just(response);
        } catch (IOException e) {
            return Observable.error(e);
        }
    }
});

That way unsubscription and backpressure are handled for you. Here's a great post by Dan Lewabout create()and defer().

这样就可以为您处理取消订阅和背压。这是Dan Lew关于create()的精彩帖子defer()

If you wished to go the Observable.create()route then it should look more like in this librarywith isUnsubscribed()calls sprinkled everywhere. And I believe this still doesn't handle backpressure.

如果你想走这Observable.create()条路线,那么它应该看起来更像是在这个库中isUnsubscribed()到处都是调用。而且我相信这仍然不能处理背压。

回答by FRR

I realise this post is a bit old, but there's a new and more convenient way of doing this now

我意识到这篇文章有点旧,但现在有一种新的更方便的方法来做到这一点

Observable.fromCallable {
        client.newCall(Request.Builder().url("your url").build()).execute()
    }

More info: https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/

更多信息:https: //artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/

回答by Brice

I came late to the discussion but, if for some reason the code need to stream the response body, then deferor fromCallablewon't do it. Instead one can employ the usingoperator.

我来晚了,但是,如果由于某种原因代码需要流式传输响应正文,那么deferfromCallable不会这样做。相反,可以雇用using操作员。

Single.using(() -> okHttpClient.newCall(okRequest).execute(), // 1
             response -> { // 2
                 ...

                 return Single.just((Consumer<OutputStream>) fileOutput -> {
                     try (InputStream upstreamResponseStream = response.body().byteStream();
                          OutputStream fileOutput = responseBodyOutput) {
                         ByteStreams.copy(upstreamResponseStream, output);
                     }
                 });
             },
             Response::close, // 3
             false) // 4
      .subscribeOn(Schedulers.io()) // 5
      .subscribe(copier -> copier.accept(...), // 6
                 throwable -> ...); // 7
  1. The first lambda executes the response after upon subscription.
  2. The second lambda creates the observable type, here with Single.just(...)
  3. The third lambda disposes the response. With deferone could have used the try-with-resources style.
  4. Set the eagertoggle to falseto make the disposer called after the terminal event, i.e. after the subscription consumer has been executed.
  5. Of course make the thing happen on another threadpool
  6. Here's the lambda that will consume the response body. Without eagerset to false, the code will raise an IOException with reason 'closed' because the response will be already closed before entering this lambda.
  7. The onErrorlambda should handle exceptions, especially the IOExceptionthat cannot be anymore caught with the usingoperator as it was possible with a try/catch with defer.
  1. 第一个 lambda在订阅后执行响应。
  2. 第二个 lambda 创建了 observable 类型,这里是 Single.just(...)
  3. 第三个 lambda 处理响应。随着defer人能使用的尝试,与资源风格。
  4. eager切换设置false为 使处置器在终端事件之后调用,即在订阅消费者执行之后。
  5. 当然让事情发生在另一个线程池上
  6. 这是将消耗响应主体的 lambda。如果未eager设置为false,代码将引发原因为“已关闭”的 IOException,因为在输入此 lambda 之前,响应已关闭。
  7. onError拉姆达应该处理异常,尤其是IOException不能再被抓到与using运营商,因为它是可能的一个try / catch用defer

回答by Arul Mani

Okhttp3 with RxSingle background API call.

Okhttp3 与 RxSingle 后台 API 调用。

     Disposable disposables = Single.fromCallable(() -> {
        Log.e(TAG, "clearData: Thread[" + Thread.currentThread().getName() + "]");
        OkHttpClient client = Util.getHttpClient();
        Request request = new Request.Builder()
                .addHeader("Authorization", "Bearer " + Util.getUserToken())
                .url(BuildConfig.BASE_URL + ApiConstants.DELETE_FEEDS)
                .build();

        Response response = client.newCall(request).execute();
        if(response.isSuccessful()) {
           ...
           return ; // Any  type
        } else {
           return ; // Any type        
        }
    }).subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe((result) -> {
           Log.d(TAG, "api() completed");
      });


    compositeDisposable.add(disposables);