RxJava:如何组合多个具有依赖关系的 Observable 并在最后收集所有结果?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/22240406/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-13 14:31:17  来源:igfitidea点击:

RxJava: how to compose multiple Observables with dependencies and collect all results at the end?

javafunctional-programmingreactive-programmingrx-java

提问by Steve Kehlet

I'm learning RxJava and, as my first experiment, trying to rewrite the code in the first run()method in this code(cited on Netflix's blogas a problem RxJava can help solve) to improve its asynchronicity using RxJava, i.e. so it doesn't wait for the result of the first Future (f1.get()) before proceeding on to the rest of the code.

我正在学习 RxJava,作为我的第一个实验,我尝试重写此代码中第run()一种方法中的代码(在Netflix 的博客中引用为 RxJava 可以帮助解决的问题)以使用 RxJava 改进其异步性,即它不会f1.get()在继续执行其余代码之前等待第一个 Future ( )的结果。

f3depends on f1. I see how to handle this, flatMapseems to do the trick:

f3取决于f1. 我知道如何处理这个问题,flatMap似乎可以解决问题:

Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
        }
    });

Next, f4and f5depend on f2. I have this:

接下来,f4f5依赖于f2. 我有这个:

final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer i) {
            Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
            Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
            return Observable.merge(f4Observable, f5Observable);
        }
    });

Which starts to get weird (mergeing them probably isn't what I want...) but allows me to do this at the end, not quite what I want:

这开始变得奇怪(merge他们可能不是我想要的......)但最后允许我这样做,而不是我想要的:

f3Observable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println("Observed from f3: " + s);
        f4And5Observable.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.println("Observed from f4 and f5: " + i);
            }
        });
    }
});

That gives me:

这给了我:

Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100

which is all the numbers, but unfortunately I get the results in separate invocations, so I can't quite replace the final println in the original code:

这是所有数字,但不幸的是我在单独的调用中得到结果,所以我不能完全替换原始代码中的最终 println:

System.out.println(f3.get() + " => " + (f4.get() * f5.get()));

I don't understand how to get access to both those return values on the same line. I think there's probably some functional programming fu I'm missing here. How can I do this? Thanks.

我不明白如何在同一行访问这两个返回值。我认为这里可能缺少一些函数式编程。我怎样才能做到这一点?谢谢。

回答by user3407713

It looks like all you really need is a bit more encouragement and perspective on how RX is used. I'd suggest you read more into the documentation as well as marble diagrams (I know they're not always useful). I also suggest looking into the lift()function and operators.

看起来您真正需要的只是对如何使用 RX 的更多鼓励和观点。我建议您阅读更多文档以及大理石图(我知道它们并不总是有用)。我还建议研究lift()函数和运算符。

  • The entire point of an observable is to concatenate data flow and data manipulation into a single object
  • The point of calls to map, flatMapand filterare to manipulate the data in your data flow
  • The point of merges are to combine data flows
  • The point of operators are to allow you to disrupt a steady stream of observables and define your own operations on a data flow. For example, I coded a moving average operator. That sums up ndoubles in an Observable of doubles to return a stream of moving averages. The code literally looked like this

    Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

  • 可观察对象的全部意义在于将数据流和数据操作连接到一个对象中
  • 调用map,flatMapfilter操作数据流中的数据的点
  • 合并的点是合并数据流
  • 运算符的目的是允许您破坏稳定的可观察数据流并定义您自己对数据流的操作。例如,我编写了一个移动平均算子。这将ndoubles总结为双精度的 Observable 以返回移动平均线流。代码看起来像这样

    Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

You'll be a relieved that a lot of the filtering methods that you take for granted all have lift()under the hood.

许多您认为理所当然的过滤方法都隐藏在幕后,您会感到宽慰lift()

With that said; all it takes to merge multiple dependencies is:

照这样说; 合并多个依赖项所需要的只是:

  • changing all incoming data to a standard data type using mapor flatMap
  • merging standard data-types to a stream
  • using custom operators if one object needs to wait on another, or if you need to order data in the stream. Caution: this approach will slow the stream down
  • using to list or subscribe to collect all of that data
  • 使用map或将所有传入数据更改为标准数据类型flatMap
  • 将标准数据类型合并到流
  • 如果一个对象需要等待另一个对象,或者您需要对流中的数据进行排序,则使用自定义运算符。注意:这种方法会减慢流量
  • 用于列出或订阅以收集所有这些数据

回答by Steve Kehlet

Edit:someone converted the following text, which I had added as an edit on the question, into an answer, which I appreciate, and understand may be the proper SO thing to do, however I do not consider this an answer because it's clearly not the right way to do it. I would not ever use this code nor would I advise anyone to copy it. Other/better solutions and comments welcome!

