RxJava:链接可观察对象

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/26935821/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 03:32:07  来源:igfitidea点击:

RxJava: chaining observables

javaandroidscalapromiserx-java

提问by Mikhail

Is it possible to implement something like next chaining using RxJava:

是否可以使用 RxJava 实现类似 next 链接的功能:

loginObservable()
   .then( (someData) -> {
      // returns another Observable<T> with some long operation
      return fetchUserDataObservable(someData);

   }).then( (userData) -> {
      // it should be called when fetching user data completed (with userData of type T)
      cacheUserData(userData);

   }).then( (userData) -> {
      // it should be called after all previous operations completed
      displayUserData()

   }).doOnError( (error) -> {
      //do something
   })

I found this library very interesting, but can't figure our how to chain requests where each other depends on previous.

我发现这个库非常有趣,但无法弄清楚我们如何将请求链接到彼此依赖的地方。

采纳答案by Benjamin Gruenbaum

Sure, RxJava supports .mapwhich does this. From the RxJava Wiki:

当然,RxJava 支持.map这样做。来自 RxJava 维基:

map

地图

Basically, it'd be:

基本上,它会是:

loginObservable()
   .switchMap( someData -> fetchUserDataObservable(someData) )
   .map( userData -> cacheUserData(userData) )
   .subscribe(new Subscriber<YourResult>() {
        @Override
        public void onCompleted() {
           // observable stream has ended - no more logins possible
        }
        @Override
        public void onError(Throwable e) {
            // do something
        }
        @Override
        public void onNext(YourType yourType) {
            displayUserData();
        }
    });

回答by lingfliu

try using scan()

尝试使用 scan()

Flowable.fromArray(array).scan(...).subscribe(...)

回答by solidak

This is the top post when Googling RxJava chain observablesso I'll just add another common case where you wouldn't want to transformthe data you receive, but chain it with another action (setting the data to a database, for example). Use .flatmap(). Here's an example:

这是谷歌搜索RxJava 链 observables时的顶级帖子,所以我将添加另一个常见情况,您不想转换收到的数据,而是将它与另一个操作链接起来(例如,将数据设置到数据库)。使用.flatmap(). 下面是一个例子:

mDataManager
    .fetchQuotesFromApi(limit)
    .subscribeOn(mSchedulerProvider.io())
    .observeOn(mSchedulerProvider.ui())
    // OnErrorResumeNext and Observable.error() would propagate the error to
    // the next level. So, whatever error occurs here, would get passed to
    // onError() on the UI side.
    .onErrorResumeNext(Function { Observable.error<List<Quote>>(it) })
    .flatMap { t: List<Quote> ->
        // Chain observable as such
        mDataManager.setQuotesToDb(t).subscribe(
            {},
            { e { "setQuotesToDb() error occurred: ${it.localizedMessage}" } },
            { d { "Done server set" } }
        )
        Observable.just(t)
    }
    .subscribeBy(
        onNext = {},
        onError = { mvpView?.showError("No internet connection") },
        onComplete = { d { "onComplete(): done with fetching quotes from api" } }
    )

This is RxKotlin2, but the idea is the same with RxJava & RxJava2:

这是 RxKotlin2,但其思想与 RxJava & RxJava2 相同:

Quick explanation:

快速解释:

  • we try to fetch some data (quotes in this example) from an api with mDataManager.fetchQuotesFromApi()
  • We subscribe the observable to do stuff on .io()thread and show results on .ui()thread.
  • onErrorResumeNext()makes sure that whatever error we encounter from fetching data is caught in this method. I wanna terminate the entire chain when there is an error there, so I return an Observable.error()
  • .flatmap()is the chaining part. I wanna be able to set whatever data I get from the API to my database. I'm not transforming the data I received using .map(), I'm simply doing something elsewith that data withouttransforming it.
  • I subscribe to the last chain of observables. If an error occurred with fetching data (first observable), it would be handled (in this case, propagated to the subscribed onError()) with onErrorResumeNext()
  • I am very conscious that I'm subscribing to the DB observable (inside flatmap()). Any error that occurs through this observable will NOTbe propagated to the last subscribeBy()methods, since it is handled inside the subscribe()method inside the .flatmap()chain.
  • 我们尝试从 api 中获取一些数据(本例中的引号) mDataManager.fetchQuotesFromApi()
  • 我们订阅 observable 以在.io()线程上执行操作并在线程上显示结果.ui()
  • onErrorResumeNext()确保我们在获取数据时遇到的任何错误都在此方法中被捕获。当那里出现错误时,我想终止整个链,所以我返回一个Observable.error()
  • .flatmap()是链接部分。我希望能够将从 API 获得的任何数据设置到我的数据库中。我没有转换我收到的数据.map(),我只是用这些数据做其他事情而不转换它。
  • 我订阅了最后一个可观察链。如果在获取数据(第一个 observable)时发生错误,它将被处理(在这种情况下,传播到 subscribed onError()onErrorResumeNext()
  • 我很清楚我订阅了 DB observable(inside flatmap())。通过这个 observable 发生的任何错误都不会传播到最后一个subscribeBy()方法,因为它是subscribe().flatmap()链内的方法内部处理的。

The code comes from this projectwhich is located here: https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt

代码来自这个位于这里的项目https: //github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt