Rxjava Android 如何使用Zip运算符

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/30219877/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 09:21:01  来源:igfitidea点击:

Rxjava Android how to use the Zip operator

javaandroidrx-java

提问by feilong

I am having a lot of trouble understanding the zip operator in RxJava for my android project. Problem I need to be able to send a network request to upload a video Then i need to send a network request to upload a picture to go with it finally i need to add a description and use the responses from the previous two requests to upload the location urls of the video and picture along with the description to my server.

我在为我的 android 项目理解 RxJava 中的 zip 运算符时遇到了很多麻烦。问题我需要能够发送网络请求来上传视频然后我需要发送网络请求来上传图片最后我需要添加描述并使用前两个请求的响应来上传视频和图片的位置 url 以及对我的服务器的描述。

I assumed that the zip operator would be perfect for this task as I understood we could take the response of two observables (video and picture requests) and use them for my final task. But I cant seem to get this to occur how I envision it.

我认为 zip 操作符非常适合这项任务,因为我知道我们可以获取两个可观察对象(视频和图片请求)的响应并将它们用于我的最终任务。但是我似乎无法按照我的设想发生这种情况。

I am looking for someone to answer how this can be done conceptually with a bit of psuedo code. Thank you

我正在寻找某人来回答如何使用一些伪代码在概念上完成此操作。谢谢

采纳答案by inmyth

Zip operator strictly pairs emitted items from observables. It waits for both (or more) items to arrive then merges them. So yes this would be suitable for your needs.

Zip 运算符严格配对来自 observable 的发射项目。它等待两个(或更多)项目到达然后合并它们。所以是的,这将适合您的需求。

I would use Func2to chain the result from the first two observables. Notice this approach would be simpler if you use Retrofit since its api interface may return an observable. Otherwise you would need to create your own observable.

我会Func2用来链接前两个可观察的结果。请注意,如果您使用 Retrofit,这种方法会更简单,因为它的 api 接口可能会返回一个 observable。否则,您将需要创建自己的可观察对象。

// assuming each observable returns response in the form of String
Observable<String> movOb = Observable.create(...);
// if you use Retrofit
Observable<String> picOb = RetrofitApiManager.getService().uploadPic(...),
Observable.zip(movOb, picOb, new Func2<String, String, MyResult>() {
      @Override
      public MyResult call(String movieUploadResponse, String picUploadResponse) {
          // analyze both responses, upload them to another server
          // and return this method with a MyResult type
          return myResult;
      }
   }
)
// continue chaining this observable with subscriber
// or use it for something else

回答by dwursteisen

zipoperator allow you to compose a result from results of two different observable.

zip运算符允许您根据两个不同的 observable 的结果组合一个结果。

You 'll have to give am lambda that will create a result from datas emitted by each observable.

您必须提供 am lambda,它将从每个 observable 发出的数据中创建一个结果。

Observable<MovieResponse> movies = ...
Observable<PictureResponse> picture = ...

Observable<Response> response = movies.zipWith(picture, (movie, pic) -> {
        return new Response("description", movie.getName(), pic.getUrl());

});

回答by paul

Here I have an example that I did using Zip in asynchronous way, just in case you′re curious

这里我有一个例子,我以异步方式使用 Zip,以防万一你好奇

      /**
 * Since every observable into the zip is created to subscribeOn a diferent thread, it′s means all of them will run in parallel.
 * By default Rx is not async, only if you explicitly use subscribeOn.
  */
@Test
public void testAsyncZip() {
    scheduler = Schedulers.newThread();
    scheduler1 = Schedulers.newThread();
    scheduler2 = Schedulers.newThread();
    long start = System.currentTimeMillis();
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2)
                                                                                        .concat(s3))
              .subscribe(result -> showResult("Async in:", start, result));
}

/**
 * In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline
 */
@Test
public void testZip() {
    long start = System.currentTimeMillis();
    Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2)
                                                                         .concat(s3))
              .subscribe(result -> showResult("Sync in:", start, result));
}


public void showResult(String transactionType, long start, String result) {
    System.out.println(result + " " +
                               transactionType + String.valueOf(System.currentTimeMillis() - start));
}

public Observable<String> obString() {
    return Observable.just("")
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "Hello");
}

