Javascript 多次订阅 Observable
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/36814995/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Multiple subscriptions to Observable
提问by cgross
I create my own Observable
and subscribed two functions to it. I would expect to have both functions executed for each element in the sequence but only the last one is.
我创建了自己的Observable
并订阅了两个函数。我希望为序列中的每个元素执行这两个函数,但只有最后一个是。
let observer = null
const notificationArrayStream = Rx.Observable.create(function (obs) {
observer = obs;
return () => {}
})
function trigger(something) {
observer.next(something)
}
notificationArrayStream.subscribe((x) => console.log('a: ' + x))
notificationArrayStream.subscribe((x) => console.log('b: ' + x))
trigger('TEST')
Expected output
预期输出
a: TEST
b: TEST
Actual output
实际产量
b: TEST
Here's the JSBin: http://jsbin.com/cahoyey/edit?js,console
这是 JSBin:http://jsbin.com/cahoyey/edit?js,console
Why is that? How can I have multiple functions subscribed to a single Observable
?
这是为什么?如何将多个功能订阅到一个Observable
?
采纳答案by user3743222
To have multiple functions subscribe to a single Observable, just subscribe them to that observable, it is that simple. And actually that's what you did.
要让多个函数订阅单个 Observable,只需将它们订阅到那个 observable,就这么简单。事实上,这就是你所做的。
BUT your code does not work because after notificationArrayStream.subscribe((x) => console.log('b: ' + x))
is executed, observer
is (x) => console.log('b: ' + x))
, so observer.next
will give you b: TEST
.
但是您的代码不起作用,因为notificationArrayStream.subscribe((x) => console.log('b: ' + x))
执行后observer
是(x) => console.log('b: ' + x))
,因此observer.next
会给您b: TEST
.
So basically it is your observable creation which is wrong. In create
you passed an observer as parameter so you can pass it values. Those values you need to generate somehow through your own logic, but as you can see your logic here is erroneous. I would recommend you use a subject if you want to push values to the observer.
所以基本上是你的可观察创造是错误的。在create
你传递了一个观察者作为参数,所以你可以传递它的值。您需要通过自己的逻辑以某种方式生成这些值,但正如您所看到的,这里的逻辑是错误的。如果您想将值推送给观察者,我建议您使用主题。
Something like:
就像是:
const notificationArrayStream = Rx.Observable.create(function (obs) {
mySubject.subscribe(obs);
return () => {}
})
function trigger(something) {
mySubject.next(something)
}
回答by Mobiletainment
Subject
主题
In your case, you could simply use a Subject
. A subjectallows you to share a single execution with multiple observerswhen using it as a proxy for a group of subscribers and a source.
在您的情况下,您可以简单地使用Subject
. 当将主体用作一组订阅者和源的代理时,主体允许您与多个观察者共享单个执行。
In essence, here's your example using a subject:
本质上,这是您使用主题的示例:
const subject = new Subject();
function trigger(something) {
subject.next(something);
}
subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));
trigger('TEST');
Result:
结果:
a: TEST
b: TEST
Pitfall: Observers arriving too late
陷阱:观察者到达太晚
Note that the timing of when you subscribe and when you broadcast the data is relevant. If you send a broadcast before subscribing, you're not getting notified by this broadcast:
请注意,订阅和广播数据的时间是相关的。如果您在订阅前发送广播,您将不会收到此广播的通知:
function trigger(something) {
subject.next(something);
}
trigger('TEST');
subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));
Result: (empty)
结果:(空)
ReplaySubject & BehaviorSubject
ReplaySubject 和 BehaviorSubject
If you want to ensure that even future subscribers get notified, you can use a ReplaySubjector a BehaviorSubjectinstead.
如果您想确保即使是未来的订阅者也能收到通知,您可以改用ReplaySubject或BehaviorSubject。
Here's an example using a ReplaySubject
(with a cache-size of 5, meaning up to 5 values from the past will be remembered, as opposed to a BehaviorSubjectwhich can remember only the last value):
这是一个使用 a 的示例ReplaySubject
(缓存大小为 5,这意味着最多可以记住过去的 5 个值,而BehaviorSubject只能记住最后一个值):
const subject = new ReplaySubject(5); // buffer size is 5
function trigger(something) {
subject.next(something);
}
trigger('TEST');
subject.subscribe((x) => console.log('a: ' + x));
subject.subscribe((x) => console.log('b: ' + x));
Result:
结果:
a: TEST
b: TEST
回答by Luca Bertolasi
Every time you subscribe, you are overriding the var observer.
每次订阅时,您都会覆盖 var观察者。
The triggerfunction only reference this one var, hence no surprise there is only one log.
该触发功能只能引用这一个变种,因此毫不奇怪,只有一个日志。
If we make the var an array it works as intended:JS Bin
如果我们将 var 设为数组,它会按预期工作:JS Bin
let obs = [];
let foo = Rx.Observable.create(function (observer) {
obs.push(observer);
});
function trigger(sth){
// console.log('trigger fn');
obs.forEach(ob => ob.next(sth));
}
foo.subscribe(function (x) {
console.log(`a:${x}`);
});
foo.subscribe(function (y) {
console.log(`b:${y}`);
});
trigger(1);
trigger(2);
trigger(3);
trigger(4);
A cleaner solution would be to use Subject, as suggested above.
更清洁的解决方案是使用Subject,如上所述。
回答by smnbbrv
Observables are not multicasting; unless you use any kind of Subject
. You can of course create Subject, pipe the Observable
output into like other answers propose.
Observable 不是多播的;除非您使用任何类型的Subject
. 您当然可以创建主题,将Observable
输出通过管道传输到其他建议的答案中。
However if you already havean Observalbe
, it is way more convenient to use share()
that turns Observable
into Subject
or shareReplay(n)
which would be equivalent for ReplaySubject(n)
:
不过,如果你已经有一个Observalbe
,它使用的方式更加方便share()
,轮流Observable
到Subject
或者shareReplay(n)
这将是等效ReplaySubject(n)
:
import {share} from 'rxjs/operators';
let observer = null
const notificationArrayStream = new Observable(obs => {
observer = obs;
}).pipe(share());
function trigger(something) {
observer.next(something)
}
notificationArrayStream.subscribe((x) => console.log('a: ' + x))
notificationArrayStream.subscribe((x) => console.log('b: ' + x))
trigger('TEST')
That's pretty much it.
差不多就是这样。
回答by golfadas
Instead of using a Subject, it is also possible to use the publishReplay() + refCount() comboto allow an observable to multicast to multiple subscribers:
除了使用 Subject,还可以使用publishReplay() + refCount() 组合来允许 observable 多播给多个订阅者:
const notificationArrayStream = Rx.Observable.create(function (obs) {
observer = obs;
return () => {}
}).pipe(publishReplay(), refCount())
回答by yfranz
You can build wrapper class Subscribable<> based on ReplaySubject. It would be cleaner than managing Subject and Observable:
您可以基于 ReplaySubject 构建包装类 Subscribable<>。它比管理 Subject 和 Observable 更干净:
export class Subscribable<T> {
private valueSource: Subject = new ReplaySubject(1);
public value: Observable;
private _value: T;
constructor() {
this.value = this.valueSource.asObservable();
}
public set(val: T) {
this.valueSource.next(val);
this._value = val;
}
public get(): T {
return this._value;
}
}
Usage:
用法:
let arrayStream : Subscribable<TYPE> = new Subscribable<TYPE>();
…
public setArrayStream (value: TYPE) {
this.set(value);
}
Handle value change:
处理值变化:
arrayStream.value.subscribe(res => { /*handle it*/ });
Original article: http://devinstance.net/articles/20170921/rxjs-subscribable
原文:http: //devinstance.net/articles/20170921/rxjs-subscribable