RxJava 并行获取 Observable

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/26249030/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 02:05:26  来源:igfitidea点击:

RxJava Fetching Observables In Parallel

javaasynchronousreactive-programmingobservablerx-java

提问by diduknow

I need some help in implementing parallel asynchronous calls in RxJava. I have picked up a simple use case wherein the FIRST call fetches (rather searches) a list of products (Tile) to be displayed. The subsequent calls go out and fetch (A) REVIEWS and (B) PRODUCT IMAGES

我需要一些帮助来在 RxJava 中实现并行异步调用。我选择了一个简单的用例,其中 FIRST 调用获取(而不是搜索)要显示的产品列表(Tile)。随后的调用出去并获取 (A) 评论和 (B) 产品图像

After several attempts I got to this place.

经过几次尝试,我到达了这个地方。

 1    Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm);
 2    List<Tile> allTiles = new ArrayList<Tile>();
 3    ClientResponse response = new ClientResponse();

 4    searchTile.parallel(oTile -> {
 5      return oTile.flatMap(t -> {
 6        Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId());
 7        Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId());

 8        return Observable.zip(reviews, imageUrl, (r, u) -> {
 9          t.setReviews(r);
10          t.setImageUrl(u);

11          return t;
12        });

13      });
14    }).subscribe(e -> {
15      allTiles.add((Tile) e);
16    });

Line 1: goes out and fetches the product (Tile) to be displayed

第1行:出去取要展示的产品(Tile)

Line 4: We take the list of the Observable and SHARD it to fetch reviews and imageUrls

第 4 行:我们获取 Observable 的列表并对其进行 SHARD 以获取评论和 imageUrls

Lie 6,7: Fetch the Observable review and Observable url

谎言 6,7:获取 Observable 评论和 Observable url

Line 8: Finally the 2 observables are zipped up to return an updated Observable

第 8 行:最后将 2 个 observable 压缩以返回更新的 Observable

Line 15: finally line 15 collates all the individual products to be displayed in a collection which can be returned back to the calling layer

第 15 行:最后第 15 行整理所有要显示在集合中的单个产品,该集合可以返回到调用层

While the Observable has been sharded and in our tests run over 4 different threads; fetching of reviews and images seems to be one after another. I suspect that the zip step on line 8 is basically causing the sequential invocation of the the 2 observables (reviews and url).

虽然 Observable 已被分片,并且在我们的测试中运行了 4 个不同的线程;获取评论和图像似乎是一个接一个。我怀疑第 8 行的 zip 步骤基本上导致了 2 个 observables(评论和 url)的顺序调用。

enter image description here

在此处输入图片说明

Does this group have any suggestion to parallely fetch reiews and image urls. In essence the waterfall chart attached above should look more vertically stacked. The calls to reviews and images should be in parallel

该组是否有任何建议并行获取 reiews 和图像 url。本质上,上面附加的瀑布图应该看起来更垂直堆叠。对评论和图像的调用应该并行

thanks anand raman

谢谢阿南德拉曼

采纳答案by benjchristensen

The parallel operator proved to be a problem for almost all use cases and does not do what most expect from it, so it was removed in the 1.0.0.rc.4 release: https://github.com/ReactiveX/RxJava/pull/1716

并行运算符被证明是几乎所有用例的一个问题,并且不能满足大多数人的期望,因此它在 1.0.0.rc.4 版本中被删除:https: //github.com/ReactiveX/RxJava/拉/1716

A good example of how to do this type of behavior and get parallel execution can be seen here.

可以在此处查看如何执行此类行为并获得并行执行的一个很好的示例。

In your example code it is unclear if searchServiceClientis synchronous or asynchronous. It affects how to solve the problem slightly as if it is already async no extra scheduling is needed. If synchronous extra scheduling is needed.

在您的示例代码中,不清楚searchServiceClient是同步还是异步。它会稍微影响如何解决问题,就好像它已经是异步的,不需要额外的调度。如果需要同步额外调度。

First here are some simple examples showing synchronous and asynchronous behavior:

首先是一些显示同步和异步行为的简单示例:

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecution {

    public static void main(String[] args) {
        System.out.println("------------ mergingAsync");
        mergingAsync();
        System.out.println("------------ mergingSync");
        mergingSync();
        System.out.println("------------ mergingSyncMadeAsync");
        mergingSyncMadeAsync();
        System.out.println("------------ flatMapExampleSync");
        flatMapExampleSync();
        System.out.println("------------ flatMapExampleAsync");
        flatMapExampleAsync();
        System.out.println("------------");
    }

    private static void mergingAsync() {
        Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println);
    }

    private static void mergingSync() {
        // here you'll see the delay as each is executed synchronously
        Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println);
    }

    private static void mergingSyncMadeAsync() {
        // if you have something synchronous and want to make it async, you can schedule it like this
        // so here we see both executed concurrently
        Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println);
    }

    private static void flatMapExampleAsync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataAsync(i);
        }).toBlocking().forEach(System.out::println);
    }

    private static void flatMapExampleSync() {
        Observable.range(0, 5).flatMap(i -> {
            return getDataSync(i);
        }).toBlocking().forEach(System.out::println);
    }

    // artificial representations of IO work
    static Observable<Integer> getDataAsync(int i) {
        return getDataSync(i).subscribeOn(Schedulers.io());
    }

    static Observable<Integer> getDataSync(int i) {
        return Observable.create((Subscriber<? super Integer> s) -> {
            // simulate latency
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                s.onNext(i);
                s.onCompleted();
            });
    }
}

Following is an attempt at providing an example that more closely matches your code:

以下是提供一个与您的代码更匹配的示例的尝试:

import java.util.List;

import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;

public class ParallelExecutionExample {

    public static void main(String[] args) {
        final long startTime = System.currentTimeMillis();

        Observable<Tile> searchTile = getSearchResults("search term")
                .doOnSubscribe(() -> logTime("Search started ", startTime))
                .doOnCompleted(() -> logTime("Search completed ", startTime));

        Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
            Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
                    .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime));
            Observable<String> imageUrl = getProductImage(t.getProductId())
                    .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime));

            return Observable.zip(reviews, imageUrl, (r, u) -> {
                return new TileResponse(t, r, u);
            }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime));
        });

        List<TileResponse> allTiles = populatedTiles.toList()
                .doOnCompleted(() -> logTime("All Tiles Completed ", startTime))
                .toBlocking().single();
    }

    private static Observable<Tile> getSearchResults(String string) {
        return mockClient(new Tile(1), new Tile(2), new Tile(3));
    }

    private static Observable<Reviews> getSellerReviews(int id) {
        return mockClient(new Reviews());
    }

    private static Observable<String> getProductImage(int id) {
        return mockClient("image_" + id);
    }

    private static void logTime(String message, long startTime) {
        System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
    }

    private static <T> Observable<T> mockClient(T... ts) {
        return Observable.create((Subscriber<? super T> s) -> {
            // simulate latency
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                }
                for (T t : ts) {
                    s.onNext(t);
                }
                s.onCompleted();
            }).subscribeOn(Schedulers.io());
        // note the use of subscribeOn to make an otherwise synchronous Observable async
    }

    public static class TileResponse {

        public TileResponse(Tile t, Reviews r, String u) {
            // store the values
        }

    }

    public static class Tile {

        private final int id;

        public Tile(int i) {
            this.id = i;
        }

        public int getSellerId() {
            return id;
        }

        public int getProductId() {
            return id;
        }

    }

    public static class Reviews {

    }
}

This outputs:

这输出:

Search started  => 65ms
Search completed  => 1094ms
getProductImage[1] completed  => 2095ms
getSellerReviews[2] completed  => 2095ms
getProductImage[3] completed  => 2095ms
zip[1] completed  => 2096ms
zip[2] completed  => 2096ms
getProductImage[2] completed  => 2096ms
getSellerReviews[1] completed  => 2096ms
zip[3] completed  => 2096ms
All Tiles Completed  => 2097ms
getSellerReviews[3] completed  => 2097ms

I have made each IO call be simulated to take 1000ms so it is obvious where the latency is and that it is happening in parallel. It prints out the progress is makes in elapsed milliseconds.

我已经模拟了每个 IO 调用需要 1000 毫秒,所以很明显延迟在哪里并且它是并行发生的。它打印出在经过的毫秒内完成的进度。

The trick here is that flatMap merges async calls, so as long as the Observables being merged are async, they will all be executed concurrently.

这里的技巧是 flatMap 合并异步调用,所以只要被合并的 Observables 是异步的,它们都会并发执行。

If a call like getProductImage(t.getProductId())was synchronous, it can be made asynchronous like this: getProductImage(t.getProductId()).subscribeOn(Schedulers.io).

如果像这样的调用getProductImage(t.getProductId())是同步的,它可以像这样异步:getProductImage(t.getProductId()).subscribeOn(Schedulers.io)。

Here is the important part of the above example without all the logging and boilerplate types:

这是上面示例的重要部分,没有所有日志记录和样板类型:

    Observable<Tile> searchTile = getSearchResults("search term");;

    Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> {
        Observable<Reviews> reviews = getSellerReviews(t.getSellerId());
        Observable<String> imageUrl = getProductImage(t.getProductId());

        return Observable.zip(reviews, imageUrl, (r, u) -> {
            return new TileResponse(t, r, u);
        });
    });

    List<TileResponse> allTiles = populatedTiles.toList()
            .toBlocking().single();

I hope this helps.

我希望这有帮助。

回答by shahshi15

People who are still @ JDK 7, whose IDE doesn't automatically detect JDK 8 source just yet and what to try out the above brilliant response (and explanation) by @benjchristensen can use this shamelessly refractored, JDK 7, code. Kudos to @benjchristensen for an amazing explanation and example !

仍然 @ JDK 7 的人,其 IDE 尚未自动检测 JDK 8 源代码,以及如何尝试@benjchristensen 的上述精彩响应(和解释)的人可以使用这个无耻地折射的 JDK 7 代码。感谢@benjchristensen 的精彩解释和示例!

import java.util.List;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.schedulers.Schedulers;

public class ParallelExecutionExample
{

    public static void main(String[] args)
    {
        final long startTime = System.currentTimeMillis();

        Observable<Tile> searchTile = getSearchResults("search term")
                .doOnSubscribe(new Action0()
                        {

                            @Override
                            public void call()
                            {
                                logTime("Search started ", startTime);
                            }
                })
                .doOnCompleted(new Action0()
                        {

                            @Override
                            public void call()
                            {
                                logTime("Search completed ", startTime);
                            }
                });
        Observable<TileResponse> populatedTiles = searchTile.flatMap(new Func1<Tile, Observable<TileResponse>>()
        {

            @Override
            public Observable<TileResponse> call(final Tile t)
            {
                Observable<Reviews> reviews = getSellerReviews(t.getSellerId())
                        .doOnCompleted(new Action0()
                                {

                                    @Override
                                    public void call()
                                    {
                                        logTime("getSellerReviews[" + t.id + "] completed ", startTime);
                                    }
                        });
                Observable<String> imageUrl = getProductImage(t.getProductId())
                        .doOnCompleted(new Action0()
                                {

                                    @Override
                                    public void call()
                                    {
                                        logTime("getProductImage[" + t.id + "] completed ", startTime);
                                    }
                        });
                return Observable.zip(reviews, imageUrl, new Func2<Reviews, String, TileResponse>()
                {

                    @Override
                    public TileResponse call(Reviews r, String u)
                    {
                        return new TileResponse(t, r, u);
                    }
                })
                        .doOnCompleted(new Action0()
                                {

                                    @Override
                                    public void call()
                                    {
                                        logTime("zip[" + t.id + "] completed ", startTime);
                                    }
                        });
            }
        });

        List<TileResponse> allTiles = populatedTiles
                .toList()
                .doOnCompleted(new Action0()
                        {

                            @Override
                            public void call()
                            {
                                logTime("All Tiles Completed ", startTime);
                            }
                })
                .toBlocking()
                .single();
    }

    private static Observable<Tile> getSearchResults(String string)
    {
        return mockClient(new Tile(1), new Tile(2), new Tile(3));
    }

    private static Observable<Reviews> getSellerReviews(int id)
    {
        return mockClient(new Reviews());
    }

    private static Observable<String> getProductImage(int id)
    {
        return mockClient("image_" + id);
    }

    private static void logTime(String message, long startTime)
    {
        System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms");
    }

    private static <T> Observable<T> mockClient(final T... ts)
    {
        return Observable.create(new Observable.OnSubscribe<T>()
        {

            @Override
            public void call(Subscriber<? super T> s)
            {
                try
                {
                    Thread.sleep(1000);
                }
                catch (Exception e)
                {
                }
                for (T t : ts)
                {
                    s.onNext(t);
                }
                s.onCompleted();
            }
        })
                .subscribeOn(Schedulers.io());
        // note the use of subscribeOn to make an otherwise synchronous Observable async
    }

    public static class TileResponse
    {

        public TileResponse(Tile t, Reviews r, String u)
        {
            // store the values
        }

    }

    public static class Tile
    {

        private final int id;

        public Tile(int i)
        {
            this.id = i;
        }

        public int getSellerId()
        {
            return id;
        }

        public int getProductId()
        {
            return id;
        }

    }

    public static class Reviews
    {

    }
}