RxJava flatMap,switchMap,concatMap

时间:2020-02-23 14:41:43  来源:igfitidea点击:

在本教程中,我们将研究RxJava中的三个重要的地图运算符。
FlatMap,SwitchMap,ConcatMap为业已强大的RxJava框架提供了更多功能。

RxJava FlatMap

map操作符转换Observable序列中的每个值。
" flatmap"运算符用于通过将可观察值分解为较小的可观察值(包含来自第一个可观察值的各个值)来转换它。
然后,对新的可观察对象进行处理,最后," flatmap"将这些单个可观察对象合并为一个完整的可观察对象。
现在,新的可观察对象已转换。
FlatMap将可观察对象展平为单个可观察对象,然后将其修改并合并为新的可观察对象。
" flatMap"中出现的新的observable无法保证您的订单相同。
这些值是交错的。

FlatMap发出Observables而不是值

下面给出了RxJava文档的说明。

FlatMap到底做什么? FlatMap将Observable分为许多奇异的Observable。
因此,FlatMap不会并行发送每个Observable值,而是并行执行所有操作,然后以完成顺序将结果合并。

让我们看一个FlatMap的示例,方法是创建一个新的IntelliJ java项目,并在build.gradle中添加RxJava依赖项。

import rx.Observable;
import rx.functions.Func1;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class RxOperatorMaps {

  public static void main(String[] args) throws InterruptedException {
      final List<String> race = new ArrayList<>(Arrays.asList("Alan", "Bob", "Cobb", "Dan", "Evan", "Finch"));

      Observable.from(race)
              .flatMap(new Func1<String, Observable<?>>() {
                  @Override
                  public Observable<?> call(String s) {
                      final int delay = new Random().nextInt(5);
                      return Observable.just(s).map(String::toUpperCase)
                              .delay(delay, TimeUnit.SECONDS);
                  }
              })
              .subscribe(System.out::println);

      Thread.sleep(5000);
  }
}

//Prints
//BOB
//FINCH
//ALAN
//EVAN
//COBB
//DAN
  • FlatMap将函数应用于每个可观察到的发射。
    函数类型是源可观察类型的单位(在这种情况下为String),并且是将其转换为类型的新Observable的单位。

  • 在上面的代码中,从源发出了可观察到的字符串,它是可观察到的字符串列表。

  • 新的可观察对象在这里被延迟,以向您显示flatMap将单个可观察对象按其完成的顺序合并。

  • 我们使用了一个延迟运算符,它可以将发射延迟随机的秒数。

  • 我们将主线程设置为休眠状态5秒钟,以防止在Observable仍在工作时主函数返回。

注意:flatMap也隐式包含合并运算符。

映射和flatMap运算符Map之间的区别是返回类型为T的对象,而FlatMap返回类型为Observable <T>。

使用可观察的运算符,我们还可以过滤掉想要到达观察者的值的数量。

Observable.from(race)
              .flatMap(s -> {
                  final int delay = new Random().nextInt(5);

                  if (s.contains("a"))
                      return Observable.empty()
                              .delay(delay, TimeUnit.SECONDS);
                  else
                      return Observable.just(s).map(String::toUpperCase)
                              .delay(delay, TimeUnit.SECONDS);
              })
              .subscribe(System.out::println);

      Thread.sleep(5000);

//Prints
//FINCH
//COBB
//BOB

当您需要并行执行多个网络请求时,flatMap在android中将非常有用。

RxJava SwitchMap

SwitchMap将源可观察性展平,但是仅返回最后发出的单个可观察性。
下面的插图后面是示例,演示了上面这行的含义。

Observable.from(race)
              .switchMap((Func1<String, Observable<?>>) s -> {
                  final int delay = new Random().nextInt(2);
                  return Observable.just(s).map(String::toUpperCase)
                          .delay(delay, TimeUnit.SECONDS);
              })
              .subscribe(System.out::println);

      Thread.sleep(5000);

//Prints for me. It can differ for you since it's random.
//ALAN
//EVAN
//FINCH

在特定的延迟之后,SwitchMap将仅发出最新的可观测值。
它会忽略所有先前的内容。
从提要中检索最新数据时,此功能很有用。
当您需要在许多供稿中标识最新信息时,此功能很有用。

RxJava concatMap

如果您阅读过flatMap,您几乎会知道concatMap运算符是什么。
除了保持可观察对象的顺序外,它与flatMap类似。
为了保持顺序,它需要等待一个可观察的对象完成,然后再进行下一个操作,从而脱离了异步概念,并使任务比平时更长。
flatMap和concatMap运算符之间的另一个区别" flatMap"隐式使用" merge"运算符,而" concatMap"则使用" concat"运算符。

Observable.from(race)
              .concatMap((Func1<String, Observable<?>>) s -> {
                  final int delay = new Random().nextInt(4);
                  return Observable.just(s).map(String::toUpperCase)
                          .delay(delay, TimeUnit.SECONDS);
              })
              .subscribe(System.out::println);

      Thread.sleep(5000);

//Prints
//ALAN
//BOB
//COBB

为什么不打印所有值?好吧,花了很长时间才使主线程醒来并退出了main方法。
您可以增加线程睡眠时间或者减少延迟随机数以查看全部。
但是您理解正确吗? ConcatMap需要更长的时间。
仅在需要相同序列时才使用它。