RxJava运算符
在本教程中,我们将讨论和实现RxJava拥有的各种运算符。
每个运营商如何转换可观察序列以及订户看到的内容?开始吧!
RxJava运算符
正如我们在第一篇RxJava教程中所看到的,运算符是对Observable起作用的对象,并将转换后的数据传递给Subscriber。
使用多个运算符,每个运算符都会完成自己的任务,然后将转换后的数据传递给下一个运算符。
创建一个新的IntelliJ Java项目并添加RxJava依赖关系以开始使用。
我们将使用函数式编程和lambda来简化代码,减少冗长的代码。
1.地图和过滤器
地图()
import rx.Observable; public class RxOperators { public static void main(String[] args) { //map operator Observable<String> mapObservable = Observable.just("hello world","the observable emits lower case sentences","subscriber sees it as upper case","map operator"); mapObservable.map(String::toUpperCase).subscribe(System.out::println); } } //Prints //HELLO WORLD //THE OBSERVABLE EMITS LOWER CASE SENTENCES //SUBSCRIBER SEES IT AS UPPER CASE //MAP OPERATOR
过滤()
//filter operator Observable<String> filterObservable = Observable.from(new String[]{"Hello","How are you?", "doing"}); filterObservable.filter(string->string.contains(" ")).subscribe(System.out::println); //Prints //How are you?
使用映射和过滤器:返回所有偶数的平方。
//map and filter Observable<Integer> mapFilter = Observable.range(0,10); mapFilter.filter(i-> i%2==0).map(i-> i*i).subscribe(System.out::println);
2.采取,计数和跳过
取算符有三个主要变体。
让我们看看它们中的每一个。
take()
传递Observable发出的前n个项目。takeFirst()
发出满足运算符指定条件的第一项。takeLast()
打印最后n个项目。
与take()相反takeUntil()
会发出项目,直到第二个可观察到的状态才开始发出值。
另外,它也可以用于条件。只要条件为真,
takeWhile()
就会发出项目。
它忽略其余的。
Observable<Integer> takeObservable = Observable.range(0,100); takeObservable.take(5).subscribe(System.out::println); //prints 0 to 4 takeObservable.takeFirst(i -> i*i>1000).subscribe(System.out::println); //prints 32 takeObservable.takeLast(1).subscribe(System.out::println); //prints 99 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5); Observable<Integer> takeUntilObservable = Observable.from(numbers); takeUntilObservable.takeUntil(number -> number>2) .subscribe(System.out::println); //prints 1 to 3 takeObservable.takeWhile(number -> number<4) .subscribe(System.out::println); //prints 1 to 3 takeObservable.takeWhile(number -> number>2) .subscribe(System.out::println); //prints nothing
takeFirst()
函数会打印平方值大于1000的第一个值。takeUntil()
运算符就像do-while循环。
在检查下一次迭代的条件之前,它首先打印该语句。
顾名思义,count()
返回到达订户的值的数量。
Observable<String> countObservable = Observable.from(new String[]{"First","Second", "Third", "Seventh"}); countObservable.filter(string-> string.length()>5).count().subscribe(System.out::println); //prints 2
在上述情况下,将null作为值之一传递将导致运行时崩溃。
因此,我们需要为onError()指定动作。
Observable<String> countObservable = Observable.from(new String[]{"First","Second", "Third", null}); countObservable.filter(string-> string.length()>5).count().subscribe(System.out::println, throwable -> System.out.println("One of the values is not valid")); //prints //One of the values is not valid
skip
运算符具有以下描述和实现的一些变体
skip()
忽略前n个值。skipLast()
忽略了最后n个值skipWhile()
会忽略所有值,直到满足特定条件为止。
它发出所有剩余的值。skipUntil()
将忽略所有值,直到另一个可观察到的开始发射。
我们稍后再看
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5); Observable<Integer> skipObservable = Observable.from(numbers); skipObservable.skip(3).subscribe(System.out::println); //prints 4 and 5 skipObservable.skipLast(3).subscribe(System.out::println); //prints 1 and 2 skipObservable.skipWhile(i-> i<3).subscribe(System.out::println); prints 3 4 and 5
在避免空值时,skip是理想的运算符。
3. startWith,reduce,重复,扫描
startWith()
在发射开始时附加给定的元素。
Observable<String> startWithObservable = Observable.just(" Rx", "Java", " Operators", " Tutorial"); startWithObservable.startWith("Welcome to the").subscribe(System.out::print); //Prints //Welcome to the RxJava Operators Tutorial
reduce()运算符充当累加器。
它将下一个值添加到先前添加的值。
最后为订户打印累计值。
Observable<Integer> reduceObservable = Observable.range(1, 5); reduceObservable.reduce((integer, integer2) -> integer + integer2).subscribe(System.out::println);
在上面的代码中,对于第一次发射,integer是1 integer2是2,对于第二次发射,分别是3和3,依此类推。
reduce运算符对于计算总和,附加字符串等很有用。
以下代码查找串联字符串的长度。
Observable<String> reduceStringObservable = Observable.just("Rx", "Java"); reduceStringObservable.reduce((x, y) -> x + y).map(String::length).subscribe(System.out::println);
重复运算符重复发射两次。
Observable.just("Android", "iOS", "Windows") .repeat(2) .subscribe(System.out::println); //Prints //Android //iOS //Android //iOS
scan
运算符与reduce不同,它增量打印累加器值
Observable.range(1, 5).scan((integer, integer2) -> integer + integer2).subscribe(System.out::println); //prints 1 3 6 10 15
4.全部,包含,elementAt
all()运算符检查每个值是否满足条件。
返回true/false。
Observable.range(1, 5).all(i-> i%2==0).subscribe(System.out::println);//prints false Observable.range(1, 5).map(i-> i*2).all(i-> i%2==0).subscribe(System.out::println); //prints true
contains()
:它检查提到的值是否存在于可观察批次中。
Observable.range(1, 5).contains(6).subscribe(System.out::println); //prints false Observable.range(1, 5).contains(4).subscribe(System.out::println); //prints true
elementAt():打印输出值列表中给定索引处的值。
Observable.range(1, 5).elementAt(4).subscribe(System.out::println); //prints 5
5.与众不同,toList,toSortedList
distinct()
运算符消除重复值。
Observable.just(1,2,3,1,5,1,2,3).count().subscribe(System.out::println); //prints 8 Observable.just(1,2,3,1,5,1,2,3).distinct().count().subscribe(System.out::println); //prints 4
toList和toSortedList用于将发射转换为另一个可观察到的类型列表。
toSortedList以升序对发射进行排序。
我们还可以设置自己的比较器进行排序。
Observable.just(1, 2, 3, 1, 5, 1, 2, 3).toList().subscribe(System.out::println); Observable.just(1, 2, 3, 1, 5, 1, 2, 3).toSortedList().subscribe(System.out::println); Observable.just(1, 10, 20, 2).toSortedList((integer, integer2) -> integer2 < integer ? -1 : integer == integer2 ? 1 : 0).subscribe(System.out::println); //prints //[1, 2, 3, 1, 5, 1, 2, 3] //[1, 1, 1, 2, 2, 3, 3, 5] //[20, 10, 2, 1]
在上面的代码中,第三个可观察值按降序对值进行排序。
6. concat,合并,zip
concat用于连接可观察对象而不交错它们。
"合并"用于通过插入可观察对象来连接它们,这意味着在" concat"中,新的可观察对象将包含第一个,然后是第二个。
在"合并"中,可以根据到达的时间将它们混合在一起。
Observable.concat( Observable.interval(1, TimeUnit.SECONDS).map(id -> "CA" + id), Observable.interval(1, TimeUnit.SECONDS).map(id -> "CB" + id)) .subscribe(System.out::println); Thread.sleep(5000); //Prints: //CA0 //CA1 //CA2 //CA3 //CA4
注意:我们需要添加睡眠以防止主要功能提前退出。
在concat()方法中,直到第一个可观察到的对象都没有完成,它才移至第二个。
因此,在上面的代码中,第二个不会被打印,因为第一个可观察到的字符会在5秒钟内打印其配额。
让我们使用" take"设置第一个限制。
Observable.concat( Observable.interval(1, TimeUnit.SECONDS).take(2).map(id -> "CA" + id), Observable.interval(1, TimeUnit.SECONDS).map(id -> "CB" + id)) .subscribe(System.out::println); Thread.sleep(5000); //prints CA0 CA1 CB0 CB1 CB2
合并两个可观察对象
Observable.merge( Observable.interval(1, TimeUnit.SECONDS).map(id -> "MA" + id), Observable.interval(1, TimeUnit.SECONDS).map(id -> "MB" + id)) .subscribe(System.out::println); Thread.sleep(5000); //prints MB0 MA0 MB1 MA1 MA2 MB2 MB3 MA3 MA4 MB4
如您所见,合并使它们交错。
但这不能保证发射的顺序。
上面的merge
和concat
只能在Observables上使用。
为了使它们能够作为运算符工作,我们需要使用mergeWith
和concatWith
。
Observable sourceA = Observable.interval(1, TimeUnit.SECONDS).map(id -> "MA" + id); Observable sourceB = Observable.interval(1, TimeUnit.SECONDS).map(id -> "MB" + id); sourceB.mergeWith(sourceA).subscribe(System.out::println); Thread.sleep(5000);
在上面的代码中使用concatWith
来获取concat的相关运算符。
zip用于将每个可观测对象的每个发射配对。
每个可观察值将等待其他可观察值发出当前值,然后在函数中可以使用每个值。
zip可用于连接不同类型。
Observable.zip( Observable.interval(1, TimeUnit.SECONDS).map(id -> String.valueOf( (char)(id + 65))), Observable.interval(2, TimeUnit.SECONDS), (s1, s2) -> s1 + s2) .subscribe(System.out::println); Thread.sleep(9000); //prints //A0 //B1 //C2 //D3
在上面的代码中,第一个可观察对象每秒发射一个整数。
映射运算符使用ASCII代码将整数转换为相关字符,并将其作为字符串返回。
第二个可观察对象每两秒钟发出一个整数。
因此,第一个可观察到的需要等待。
尝试自己实现zipWith运算符。
它类似于" concatWith"和" mergeWith"。
7.防抖,延迟
"反跳"运算符仅在经过特定时间范围后才从Observable发出一项,而不发出另一项。
这个运算符在Android的SearchView中的EditText等地方非常有用。
通常,输入任何内容后,搜索视图都会使用当前文本调用后端API。
使用反跳运算符可以保存即时呼叫。
它使用户有时间评估他们输入的文本以及他们是否要修改它。
只有经过特定的时间跨度后,才会调用后端API!
Observable.just(1,2,3,4,5,6) .debounce(1, TimeUnit.SECONDS) .subscribe(System.out::println, System.out::println, () -> System.out.print("OnComplete")); //Prints 6
在上述情况下,最后一个元素将被打印,因为没有剩余任何内容,并且反跳运算符在延迟后发出该值。
"延迟"运算符将从Observable开始发射值的时间延迟了一定时间。
Observable.just(1,2,3,4,5,6) .delay(5, TimeUnit.SECONDS) .subscribe(System.out::println, System.out::println, () -> System.out.print("OnComplete")); Thread.sleep(4000);
在上面的代码中,什么都没有发生,因为我们的main函数将在4秒后返回,从而防止5秒后发生发射。