public Observable<String> obString1() {
    return Observable.just("")
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> " World");
}

public Observable<String> obString2() {
    return Observable.just("")
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "!");
}

public Observable<String> obAsyncString() {
    return Observable.just("")
                     .observeOn(scheduler)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "Hello");
}

public Observable<String> obAsyncString1() {
    return Observable.just("")
                     .observeOn(scheduler1)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> " World");
}

public Observable<String> obAsyncString2() {
    return Observable.just("")
                     .observeOn(scheduler2)
                     .doOnNext(val -> {
                         System.out.println("Thread " + Thread.currentThread()
                                                              .getName());
                     })
                     .map(val -> "!");
}

You can see more examples here https://github.com/politrons/reactive

您可以在此处查看更多示例https://github.com/politrons/reactive

回答by Evin1_

A small example:

一个小例子

Observable<String> stringObservable1 = Observable.just("Hello", "World");
Observable<String> stringObservable2 = Observable.just("Bye", "Friends");

Observable.zip(stringObservable1, stringObservable2, new BiFunction<String, String, String>() {
    @Override
    public String apply(@NonNull String s, @NonNull String s2) throws Exception {
        return s + " - " + s2;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String s) throws Exception {
        System.out.println(s);
    }
});

This will print:

这将打印:

Hello - Bye
World - Friends

回答by Ahmed Adel Ismail

i have been searching for a simple answer on how to use the Zip operator, and what to do with the Observables i create to pass them to it, i was wondering if i should call subscribe() for every observable or not, non of these answers were simple to find, i had to figure it out by my self, so here is a simple example for using Zip operator on 2 Observables :

我一直在寻找关于如何使用 Zip 运算符的简单答案,以及如何处理我创建的 Observables 以将它们传递给它,我想知道我是否应该为每个 observable 调用 subscribe(),不是这些答案很容易找到,我必须自己弄清楚,所以这里有一个在 2 个 Observables 上使用 Zip 运算符的简单示例:

@Test
public void zipOperator() throws Exception {

    List<Integer> indexes = Arrays.asList(0, 1, 2, 3, 4);
    List<String> letters = Arrays.asList("a", "b", "c", "d", "e");

    Observable<Integer> indexesObservable = Observable.fromIterable(indexes);

    Observable<String> lettersObservable = Observable.fromIterable(letters);

    Observable.zip(indexesObservable, lettersObservable, mergeEmittedItems())
            .subscribe(printMergedItems());
}

@NonNull
private BiFunction<Integer, String, String> mergeEmittedItems() {
    return new BiFunction<Integer, String, String>() {
        @Override
        public String apply(Integer index, String letter) throws Exception {
            return "[" + index + "] " + letter;
        }
    };
}

@NonNull
private Consumer<String> printMergedItems() {
    return new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    };
}

the printed result is :

打印的结果是:

[0] a
[1] b
[2] c
[3] d
[4] e

the final answers to the questions that where in my head were as follows

我脑子里的问题的最终答案如下

the Observables passed to the zip() method just need to be created only, they do not need to have any subscribers to them, only creating them is enough ... if you want any observable to run on a scheduler, you can specify this for that Observable ... i also tried the zip() operator on Observables where they should wait for there result, and the Consumable of the zip() was triggered only when both results where ready (which is the expected behavior)

传递给 zip() 方法的 Observable 只需要被创建,它们不需要有任何订阅者,只需要创建它们就足够了……如果你想让任何 observable 在调度器上运行,你可以指定这个对于那个 Observable ......我还尝试了 Observables 上的 zip() 运算符,他们应该在那里等待结果,并且 zip() 的消费品仅在两个结果都准备就绪时才触发(这是预期的行为)

回答by Someone Somewhere

This is my implementation using Single.zipand rxJava2

这是我使用Single.ziprxJava2 的实现

I tried to make it as easy to understand as possible

我试图让它尽可能容易理解

//
// API Client Interface
//
@GET(ServicesConstants.API_PREFIX + "questions/{id}/")
Single<Response<ResponseGeneric<List<ResponseQuestion>>>> getBaseQuestions(@Path("id") int personId);

@GET(ServicesConstants.API_PREFIX + "physician/{id}/")
Single<Response<ResponseGeneric<List<ResponsePhysician>>>> getPhysicianInfo(@Path("id") int personId);

