RxJava; 如何同步发出 observable

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/35062485/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 16:16:49  来源:igfitidea点击:

RxJava; How to emit observables synchronously

javarx-java

提问by Thomas Neuteboom

I want to synchronously emit two Observable objects (which are asynchronous), one after the other where it returns the firstemitted Observable object. If the first one fails, it should not emit the second one.

我想同步发出两个 Observable 对象(它们是异步的),一个接一个地返回第一个发出的 Observable 对象。如果第一个失败,它不应该发出第二个。

Let's say we have one Observable that signs a user in, and another Observable that automatically selects the user's account, aftersigning in.

假设我们有一个 Observable 用于登录用户,另一个 Observable登录自动选择用户的帐户。

This is what I tried:

这是我尝试过的:

public Observable<AccessToken> signInAndSelectAccount(String username, String password)
{

    Observable<AccessToken> ob1 = ...; // Sign in.
    Observable<Account> ob2 = ...; // Select account.


    return Observable.zip(
            ob1,
            ob2,
            new Func2<AccessToken, Account, AccessToken>() {
                @Override
                public AccessToken call(AccessToken accessToken, Account account)
                {
                     return accessToken;
                }
            });
}

This unfortunately does not work for my use case. It will emit/call both observables parallel, starting with 'ob1'.

不幸的是,这不适用于我的用例。它将以 'ob1' 开始,并行地发出/调用两个 observable。

Did someone encounter a similar use case? Or has an idea on how to make observables wait for eachother in a synchronous way, where the first emitted can be returned?

有人遇到过类似的用例吗?或者有一个关于如何让 observables 以同步方式等待彼此的想法,可以返回第一个发出的?

Thanks in advance.

提前致谢。

采纳答案by eleven

There is no such term as "wait" in reactive programming. You need to think about creating of a data stream, where one Observablecould be triggered by another. In your case after receiving tokenyou need to receive account. It could look like this:

在反应式编程中没有“等待”这样的术语。您需要考虑创建一个数据流,其中一个Observable可以被另一个触发。在您的情况下,收到后token您需要收到account. 它可能看起来像这样:

Observable<Account> accountObservable = Observable.create(new Observable.OnSubscribe<AccessToken>() {
    @Override public void call(Subscriber<? super AccessToken> subscriber) {
        subscriber.onNext(new AccessToken());
        subscriber.onCompleted();
    }
}).flatMap(accessToken -> Observable.create(new Observable.OnSubscribe<Account>() {
    @Override public void call(Subscriber<? super Account> subscriber) {
        subscriber.onNext(new Account(accessToken));
        subscriber.onCompleted();
    }
}));

回答by Tomá? Dvo?ák

I don't know Java, but the solution in Scala would probably be this, hope it is readable to you

我不知道 Java,但 Scala 中的解决方案可能是这样,希望它对你来说是可读的

import rx.lang.scala.Observable

class AccessToken
class Account

case class TokenAndAccount(token: AccessToken, account: Account)

val accessTokenSource = Observable.just(new AccessToken)
val accountSource = Observable.just(new Account)

accessTokenSource
   .flatMap(token ? accountSource.map(account ? TokenAndAccount(token, account)))
   .subscribe(tokenAndAccount ? println(tokenAndAccount))

Basically flatMapwill make sure that the accountSource.map...is used only after the token from accessTokenSourcehas been emitted. Inside the accountSource.mapwe combine the obtained token and account together for later usage in subscribe.

基本上flatMap将确保accountSource.map...只有在accessTokenSource发出令牌之后才使用。在里面,accountSource.map我们将获得的令牌和帐户结合在一起,以备后用subscribe

flatMapis one of the most useful operators, be sure to read it's docs and perhaps some tutorials.

flatMap是最有用的操作符之一,一定要阅读它的文档和一些教程。

回答by netgui

You can also use rx.observables.BlockingObservable e.g.:

您还可以使用 rx.observables.BlockingObservable 例如:

BlockingObservable.from(/**/).single();

回答by Phan Van Linh

You can use Single.blockingGetfor synchronous call

您可以使用Single.blockingGet进行同步调用

// example 
signIn(name,password).blockingGet()