javascript 如何在Node.js中使用stream.Writable的drain事件

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/18932488/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-27 13:43:53  来源:igfitidea点击:

How to use drain event of stream.Writable in Node.js

javascriptnode.jsstream

提问by sachin

In Node.js I'm using the fs.createWriteStreammethod to append data to a local file. In the Node documentation they mention the drainevent when using fs.createWriteStream, but I don't understand it.

在 Node.js 中,我使用该fs.createWriteStream方法将数据附加到本地文件。在 Node 文档中,他们drain在使用时提到了该事件fs.createWriteStream,但我不明白。

var stream = fs.createWriteStream('fileName.txt');
var result = stream.write(data);

In the code above, how can I use the drain event? Is the event used properly below?

在上面的代码中,如何使用drain 事件?下面的事件是否正确使用?

var data = 'this is my data';
if (!streamExists) {
  var stream = fs.createWriteStream('fileName.txt');
}

var result = stream.write(data);
if (!result) {
  stream.once('drain', function() {
    stream.write(data);
  });
}

回答by hexacyanide

The drainevent is for when a writable stream's internal buffer has been emptied.

drain事件用于可写流的内部缓冲区已被清空。

This can only happen when the size of the internal buffer once exceeded its highWaterMarkproperty, which is the maximum bytes of data that can be stored inside a writable stream's internal buffer until it stops reading from the data source.

只有当内部缓冲区的大小超过其highWaterMark属性时才会发生这种情况,该属性是可写流的内部缓冲区中可以存储的最大数据字节数,直到它停止从数据源读取。

The cause of something like this can be due to setups that involve reading a data source from one stream faster than it can be written to another resource. For example, take two streams:

造成这种情况的原因可能是设置涉及从一个流读取数据源的速度比写入另一个资源的速度快。例如,取两个流:

var fs = require('fs');

var read = fs.createReadStream('./read');
var write = fs.createWriteStream('./write');

Now imagine that the file readis on a SSD and can read at 500MB/s and writeis on a HDD that can only write at 150MB/s. The write stream will not be able to keep up, and will start storing data in the internal buffer. Once the buffer has reached the highWaterMark, which is by default 16KB, the writes will start returning false, and the stream will internally queue a drain. Once the internal buffer's length is 0, then the drainevent is fired.

现在假设该文件read位于 SSD 上,可以以 500MB/s 的速度读取,并且write位于只能以150MB/s. 写流将无法跟上,并将开始在内部缓冲区中存储数据。一旦缓冲区达到highWaterMark,默认情况下为 16KB,写入将开始返回false,并且流将在内部排队排空。一旦内部缓冲区的长度为 0,drain就会触发该事件。

This is how a drain works:

这是排水管的工作原理:

if (state.length === 0 && state.needDrain) {
  state.needDrain = false;
  stream.emit('drain');
}

And these are the prerequisites for a drain which are part of the writeOrBufferfunction:

这些是作为writeOrBuffer函数一部分的排水管的先决条件:

var ret = state.length < state.highWaterMark;
state.needDrain = !ret;


To see how the drainevent is used, take the example from the Node.js documentation.

要查看drain事件的使用方式,请从 Node.js 文档中获取示例。

function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        // last time!
        writer.write(data, encoding, callback);
      } else {
        // see if we should continue, or wait
        // don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // had to stop early!
      // write some more once it drains
      writer.once('drain', write);
    }
  }
}

The function's objective is to write 1,000,000 times to a writable stream. What happens is a variable okis set to true, and a loop only executes when okis true. For each loop iteration, the value of okis set to the value of stream.write(), which will return false if a drainis required. If okbecomes false, then the event handler for drainwaits, and on fire, resumes the writing.

该函数的目标是向可写流写入 1,000,000 次。发生的情况是一个变量ok被设置为 true,并且一个循环只在ok为 true时执行。对于每次循环迭代, 的值ok设置为 的值stream.write(),如果drain需要a 将返回 false 。如果ok变为 false,则drain等待的事件处理程序并在发生时恢复写入。