//
// API middle layer - NOTE: I had feedback that the Single.create is not needed (but I haven't yet spent the time to improve it)
//
public Single<List<ResponsePhysician>> getPhysicianInfo(int personId) {
    return Single.create(subscriber -> {
        apiClient.getPhysicianInfo(appId)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(response -> {
                    ResponseGeneric<List<ResponsePhysician>> responseBody = response.body();
                    if(responseBody != null && responseBody.statusCode == 1) {
                        if (!subscriber.isDisposed()) subscriber.onSuccess(responseBody.data);
                    } else if(response.body() != null && response.body().status != null ){
                        if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.body().status));
                    } else {
                        if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
                    }
                }, throwable -> {
                    throwable.printStackTrace();
                    if(!subscriber.isDisposed()) subscriber.onError(throwable);
                });
    });
}

public Single<List<ResponseQuestion>> getHealthQuestions(int personId){
    return Single.create(subscriber -> {
        apiClient.getBaseQuestions(personId)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .subscribe(response -> {
                    ResponseGeneric<List<ResponseQuestion>> responseBody = response.body();
                    if(responseBody != null && responseBody.data != null) {
                        if (!subscriber.isDisposed()) subscriber.onSuccess(response.body().data);
                    } else if(response.body() != null && response.body().status != null ){
                        if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.body().status));
                    } else {
                        if (!subscriber.isDisposed()) subscriber.onError(new Throwable(response.message()));
                    }
                }, throwable -> {
                    throwable.printStackTrace();
                    if(!subscriber.isDisposed()) subscriber.onError(throwable);
                });
    });
}

//please note that ResponseGeneric is just an outer wrapper of the returned data - common to all API's in this project

public class ResponseGeneric<T> {

    @SerializedName("Status")
    public String status;

    @SerializedName("StatusCode")
    public float statusCode;

    @SerializedName("Data")
    public T data;
}

//
// API end-use layer - this gets close to the UI so notice the oberver is set for main thread
//
private static class MergedResponse{// this is just a POJO to store all the responses in one object
    public List<ResponseQuestion> listQuestions;
    public List<ResponsePhysician> listPhysicians;
    public MergedResponse(List<ResponseQuestion> listQuestions, List<ResponsePhysician> listPhysicians){
        this.listQuestions = listQuestions;
        this.listPhysicians = listPhysicians;
    }
}

// example of Single.zip() - calls getHealthQuestions() and getPhysicianInfo() from API Middle Layer
private void downloadHealthQuestions(int personId) {
    addRxSubscription(Single
            .zip(getHealthQuestions(personId), getPhysicianInfo(personId), MergedResponse::new)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(response -> {
                if(response != null) {
                    Timber.i(" - total health questions downloaded %d", response.listQuestions.size());
                    Timber.i(" - physicians downloaded %d", response.listPhysicians.size());

                    if (response.listPhysicians != null && response.listPhysicians.size()>0) {
                        // do your stuff to process response data
                    }

                    if (response.listQuestions != null && response.listQuestions.size()>0) {

                        // do your stuff to process response data
                    }


                } else {
                    // process error - show message
                }
            }, error -> {
                // process error - show network error message
            }));
}

回答by ?ngelo Polotto

You use the zipfrom rxjavawith Java 8:

您可以使用ziprxjavaJava 8

Observable<MovieResponse> movies = ...
Observable<PictureResponse> picture = ...

Observable<ZipResponse> response = Observable.zip(movies, picture, ZipResponse::new);

class ZipResponse {
        private MovieResponse movieResponse;
        private PictureResponse pictureResponse;

        ZipResponse(MovieResponse movieResponse, PictureResponse pictureResponse) {
             this.movieResponse = movieResponse;
             this.pictureResponse = pictureResponse;
        }

        public MovieResponse getMovieResponse() {
             return movieResponse;
        }

        public void setMovieResponse(MovieResponse movieResponse) {
            this.movieResponse= movieResponse;
        }

        public PictureResponse getPictureResponse() {
             return pictureResponse;
        }

        public void setPictureResponse(PictureResponse pictureResponse) {
            this.pictureResponse= pictureResponse;
        }
}