Node.js 将相同的可读流传送到多个(可写)目标中

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/19553837/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-02 16:04:11  来源:igfitidea点击:

Node.js Piping the same readable stream into multiple (writable) targets

javascriptnode.jsstreampipenode.js-stream

提问by Maroshii

I need to run two commands in series that need to read data from the same stream. After piping a stream into another the buffer is emptied so i can't read data from that stream again so this doesn't work:

我需要连续运行两个需要从同一个流中读取数据的命令。将流传输到另一个流后,缓冲区被清空,因此我无法再次从该流中读取数据,因此这不起作用:

var spawn = require('child_process').spawn;
var fs = require('fs');
var request = require('request');

var inputStream = request('http://placehold.it/640x360');
var identify = spawn('identify',['-']);

inputStream.pipe(identify.stdin);

var chunks = [];
identify.stdout.on('data',function(chunk) {
  chunks.push(chunk);
});

identify.stdout.on('end',function() {
  var size = getSize(Buffer.concat(chunks)); //width
  var convert = spawn('convert',['-','-scale',size * 0.5,'png:-']);
  inputStream.pipe(convert.stdin);
  convert.stdout.pipe(fs.createWriteStream('half.png'));
});

function getSize(buffer){
  return parseInt(buffer.toString().split(' ')[2].split('x')[0]);
}

Requestcomplains about this

请求对此进行投诉

Error: You cannot pipe after data has been emitted from the response.

and changing the inputStreamto fs.createWriteStreamyields the same issue of course. I don't want to write into a file but reusein some way the stream that requestproduces (or any other for that matter).

并将inputStream更改fs.createWriteStream为当然会产生相同的问题。我不想写入文件,而是以某种方式重用请求生成的流(或任何其他与此相关的流)。

Is there a way to reuse a readable stream once it finishes piping? What would be the best way to accomplish something like the above example?

一旦完成管道,有没有办法重用可读流?完成上述示例的最佳方法是什么?

回答by user568109

You have to create duplicate of the stream by piping it to two streams. You can create a simple stream with a PassThrough stream, it simply passes the input to the output.

您必须通过管道将流传输到两个流来创建流的副本。您可以使用 PassThrough 流创建一个简单的流,它只是将输入传递给输出。

const spawn = require('child_process').spawn;
const PassThrough = require('stream').PassThrough;

const a = spawn('echo', ['hi user']);
const b = new PassThrough();
const c = new PassThrough();

a.stdout.pipe(b);
a.stdout.pipe(c);

let count = 0;
b.on('data', function (chunk) {
  count += chunk.length;
});
b.on('end', function () {
  console.log(count);
  c.pipe(process.stdout);
});

Output:

输出:

8
hi user

回答by artikas

The first answer only works if streams take roughly the same amount of time to process data. If one takes significantly longer, the faster one will request new data, consequently overwriting the data still being used by the slower one (I had this problem after trying to solve it using a duplicate stream).

第一个答案仅适用于流处理数据所需的时间大致相同的情况。如果花费的时间明显更长,则请求新数据的速度越快,因此覆盖速度较慢的数据仍在使用的数据(在尝试使用重复流解决此问题后,我遇到了这个问题)。

The following pattern worked very well for me. It uses a library based on Stream2 streams, Streamz, and Promises to synchronize async streams via a callback. Using the familiar example from the first answer:

以下模式对我来说非常有效。它使用基于 Stream2 流、Streamz 和 Promises 的库通过回调同步异步流。使用第一个答案中熟悉的示例:

spawn = require('child_process').spawn;
pass = require('stream').PassThrough;
streamz = require('streamz').PassThrough;
var Promise = require('bluebird');

a = spawn('echo', ['hi user']);
b = new pass;
c = new pass;   

a.stdout.pipe(streamz(combineStreamOperations)); 

