RxJava运算符
在本教程中,我们将讨论和实现RxJava拥有的各种运算符。
每个运营商如何转换可观察序列以及订户看到的内容?开始吧!
RxJava运算符
正如我们在第一篇RxJava教程中所看到的,运算符是对Observable起作用的对象,并将转换后的数据传递给Subscriber。
使用多个运算符,每个运算符都会完成自己的任务,然后将转换后的数据传递给下一个运算符。
创建一个新的IntelliJ Java项目并添加RxJava依赖关系以开始使用。
我们将使用函数式编程和lambda来简化代码,减少冗长的代码。
1.地图和过滤器
地图()
import rx.Observable;
public class RxOperators {
public static void main(String[] args)
{
//map operator
Observable<String> mapObservable = Observable.just("hello world","the observable emits lower case sentences","subscriber sees it as upper case","map operator");
mapObservable.map(String::toUpperCase).subscribe(System.out::println);
}
}
//Prints
//HELLO WORLD
//THE OBSERVABLE EMITS LOWER CASE SENTENCES
//SUBSCRIBER SEES IT AS UPPER CASE
//MAP OPERATOR
过滤()
//filter operator
Observable<String> filterObservable = Observable.from(new String[]{"Hello","How are you?", "doing"});
filterObservable.filter(string->string.contains(" ")).subscribe(System.out::println);
//Prints
//How are you?
使用映射和过滤器:返回所有偶数的平方。
//map and filter Observable<Integer> mapFilter = Observable.range(0,10); mapFilter.filter(i-> i%2==0).map(i-> i*i).subscribe(System.out::println);
2.采取,计数和跳过
取算符有三个主要变体。
让我们看看它们中的每一个。
take()传递Observable发出的前n个项目。takeFirst()发出满足运算符指定条件的第一项。takeLast()打印最后n个项目。
与take()相反takeUntil()会发出项目,直到第二个可观察到的状态才开始发出值。
另外,它也可以用于条件。只要条件为真,
takeWhile()就会发出项目。
它忽略其余的。
Observable<Integer> takeObservable = Observable.range(0,100);
takeObservable.take(5).subscribe(System.out::println); //prints 0 to 4
takeObservable.takeFirst(i -> i*i>1000).subscribe(System.out::println); //prints 32
takeObservable.takeLast(1).subscribe(System.out::println); //prints 99
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Observable<Integer> takeUntilObservable = Observable.from(numbers);
takeUntilObservable.takeUntil(number -> number>2)
.subscribe(System.out::println); //prints 1 to 3
takeObservable.takeWhile(number -> number<4)
.subscribe(System.out::println); //prints 1 to 3
takeObservable.takeWhile(number -> number>2)
.subscribe(System.out::println); //prints nothing
takeFirst()函数会打印平方值大于1000的第一个值。takeUntil()运算符就像do-while循环。
在检查下一次迭代的条件之前,它首先打印该语句。
顾名思义,count()返回到达订户的值的数量。
Observable<String> countObservable = Observable.from(new String[]{"First","Second", "Third", "Seventh"});
countObservable.filter(string-> string.length()>5).count().subscribe(System.out::println); //prints 2
在上述情况下,将null作为值之一传递将导致运行时崩溃。
因此,我们需要为onError()指定动作。
Observable<String> countObservable = Observable.from(new String[]{"First","Second", "Third", null});
countObservable.filter(string-> string.length()>5).count().subscribe(System.out::println, throwable -> System.out.println("One of the values is not valid"));
//prints
//One of the values is not valid
skip运算符具有以下描述和实现的一些变体
skip()忽略前n个值。skipLast()忽略了最后n个值skipWhile()会忽略所有值,直到满足特定条件为止。
它发出所有剩余的值。skipUntil()将忽略所有值,直到另一个可观察到的开始发射。
我们稍后再看
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Observable<Integer> skipObservable = Observable.from(numbers);
skipObservable.skip(3).subscribe(System.out::println); //prints 4 and 5
skipObservable.skipLast(3).subscribe(System.out::println); //prints 1 and 2
skipObservable.skipWhile(i-> i<3).subscribe(System.out::println); prints 3 4 and 5
在避免空值时,skip是理想的运算符。
3. startWith,reduce,重复,扫描
startWith()在发射开始时附加给定的元素。
Observable<String> startWithObservable = Observable.just(" Rx", "Java", " Operators", " Tutorial");
startWithObservable.startWith("Welcome to the").subscribe(System.out::print);
//Prints
//Welcome to the RxJava Operators Tutorial
reduce()运算符充当累加器。
它将下一个值添加到先前添加的值。
最后为订户打印累计值。
Observable<Integer> reduceObservable = Observable.range(1, 5); reduceObservable.reduce((integer, integer2) -> integer + integer2).subscribe(System.out::println);
在上面的代码中,对于第一次发射,integer是1 integer2是2,对于第二次发射,分别是3和3,依此类推。
reduce运算符对于计算总和,附加字符串等很有用。
以下代码查找串联字符串的长度。
Observable<String> reduceStringObservable = Observable.just("Rx", "Java");
reduceStringObservable.reduce((x, y) -> x + y).map(String::length).subscribe(System.out::println);
重复运算符重复发射两次。
Observable.just("Android", "iOS", "Windows")
.repeat(2)
.subscribe(System.out::println);
//Prints
//Android
//iOS
//Android
//iOS
scan运算符与reduce不同,它增量打印累加器值
Observable.range(1, 5).scan((integer, integer2) -> integer + integer2).subscribe(System.out::println); //prints 1 3 6 10 15
4.全部,包含,elementAt
all()运算符检查每个值是否满足条件。
返回true/false。
Observable.range(1, 5).all(i-> i%2==0).subscribe(System.out::println);//prints false Observable.range(1, 5).map(i-> i*2).all(i-> i%2==0).subscribe(System.out::println); //prints true
contains():它检查提到的值是否存在于可观察批次中。
Observable.range(1, 5).contains(6).subscribe(System.out::println); //prints false Observable.range(1, 5).contains(4).subscribe(System.out::println); //prints true
elementAt():打印输出值列表中给定索引处的值。
Observable.range(1, 5).elementAt(4).subscribe(System.out::println); //prints 5
5.与众不同,toList,toSortedList
distinct()运算符消除重复值。
Observable.just(1,2,3,1,5,1,2,3).count().subscribe(System.out::println); //prints 8 Observable.just(1,2,3,1,5,1,2,3).distinct().count().subscribe(System.out::println); //prints 4
toList和toSortedList用于将发射转换为另一个可观察到的类型列表。
toSortedList以升序对发射进行排序。
我们还可以设置自己的比较器进行排序。
Observable.just(1, 2, 3, 1, 5, 1, 2, 3).toList().subscribe(System.out::println); Observable.just(1, 2, 3, 1, 5, 1, 2, 3).toSortedList().subscribe(System.out::println); Observable.just(1, 10, 20, 2).toSortedList((integer, integer2) -> integer2 < integer ? -1 : integer == integer2 ? 1 : 0).subscribe(System.out::println); //prints //[1, 2, 3, 1, 5, 1, 2, 3] //[1, 1, 1, 2, 2, 3, 3, 5] //[20, 10, 2, 1]
在上面的代码中,第三个可观察值按降序对值进行排序。
6. concat,合并,zip
concat用于连接可观察对象而不交错它们。
"合并"用于通过插入可观察对象来连接它们,这意味着在" concat"中,新的可观察对象将包含第一个,然后是第二个。
在"合并"中,可以根据到达的时间将它们混合在一起。
Observable.concat(
Observable.interval(1, TimeUnit.SECONDS).map(id -> "CA" + id),
Observable.interval(1, TimeUnit.SECONDS).map(id -> "CB" + id))
.subscribe(System.out::println);
Thread.sleep(5000);
//Prints:
//CA0
//CA1
//CA2
//CA3
//CA4
注意:我们需要添加睡眠以防止主要功能提前退出。
在concat()方法中,直到第一个可观察到的对象都没有完成,它才移至第二个。
因此,在上面的代码中,第二个不会被打印,因为第一个可观察到的字符会在5秒钟内打印其配额。
让我们使用" take"设置第一个限制。
Observable.concat(
Observable.interval(1, TimeUnit.SECONDS).take(2).map(id -> "CA" + id),
Observable.interval(1, TimeUnit.SECONDS).map(id -> "CB" + id))
.subscribe(System.out::println);
Thread.sleep(5000);
//prints
CA0
CA1
CB0
CB1
CB2
合并两个可观察对象
Observable.merge(
Observable.interval(1, TimeUnit.SECONDS).map(id -> "MA" + id),
Observable.interval(1, TimeUnit.SECONDS).map(id -> "MB" + id))
.subscribe(System.out::println);
Thread.sleep(5000);
//prints
MB0
MA0
MB1
MA1
MA2
MB2
MB3
MA3
MA4
MB4
如您所见,合并使它们交错。
但这不能保证发射的顺序。
上面的merge和concat只能在Observables上使用。
为了使它们能够作为运算符工作,我们需要使用mergeWith和concatWith。
Observable sourceA = Observable.interval(1, TimeUnit.SECONDS).map(id -> "MA" + id); Observable sourceB = Observable.interval(1, TimeUnit.SECONDS).map(id -> "MB" + id); sourceB.mergeWith(sourceA).subscribe(System.out::println); Thread.sleep(5000);
在上面的代码中使用concatWith来获取concat的相关运算符。
zip用于将每个可观测对象的每个发射配对。
每个可观察值将等待其他可观察值发出当前值,然后在函数中可以使用每个值。
zip可用于连接不同类型。
Observable.zip(
Observable.interval(1, TimeUnit.SECONDS).map(id -> String.valueOf( (char)(id + 65))),
Observable.interval(2, TimeUnit.SECONDS), (s1, s2) -> s1 + s2)
.subscribe(System.out::println);
Thread.sleep(9000);
//prints
//A0
//B1
//C2
//D3
在上面的代码中,第一个可观察对象每秒发射一个整数。
映射运算符使用ASCII代码将整数转换为相关字符,并将其作为字符串返回。
第二个可观察对象每两秒钟发出一个整数。
因此,第一个可观察到的需要等待。
尝试自己实现zipWith运算符。
它类似于" concatWith"和" mergeWith"。
7.防抖,延迟
"反跳"运算符仅在经过特定时间范围后才从Observable发出一项,而不发出另一项。
这个运算符在Android的SearchView中的EditText等地方非常有用。
通常,输入任何内容后,搜索视图都会使用当前文本调用后端API。
使用反跳运算符可以保存即时呼叫。
它使用户有时间评估他们输入的文本以及他们是否要修改它。
只有经过特定的时间跨度后,才会调用后端API!
Observable.just(1,2,3,4,5,6)
.debounce(1, TimeUnit.SECONDS)
.subscribe(System.out::println, System.out::println, () -> System.out.print("OnComplete"));
//Prints 6
在上述情况下,最后一个元素将被打印,因为没有剩余任何内容,并且反跳运算符在延迟后发出该值。
"延迟"运算符将从Observable开始发射值的时间延迟了一定时间。
Observable.just(1,2,3,4,5,6)
.delay(5, TimeUnit.SECONDS)
.subscribe(System.out::println, System.out::println, () -> System.out.print("OnComplete"));
Thread.sleep(4000);
在上面的代码中,什么都没有发生,因为我们的main函数将在4秒后返回,从而防止5秒后发生发射。