编辑:有人将我作为对问题的编辑添加的以下文本转换为答案,我很欣赏,并且理解这可能是正确的做法,但是我不认为这是一个答案,因为它显然不是正确的做法。我永远不会使用此代码,也不会建议任何人复制它。欢迎其他/更好的解决方案和评论!



I was able to solve this with the following. I didn't realize you could flatMapan observable more than once, I assumed results could only be consumed once. So I just flatMapf2Observable twice (sorry, I renamed some stuff in the code since my original post), then zipon all the Observables, then subscribe to that. That Mapin the zipto aggregate the values is undesirable because of the type juggling. Other/better solutions and comments welcome!The full code is viewable in a gist. Thank you.

我能够通过以下方式解决这个问题。我没有意识到你可以flatMap多次观察,我认为结果只能被消耗一次。所以我只是flatMapf2Observable 两次(抱歉,自从我的原始帖子以来,我在代码中重命名了一些东西),然后zip在所有 Observables 上,然后订阅它。这Mapzip聚集的值是因为类型杂耍的不希望的。欢迎其他/更好的解决方案和评论!完整的代码是一个要点可见。谢谢你。

Future<Integer> f2 = executor.submit(new CallToRemoteServiceB());
Observable<Integer> f2Observable = Observable.from(f2);
Observable<Integer> f4Observable = f2Observable
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("Observed from f2: " + integer);
            Future<Integer> f4 = executor.submit(new CallToRemoteServiceD(integer));
            return Observable.from(f4);
        }       
    });     

Observable<Integer> f5Observable = f2Observable
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            System.out.println("Observed from f2: " + integer);
            Future<Integer> f5 = executor.submit(new CallToRemoteServiceE(integer));
            return Observable.from(f5);
        }       
    });     

Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
    @Override
    public Map<String, String> call(String s, Integer integer, Integer integer2) {
        Map<String, String> map = new HashMap<String, String>();
        map.put("f3", s);
        map.put("f4", String.valueOf(integer));
        map.put("f5", String.valueOf(integer2));
        return map;
    }       
}).subscribe(new Action1<Map<String, String>>() {
    @Override
    public void call(Map<String, String> map) {
        System.out.println(map.get("f3") + " => " + (Integer.valueOf(map.get("f4")) * Integer.valueOf(map.get("f5"))));
    }       
});     

And this yields me the desired output:

这产生了我想要的输出:

responseB_responseA => 714000

回答by Stephen

I think what you are looking for is switchmap. We ran into a similar issue where we have a session service that handles getting a new session from an api, and we need that session before we can get more data. We can add to the session observable that returns the sessionToken for use in our data call.

我认为您正在寻找的是 switchmap。我们遇到了一个类似的问题,我们有一个会话服务来处理从 api 获取新会话,我们需要该会话才能获取更多数据。我们可以添加到 session observable,它返回 sessionToken 以在我们的数据调用中使用。

getSession returns an observable;

getSession 返回一个可观察的;

public getSession(): Observable<any>{
  if (this.sessionToken)
    return Observable.of(this.sessionToken);
  else if(this.sessionObservable)
    return this.sessionObservable;
  else {
    // simulate http call 
    this.sessionObservable = Observable.of(this.sessonTokenResponse)
    .map(res => {
      this.sessionObservable = null;
      return res.headers["X-Session-Token"];
    })
    .delay(500)
    .share();
    return this.sessionObservable;
  }
}

and getData takes that observable and appends to it.

和 getData 获取该 observable 并附加到它。

public getData() {
  if (this.dataObservable)
    return this.dataObservable;
  else {
    this.dataObservable = this.sessionService.getSession()
      .switchMap((sessionToken:string, index:number) =>{
        //simulate data http call that needed sessionToken
          return Observable.of(this.dataResponse)
          .map(res => {
            this.dataObservable = null;
            return res.body;
          })
          .delay(1200)
        })
        .map ( data => {
          return data;
        })
        .catch(err => {
          console.log("err in data service", err);
         // return err;
        })
        .share();
    return this.dataObservable;
  }
}

You will still need a flatmap to combine the not dependent observables.

您仍然需要一个平面图来组合不依赖的 observables。

Plunkr: http://plnkr.co/edit/hiA1jP?p=info

Plunkr:http://plnkr.co/edit/hiA1jP?p=info

Where I got the idea to use switch map: http://blog.thoughtram.io/angular/2016/01/06/taking-advantage-of-observables-in-angular2.html

我想到使用开关地图的地方:http: //blog.thoughtram.io/angular/2016/01/06/taking-advantage-of-observables-in-angular2.html