RxJava可观察对象和观察者

时间:2020-02-23 14:41:44  来源:igfitidea点击:

在本教程中,我们将详细讨论RxJava Observable和Observers。
我们将讨论它们的各种类型以及它们各自提供的功能。

观察者与观察者

在RxJava中,Observables是向观察者发射项目的源。
为了让观察者收听可观察对象,他们需要先订阅。
订阅RxJava2之后创建的实例称为Disposable

为了停止监听Observable,我们可以通过在Disposable实例上调用方法dispose()来取消订阅。

自引入Disposable以来,订阅类在RxJava2中已被弃用。
有关RxJava2新功能的更多信息,请参见此处。

创建可观察物

我们可以通过多种方式创建Observable。
方法之一是:

Observable<Integer> observable = new ObservableCreate<Integer>(new ObservableOnSubscribe<Integer>() {
          @Override
          public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
              emitter.onNext(10);
              emitter.onNext(20);
              emitter.onComplete();
          }
      });

Observable.OnSubscribe是一个接口,用于定义订户订阅Observable时要采取的操作。
只有在将观察者订阅到Observable时,subscribe方法才会运行。

onNext用于发射下一个项目。
发生错误时会触发" onError"。
发出最后一项后,调用onComplete。

现在,为了捕获这些值,我们必须订阅一个观察者。
为此,我们必须首先创建一个观察者:

Observer<Integer> observer = new Observer<Integer>() {
          @Override
          public void onSubscribe(Disposable d) {
              System.out.println("onSubscribe");
          }

          @Override
          public void onNext(Integer o) {
              System.out.println("onNext " + o);
          }

          @Override
          public void onError(Throwable e) {

          }

          @Override
          public void onComplete() {
              System.out.println("onComplete");
          }
      };

让我们订阅它:

observable.subscribe(observer);

这将在观察者和可观察者之间创建一个订阅。
Observable现在将发出由Observer的onNext捕获的值。

控制台的输出为:

Output:
onSubscribe
onNext 10
onNext 20
onComplete

如果您多次" subscribe()",则每次发射该项目。

创建可观察物的方法

我们可以通过以下方式创建Observables:

  • Observable.from()

  • Observable.just()–其中传递一个或者多个值。

  • Observable.range –第一个参数需要起始值。
    第二个期望大小。
    例如:Observable.range(1,2)将发出1和2。

  • Observable.interval()–发出定义的时间间隔中的值。
    发出的值将是Long类型。
    稍后再详细介绍。

有关更多信息,请查看RxJava教程。

冷观测和热观测

冷可观察变量是发出一个或者多个值以便每个订阅服务器将从开始就接收所有值的可观察变量。

热点观察者是指观察者在订阅之前无法接收发出的项目的观察者。
只能接收发出观察者之后发出的项目。

我们上面定义的示例是"冷观测"。

要创建热点可观察对象,请执行以下操作:

Observable<Long> observableInterval = Observable.interval(2, TimeUnit.SECONDS);

      PublishSubject<Long> publishSubject = PublishSubject.create();
      observableInterval.subscribe(publishSubject);

      publishSubject.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l));

      try {
          Thread.sleep(2000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }

      publishSubject.subscribe(l -> System.out.println("Subscriber #2 onNext: " + l));

      try {
          Thread.sleep(10000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }

要创建热点可观察对象,我们需要使用主题。
主体可以在任何给定时间充当可观察者或者观察者。

从0开始的值将每2秒发出一次。
订阅第一个观察者后,我们将线程设置为休眠2秒钟。
因此,第二个观察者将不会获得最初发出的物品,如下面的输出所示:

可观察物的类型

以下是Observable的主要类型,每种类型的功能和用例略有不同:

  • 可观察–发出一个或者多个值。
    上面我们已经讨论过了。

  • Single –发出单个值或者引发错误。

  • 可能–这可能会或者可能不会发出值。
    当您需要一些数据时应使用。

  • 可流动–在必须发射大量数据时使用。
    用于背压。
    稍后再详细介绍。

  • 可完成–只会产生成功或者失败。
    没有数据被发射。

观察者类型

对于上面的每个Observable类型,RxJava中也都有Observer类型。

  • 观察员
  • 单次可观察
  • 也许可以观察
  • CompletableObserver

Flowable Observable使用默认的Observer。
由于RxJava2 for Flowables,将使用订阅服务器代替观察者

现在,让我们来看一下带有观察者的每个观察者的基本实现。

这仅发出一个值。
可以与翻新网络调用一起使用。

以下是Single的示例:

Observable<Integer> integerObservable = Observable.just(1,2,3);

      Single<Integer> integerSingle = integerObservable.single(1);

      integerSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError"));

      integerSingle = integerObservable.singleOrError();

      integerSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError"));

在两种情况下都将产生一个" onError",因为两者都不具有单个值。

single(Integer defaultValue)和singleOrError()只是其中两个方法。
我们可以添加很多其他运算符,以及" all"," any"," contains"," count"等。
通常,在这些方法中设置谓词,这些谓词将返回单个值。

例:

Observable<Integer> integerObservable = Observable.just(1, 2, 3);

      Single<Boolean> booleanSingle = integerObservable.any(new Predicate<Integer>() {
          @Override
          public boolean test(Integer integer) throws Exception {
              return integer % 2 == 0;
          }
      });

      booleanSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError"));

      Single<Long> integerSingle = integerObservable.count();

      integerSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError"));

这将分别输出" true"和" 3"。

Maybe

也许发出0或者1个项目。
MaybeObserver使用方法onSuccess来代替onNext()。

以下是使用Maybe的示例,其中我们从"可观察的整数"中打印最大数量。

Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5);

      Maybe<Integer> integerMaybe = integerObservable.reduce(new BiFunction<Integer, Integer, Integer>() {
          @Override
          public Integer apply(Integer integer, Integer integer2) throws Exception {
              if (integer > integer2)
                  return integer;
              else
                  return integer2;
          }
      });

      MaybeObserver<Integer> maybeObserver = new MaybeObserver<Integer>() {
          @Override
          public void onSubscribe(Disposable d) {
              System.out.println("onSubscribe");
          }

          @Override
          public void onSuccess(Integer o) {
              System.out.println("onSuccess : " + o);
          }

          @Override
          public void onError(Throwable e) {
              System.out.println("onError");
          }

          @Override
          public void onComplete() {
              System.out.println("onComplete");
          }
      };

      integerMaybe.subscribe(maybeObserver);

打印5。

除了reduce函数外,还有很多其他函数,例如firstElement(),lastElement()等。

也许与"单一"相似,除了也许也允许零排放。
要创建可观测的零排放,请执行以下操作:

Maybe<Integer> emptySource = Maybe.empty();

可完成

当您需要知道某个操作是否成功完成时,可以使用"完成"。
示例:将图像上传到服务器。
与Maybe和Single不同,CompletableObserver完全不返回任何值。
Completable Observable也没有类型。

例:

Observable<Integer> integerObservable = Observable.empty();

      Completable completable = integerObservable.ignoreElements();

      CompletableObserver completableObserver = new CompletableObserver() {
          @Override
          public void onSubscribe(Disposable d) {
              System.out.println("onSubscribe");
          }

          @Override
          public void onComplete() {
              System.out.println("onComplete");
          }

          @Override
          public void onError(Throwable e) {
              System.out.println("onError");
          }
      };

      completable.subscribe(completableObserver);

流动性

当您需要处理大量数据时,可以使用Flowable。
它支持背压。
我们将在另一个教程中详细讨论它。
从RxJava2开始,目前Flowable Observable需要一个Subscriber类作为Observer。

以下是Flowable的示例:

Flowable<Integer> integerFlowable = Flowable.range(1,500000);

      integerFlowable.reduce(new BiFunction<Integer, Integer, Integer>() {
          @Override
          public Integer apply(Integer integer, Integer integer2) throws Exception {
              return integer + integer2;
          }
      });

      Subscriber<Integer> integerSubscriber = new Subscriber<Integer>() {
          @Override
          public void onSubscribe(Subscription s) {
              System.out.println("onSubscribe");
              s.request(Long.MAX_VALUE);
          }

          @Override
          public void onNext(Integer integer) {
              System.out.println("onNext: " + integer);
          }

          @Override
          public void onError(Throwable t) {

          }

          @Override
          public void onComplete() {
              System.out.println("onComplete");
          }
      };

      integerFlowable.subscribe(integerSubscriber);

为了使订阅者开始接收排放,我们必须如上所述在Subscription实例上手动调用request()。

在可观察对象之间转换

我们有各种辅助方法可以将Observable类型转换为另一个类型。

例如:

要将任何类型转换为Completable,可以使用以下两种方法之一:

  • totoCompletable()
  • ignoreElements()

同样,要转换为Observable,toObservable()方法就足够了。
Flowable –toFlowable()可能–toMaybe()单个– reduce()/firstElement()等。