如何在 node.js 中发出/管道数组值作为可读流?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/16848972/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-02 14:40:41  来源:igfitidea点击:

How to emit/pipe array values as a readable stream in node.js?

node.jsstreampipe

提问by TankofVines

What is the best way to create a readable stream from an array and pipe values to a writable stream? I have seen substack's exampleusing setInterval and I can implement that successfully using 0 for the interval value, but I am iterating over a lot of data and triggering gc every time is slowing things down.

创建从数组和管道值到可写流的可读流的最佳方法是什么?我已经看到使用 setInterval 的substack示例,我可以成功地使用 0 作为间隔值来实现它,但是我正在迭代大量数据并且每次触发 gc 都会减慢速度。

// Working with the setInterval wrapper
var arr = [1, 5, 3, 6, 8, 9];

function createStream () {
    var t = new stream;
    t.readable = true;
    var times = 0;
    var iv = setInterval(function () {
        t.emit('data', arr[times]);
        if (++times === arr.length) {
            t.emit('end');
            clearInterval(iv);
        }
    }
}, 0);

// Create the writable stream s
// ....

createStream().pipe(s);

What I would like to do is emit values without the setInterval. Perhaps using the async module like this:

我想做的是在没有 setInterval 的情况下发出值。也许使用这样的异步模块:

async.forEachSeries(arr, function(item, cb) {
    t.emit('data', item);
    cb();
}, function(err) {
 if (err) {
     console.log(err);
 }
 t.emit('end');
});

In this case I iterate the array and emit data, but never pipe any values. I have already seen shinout's ArrayStream, but I think that was created before v0.10 and it is a bit more overhead than I am looking for.

在这种情况下,我迭代数组并发出数据,但从不通过管道传输任何值。我已经看过 shinout 的ArrayStream,但我认为它是在 v0.10 之前创建的,它的开销比我想要的要多一些。

回答by Max Heiber

You can solve this problem by creating a readable stream and pushing values into it.

您可以通过创建可读流并将值推入其中来解决此问题。

Streams are a pain, but it's often easierto work with them directlythan to use libraries.

流是一个痛苦,但它往往更容易直接与他们的工作,而不是使用库。

Array of strings or buffers to stream

要流式传输的字符串或缓冲区数组

If you're working with an array of strings or buffers, this will work:

如果您正在使用字符串或缓冲区数组,这将起作用:

'use strict'
const Stream = require('stream')
const readable = new Stream.Readable()

readable.pipe(process.stdout)

const items = ['a', 'b', 'c']
items.forEach(item => readable.push(item))

// no more data
readable.push(null)

Notes:

笔记:

  • readable.pipe(process.stdout)does two things: puts the stream into "flowing" mode and sets up the process.stdout writable stream to receive data from readable
  • the Readable#pushmethod is for the creator of the readable stream, not the stream consumer.
  • You have to do Readable#push(null)to signal that there is no more data.
  • readable.pipe(process.stdout)做两件事:将流置于“流动”模式并设置 process.stdout 可写流以接收来自 readable
  • Readable#push方法适用于可读流的创建者,而不是流消费者。
  • 你必须做Readable#push(null)信号表明没有更多的数据。

Array of non-strings to stream

要流式传输的非字符串数组

To make a stream from an array of things that are neither strings nor buffers, you need both the readable stream and the writable stream to be in "Object Mode". In the example below, I made the following changes:

要从既不是字符串也不是缓冲区的数组创建流,您需要可读流和可写流都处于"Object Mode"。在下面的示例中,我进行了以下更改:

  • Initialize the readable stream with {objectMode: true}
  • Instead of piping to process.stdout, pipe to a simple writable stream that is in object mode.

      'use strict'
      const Stream = require('stream')
    
      const readable = new Stream.Readable({objectMode: true})
    
      const writable = new Stream.Writable({objectMode: true})
      writable._write = (object, encoding, done) => {
        console.log(object)
    
        // ready to process the next chunk
        done()
      }
    
      readable.pipe(writable)
    
      const items = [1, 2, 3]
      items.forEach(item => readable.push(item))
    
      // end the stream
      readable.push(null)
    
  • 初始化可读流 {objectMode: true}
  • 不是通过管道传输到process.stdout,而是通过管道传输到处于对象模式的简单可写流。

      'use strict'
      const Stream = require('stream')
    
      const readable = new Stream.Readable({objectMode: true})
    
      const writable = new Stream.Writable({objectMode: true})
      writable._write = (object, encoding, done) => {
        console.log(object)
    
        // ready to process the next chunk
        done()
      }
    
      readable.pipe(writable)
    
      const items = [1, 2, 3]
      items.forEach(item => readable.push(item))
    
      // end the stream
      readable.push(null)
    

Performance Note

性能说明

Where is the data coming from? If it's a streaming data source, it's better to manipulate the stream using a transform stream than to convert to/from an array.

数据来自哪里?如果它是流数据源,最好使用转换流来操作流,而不是在数组之间进行转换。

回答by Lajos

tl;dr;

tl;博士;

This is a LIFO solution. Array.prototype.pop() has similar behavior to shift but applied to the last element in an array.

这是一个 LIFO 解决方案。Array.prototype.pop() 具有与 shift 类似的行为,但应用于数组中的最后一个元素。

const items = [1,2,3]
const stream = new Readable({
  objectMode: true,
  read() {
    const item = items.pop()
    if (!item) {
      this.push(null);
      return;
    }
    this.push(item)
  },
})

回答by austin_ce

As of Node 12.3, you can use stream.Readable.from(iterable, [options])instead.

从 Node 12.3 开始,您可以stream.Readable.from(iterable, [options])改为使用。

const { Readable } = require('stream');
const readableStream = Readable.from(arr);

回答by TankofVines

I wound up using ArrayStreamfor this. It did resolve the issue with the GC being triggered too often. I was getting warnings for a recursive process.nextTick from node so modified the nextTick callbacks in ArrayStream to setImmediate and that fixed the warnings and seems to be working well.

为此我最终使用了ArrayStream。它确实解决了 GC 触发过于频繁的问题。我收到来自节点的递归 process.nextTick 警告,因此将 ArrayStream 中的 nextTick 回调修改为 setImmediate 并修复了警告,似乎运行良好。

回答by Rudolf Meijering

It's an old question, but if anyone stumbles on this, node-stream-arrayis a much simpler and more elegant implementation for Node.js >= v0.10

这是一个老问题,但如果有人偶然发现,node-stream-array是 Node.js >= v0.10 的更简单、更优雅的实现

var streamify = require('stream-array'),
  os = require('os');

streamify(['1', '2', '3', os.EOL]).pipe(process.stdout);