Regarding your code specifically, you don't need to use the drainevent because you are writing only once right after opening your stream. Since you have not yet written anything to the stream, the internal buffer is empty, and you would have to be writing at least 16KB in chunks in order for the drainevent to fire. The drainevent is for writing many times with more data than the highWaterMarksetting of your writable stream.

特别是关于您的代码,您不需要使用该drain事件,因为您在打开流后只编写一次。由于您尚未向流写入任何内容,因此内部缓冲区是空的,您必须以块的形式写入至少 16KBdrain才能触发事件。该drain事件用于多次写入比highWaterMark可写流的设置更多的数据。

回答by Laurent Perrin

Imagine you're connecting 2 streams with very different bandwidths, say, uploading a local file to a slow server. The (fast) file stream will emit data faster than the (slow) socket stream can consume it.

想象一下,您正在连接 2 个带宽非常不同的流,例如,将本地文件上传到速度较慢的服务器。(快)文件流将比(慢)套接字流消耗数据更快地发出数据。

In this situation, node.js will keep data in memory until the slow stream gets a chance to process it. This can get problematic if the file is very large.

在这种情况下,node.js 会将数据保存在内存中,直到慢速流有机会处理它。如果文件非常大,这可能会出现问题。

To avoid this, Stream.writereturns falsewhen the underlying system buffer is full. If you stop writing, the stream will later emit a drainevent to indicate that the system buffer has emptied and it is appropriate to write again.

为了避免这种情况,当底层系统缓冲区已满时Stream.write返回false。如果停止写入,该流稍后将发出一个drain事件以指示系统缓冲区已清空,可以再次写入。

You can use pause/resumethe readable stream and control the bandwidth of the readable stream.

您可以使用pause/resume可读流并控制可读流的带宽。

Better: you can use readable.pipe(writable)which will do this for you.

更好:您可以使用readable.pipe(writable)which 将为您执行此操作。

EDIT: There's a bug in your code: regardless of what writereturns, your data has been written. You don't need to retry it. In your case, you're writing datatwice.

编辑:您的代码中有一个错误:无论write返回什么,您的数据都已写入。您无需重试。在你的情况下,你写了data两次。

Something like this would work:

像这样的事情会起作用:

var packets = […],
    current = -1;

function niceWrite() {
  current += 1;

  if (current === packets.length)
    return stream.end();

  var nextPacket = packets[current],
      canContinue = stream.write(nextPacket);

  // wait until stream drains to continue
  if (!canContinue)
    stream.once('drain', niceWrite);
  else
    niceWrite();
}

回答by Steven Kaspar

Here is a version with async/await

这是一个带有 async/await 的版本

const write = (writer, data) => {
  return new Promise((resolve) => {
    if (!writer.write(data)) {
      writer.once('drain', resolve)
    }
    else {
      resolve()
    }
  })
}

// usage
const run = async () => {
  const write_stream = fs.createWriteStream('...')
  const max = 1000000
  let current = 0
  while (current <= max) {
    await write(write_stream, current++)
  }
}

https://gist.github.com/stevenkaspar/509f792cbf1194f9fb05e7d60a1fbc73

https://gist.github.com/stevenkaspar/509f792cbf1194f9fb05e7d60a1fbc73

回答by Michael_Scharf

This is a speed-optimized version using Promises (async/await). The caller has to check if it gets a promiseback and only in that case awaithas to be called. Doing await on each call can slow down the program by a factor of 3...

这是使用 Promises (async/await) 的速度优化版本。调用者必须检查它是否得到promise回复,并且只有在这种情况下await才必须被调用。在每次调用时执行 await 会使程序减慢 3 倍......

const write = (writer, data) => {
    // return a promise only when we get a drain
    if (!writer.write(data)) {
        return new Promise((resolve) => {
            writer.once('drain', resolve)
        })
    }
}

// usage
const run = async () => {
    const write_stream = fs.createWriteStream('...')
    const max = 1000000
    let current = 0
    while (current <= max) {
        const promise = write(write_stream, current++)
        // since drain happens rarely, awaiting each write call is really slow.
        if (promise) {
            // we got a drain event, therefore we wait
            await promise
        }
    }
}