Java 获取 Observable 的最新值并立即发出

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37009207/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 18:54:52  来源:igfitidea点击:

Get the latest value of an Observable and emit it immeditely

javarx-javareactive-programmingrx-android

提问by E-Kami

I'm trying to get the latest value of a given Observableand get it to emit immediately once it's called. Given the code below as an example:

我正在尝试获取给定的最新值,Observable并在调用后立即发出。以下面的代码为例:

return Observable.just(myObservable.last())
    .flatMap(myObservable1 -> {
        return myObservable1;
    })
    .map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object

This does not work because by doing this the flatMapwill emit myObservable1which in turn will have to emit to reach the map. I don't know if doing such thing is even possible. Do anyone have any clue on how to achieve this goal? Thank you

这是行不通的,因为通过这样做,flatMap将发出myObservable1,而后者又必须发出才能到达map. 我不知道这样做是否可能。有没有人知道如何实现这个目标?谢谢

采纳答案by MatBos

last()method will not be of any help here as it waits for the Observable to terminate to give you the last item emitted.

last()方法在这里没有任何帮助,因为它等待 Observable 终止以提供最后发出的项目。

Assuming that you do not have the control over the emitting observable you could simply create a BehaviorSubjectand subscribe it to the observable that emits the data that you want to listen and then subscribe to the created subject. Since Subjectis both Observableand Subscriberyou will get what you want.

假设您无法控制发出的 observable,您可以简单地创建一个 observableBehaviorSubject并将其订阅到发出您想要收听的数据的 observable,然后订阅创建的主题。因为Subject两者都是ObservableSubscriber你会得到你想要的。

I think (do not have the time to check it now) you may have to manually unsubscribe from the original observable as the BehaviorSubjectonce all of his subscribers unsubscribe will not unsubscribe automatically.

我认为(现在没有时间检查它)您可能必须手动取消订阅原始 observable,因为BehaviorSubject一旦他的所有订阅者取消订阅就不会自动取消订阅。

Something like this:

像这样的东西:

BehaviorSubject subject = new BehaviorSubject();
hotObservable.subscribe(subject);
subject.subscribe(thing -> {
    // Here just after subscribing 
    // you will receive the last emitted item, if there was any.
    // You can also always supply the first item to the behavior subject
});

http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html

http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html

回答by dieyidezui

In RxJava, subscriber.onXXX is called asynchronous.It means that if your Observable emit items in new thread, you can never get the last item before return, except you block the thread and wait for the item.But if the Observable emit item synchronously and you dont' change it's thread by subscribeOn and observOn, such as the code:

在 RxJava 中,subscriber.onXXX 被称为异步的。这意味着如果你的 Observable 在新线程中发射项目,你永远无法获得返回前的最后一个项目,除非你阻塞线程并等待项目。但如果 Observable 同步发射项目并且您不会通过 subscribeOn 和 observOn 更改它的线程,例如代码:

Observable.just(1,2,3).subscribe();

In this case, you can get the last item by doing like this:

在这种情况下,您可以通过执行以下操作来获取最后一项:

Integer getLast(Observable<Integer> o){
    final int[] ret = new int[1];
    Observable.last().subscribe(i -> ret[0] = i);
    return ret[0];
}

It's a bad idea doing like this.RxJava prefer you to do asynchronous work by it.

这样做是个坏主意。RxJava 更喜欢你用它来做异步工作。

回答by ndori

What you actually want to achieve here is to take an asynchronous task and transform it to a synchronous one.

您在这里真正想要实现的是将异步任务转换为同步任务。

There are several ways to achieve it, each one with it's pros and cons:

有几种方法可以实现它,每种方法都有其优点和缺点:

  • Use toBlocking()- it means that this thread will be BLOCKED, until the stream is finish, in order to get only one item simply use first()as it will complete once an item is delivered. let's say your entire stream is Observable<T> getData();then a method that will get the last value immediately will look like this:
  • 使用toBlocking()- 这意味着这个线程将被阻塞,直到流结束,为了只获得一个项目,只需使用first()因为它会在一个项目交付后完成。假设您的整个流是Observable<T> getData();一个可以立即获取最后一个值的方法,如下所示:

public T getLastItem(){ return getData().toBlocking().first(); }

public T getLastItem(){ return getData().toBlocking().first(); }

please don't use last()as it will wait for the stream to complete and only then will emit the last item.

请不要使用last() ,因为它会等待流完成,然后才会发出最后一项。

If your stream is a network request and it didn't get any item yet this will block your thread!, so only use it when you are sure that there is an item available immediately (or if you really want a block...)

如果你的流是一个网络请求并且它还没有得到任何项目,这会阻塞你的线程!,所以只有当你确定有一个项目立即可用时才使用它(或者如果你真的想要一个块......)

  • another option is to simply cache the last result, something like this:

    getData().subscribe(t-> cachedT = t;) //somewhere in the code and it will keep saving the last item delivered public T getLastItem(){ return cachedT; }

  • 另一种选择是简单地缓存最后一个结果,如下所示:

    getData().subscribe(t-> cachedT = t;) //在代码中的某个地方,它会一直保存最后交付的项目 public T getLastItem(){ return cachedT; }

if there wasn't any item sent by the time you request it you will get null or whatever initial value you have set. the problem with this approch is that the subscribe phase might happen after the get and might make a race condition if used in 2 different threads.

如果在您请求它时没有发送任何项目,您将获得 null 或您设置的任何初始值。这种方法的问题是订阅阶段可能发生在 get 之后,如果在 2 个不同的线程中使用,可能会产生竞争条件。