function combineStreamOperations(data, next){
  Promise.join(b, c, function(b, c){ //perform n operations on the same data
  next(); //request more
}

count = 0;
b.on('data', function(chunk) { count += chunk.length; });
b.on('end', function() { console.log(count); c.pipe(process.stdout); });

回答by user3683370

What about piping into two or more streams not at the same time ?

不同时将管道输送到两个或多个流中会怎样?

For example :

例如 :

var PassThrough = require('stream').PassThrough;
var mybiraryStream = stream.start(); //never ending audio stream
var file1 = fs.createWriteStream('file1.wav',{encoding:'binary'})
var file2 = fs.createWriteStream('file2.wav',{encoding:'binary'})
var mypass = PassThrough
mybinaryStream.pipe(mypass)
mypass.pipe(file1)
setTimeout(function(){
   mypass.pipe(file2);
},2000)

The above code does not produce any errors but the file2 is empty

上面的代码没有产生任何错误但是file2是空的

回答by Zied Hamdi

I have a different solution to write to two streams simultaneously, naturally, the time to write will be the addition of the two times, but I use it to respond to a download request, where I want to keep a copy of the downloaded file on my server (actually I use a S3 backup, so I cache the most used files locally to avoid multiple file transfers)

我有一个不同的解决方案来同时写入两个流,自然,写入时间将是两次相加,但我用它来响应下载请求,我想在其中保留下载文件的副本我的服务器(实际上我使用的是 S3 备份,所以我在本地缓存了最常用的文件以避免多个文件传输)

/**
 * A utility class made to write to a file while answering a file download request
 */
class TwoOutputStreams {
  constructor(streamOne, streamTwo) {
    this.streamOne = streamOne
    this.streamTwo = streamTwo
  }

  setHeader(header, value) {
    if (this.streamOne.setHeader)
      this.streamOne.setHeader(header, value)
    if (this.streamTwo.setHeader)
      this.streamTwo.setHeader(header, value)
  }

  write(chunk) {
    this.streamOne.write(chunk)
    this.streamTwo.write(chunk)
  }

  end() {
    this.streamOne.end()
    this.streamTwo.end()
  }
}

You can then use this as a regular OutputStream

然后您可以将其用作常规的 OutputStream

const twoStreamsOut = new TwoOutputStreams(fileOut, responseStream)

and pass it to to your method as if it was a response or a fileOutputStream

并将其传递给您的方法,就好像它是响应或 fileOutputStream

回答by Juan

If you have async operations on the PassThrough streams, the answers posted here won't work. A solution that works for async operations includes buffering the stream content and then creating streams from the buffered result.

如果您对 PassThrough 流进行异步操作,则此处发布的答案将不起作用。适用于异步操作的解决方案包括缓冲流内容,然后根据缓冲结果创建流。

  1. To buffer the result you can use concat-stream

    const Promise = require('bluebird');
    const concat = require('concat-stream');
    const getBuffer = function(stream){
        return new Promise(function(resolve, reject){
            var gotBuffer = function(buffer){
                resolve(buffer);
            }
            var concatStream = concat(gotBuffer);
            stream.on('error', reject);
            stream.pipe(concatStream);
        });
    }
    
  2. To create streams from the buffer you can use:

    const { Readable } = require('stream');
    const getBufferStream = function(buffer){
        const stream = new Readable();
        stream.push(buffer);
        stream.push(null);
        return Promise.resolve(stream);
    }
    
  1. 要缓冲结果,您可以使用concat-stream

    const Promise = require('bluebird');
    const concat = require('concat-stream');
    const getBuffer = function(stream){
        return new Promise(function(resolve, reject){
            var gotBuffer = function(buffer){
                resolve(buffer);
            }
            var concatStream = concat(gotBuffer);
            stream.on('error', reject);
            stream.pipe(concatStream);
        });
    }
    
  2. 要从缓冲区创建流,您可以使用:

    const { Readable } = require('stream');
    const getBufferStream = function(buffer){
        const stream = new Readable();
        stream.push(buffer);
        stream.push(null);
        return Promise.resolve(stream);
    }
    

回答by levansuper

You can use this small npm package I created:

你可以使用我创建的这个小的 npm 包:

readable-stream-clone

readable-stream-clone

With this you can reuse readable streams as many times as you need

有了这个,您可以根据需要多次重复使用可读流

回答by Jake

For general problem, the following code works fine

对于一般问题,以下代码工作正常

var PassThrough = require('stream').PassThrough
a=PassThrough()
b1=PassThrough()
b2=PassThrough()
a.pipe(b1)
a.pipe(b2)
b1.on('data', function(data) {
  console.log('b1:', data.toString())
})
b2.on('data', function(data) {
  console.log('b2:', data.toString())
})
a.write('text')