Javascript RxJS MergeMap 是如何工作的?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/42120680/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-23 00:55:36  来源:igfitidea点击:

How Does RxJS MergeMap Work?

javascriptangularrxjsobservable

提问by VSO

I don't understand the purpose of mergeMapat all. I have heard two "explanations:

我根本不明白这样做的目的mergeMap。我听过两个“解释:

  1. "It's like SelectAll" in LINQ - nope.
  2. "Well, it's a combination of RxJS mergeand map" - nope (or I can't replicate this).
  1. “这就像 LINQ 中的 SelectAll” - 不。
  2. “嗯,它是 RxJSmergemap”的组合- 不(或者我无法复制它)。

Consider the following code:

考虑以下代码:

    var obs1 = new Rx.Observable.interval(1000);
    var obs2 = new Rx.Observable.interval(1000);

    //Just a merge and a map, works fine
    obs1.merge(obs2).map(x=> x+'a').subscribe(
      next => console.log(next)
    )

    //Who know what - seems to do the same thing as a plain map on 1 observable
    obs1.mergeMap(val => Rx.Observable.of(val + `B`))
        .subscribe(
          next => console.log(next)
        )

JS Bin

JS斌

The last piece labelled "Who knows what" does nothing more than a map on obs1- what's the point?

最后一篇标有“谁知道什么”的文章只不过是一张地图obs1——有什么意义?

What does mergeMapactually do? What is an example of a valid use case? (Preferably with some code)

什么是mergeMap真正做到?什么是有效用例的示例?(最好有一些代码)

Articles that didn't help me at all (mergeMap code from above is from one of these): 1, 2

根本没有帮助我的文章(上面的合并地图代码来自其中之一):1, 2

回答by artur grzesiak

tl;dr;mergeMapis way more powerful than map. Understanding mergeMapis the necessary condition to access full power of Rx.

tl;博士; mergeMapmap. 理解mergeMap是获得 Rx 全部功能的必要条件。



similarities

相似之处

  • both mergeMapand mapacts on a single stream (vs. zip, combineLatest)

  • both mergeMapand mapcan transform elements of a stream (vs. filter, delay)

  • 二者mergeMapmap作用于单流(相对于zipcombineLatest

  • 二者mergeMapmap可以改变一个流的元件(相对于filterdelay

differences

差异

map

地图

  • cannot change size of the source stream (assumption: mapitself does not throw); for each element from source exactly one mappedelement is emitted; mapcannot ignore elements (like for example filter);

  • in case of the default scheduler the transformation happens synchronously; to be 100% clear: the source stream may deliver its elements asynchronously, but each next element is immediately mappedand re-emitted further; mapcannot shift elements in time like for example delay

  • no restrictions on return values

  • id: x => x

  • 不能改变源流的大小(假设:map本身没有throw);对于源中的每个元素,mapped只发出一个元素;map不能忽略元素(例如filter);

  • 在默认调度程序的情况下,转换是同步发生的;100% 明确:源流可能会异步传递其元素,但每个下一个元素都会立即mapped重新发送;map无法及时移动元素,例如delay

  • 对返回值没有限制

  • idx => x

mergeMap

合并映射

  • can change size of the source stream; for each element there might be arbitrary number (0, 1 or many) of new elements created/emitted

  • it offers full control over asynchronicity - both when new elements are created/emitted and how many elements from the source stream should be processed concurrently; for example assume source stream emitted 10 elements but maxConcurrencyis set to 2 then two first elements will be processed immediately and the rest 8 buffered; once one of the processed completed the next element from source stream will be processed and so on - it is bit tricky, but take a look at the example below

  • all other operators can be implemented with just mergeMapand Observableconstructor

  • may be used for recursive async operations

  • return values has to be of Observable type (or Rx has to know how to create observable out of it - e.g. promise, array)

  • id: x => Rx.Observable.of(x)

  • 可以改变源流的大小;对于每个元素,可能会创建/发出任意数量(0、1 或许多)的新元素

  • 它提供了对异步性的完全控制——无论是何时创建/发出新元素,以及应同时处理多少来自源流的元素;例如,假设源流发出 10 个元素,但maxConcurrency设置为 2,则将立即处理两个第一个元素,其余 8 个元素将被缓冲;一旦其中一个被处理complete,源流中的下一个元素将被处理等等 - 这有点棘手,但看看下面的例子

  • 所有其他运算符都可以用 justmergeMapObservable构造函数来实现

  • 可用于递归异步操作

  • 返回值必须是 Observable 类型(或者 Rx 必须知道如何从中创建可观察的 - 例如承诺,数组)

  • idx => Rx.Observable.of(x)

array analogy

数组类比

let array = [1,2,3]
fn             map                    mergeMap
x => x*x       [1,4,9]                error /*expects array as return value*/
x => [x,x*x]   [[1,1],[2,4],[3,9]]    [1,1,2,4,3,9]

The analogy does not show full picture and it basically corresponds to .mergeMapwith maxConcurrencyset to 1. In such a case elements will be ordered as above, but in general case it does not have to be so. The only guarantee we have is that emission of new elements will be order by their position in the underlying stream. For example: [3,1,2,4,9,1]and [2,3,1,1,9,4]are valid, but [1,1,4,2,3,9]is not (since 4was emitted after 2in the underlying stream).

类比不显示完整图像,它基本上对应于.mergeMapmaxConcurrency设置为1。在这种情况下的元素将被排序为以上,但在一般情况下,它不必须是如此。我们唯一的保证是新元素的发射将按照它们在底层流中的位置进行排序。例如:[3,1,2,4,9,1]and[2,3,1,1,9,4]是有效的,但[1,1,4,2,3,9]不是(因为42在底层流中发出的)。

A couple of examples using mergeMap:

几个使用的例子mergeMap

// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
  return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}

Rx.Observable.range(1, 3)
  .mapWithMergeMap(x => x * x)
  .subscribe(x => console.log('mapWithMergeMap', x))

// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
  return this.mergeMap(x =>
    filterFn(x) ?
    Rx.Observable.of(x) :
    Rx.Observable.empty()); // return no element
}

Rx.Observable.range(1, 3)
  .filterWithMergeMap(x => x === 3)
  .subscribe(x => console.log('filterWithMergeMap', x))

// implement .delay with .mergeMap 
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
  return this.mergeMap(x =>
    Rx.Observable.create(obs => {
      // setTimeout is naive - one should use scheduler instead
      const token = setTimeout(() => {
        obs.next(x);
        obs.complete();
      }, delayMs)
      return () => clearTimeout(token);
    }))
}

Rx.Observable.range(1, 3)
  .delayWithMergeMap(500)
  .take(2)
  .subscribe(x => console.log('delayWithMergeMap', x))

// recursive count
const count = (from, to, interval) => {
  if (from > to) return Rx.Observable.empty();
  return Rx.Observable.timer(interval)
    .mergeMap(() =>
      count(from + 1, to, interval)
      .startWith(from))
}

count(1, 3, 1000).subscribe(x => console.log('count', x))

// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
  Rx.Observable.if(
    () => from > to,
    Rx.Observable.empty(),
    Rx.Observable.timer(interval)
    .mergeMap(() => countMoreRxWay(from + 1, to, interval)
      .startWith(from)))

const maxConcurrencyExample = () =>
  Rx.Observable.range(1,7)
    .do(x => console.log('emitted', x))
    .mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
    .do(x => console.log('processed', x))
    .subscribe()

setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>

回答by Mark van Straten

.mergeMap()lets you flatten a higher-order Observable into a single stream. For instance:

.mergeMap()允许您将高阶 Observable 展平为单个流。例如:

Rx.Observable.from([1,2,3,4])
  .map(i => getFreshApiData())
  .subscribe(val => console.log('regular map result: ' + val));

//vs

Rx.Observable.from([1,2,3,4])
  .mergeMap(i => getFreshApiData())
  .subscribe(val => console.log('mergeMap result: ' + val));

function getFreshApiData() {
  return Rx.Observable.of('retrieved new data')
    .delay(1000);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

See my answer at this other question for an in-dept explanation of the .xxxMap()operators: Rxjs - How can I extract multiple values inside an array and feed them back to the observable stream synchronously

有关.xxxMap()运算符的深入解释,请参阅我在另一个问题上的回答:Rxjs - 如何提取数组中的多个值并将它们同步反馈给可观察流