RxJava:链接可观察对象
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/26935821/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
RxJava: chaining observables
提问by Mikhail
Is it possible to implement something like next chaining using RxJava:
是否可以使用 RxJava 实现类似 next 链接的功能:
loginObservable()
.then( (someData) -> {
// returns another Observable<T> with some long operation
return fetchUserDataObservable(someData);
}).then( (userData) -> {
// it should be called when fetching user data completed (with userData of type T)
cacheUserData(userData);
}).then( (userData) -> {
// it should be called after all previous operations completed
displayUserData()
}).doOnError( (error) -> {
//do something
})
I found this library very interesting, but can't figure our how to chain requests where each other depends on previous.
我发现这个库非常有趣,但无法弄清楚我们如何将请求链接到彼此依赖的地方。
采纳答案by Benjamin Gruenbaum
Sure, RxJava supports .map
which does this. From the RxJava Wiki:
当然,RxJava 支持.map
这样做。来自 RxJava 维基:
Basically, it'd be:
基本上,它会是:
loginObservable()
.switchMap( someData -> fetchUserDataObservable(someData) )
.map( userData -> cacheUserData(userData) )
.subscribe(new Subscriber<YourResult>() {
@Override
public void onCompleted() {
// observable stream has ended - no more logins possible
}
@Override
public void onError(Throwable e) {
// do something
}
@Override
public void onNext(YourType yourType) {
displayUserData();
}
});
回答by lingfliu
try using scan()
尝试使用 scan()
Flowable.fromArray(array).scan(...).subscribe(...)
回答by solidak
This is the top post when Googling RxJava chain observablesso I'll just add another common case where you wouldn't want to transformthe data you receive, but chain it with another action (setting the data to a database, for example). Use .flatmap()
. Here's an example:
这是谷歌搜索RxJava 链 observables时的顶级帖子,所以我将添加另一个常见情况,您不想转换收到的数据,而是将它与另一个操作链接起来(例如,将数据设置到数据库)。使用.flatmap()
. 下面是一个例子:
mDataManager
.fetchQuotesFromApi(limit)
.subscribeOn(mSchedulerProvider.io())
.observeOn(mSchedulerProvider.ui())
// OnErrorResumeNext and Observable.error() would propagate the error to
// the next level. So, whatever error occurs here, would get passed to
// onError() on the UI side.
.onErrorResumeNext(Function { Observable.error<List<Quote>>(it) })
.flatMap { t: List<Quote> ->
// Chain observable as such
mDataManager.setQuotesToDb(t).subscribe(
{},
{ e { "setQuotesToDb() error occurred: ${it.localizedMessage}" } },
{ d { "Done server set" } }
)
Observable.just(t)
}
.subscribeBy(
onNext = {},
onError = { mvpView?.showError("No internet connection") },
onComplete = { d { "onComplete(): done with fetching quotes from api" } }
)
This is RxKotlin2, but the idea is the same with RxJava & RxJava2:
这是 RxKotlin2,但其思想与 RxJava & RxJava2 相同:
Quick explanation:
快速解释:
- we try to fetch some data (quotes in this example) from an api with
mDataManager.fetchQuotesFromApi()
- We subscribe the observable to do stuff on
.io()
thread and show results on.ui()
thread. onErrorResumeNext()
makes sure that whatever error we encounter from fetching data is caught in this method. I wanna terminate the entire chain when there is an error there, so I return anObservable.error()
.flatmap()
is the chaining part. I wanna be able to set whatever data I get from the API to my database. I'm not transforming the data I received using.map()
, I'm simply doing something elsewith that data withouttransforming it.- I subscribe to the last chain of observables. If an error occurred with fetching data (first observable), it would be handled (in this case, propagated to the subscribed
onError()
) withonErrorResumeNext()
- I am very conscious that I'm subscribing to the DB observable (inside
flatmap()
). Any error that occurs through this observable will NOTbe propagated to the lastsubscribeBy()
methods, since it is handled inside thesubscribe()
method inside the.flatmap()
chain.
- 我们尝试从 api 中获取一些数据(本例中的引号)
mDataManager.fetchQuotesFromApi()
- 我们订阅 observable 以在
.io()
线程上执行操作并在线程上显示结果.ui()
。 onErrorResumeNext()
确保我们在获取数据时遇到的任何错误都在此方法中被捕获。当那里出现错误时,我想终止整个链,所以我返回一个Observable.error()
.flatmap()
是链接部分。我希望能够将从 API 获得的任何数据设置到我的数据库中。我没有转换我收到的数据.map()
,我只是用这些数据做其他事情而不转换它。- 我订阅了最后一个可观察链。如果在获取数据(第一个 observable)时发生错误,它将被处理(在这种情况下,传播到 subscribed
onError()
)onErrorResumeNext()
- 我很清楚我订阅了 DB observable(inside
flatmap()
)。通过这个 observable 发生的任何错误都不会传播到最后一个subscribeBy()
方法,因为它是subscribe()
在.flatmap()
链内的方法内部处理的。
The code comes from this projectwhich is located here: https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt
代码来自这个位于这里的项目:https: //github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt