javascript 在 RxJS 中按特定时间量分隔可观察值

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/21661391/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-27 21:25:27  来源:igfitidea点击:

Separate observable values by specific amount of time in RxJS

javascriptsystem.reactiverxjs

提问by Sergi Mansilla

What would be the most idiomatic way to yield values of an Observable by a specific amount of time? For example, let's say I have an Observable created from a big Array and I want to yield a value every 2 seconds. Is a combination of intervaland selectManythe best way?

在特定时间段内产生 Observable 值的最惯用的方法是什么?例如,假设我有一个从大数组创建的 Observable,我想每 2 秒产生一个值。是intervalselectMany的结合最好的方法吗?

回答by Brandon

For your specific example, the idea is to map each value from the array to an observable that will yield its result after a delay, then concatenate the resulting stream of observables:

对于您的具体示例,该想法是将数组中的每个值映射到一个可在延迟后产生其结果的 observable,然后连接生成的 observables 流:

var delayedStream = Rx.Observable
    .fromArray([1, 2, 3, 4, 5])
    .map(function (value) { return Rx.Observable.return(value).delay(2000); })
    .concatAll();

Other examples might indeed make use of timeror interval. It just depends.

其他示例可能确实使用timerinterval。这只是取决于。

For example, if your array is really really big, then the above will cause a fair amount of memory pressure (because it is creating Nobservables for a really large N). Here is an alternative that uses intervalto lazily walk the array:

例如,如果您的数组真的很大,那么上面的内容将导致相当大的内存压力(因为它正在N为一个非常大的 observable创建N)。这是interval用于懒惰地遍历数组的替代方法:

var delayedStream = Rx.Observable
    .interval(2000)
    .take(reallyBigArray.length) // end the observable after it pulses N times
    .map(function (i) { return reallyBigArray[i]; });

This one will yield the next value from the array every 2 seconds until it has iterated over the entire array.

这将每 2 秒从数组中产生下一个值,直到它遍历整个数组。

回答by farincz

I think that using zip produce better and more readable code, still using just 3 observables.

我认为使用 zip 会产生更好、更易读的代码,但仍然只使用 3 个 observable。

var items = ['A', 'B', 'C'];

Rx.Observable.zip(
  Rx.Observable.fromArray(items),
  Rx.Observable.timer(2000, 2000),  
  function(item, i) { return item;}
)

回答by cwharris

While Brandon's answer gets the gist of the idea, here's a version which yields the first item immediately, then puts time between the following items.

虽然布兰登的回答得到了这个想法的要点,但这里有一个版本,它立即产生第一个项目,然后在以下项目之间留出时间。

var delay = Rx.Observable.empty().delay(2000);

var items = Rx.Observable.fromArray([1,2,3,4,5])
  .map(function (x) {
    return Rx.Observable.return(x).concat(delay); // put some time after the item
  })
  .concatAll();

Updated for newer RxJS:

为较新的 RxJS 更新:

var delay = Rx.Observable.empty().delay(2000);

var items = Rx.Observable.fromArray([1,2,3,4,5])
  .concatMap(function (x) {
    return Rx.Observable.of(x).concat(delay); // put some time after the item
  });

回答by user3587412

For RxJS 5:

对于 RxJS 5:

Rx.Observable.from([1, 2, 3, 4, 5])
  .zip(Rx.Observable.timer(0, 2000), x => x)
  .subscribe(x => console.log(x));

回答by Juan Mendes

Since this wasn't mentioned, I think concatMapcombined with delayis pretty readable.

由于没有提到这一点,我认为concatMap结合与delay是相当可读的。

Rx.Observable.fromArray([1, 2, 3, 4, 5])
    .concatMap(x => Rx.Observable.of(x).delay(1000));

See https://codepen.io/jmendes/pen/EwaPzw

https://codepen.io/jmendes/pen/EwaPzw

回答by Robert Penner

Agree that zip is a clean approach. Here is a reusable function to generate an interval stream for an array:

同意 zip 是一种干净的方法。这是一个可重用的函数,用于为数组生成间隔流:

function yieldByInterval(items, time) {
  return Rx.Observable.from(items).zip(
    Rx.Observable.interval(time),
    function(item, index) { return item; }
  );
}

// test
yieldByInterval(['A', 'B', 'C'], 2000)
  .subscribe(console.log.bind(console));

This builds on farincz's answer, but is slightly shorter by using .zipas an instance method.

这建立在farincz 的答案之上,但通过.zip用作实例方法会稍微短一些。

Also, I used Rx.Observable.from()because Rx.Observable.fromArray()is deprecated.

另外,我使用了Rx.Observable.from()因为Rx.Observable.fromArray()弃用

回答by ZoomAll

For RxJS v6 getting the next one with a delay of 2 seconds.

对于 RxJS v6,以 2 秒的延迟获取下一个。

Example 1.concatMap:

示例 1.concatMap:

import {of} from 'rxjs';
import {concatMap, delay} from 'rxjs/operators';

of(1, 2, 3, 4, 5)
  .pipe(
    concatMap(x => of(x)
      .pipe(
        delay(2000))
    )
  )
  .subscribe({
    next(value) {
      console.log(value);
    }
  });

Example 2.map + concatAll:

示例 2.map + concatAll:

import {of} from 'rxjs';
import {concatAll, delay, map} from 'rxjs/operators';

of(1, 2, 3, 4, 5)
  .pipe(
    map(x => of(x)
      .pipe(
        delay(2000))
    ),
    concatAll()
  )
  .subscribe({
    next(value) {
      console.log(value);
    }
  });

回答by Anonymous

Building on the zip solutions by farincz and user3587412, here is how it works in RxJS v6

基于 farincz 和 user3587412 的 zip 解决方案,这是它在 RxJS v6 中的工作原理

const { zip, from, timer } = require("rxjs")
const { map } = require("rxjs/operators")

const input = [1, 2, 3, 4, 5]
const delay = 2000

zip(
    from(input),
    timer(0, delay)
).pipe(
    map(([ delayedInput, _timer ]) => delayedInput) // throw away timer index
).subscribe(
    console.log
)