Javascript 如何获取 RxJS Subject 或 Observable 的当前值?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/37089977/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-23 19:53:47  来源:igfitidea点击:

How to get current value of RxJS Subject or Observable?

javascriptangularrxjs

提问by Baconbeastnz

I have an Angular 2 service:

我有一个 Angular 2 服务:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {
  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
  }
}

Everything works great. But I have another component which doesn't need to subscribe, it just needs to get the current value of isLoggedIn at a certain point in time. How can I do this?

一切都很好。但是我有另一个不需要订阅的组件,它只需要在某个时间点获取 isLoggedIn 的当前值。我怎样才能做到这一点?

回答by Günter Z?chbauer

A Subjector Observabledoesn't have a current value. When a value is emitted, it is passed to subscribers and the Observableis done with it.

A SubjectorObservable没有当前值。当一个值被发出时,它被传递给订阅者并Observable完成它。

If you want to have a current value, use BehaviorSubjectwhich is designed for exactly that purpose. BehaviorSubjectkeeps the last emitted value and emits it immediately to new subscribers.

如果您想获得当前值,请使用BehaviorSubject专为此目的而设计的值。BehaviorSubject保留最后发出的值并立即将其发送给新订阅者。

It also has a method getValue()to get the current value.

它还具有getValue()获取当前值的方法。

回答by Ben Lesh

The only way you shouldbe getting values "out of" an Observable/Subject is with subscribe!

应该从 Observable/Subject 中“获取”值的唯一方法是订阅!

If you're using getValue()you're doing something imperative in declarative paradigm. It's there as an escape hatch, but 99.9% of the time you should NOT use getValue().There are a few interesting things that getValue()will do: It will throw an error if the subject has been unsubscribed, it will prevent you from getting a value if the subject is dead because it's errored, etc. But, again, it's there as an escape hatch for rare circumstances.

如果您正在使用,getValue()那么您就是在声明式范式中做一些必要的事情。它作为逃生舱口存在,但在99.9% 的情况下您不应该使用getValue(). 有一些有趣的事情getValue()会做:如果主题已取消订阅,它将引发错误,如果主题因错误而死亡,它将阻止您获得值等。但是,再次,它是作为转义而存在的在罕见的情况下孵化。

There are several ways of getting the latest value from a Subject or Observable in a "Rx-y" way:

有几种方法可以以“Rx-y”方式从 Subject 或 Observable 获取最新值:

  1. Using BehaviorSubject: But actually subscribing to it. When you first subscribe to BehaviorSubjectit will synchronously send the previous value it received or was initialized with.
  2. Using a ReplaySubject(N): This will cache Nvalues and replay them to new subscribers.
  3. A.withLatestFrom(B): Use this operator to get the most recent value from observable Bwhen observable Aemits. Will give you both values in an array [a, b].
  4. A.combineLatest(B): Use this operator to get the most recent values from Aand Bevery time either Aor Bemits. Will give you both values in an array.
  5. shareReplay(): Makes an Observable multicast through a ReplaySubject, but allows you to retry the observable on error. (Basically it gives you that promise-y caching behavior).
  6. publishReplay(), publishBehavior(initialValue), multicast(subject: BehaviorSubject | ReplaySubject), etc: Other operators that leverage BehaviorSubjectand ReplaySubject. Different flavors of the same thing, they basically multicast the source observable by funneling all notifications through a subject. You need to call connect()to subscribe to the source with the subject.
  1. 使用BehaviorSubject:但实际上订阅了它。当您第一次订阅BehaviorSubject它时,它将同步发送它接收到或初始化的先前值。
  2. 使用 a ReplaySubject(N):这将缓存N值并将它们重播给新订阅者。
  3. A.withLatestFrom(B):B当 observableA发出时,使用此运算符从 observable 获取最新值。会给你一个数组中的两个值[a, b]
  4. A.combineLatest(B): 使用此运算符从AB每次发出A或时获取最新值B。会给你一个数组中的两个值。
  5. shareReplay(): 通过 a 进行 Observable 多播ReplaySubject,但允许您在出错时重试 observable。(基本上它为您提供了承诺的缓存行为)。
  6. publishReplay()publishBehavior(initialValue)multicast(subject: BehaviorSubject | ReplaySubject)等: 其他利用BehaviorSubject和 的运算符ReplaySubject。同一事物的不同风格,它们基本上通过将所有通知汇集到一个主题来多播源 observable。您需要调用connect()以订阅带有主题的源。

