java 等待并行 RX 订阅者完成
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/25163262/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Waiting for parallel RX subscribers to complete
提问by user1232555
I'm looking for the best method to wait for async tasks to finish in rx-java.
我正在寻找等待异步任务在 rx-java 中完成的最佳方法。
As a common example say there is a function which gets a list of id's from a local store and then queries a remote system for those Id's, the remote system results are then consolidated into a single report and returned to the caller of the function. As the call to the remote system is slow we want them to be done in asynchronously and I only want to return once all of the calls have returned and their results have been processed.
作为一个常见的例子,有一个函数从本地存储获取 id 列表,然后查询远程系统的这些 id,然后将远程系统结果合并到一个报告中并返回给函数的调用者。由于对远程系统的调用很慢,我们希望它们以异步方式完成,并且我只想在所有调用都返回并处理了它们的结果后返回。
The only reliable way I have found to do this is to poll the subscription to check it is unsubscribed yet. But I'm thinking doesn't seem to be the 'RX' way to do things!
我发现这样做的唯一可靠方法是轮询订阅以检查它是否已取消订阅。但我认为这似乎不是“RX”的做事方式!
As an example I've taken the example from http://howrobotswork.wordpress.com/2013/10/28/using-rxjava-in-android/and amended it slightly to make it non-android and to show what I mean. I have to have the following code at the of the main() method to stop it exiting immediately.
作为一个例子,我从http://howrobotswork.wordpress.com/2013/10/28/using-rxjava-in-android/ 中取了一个例子,并稍微修改了它以使其非 android 并显示我的意思. 我必须在 main() 方法中使用以下代码来阻止它立即退出。
while (!subscription.isUnsubscribed()) {
Thread.sleep(100);
}
The full code for the example is listed below (it is dependent on http://square.github.io/retrofit/if your trying to compile it)
下面列出了示例的完整代码(如果您尝试编译它,则取决于http://square.github.io/retrofit/)
package org.examples;
import retrofit.RestAdapter;
import retrofit.http.GET;
import retrofit.http.Query;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.schedulers.Schedulers;
public class AsyncExample {
public static void main(String[] args) throws InterruptedException {
final Subscription subscription = Observable.from("London", "Paris", "Berlin")
.flatMap(new Func1<String, Observable<WeatherData>>() {
@Override
public Observable<WeatherData> call(String s) {
return ApiManager.getWeatherData(s);
}
})
.subscribe(
new Action1<WeatherData>() {
@Override
public void call(WeatherData weatherData) {
System.out.println(Thread.currentThread().getName() + " - " + weatherData.name + ", " + weatherData.base);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
System.out.println(Thread.currentThread().getName() + " - ERROR: " + throwable.getMessage());
}
},
new Action0() {
@Override
public void call() {
System.out.println(Thread.currentThread().getName() + " - COMPLETED");
}
}
);
// Have to poll subscription to check if its finished - is this the only way to do it?
while (!subscription.isUnsubscribed()) {
Thread.sleep(100);
}
}
}
class ApiManager {
private interface ApiManagerService {
@GET("/weather")
WeatherData getWeather(@Query("q") String place, @Query("units") String units);
}
private static final RestAdapter restAdapter = new RestAdapter.Builder()
.setEndpoint("http://api.openweathermap.org/data/2.5")
.build();
private static final ApiManagerService apiManager = restAdapter.create(ApiManagerService.class);
public static Observable<WeatherData> getWeatherData(final String city) {
return Observable.create(new Observable.OnSubscribe<WeatherData>() {
@Override
public void call(Subscriber<? super WeatherData> subscriber) {
try {
System.out.println(Thread.currentThread() + " - Getting " + city);
subscriber.onNext(apiManager.getWeather(city, "metric"));
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}
回答by kjones
If you are asking how to turn an asynchronous Observable into a synchronous one you can use .toBlocking() and then use .last() to wait for the Observable to complete.. For example, the following will not continue until the timer completes even though the timer executes on the computation thread.
如果您问如何将异步 Observable 转换为同步 Observable,您可以使用 .toBlocking() 然后使用 .last() 等待 Observable 完成.. 例如,以下不会继续直到计时器完成甚至尽管计时器在计算线程上执行。
try {
Observable
.timer(10, TimeUnit.SECONDS)
.toBlocking()
.last(); // Wait for observable to complete. Last item discarded.
} catch (IllegalArgumentException ex) {
// No items or error.
}
You probably shouldn't use this method unless absolutely necessary. Normally, application termination would be controlled by some other event (key press, close menu item click, etc.) Your network request Observables would complete asynchronously and you would take action in the onNext(), onCompleted() and onError() calls to update the UI or display an error.
除非绝对必要,否则您可能不应该使用此方法。通常,应用程序终止将由其他一些事件(按键、关闭菜单项单击等)控制。您的网络请求 Observables 将异步完成,您将在 onNext()、onCompleted() 和 onError() 调用中采取行动更新 UI 或显示错误。
Also, the beauty of Retrofit is that it has built in support for RxJava and will execute network requests in the background. To use that support declare you interface as returning an Observable.
此外,Retrofit 的美妙之处在于它内置了对 RxJava 的支持,并将在后台执行网络请求。要使用该支持,请声明您的接口返回一个 Observable。
interface ApiManagerService {
@GET("/weather")
Observable<WeatherData> getWeather(@Query("q") String place, @Query("units") String units);
}
Which allows you to simplify your getWeatherData() method to this:
这允许您将 getWeatherData() 方法简化为:
public static Observable<WeatherData> getWeatherData(final String city) {
return apiManager.getWeather(city, "metric");
}
回答by Maksim Golubev
For the purpose of synchronous waiting for asynchronous observаble I wrote a simple function:
为了同步等待异步可观察对象,我编写了一个简单的函数:
public static <T> void runSynchronously(Observable<T> observable) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Throwable> exception = new AtomicReference<>();
observable.subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
}
@Override
public void onError(Throwable e) {
exception.set(e);
latch.countDown();
}
@Override
public void onNext(T o) {
}
});
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (exception.get() != null) {
throw Exceptions.propagate(exception.get());
}
}
You can use it this way:
你可以这样使用它:
Observer<String> sideEffects = new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}
@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
};
runSynchronously(
Observable.just("London", "Paris", "Berlin")
.doOnEach(sideEffects)
);
回答by Georgi Khomeriki
In most cases it's better not to block a method. Try to be as non-blocking as possible. If you need to do something after an Observable has completed simply invoke your logic from the onCompleted
callback, or use the doOnCompleted
operator.
在大多数情况下,最好不要阻塞一个方法。尽量做到无阻塞。如果你需要在 Observable 完成后做一些事情,只需从onCompleted
回调中调用你的逻辑,或者使用doOnCompleted
操作符。
If you really, really need to block then you can use the Blocking Observable Operators
.
Most of these operators block until the Observable completes.
如果你真的,真的需要阻止,那么你可以使用Blocking Observable Operators
. 这些操作符中的大多数都会阻塞,直到 Observable 完成。
However, stopping your main method from exiting prematurely can be achieved by a simple System.in.read()
at the end.
但是,可以通过最后一个简单的方法来阻止您的主要方法过早退出System.in.read()
。