Java RX:并行运行压缩的 Observables?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/21201083/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 07:16:01  来源:igfitidea点击:

RX: Run Zipped Observables in parallel?

javasystem.reactiverx-java

提问by spierce7

So I'm playing around with RX (really cool), and I've been converting my api that accesses a sqlite database in Android to return observables.

所以我在玩 RX(真的很酷),我一直在转换我的 api,它访问 Android 中的 sqlite 数据库以返回 observable。

So naturally one of the problems I started to try to solve is, "What if I want to make 3 API calls, get the results, and then do some processing once they are all finished?"

所以自然而然我开始尝试解决的问题之一是,“如果我想进行 3 个 API 调用,获取结果,然后在它们全部完成后做一些处理怎么办?”

It took me an hour or 2, but I eventually found the Zip Functionalityand it helps me out handily:

我花了一两个小时,但我最终找到了Zip 功能,它可以轻松地帮助我:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}

Great! So that's cool.

伟大的!所以这很酷。

So when I zip up the 3 observables they run in serial. What if I want them all to run in parallel at the same time so I end up getting the results faster?I've played around with a few things, and even tried reading some of the original RX stuff people have written in C#. I'm sure there is a simple answer. Can anyone point me in the right direction? What is the proper way to do this?

因此,当我压缩 3 个 observables 时,它们会串行运行。如果我希望它们同时并行运行以便我最终更快地获得结果怎么办?我玩过一些东西,甚至尝试阅读一些人们用 C# 编写的原始 RX 东西。我相信有一个简单的答案。任何人都可以指出我正确的方向吗?这样做的正确方法是什么?

采纳答案by James World

zipdoesrun the observables in parallel - but it also subscribesto them serially. Since your getNumberedObservableis completing in the subscription method it gives the impressionof running serially, but there is in fact no such limitation.

zip确实并行运行 observables - 但它也串行订阅它们。由于您getNumberedObservable在订阅方法中完成它给人的印象是串行运行,但实际上没有这样的限制。

You can either try with some long running Observables that outlive their subscription logic, such as timer, or use the subscribeOnmethod to subscribe asynchronously to each stream passed to zip.

您可以尝试使用一些长期运行的 Observables,这些 Observables 比它们的订阅逻辑寿命更长,例如timer,或者使用subscribeOn方法异步订阅传递给 的每个流zip

回答by Brandon

In RxJava, use toAsyncto turn a regular function into something that will run on a thread and return its result in an observable.

在 RxJava 中,使用toAsync将常规函数转换为将在线程上运行并在 observable 中返回其结果的东西。

I don't know Java syntax that well, but it would look something like:

我不太了解 Java 语法,但它看起来像:

public static Integer getNumber(final int value) { return value; }
public static Observable<Integer> getNumberedObservable(final int value) {
    return rx.util.functions.toAsync(new Func<Integer,Integer>() {
        @Override
        public Integer call(Integer value) { return getNumber(value); }
    });
};

That would work if getNumberwere really accessing a database. When you call getNumberedObservableit returns an observable that will run getNumberon a separate thread when you subscribe to it.

如果getNumber真的访问数据库,那会起作用。当您调用getNumberedObservable它时,它会返回一个 observable,getNumber当您订阅它时,它将在单独的线程上运行。

回答by s-hunter

I was trying to do the same, running multiple threads in parallel using the zip. I ended opening a new so questionand got an answer. Basically, you have to subscribe each observable to a new thread, so if you want to run three observables in parallel using the zip, you have to have subscribe to 3 separate threads.

我试图做同样的事情,使用zip并行运行多个线程。我结束了打开一个新问题并得到了答案。基本上,您必须将每个 observable 订阅到一个新线程,因此如果您想使用 zip 并行运行三个 observable,您必须订阅 3 个单独的线程。