Java 获取 Observable 的最新值并立即发出
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/37009207/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Get the latest value of an Observable and emit it immeditely
提问by E-Kami
I'm trying to get the latest value of a given Observable
and get it to emit
immediately once it's called. Given the code below as an example:
我正在尝试获取给定的最新值,Observable
并在调用后立即发出。以下面的代码为例:
return Observable.just(myObservable.last())
.flatMap(myObservable1 -> {
return myObservable1;
})
.map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object
This does not work because by doing this the flatMap
will emit myObservable1
which in turn will have
to emit to reach the map
.
I don't know if doing such thing is even possible. Do anyone have any clue on how to achieve this goal? Thank you
这是行不通的,因为通过这样做,flatMap
将发出myObservable1
,而后者又必须发出才能到达map
. 我不知道这样做是否可能。有没有人知道如何实现这个目标?谢谢
采纳答案by MatBos
last()
method will not be of any help here as it waits for the Observable to terminate to give you the last item emitted.
last()
方法在这里没有任何帮助,因为它等待 Observable 终止以提供最后发出的项目。
Assuming that you do not have the control over the emitting observable you could simply create a BehaviorSubject
and subscribe it to the observable that emits the data that you want to listen and then subscribe to the created subject. Since Subject
is both Observable
and Subscriber
you will get what you want.
假设您无法控制发出的 observable,您可以简单地创建一个 observableBehaviorSubject
并将其订阅到发出您想要收听的数据的 observable,然后订阅创建的主题。因为Subject
两者都是Observable
,Subscriber
你会得到你想要的。
I think (do not have the time to check it now) you may have to manually unsubscribe from the original observable as the BehaviorSubject
once all of his subscribers unsubscribe will not unsubscribe automatically.
我认为(现在没有时间检查它)您可能必须手动取消订阅原始 observable,因为BehaviorSubject
一旦他的所有订阅者取消订阅就不会自动取消订阅。
Something like this:
像这样的东西:
BehaviorSubject subject = new BehaviorSubject();
hotObservable.subscribe(subject);
subject.subscribe(thing -> {
// Here just after subscribing
// you will receive the last emitted item, if there was any.
// You can also always supply the first item to the behavior subject
});
http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html
http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html
回答by dieyidezui
In RxJava, subscriber.onXXX is called asynchronous.It means that if your Observable emit items in new thread, you can never get the last item before return, except you block the thread and wait for the item.But if the Observable emit item synchronously and you dont' change it's thread by subscribeOn and observOn, such as the code:
在 RxJava 中,subscriber.onXXX 被称为异步的。这意味着如果你的 Observable 在新线程中发射项目,你永远无法获得返回前的最后一个项目,除非你阻塞线程并等待项目。但如果 Observable 同步发射项目并且您不会通过 subscribeOn 和 observOn 更改它的线程,例如代码:
Observable.just(1,2,3).subscribe();
In this case, you can get the last item by doing like this:
在这种情况下,您可以通过执行以下操作来获取最后一项:
Integer getLast(Observable<Integer> o){
final int[] ret = new int[1];
Observable.last().subscribe(i -> ret[0] = i);
return ret[0];
}
It's a bad idea doing like this.RxJava prefer you to do asynchronous work by it.
这样做是个坏主意。RxJava 更喜欢你用它来做异步工作。
回答by ndori
What you actually want to achieve here is to take an asynchronous task and transform it to a synchronous one.
您在这里真正想要实现的是将异步任务转换为同步任务。
There are several ways to achieve it, each one with it's pros and cons:
有几种方法可以实现它,每种方法都有其优点和缺点:
- Use toBlocking()- it means that this thread will be BLOCKED, until the stream is finish, in order to get only one item simply use first()as it will complete once an item is delivered.
let's say your entire stream is
Observable<T> getData();
then a method that will get the last value immediately will look like this:
- 使用toBlocking()- 这意味着这个线程将被阻塞,直到流结束,为了只获得一个项目,只需使用first()因为它会在一个项目交付后完成。假设您的整个流是
Observable<T> getData();
一个可以立即获取最后一个值的方法,如下所示:
public T getLastItem(){
return getData().toBlocking().first();
}
public T getLastItem(){
return getData().toBlocking().first();
}
please don't use last()as it will wait for the stream to complete and only then will emit the last item.
请不要使用last() ,因为它会等待流完成,然后才会发出最后一项。
If your stream is a network request and it didn't get any item yet this will block your thread!, so only use it when you are sure that there is an item available immediately (or if you really want a block...)
如果你的流是一个网络请求并且它还没有得到任何项目,这会阻塞你的线程!,所以只有当你确定有一个项目立即可用时才使用它(或者如果你真的想要一个块......)
another option is to simply cache the last result, something like this:
getData().subscribe(t-> cachedT = t;) //somewhere in the code and it will keep saving the last item delivered public T getLastItem(){ return cachedT; }
另一种选择是简单地缓存最后一个结果,如下所示:
getData().subscribe(t-> cachedT = t;) //在代码中的某个地方,它会一直保存最后交付的项目 public T getLastItem(){ return cachedT; }
if there wasn't any item sent by the time you request it you will get null or whatever initial value you have set. the problem with this approch is that the subscribe phase might happen after the get and might make a race condition if used in 2 different threads.
如果在您请求它时没有发送任何项目,您将获得 null 或您设置的任何初始值。这种方法的问题是订阅阶段可能发生在 get 之后,如果在 2 个不同的线程中使用,可能会产生竞争条件。