RxJava运算符

时间:2020-02-23 14:41:44  来源:igfitidea点击:

在本教程中,我们将讨论和实现RxJava拥有的各种运算符。
每个运营商如何转换可观察序列以及订户看到的内容?开始吧!

RxJava运算符

正如我们在第一篇RxJava教程中所看到的,运算符是对Observable起作用的对象,并将转换后的数据传递给Subscriber。
使用多个运算符,每个运算符都会完成自己的任务,然后将转换后的数据传递给下一个运算符。

创建一个新的IntelliJ Java项目并添加RxJava依赖关系以开始使用。
我们将使用函数式编程和lambda来简化代码,减少冗长的代码。

1.地图和过滤器

地图()

import rx.Observable;

public class RxOperators {

  public static void main(String[] args)
  {
      
      //map operator
      Observable<String> mapObservable = Observable.just("hello world","the observable emits lower case sentences","subscriber sees it as upper case","map operator");
      mapObservable.map(String::toUpperCase).subscribe(System.out::println);
  }
}
//Prints 
//HELLO WORLD
//THE OBSERVABLE EMITS LOWER CASE SENTENCES
//SUBSCRIBER SEES IT AS UPPER CASE
//MAP OPERATOR

过滤()

//filter operator
      Observable<String> filterObservable = Observable.from(new String[]{"Hello","How are you?", "doing"});
      filterObservable.filter(string->string.contains(" ")).subscribe(System.out::println);

//Prints
//How are you?

使用映射和过滤器:返回所有偶数的平方。

//map and filter
Observable<Integer> mapFilter = Observable.range(0,10);
mapFilter.filter(i-> i%2==0).map(i-> i*i).subscribe(System.out::println);

2.采取,计数和跳过

取算符有三个主要变体。
让我们看看它们中的每一个。

  • take()传递Observable发出的前n个项目。

  • takeFirst()发出满足运算符指定条件的第一项。

  • takeLast()打印最后n个项目。
    与take()相反

  • takeUntil()会发出项目,直到第二个可观察到的状态才开始发出值。
    另外,它也可以用于条件。

  • 只要条件为真,takeWhile()就会发出项目。
    它忽略其余的。

Observable<Integer> takeObservable = Observable.range(0,100); 
takeObservable.take(5).subscribe(System.out::println); //prints 0 to 4 
takeObservable.takeFirst(i -> i*i>1000).subscribe(System.out::println); //prints 32
takeObservable.takeLast(1).subscribe(System.out::println); //prints 99

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Observable<Integer> takeUntilObservable = Observable.from(numbers);
takeUntilObservable.takeUntil(number -> number>2)
                 .subscribe(System.out::println); //prints 1 to 3

takeObservable.takeWhile(number -> number<4)
              .subscribe(System.out::println); //prints 1 to 3
takeObservable.takeWhile(number -> number>2)
              .subscribe(System.out::println); //prints nothing

takeFirst()函数会打印平方值大于1000的第一个值。
takeUntil()运算符就像do-while循环。
在检查下一次迭代的条件之前,它首先打印该语句。

顾名思义,count()返回到达订户的值的数量。

Observable<String> countObservable = Observable.from(new String[]{"First","Second", "Third", "Seventh"});
countObservable.filter(string-> string.length()>5).count().subscribe(System.out::println); //prints 2

在上述情况下,将null作为值之一传递将导致运行时崩溃。
因此,我们需要为onError()指定动作。

Observable<String> countObservable = Observable.from(new String[]{"First","Second", "Third", null});
countObservable.filter(string-> string.length()>5).count().subscribe(System.out::println, throwable -> System.out.println("One of the values is not valid"));

//prints 
//One of the values is not valid

skip运算符具有以下描述和实现的一些变体

  • skip()忽略前n个值。

  • skipLast()忽略了最后n个值

  • skipWhile()会忽略所有值,直到满足特定条件为止。
    它发出所有剩余的值。

  • skipUntil()将忽略所有值,直到另一个可观察到的开始发射。
    我们稍后再看

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
      Observable<Integer> skipObservable = Observable.from(numbers);
      skipObservable.skip(3).subscribe(System.out::println); //prints 4 and 5
      skipObservable.skipLast(3).subscribe(System.out::println); //prints 1 and 2
      skipObservable.skipWhile(i-> i<3).subscribe(System.out::println); prints 3 4 and 5

在避免空值时,skip是理想的运算符。

3. startWith,reduce,重复,扫描

startWith()在发射开始时附加给定的元素。

Observable<String> startWithObservable = Observable.just(" Rx", "Java", " Operators", " Tutorial");
      startWithObservable.startWith("Welcome to the").subscribe(System.out::print);

//Prints
//Welcome to the RxJava Operators Tutorial

reduce()运算符充当累加器。
它将下一个值添加到先前添加的值。
最后为订户打印累计值。

Observable<Integer> reduceObservable = Observable.range(1, 5);
reduceObservable.reduce((integer, integer2) -> integer + integer2).subscribe(System.out::println);

在上面的代码中,对于第一次发射,integer是1 integer2是2,对于第二次发射,分别是3和3,依此类推。

reduce运算符对于计算总和,附加字符串等很有用。
以下代码查找串联字符串的长度。

Observable<String> reduceStringObservable = Observable.just("Rx", "Java");
reduceStringObservable.reduce((x, y) -> x + y).map(String::length).subscribe(System.out::println);

重复运算符重复发射两次。

Observable.just("Android", "iOS", "Windows")
              .repeat(2)
              .subscribe(System.out::println);

//Prints
//Android
//iOS
//Android
//iOS

scan运算符与reduce不同,它增量打印累加器值

Observable.range(1, 5).scan((integer, integer2) -> integer + integer2).subscribe(System.out::println);

//prints
1
3
6
10
15

4.全部,包含,elementAt

all()运算符检查每个值是否满足条件。
返回true/false。

Observable.range(1, 5).all(i-> i%2==0).subscribe(System.out::println);//prints false
Observable.range(1, 5).map(i-> i*2).all(i-> i%2==0).subscribe(System.out::println); //prints true

contains():它检查提到的值是否存在于可观察批次中。

Observable.range(1, 5).contains(6).subscribe(System.out::println); //prints false
Observable.range(1, 5).contains(4).subscribe(System.out::println); //prints true

elementAt():打印输出值列表中给定索引处的值。

Observable.range(1, 5).elementAt(4).subscribe(System.out::println); //prints 5

5.与众不同,toList,toSortedList

distinct()运算符消除重复值。

Observable.just(1,2,3,1,5,1,2,3).count().subscribe(System.out::println); //prints 8
Observable.just(1,2,3,1,5,1,2,3).distinct().count().subscribe(System.out::println); //prints 4

toList和toSortedList用于将发射转换为另一个可观察到的类型列表。
toSortedList以升序对发射进行排序。
我们还可以设置自己的比较器进行排序。

Observable.just(1, 2, 3, 1, 5, 1, 2, 3).toList().subscribe(System.out::println);
Observable.just(1, 2, 3, 1, 5, 1, 2, 3).toSortedList().subscribe(System.out::println);
Observable.just(1, 10, 20, 2).toSortedList((integer, integer2) -> integer2 < integer ? -1 : integer == integer2 ? 1 : 0).subscribe(System.out::println);

//prints
//[1, 2, 3, 1, 5, 1, 2, 3]
//[1, 1, 1, 2, 2, 3, 3, 5]
//[20, 10, 2, 1]

在上面的代码中,第三个可观察值按降序对值进行排序。

6. concat,合并,zip

concat用于连接可观察对象而不交错它们。
"合并"用于通过插入可观察对象来连接它们,这意味着在" concat"中,新的可观察对象将包含第一个,然后是第二个。
在"合并"中,可以根据到达的时间将它们混合在一起。

Observable.concat(
              Observable.interval(1, TimeUnit.SECONDS).map(id -> "CA" + id),
              Observable.interval(1, TimeUnit.SECONDS).map(id -> "CB" + id))
              .subscribe(System.out::println);

      Thread.sleep(5000);

//Prints:
//CA0
//CA1
//CA2
//CA3
//CA4

注意:我们需要添加睡眠以防止主要功能提前退出。
在concat()方法中,直到第一个可观察到的对象都没有完成,它才移至第二个。
因此,在上面的代码中,第二个不会被打印,因为第一个可观察到的字符会在5秒钟内打印其配额。
让我们使用" take"设置第一个限制。

Observable.concat(
              Observable.interval(1, TimeUnit.SECONDS).take(2).map(id -> "CA" + id),
              Observable.interval(1, TimeUnit.SECONDS).map(id -> "CB" + id))
              .subscribe(System.out::println);

      Thread.sleep(5000);

//prints 
CA0
CA1
CB0
CB1
CB2

合并两个可观察对象

Observable.merge(
              Observable.interval(1, TimeUnit.SECONDS).map(id -> "MA" + id),
              Observable.interval(1, TimeUnit.SECONDS).map(id -> "MB" + id))
              .subscribe(System.out::println);

      Thread.sleep(5000);

//prints 
MB0
MA0
MB1
MA1
MA2
MB2
MB3
MA3
MA4
MB4

如您所见,合并使它们交错。
但这不能保证发射的顺序。

上面的mergeconcat只能在Observables上使用。
为了使它们能够作为运算符工作,我们需要使用mergeWithconcatWith

Observable sourceA = Observable.interval(1, TimeUnit.SECONDS).map(id -> "MA" + id);
Observable sourceB = Observable.interval(1, TimeUnit.SECONDS).map(id -> "MB" + id);

sourceB.mergeWith(sourceA).subscribe(System.out::println);
Thread.sleep(5000);

在上面的代码中使用concatWith来获取concat的相关运算符。

zip用于将每个可观测对象的每个发射配对。
每个可观察值将等待其他可观察值发出当前值,然后在函数中可以使用每个值。
zip可用于连接不同类型。

Observable.zip(
              Observable.interval(1, TimeUnit.SECONDS).map(id -> String.valueOf( (char)(id + 65))),
              Observable.interval(2, TimeUnit.SECONDS), (s1, s2) -> s1 + s2)
              .subscribe(System.out::println);

      Thread.sleep(9000);

//prints
//A0
//B1
//C2
//D3

在上面的代码中,第一个可观察对象每秒发射一个整数。
映射运算符使用ASCII代码将整数转换为相关字符,并将其作为字符串返回。
第二个可观察对象每两秒钟发出一个整数。
因此,第一个可观察到的需要等待。

尝试自己实现zipWith运算符。
它类似于" concatWith"和" mergeWith"。

7.防抖,延迟

"反跳"运算符仅在经过特定时间范围后才从Observable发出一项,而不发出另一项。
这个运算符在Android的SearchView中的EditText等地方非常有用。
通常,输入任何内容后,搜索视图都会使用当前文本调用后端API。
使用反跳运算符可以保存即时呼叫。
它使用户有时间评估他们输入的文本以及他们是否要修改它。
只有经过特定的时间跨度后,才会调用后端API!

Observable.just(1,2,3,4,5,6)
              .debounce(1, TimeUnit.SECONDS)
              .subscribe(System.out::println, System.out::println, () -> System.out.print("OnComplete"));
//Prints 6

在上述情况下,最后一个元素将被打印,因为没有剩余任何内容,并且反跳运算符在延迟后发出该值。

"延迟"运算符将从Observable开始发射值的时间延迟了一定时间。

Observable.just(1,2,3,4,5,6)
              .delay(5, TimeUnit.SECONDS)
              .subscribe(System.out::println, System.out::println, () -> System.out.print("OnComplete"));

      Thread.sleep(4000);

在上面的代码中,什么都没有发生,因为我们的main函数将在4秒后返回,从而防止5秒后发生发射。