javascript 如何在Node.js中使用stream.Writable的drain事件
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/18932488/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to use drain event of stream.Writable in Node.js
提问by sachin
In Node.js I'm using the fs.createWriteStream
method to append data to a local file. In the Node documentation they mention the drain
event when using fs.createWriteStream
, but I don't understand it.
在 Node.js 中,我使用该fs.createWriteStream
方法将数据附加到本地文件。在 Node 文档中,他们drain
在使用时提到了该事件fs.createWriteStream
,但我不明白。
var stream = fs.createWriteStream('fileName.txt');
var result = stream.write(data);
In the code above, how can I use the drain event? Is the event used properly below?
在上面的代码中,如何使用drain 事件?下面的事件是否正确使用?
var data = 'this is my data';
if (!streamExists) {
var stream = fs.createWriteStream('fileName.txt');
}
var result = stream.write(data);
if (!result) {
stream.once('drain', function() {
stream.write(data);
});
}
回答by hexacyanide
The drain
event is for when a writable stream's internal buffer has been emptied.
该drain
事件用于可写流的内部缓冲区已被清空。
This can only happen when the size of the internal buffer once exceeded its highWaterMark
property, which is the maximum bytes of data that can be stored inside a writable stream's internal buffer until it stops reading from the data source.
只有当内部缓冲区的大小超过其highWaterMark
属性时才会发生这种情况,该属性是可写流的内部缓冲区中可以存储的最大数据字节数,直到它停止从数据源读取。
The cause of something like this can be due to setups that involve reading a data source from one stream faster than it can be written to another resource. For example, take two streams:
造成这种情况的原因可能是设置涉及从一个流读取数据源的速度比写入另一个资源的速度快。例如,取两个流:
var fs = require('fs');
var read = fs.createReadStream('./read');
var write = fs.createWriteStream('./write');
Now imagine that the file read
is on a SSD and can read at 500MB/s and write
is on a HDD that can only write at 150MB/s
. The write stream will not be able to keep up, and will start storing data in the internal buffer. Once the buffer has reached the highWaterMark
, which is by default 16KB, the writes will start returning false
, and the stream will internally queue a drain. Once the internal buffer's length is 0, then the drain
event is fired.
现在假设该文件read
位于 SSD 上,可以以 500MB/s 的速度读取,并且write
位于只能以150MB/s
. 写流将无法跟上,并将开始在内部缓冲区中存储数据。一旦缓冲区达到highWaterMark
,默认情况下为 16KB,写入将开始返回false
,并且流将在内部排队排空。一旦内部缓冲区的长度为 0,drain
就会触发该事件。
This is how a drain works:
这是排水管的工作原理:
if (state.length === 0 && state.needDrain) {
state.needDrain = false;
stream.emit('drain');
}
And these are the prerequisites for a drain which are part of the writeOrBuffer
function:
这些是作为writeOrBuffer
函数一部分的排水管的先决条件:
var ret = state.length < state.highWaterMark;
state.needDrain = !ret;
To see how the drain
event is used, take the example from the Node.js documentation.
要查看drain
事件的使用方式,请从 Node.js 文档中获取示例。
function writeOneMillionTimes(writer, data, encoding, callback) {
var i = 1000000;
write();
function write() {
var ok = true;
do {
i -= 1;
if (i === 0) {
// last time!
writer.write(data, encoding, callback);
} else {
// see if we should continue, or wait
// don't pass the callback, because we're not done yet.
ok = writer.write(data, encoding);
}
} while (i > 0 && ok);
if (i > 0) {
// had to stop early!
// write some more once it drains
writer.once('drain', write);
}
}
}
The function's objective is to write 1,000,000 times to a writable stream. What happens is a variable ok
is set to true, and a loop only executes when ok
is true. For each loop iteration, the value of ok
is set to the value of stream.write()
, which will return false if a drain
is required. If ok
becomes false, then the event handler for drain
waits, and on fire, resumes the writing.
该函数的目标是向可写流写入 1,000,000 次。发生的情况是一个变量ok
被设置为 true,并且一个循环只在ok
为 true时执行。对于每次循环迭代, 的值ok
设置为 的值stream.write()
,如果drain
需要a 将返回 false 。如果ok
变为 false,则drain
等待的事件处理程序并在发生时恢复写入。
Regarding your code specifically, you don't need to use the drain
event because you are writing only once right after opening your stream. Since you have not yet written anything to the stream, the internal buffer is empty, and you would have to be writing at least 16KB in chunks in order for the drain
event to fire. The drain
event is for writing many times with more data than the highWaterMark
setting of your writable stream.
特别是关于您的代码,您不需要使用该drain
事件,因为您在打开流后只编写一次。由于您尚未向流写入任何内容,因此内部缓冲区是空的,您必须以块的形式写入至少 16KBdrain
才能触发事件。该drain
事件用于多次写入比highWaterMark
可写流的设置更多的数据。
回答by Laurent Perrin
Imagine you're connecting 2 streams with very different bandwidths, say, uploading a local file to a slow server. The (fast) file stream will emit data faster than the (slow) socket stream can consume it.
想象一下,您正在连接 2 个带宽非常不同的流,例如,将本地文件上传到速度较慢的服务器。(快)文件流将比(慢)套接字流消耗数据更快地发出数据。
In this situation, node.js will keep data in memory until the slow stream gets a chance to process it. This can get problematic if the file is very large.
在这种情况下,node.js 会将数据保存在内存中,直到慢速流有机会处理它。如果文件非常大,这可能会出现问题。
To avoid this, Stream.write
returns false
when the underlying system buffer is full. If you stop writing, the stream will later emit a drain
event to indicate that the system buffer has emptied and it is appropriate to write again.
为了避免这种情况,当底层系统缓冲区已满时Stream.write
返回false
。如果停止写入,该流稍后将发出一个drain
事件以指示系统缓冲区已清空,可以再次写入。
You can use pause/resume
the readable stream and control the bandwidth of the readable stream.
您可以使用pause/resume
可读流并控制可读流的带宽。
Better: you can use readable.pipe(writable)
which will do this for you.
更好:您可以使用readable.pipe(writable)
which 将为您执行此操作。
EDIT: There's a bug in your code: regardless of what write
returns, your data has been written. You don't need to retry it. In your case, you're writing data
twice.
编辑:您的代码中有一个错误:无论write
返回什么,您的数据都已写入。您无需重试。在你的情况下,你写了data
两次。
Something like this would work:
像这样的事情会起作用:
var packets = […],
current = -1;
function niceWrite() {
current += 1;
if (current === packets.length)
return stream.end();
var nextPacket = packets[current],
canContinue = stream.write(nextPacket);
// wait until stream drains to continue
if (!canContinue)
stream.once('drain', niceWrite);
else
niceWrite();
}
回答by Steven Kaspar
Here is a version with async/await
这是一个带有 async/await 的版本
const write = (writer, data) => {
return new Promise((resolve) => {
if (!writer.write(data)) {
writer.once('drain', resolve)
}
else {
resolve()
}
})
}
// usage
const run = async () => {
const write_stream = fs.createWriteStream('...')
const max = 1000000
let current = 0
while (current <= max) {
await write(write_stream, current++)
}
}
https://gist.github.com/stevenkaspar/509f792cbf1194f9fb05e7d60a1fbc73
https://gist.github.com/stevenkaspar/509f792cbf1194f9fb05e7d60a1fbc73
回答by Michael_Scharf
This is a speed-optimized version using Promises (async/await). The caller has to check if it gets a promise
back and only in that case await
has to be called. Doing await on each call can slow down the program by a factor of 3...
这是使用 Promises (async/await) 的速度优化版本。调用者必须检查它是否得到promise
回复,并且只有在这种情况下await
才必须被调用。在每次调用时执行 await 会使程序减慢 3 倍......
const write = (writer, data) => {
// return a promise only when we get a drain
if (!writer.write(data)) {
return new Promise((resolve) => {
writer.once('drain', resolve)
})
}
}
// usage
const run = async () => {
const write_stream = fs.createWriteStream('...')
const max = 1000000
let current = 0
while (current <= max) {
const promise = write(write_stream, current++)
// since drain happens rarely, awaiting each write call is really slow.
if (promise) {
// we got a drain event, therefore we wait
await promise
}
}
}