Javascript 多次订阅 Observable

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/36814995/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-23 19:35:35  来源:igfitidea点击:

Multiple subscriptions to Observable

javascriptrxjs

提问by cgross

I create my own Observableand subscribed two functions to it. I would expect to have both functions executed for each element in the sequence but only the last one is.

我创建了自己的Observable并订阅了两个函数。我希望为序列中的每个元素执行这两个函数,但只有最后一个是。

let observer = null
const notificationArrayStream = Rx.Observable.create(function (obs) {
  observer = obs;
  return () => {}
})

function trigger(something) {
  observer.next(something)
}

notificationArrayStream.subscribe((x) => console.log('a: ' + x))
notificationArrayStream.subscribe((x) => console.log('b: ' + x))

trigger('TEST')

Expected output

预期输出

a: TEST
b: TEST

Actual output

实际产量

b: TEST

Here's the JSBin: http://jsbin.com/cahoyey/edit?js,console

这是 JSBin:http://jsbin.com/cahoyey/edit?js,console

Why is that? How can I have multiple functions subscribed to a single Observable?

这是为什么?如何将多个功能订阅到一个Observable

采纳答案by user3743222

To have multiple functions subscribe to a single Observable, just subscribe them to that observable, it is that simple. And actually that's what you did.

要让多个函数订阅单个 Observable,只需将它们订阅到那个 observable,就这么简单。事实上,这就是你所做的。

BUT your code does not work because after notificationArrayStream.subscribe((x) => console.log('b: ' + x))is executed, observeris (x) => console.log('b: ' + x)), so observer.nextwill give you b: TEST.

但是您的代码不起作用,因为notificationArrayStream.subscribe((x) => console.log('b: ' + x))执行后observer(x) => console.log('b: ' + x)),因此observer.next会给您b: TEST.

So basically it is your observable creation which is wrong. In createyou passed an observer as parameter so you can pass it values. Those values you need to generate somehow through your own logic, but as you can see your logic here is erroneous. I would recommend you use a subject if you want to push values to the observer.

所以基本上是你的可观察创造是错误的。在create你传递了一个观察者作为参数,所以你可以传递它的值。您需要通过自己的逻辑以某种方式生成这些值,但正如您所看到的,这里的逻辑是错误的。如果您想将值推送给观察者,我建议您使用主题。

Something like:

就像是:

const notificationArrayStream = Rx.Observable.create(function (obs) {
  mySubject.subscribe(obs);
  return () => {}
})

function trigger(something) {
  mySubject.next(something)
}

回答by Mobiletainment

Subject

主题

In your case, you could simply use a Subject. A subjectallows you to share a single execution with multiple observerswhen using it as a proxy for a group of subscribers and a source.

在您的情况下,您可以简单地使用Subject. 当将主体用作一组订阅者和源的代理时,主体允许您与多个观察者共享单个执行

In essence, here's your example using a subject:

本质上,这是您使用主题的示例:

const subject = new Subject();

function trigger(something) {
    subject.next(something);
}

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

trigger('TEST');

Result:

结果:

a: TEST
b: TEST


Pitfall: Observers arriving too late

陷阱:观察者到达太晚

Note that the timing of when you subscribe and when you broadcast the data is relevant. If you send a broadcast before subscribing, you're not getting notified by this broadcast:

请注意,订阅和广播数据的时间是相关的。如果您在订阅前发送广播,您将不会收到此广播的通知:

function trigger(something) {
    subject.next(something);
}

trigger('TEST');

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

Result: (empty)

结果:(空)



ReplaySubject & BehaviorSubject

ReplaySubject 和 BehaviorSubject

If you want to ensure that even future subscribers get notified, you can use a ReplaySubjector a BehaviorSubjectinstead.

如果您想确保即使是未来的订阅者也能收到通知,您可以改用ReplaySubjectBehaviorSubject