回答by Kfir Erez

I had similar situation where late subscribers subscribe to the Subject after its value arrived.

我遇到过类似的情况,迟到的订阅者在它的价值到达后订阅它。

I found ReplaySubjectwhich is similar to BehaviorSubject works like a charm in this case. And here is a link to better explanation: http://reactivex.io/rxjs/manual/overview.html#replaysubject

我发现与 BehaviorSubject 类似的ReplaySubject在这种情况下就像一个魅力。这是更好解释的链接:http: //reactivex.io/rxjs/manual/overview.html#replaysubject

回答by Keerati Limkulphong

const observable = of('response')

function hasValue(value: any) {
  return value !== null && value !== undefined;
}

function getValue<T>(observable: Observable<T>): Promise<T> {
  return observable
    .pipe(
      filter(hasValue),
      first()
    )
    .toPromise();
}

const result = await getValue(observable)
// Do the logic with the result
// .................
// .................
// .................

You can check the full article on how to implement it from here. https://www.imkrish.com/how-to-get-current-value-of-observable-in-a-clean-way/

您可以从此处查看有关如何实现它的完整文章。 https://www.imkrish.com/how-to-get-current-value-of-observable-in-a-clean-way/

回答by Molp Burnbright

I encountered the same problem in child components where initially it would have to have the current value of the Subject, then subscribe to the Subject to listen to changes. I just maintain the current value in the Service so it is available for components to access, e.g. :

我在子组件中遇到了同样的问题,最初它必须具有主题的当前值,然后订阅主题以听取更改。我只是维护服务中的当前值,以便组件可以访问它,例如:

import {Storage} from './storage';
import {Injectable} from 'angular2/core';
import {Subject}    from 'rxjs/Subject';

@Injectable()
export class SessionStorage extends Storage {

  isLoggedIn: boolean;

  private _isLoggedInSource = new Subject<boolean>();
  isLoggedIn = this._isLoggedInSource.asObservable();
  constructor() {
    super('session');
    this.currIsLoggedIn = false;
  }
  setIsLoggedIn(value: boolean) {
    this.setItem('_isLoggedIn', value, () => {
      this._isLoggedInSource.next(value);
    });
    this.isLoggedIn = value;
  }
}

A component that needs the current value could just then access it from the service, i.e,:

需要当前值的组件可以从服务中访问它,即:

sessionStorage.isLoggedIn

Not sure if this is the right practice :)

不确定这是否是正确的做法:)

回答by Simon_Weaver

A similar lookinganswer was downvoted. But I think I can justify what I'm suggesting here for limited cases.

类似的寻找答案downvoted。但我想我可以证明我在这里建议的有限情况是合理的。



While it's true that an observable doesn't have a currentvalue, very often it will have an immediately availablevalue. For example with redux / flux / akita stores you may request data from a central store, based on a number of observables and that value will generally be immediately available.

虽然 observable 确实没有当前值,但它通常具有立即可用的值。例如,对于 redux/flux/akita 商店,您可以从中央商店请求数据,基于许多可观察的数据,并且该值通常会立即可用。

If this is the case then when you subscribe, the value will come back immediately.

如果是这种情况,那么当您 时subscribe,该值将立即返回。

So let's say you had a call to a service, and on completion you want to get the latest value of something from your store, that potentially might not emit:

因此,假设您有一个服务调用,并且在完成后您想从您的商店中获取某物的最新值,这可能不会发出

You might try to do this (and you should as much as possible keep things 'inside pipes'):

