Java 如何等待异步 Observable 完成
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/24289093/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to wait for async Observable to complete
提问by user3753094
I'm trying to build a sample using rxjava. The sample should orchestrate a ReactiveWareService and a ReactiveReviewService retruning a WareAndReview composite.
我正在尝试使用 rxjava 构建一个示例。该示例应编排 ReactiveWareService 和 ReactiveReviewService 重新调整 WareAndReview 组合。
ReactiveWareService
public Observable<Ware> findWares() {
return Observable.from(wareService.findWares());
}
ReactiveReviewService: reviewService.findReviewsByItem does a ThreadSleep to simulate a latency!
public Observable<Review> findReviewsByItem(final String item) {
return Observable.create((Observable.OnSubscribe<Review>) observer -> executor.execute(() -> {
try {
List<Review> reviews = reviewService.findReviewsByItem(item);
reviews.forEach(observer::onNext);
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
}));
}
public List<WareAndReview> findWaresWithReviews() throws RuntimeException {
final List<WareAndReview> wareAndReviews = new ArrayList<>();
wareService.findWares()
.map(WareAndReview::new)
.subscribe(wr -> {
wareAndReviews.add(wr);
//Async!!!!
reviewService.findReviewsByItem(wr.getWare().getItem())
.subscribe(wr::addReview,
throwable -> System.out.println("Error while trying to find reviews for " + wr)
);
}
);
//TODO: There should be a better way to wait for async reviewService.findReviewsByItem completion!
try {
Thread.sleep(3000);
} catch (InterruptedException e) {}
return wareAndReviews;
}
Given the fact I don't want to return an Observable, how can I wait for async Observable (findReviewsByItem) to complete?
鉴于我不想返回 Observable,我该如何等待异步 Observable (findReviewsByItem) 完成?
回答by Marek Hawrylczak
You may use methods from BlockingObservable see https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators. e.g
您可以使用来自 BlockingObservable 的方法,请参阅https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators。例如
BlockingObservable.from(reviewService.findReviewsByItem(..)).toIterable()
回答by theBuckWheat
Another way would be to declare a CountdownLatch before starting. Then call countDown() on that latch in your onCompleted(). You can then replace your Thread.sleep() with await() on that latch.
另一种方法是在开始之前声明一个 CountdownLatch。然后在 onCompleted() 中对该闩锁调用 countDown()。然后,您可以在该闩锁上将 Thread.sleep() 替换为 await()。
回答by akarnokd
Most of your example can be rewritten with standard RxJava operators that work together well:
你的大部分例子都可以用标准的 RxJava 运算符重写,它们可以很好地协同工作:
public class Example {
Scheduler scheduler = Schedulers.from(executor);
public Observable<Review> findReviewsByItem(final String item) {
return Observable.just(item)
.subscribeOn(scheduler)
.flatMapIterable(reviewService::findReviewsByItem);
}
public List<WareAndReview> findWaresWithReviews() {
return wareService
.findWares()
.map(WareAndReview::new)
.flatMap(wr -> {
return reviewService
.findReviewsByItem(wr.getWare().getItem())
.doOnNext(wr::addReview)
.lastOrDefault(null)
.map(v -> wr);
})
.toList()
.toBlocking()
.first();
}
}
Whenever you want to compose services like this, think of flatMap
first. You don't need to block for each sub-Observable but only at the very end with toBlocking()
if really necessary.
每当您想组合这样的服务时,请flatMap
首先考虑。你不需要为每个子 Observable 阻塞,但只有在toBlocking()
真正需要时才阻塞。