Javascript 如何让一个 Observable 序列在发射之前等待另一个完成?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/30519645/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-23 05:19:07  来源:igfitidea点击:

How to make one Observable sequence wait for another to complete before emitting?

javascriptobservablerxjs

提问by Stephen

Say I have an Observable, like so:

说我有一个Observable,像这样:

var one = someObservable.take(1);

one.subscribe(function(){ /* do something */ });

Then, I have a second Observable:

然后,我有第二个Observable

var two = someOtherObservable.take(1);

Now, I want to subscribe()to two, but I want to make sure that onehas completed before the twosubscriber is fired.

现在,我想subscribe()two的,但我想肯定使该one已完成之前,two用户被激发。

What kind of buffering method can I use on twoto make the second one wait for the first one to be completed?

我可以使用什么样的缓冲方法two让第二个等待第一个完成?

I suppose I am looking to pause twountil oneis complete.

我想我想暂停two直到one完成。

采纳答案by paulpdaniels

A couple ways I can think of

我能想到的几种方法

import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'

//Method one

var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});

//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));

回答by Simon_Weaver

skipUntil() with last()

skipUntil() 和 last()

skipUntil : ignore emitted items until another observable has emitted

skipUntil :忽略发出的项目,直到另一个 observable 发出

last: emit last value from a sequence(i.e. wait until it completes then emit)

last:从序列中发出最后一个值(即等待它完成然后发出)

Note that anything emitted from the observable passed to skipUntilwill cancel the skipping, which is why we need to add last()- to wait for the stream to complete.

请注意,从传递给的 observable 发出的任何内容都skipUntil将取消跳过,这就是我们需要添加last()- 以等待流完成的原因。

main$.skipUntil(sequence2$.pipe(last()))

Official: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil

官方:https: //rxjs-dev.firebaseapp.com/api/operators/skipUntil



Possible issue: Note that last()by itself will errorif nothing is emitted. The last()operator does have a defaultparameter but only when used in conjunction with a predicate. I think if this situation is a problem for you (if sequence2$may complete without emitting) then one of these should work (currently untested):

可能的问题:请注意,如果没有发出任何内容last(),它本身就会出错。该last()运营商确实有default配合使用的谓词参数,但只有当。我认为如果这种情况对你来说是个问题(如果sequence2$可以完成而不发出)那么其中一个应该可以工作(目前未经测试):

main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))

Note that undefinedis a valid item to be emitted, but could actually be any value. Also note that this is the pipe attached to sequence2$and not the main$pipe.

请注意,这undefined是要发出的有效项目,但实际上可以是任何值。另请注意,这是连接到sequence2$main$管道而不是管道。

回答by Nikos Tsokos

If you want to make sure that the order of execution is retained you can use flatMap as the following example

如果要确保保留执行顺序,可以使用 flatMap 作为以下示例

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));

first
  .flatMap(() => second)
  .flatMap(() => third)
  .subscribe(()=> console.log('finished'));

The outcome would be:

结果将是:

"1"
"11"
"111"
"finished"

回答by Joe King

Here is yet another possibility taking advantage of switchMap's result selector

这是利用 switchMap 的结果选择器的另一种可能性

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
    /** Wait for first Observable */
    () => one$,
    /** Only return the value we're actually interested in */
    (value2, value1) => value2
  )
  .subscribe((value2) => {
    /* do something */ 
  });

Since the switchMap's result selector has been depreciated, here is an updated version

由于 switchMap 的结果选择器已经贬值,这里是一个更新的版本

const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
  take(1),
  switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
  /* do something */ 
});

回答by Anton

If the second observable is hot, there is another wayto do pause/resume:

如果第二个 observable 是hot,则有另一种方法来执行pause/resume

var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);

source1.doOnCompleted(function () { 
  /* resume paused source2 */ 
  pauser.onNext(true);
}).subscribe(function(){
  // do something
});

source2.subscribe(function(){
  // start to recieve data 
});

Also you can use buffered version pausableBufferedto keep data during pause is on.

您也可以使用缓冲版本pausableBuffered在暂停期间保留数据。

回答by Andrei T?tar

Here's a reusable way of doing it (it's typescript but you can adapt it to js):

这是一种可重复使用的方法(它是打字稿,但您可以将其调整为 js):

export function waitFor<T>(signal: Observable<any>) {
    return (source: Observable<T>) =>
        new Observable<T>(observer =>
            signal.pipe(first())
                .subscribe(_ =>
                    source.subscribe(observer)
                )
        );
}

and you can use it like any operator:

您可以像使用任何运算符一样使用它:

var two = someOtherObservable.pipe(waitFor(one), take(1));

It's basically an operator that defers the subscribe on the source observable until the signal observable emits the first event.

它基本上是一个操作符,它推迟对源 observable 的订阅,直到信号 observable 发出第一个事件。

回答by itay oded

well, I know this is pretty old but I think that what you might need is:

好吧,我知道这已经很老了,但我认为您可能需要的是:

var one = someObservable.take(1);

var two = someOtherObservable.pipe(
  concatMap((twoRes) => one.pipe(mapTo(twoRes))),
  take(1)
).subscribe((twoRes) => {
   // one is completed and we get two's subscription.
})

回答by c1moore

Here's yet another, but I feel more straightforward and intuitive (or at least natural if you're used to Promises), approach. Basically, you create an Observable using Observable.create()to wrap oneand twoas a single Observable. This is very similar to how Promise.all()may work.

这是另一个,但我觉得更直接和直观(或者如果你习惯了 Promises,那么至少很自然),方法。基本上,您创建一个 ObservableObservable.create()用于包装onetwo作为单个 Observable。这与如何Promise.all()工作非常相似。

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      // observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
});

So, what's going on here? First, we create a new Observable. The function passed to Observable.create(), aptly named onSubscription, is passed the observer (built from the parameters you pass to subscribe()), which is similar to resolveand rejectcombined into a single object when creating a new Promise. This is how we make the magic work.

那么,这里发生了什么?首先,我们创建一个新的 Observable。传递给 的函数Observable.create(),恰如其分地命名为onSubscription,被传递给观察者(根据您传递给 的参数构建subscribe()),这类似于创建新 Promise 时resolvereject组合成单个对象。这就是我们如何使魔法发挥作用。

In onSubscription, we subscribe to the first Observable (in the example above, this was called one). How we handle nextand erroris up to you, but the default provided in my sample should be appropriate generally speaking. However, when we receive the completeevent, which means oneis now done, we can subscribe to the next Observable; thereby firing the second Observable after the first one is complete.

在 中onSubscription,我们订阅了第一个 Observable(在上面的示例中,它被称为one)。我们如何处理next,并error是你的,但我的样品中提供的默认要适当一般来说。但是,当我们收到complete事件时,也one就是现在已经完成了,我们可以订阅下一个 Observable;从而在第一个 Observable 完成后触发第二个 Observable。

The example observer provided for the second Observable is fairly simple. Basically, secondnow acts like what you would expect twoto act like in the OP. More specifically, secondwill emit the first and only the first value emitted by someOtherObservable(because of take(1)) and then complete, assuming there is no error.

为第二个 Observable 提供的示例观察者相当简单。基本上,second现在的行为就像您期望two在 OP 中的行为一样。更具体地说,假设没有错误,second将发出由someOtherObservable(because of take(1))发出的第一个也是唯一的第一个值,然后完成。

Example

例子

Here is a full, working example you can copy/paste if you want to see my example working in real life:

这是一个完整的工作示例,如果您想在现实生活中看到我的示例,您可以复制/粘贴:

var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
}).subscribe(
  function onNext(value) {
    console.log(value);
  },
  function onError(error) {
    console.error(error);
  },
  function onComplete() {
    console.log("Done!");
  }
);

If you watch the console, the above example will print:

如果你观察控制台,上面的例子将打印:

1

6

Done!

1

6

完毕!

回答by Tktorza

You can use result emitted from previous Observable thanks to mergeMap(or his alias flatMap) operator like this:

由于mergeMap(或他的别名flatMap)运算符,您可以使用从以前的 Observable 发出的结果,如下所示:

 const one = Observable.of('https://api.github.com/users');
 const two = (c) => ajax(c);//ajax from Rxjs/dom library

 one.mergeMap(two).subscribe(c => console.log(c))