您可能会尝试这样做(并且您应该尽可能将东西放在“管道内”):

 serviceCallResponse$.pipe(withLatestFrom(store$.select(x => x.customer)))
                     .subscribe(([ serviceCallResponse, customer] => {

                        // we have serviceCallResponse and customer 
                     });

The problem with this is that it will block until the secondary observable emits a value, which potentially could be never.

这样做的问题是它会阻塞,直到辅助 observable 发出一个值,这可能永远不会。

I found myself recently needing to evaluate an observable only if a value was immediately available, and more importantly I needed to be able to detect if it wasn't. I ended up doing this:

我发现自己最近只需要在一个值立即可用时才需要评估一个 observable ,更重要的是我需要能够检测它是否不是。我最终这样做了:

 serviceCallResponse$.pipe()
                     .subscribe(serviceCallResponse => {

                        // immediately try to subscribe to get the 'available' value
                        // note: immediately unsubscribe afterward to 'cancel' if needed
                        let customer = undefined;

                        // whatever the secondary observable is
                        const secondary$ = store$.select(x => x.customer);

                        // subscribe to it, and assign to closure scope
                        sub = secondary$.pipe(take(1)).subscribe(_customer => customer = _customer);
                        sub.unsubscribe();

                        // if there's a delay or customer isn't available the value won't have been set before we get here
                        if (customer === undefined) 
                        {
                           // handle, or ignore as needed
                           return throwError('Customer was not immediately available');
                        }
                     });

Note that for all of the above I'm using subscribeto get the value (as @Ben discusses). Not using a .valueproperty, even if I had a BehaviorSubject.

请注意,对于上述所有内容,我都使用它subscribe来获取值(如@Ben 讨论的那样)。不使用.value属性,即使我有一个BehaviorSubject.

回答by j3ff

Although it may sound overkill, this is just another "possible" solution to keep Observabletype and reduce boilerplate...

尽管这听起来有些矫枉过正,但这只是保持Observable类型和减少样板文件的另一种“可能”解决方案......

You could always create an extension getterto get the current value of an Observable.

你总是可以创建一个扩展 getter来获取 Observable 的当前值。

To do this you would need to extend the Observable<T>interface in a global.d.tstypings declaration file. Then implement the extension getterin a observable.extension.tsfile and finally include both typings and extension file to your application.

为此,您需要Observable<T>在类型global.d.ts声明文件中扩展接口。然后在文件中实现扩展 getterobservable.extension.ts最后将类型和扩展文件包含到您的应用程序中。

You can refer to this StackOverflow Answerto know how to include the extensions into your Angular application.

您可以参考此StackOverflow Answer以了解如何将扩展包含到您的 Angular 应用程序中。

// global.d.ts
declare module 'rxjs' {
  interface Observable<T> {
    /**
     * _Extension Method_ - Returns current value of an Observable.
     * Value is retrieved using _first()_ operator to avoid the need to unsubscribe.
     */
    value: Observable<T>;
  }
}

// observable.extension.ts
Object.defineProperty(Observable.prototype, 'value', {
  get <T>(this: Observable<T>): Observable<T> {
    return this.pipe(
      filter(value => value !== null && value !== undefined),
      first());
  },
});

// using the extension getter example
this.myObservable$.value
  .subscribe(value => {
    // whatever code you need...
  });

回答by yaya

The best way to do this is using Behaviur Subject, here is an example:

最好的方法是使用Behaviur Subject,这是一个例子:

var sub = new rxjs.BehaviorSubject([0, 1])
sub.next([2, 3])
setTimeout(() => {sub.next([4, 5])}, 1500)
sub.subscribe(a => console.log(a)) //2, 3 (current value) -> wait 2 sec -> 4, 5

回答by Slawa

You could store the last emitted value separately from the Observable. Then read it when needed.

您可以将最后发出的值与 Observable 分开存储。然后在需要时阅读。

let lastValue: number;

const subscription = new Service().start();
subscription
    .subscribe((data) => {
        lastValue = data;
    }
);