Here's an example using a ReplaySubject(with a cache-size of 5, meaning up to 5 values from the past will be remembered, as opposed to a BehaviorSubjectwhich can remember only the last value):

这是一个使用 a 的示例ReplaySubject(缓存大小为 5,这意味着最多可以记住过去的 5 个值,而BehaviorSubject只能记住最后一个值):

const subject = new ReplaySubject(5); // buffer size is 5

function trigger(something) {
    subject.next(something);
}

trigger('TEST');

subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));

Result:

结果:

a: TEST
b: TEST

回答by Luca Bertolasi

Every time you subscribe, you are overriding the var observer.

每次订阅时,您都会覆盖 var观察者

The triggerfunction only reference this one var, hence no surprise there is only one log.

触发功能只能引用这一个变种,因此毫不奇怪,只有一个日志。

If we make the var an array it works as intended:JS Bin

如果我们将 var 设为数组,它会按预期工作:JS Bin

let obs = [];

let foo = Rx.Observable.create(function (observer) {
  obs.push(observer);
});

function trigger(sth){
//   console.log('trigger fn');
  obs.forEach(ob => ob.next(sth));
}

foo.subscribe(function (x) {
  console.log(`a:${x}`);
});
foo.subscribe(function (y) {
  console.log(`b:${y}`);
});

trigger(1);
trigger(2);
trigger(3);
trigger(4);

A cleaner solution would be to use Subject, as suggested above.

更清洁的解决方案是使用Subject,如上所述。

回答by smnbbrv

Observables are not multicasting; unless you use any kind of Subject. You can of course create Subject, pipe the Observableoutput into like other answers propose.

Observable 不是多播的;除非您使用任何类型的Subject. 您当然可以创建主题,将Observable输出通过管道传输到其他建议的答案中。

However if you already havean Observalbe, it is way more convenient to use share()that turns Observableinto Subjector shareReplay(n)which would be equivalent for ReplaySubject(n):

不过,如果你已经有一个Observalbe,它使用的方式更加方便share(),轮流ObservableSubject或者shareReplay(n)这将是等效ReplaySubject(n)

import {share} from 'rxjs/operators';

let observer = null

const notificationArrayStream = new Observable(obs => {
  observer = obs;
}).pipe(share());

function trigger(something) {
  observer.next(something)
}

notificationArrayStream.subscribe((x) => console.log('a: ' + x))
notificationArrayStream.subscribe((x) => console.log('b: ' + x))

trigger('TEST')

That's pretty much it.

差不多就是这样。

回答by golfadas

Instead of using a Subject, it is also possible to use the publishReplay() + refCount() comboto allow an observable to multicast to multiple subscribers:

除了使用 Subject,还可以使用publishReplay() + refCount() 组合来允许 observable 多播给多个订阅者:

const notificationArrayStream = Rx.Observable.create(function (obs) {
  observer = obs;
  return () => {}
}).pipe(publishReplay(), refCount())

回答by yfranz

You can build wrapper class Subscribable<> based on ReplaySubject. It would be cleaner than managing Subject and Observable:

您可以基于 ReplaySubject 构建包装类 Subscribable<>。它比管理 Subject 和 Observable 更干净:

export class Subscribable<T> {

    private valueSource: Subject = new ReplaySubject(1);
    public value: Observable;
    private _value: T;

    constructor() {
        this.value = this.valueSource.asObservable();
    }

    public set(val: T) {
        this.valueSource.next(val);
        this._value = val;
    }

    public get(): T {
        return this._value;
    }
}

Usage:

用法:

let arrayStream : Subscribable<TYPE> = new Subscribable<TYPE>();

…
public setArrayStream (value: TYPE) {
    this.set(value);
}

Handle value change:

处理值变化:

arrayStream.value.subscribe(res => { /*handle it*/ });

Original article: http://devinstance.net/articles/20170921/rxjs-subscribable

原文:http: //devinstance.net/articles/20170921/rxjs-subscribable