node.js 将流通过管道传输到 s3.upload()
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/37336050/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Pipe a stream to s3.upload()
提问by womp
I'm currently making use of a node.js plugin called s3-upload-streamto stream very large files to Amazon S3. It uses the multipart API and for the most part it works very well.
我目前正在使用名为s3-upload-stream的 node.js 插件将非常大的文件流式传输到 Amazon S3。它使用多部分 API,并且在大多数情况下它运行良好。
However, this module is showing its age and I've already had to make modifications to it (the author has deprecated it as well). Today I ran into another issue with Amazon, and I would really like to take the author's recommendation and start using the official aws-sdk to accomplish my uploads.
然而,这个模块显示了它的年龄,我已经不得不对其进行修改(作者也弃用了它)。今天又遇到了亚马逊的另一个问题,我真的很想接受作者的建议,开始使用官方的aws-sdk来完成我的上传。
BUT.
但。
The official SDK does not seem to support piping to s3.upload(). The nature of s3.upload is that you have to pass the readable stream as an argument to the S3 constructor.
官方 SDK 似乎不支持管道到s3.upload(). s3.upload 的本质是您必须将可读流作为参数传递给 S3 构造函数。
I have roughly 120+ user code modules that do various file processing, and they are agnostic to the final destination of their output. The engine hands them a pipeable writeable output stream, and they pipe to it. I cannot hand them an AWS.S3object and ask them to call upload()on it without adding code to all the modules. The reason I used s3-upload-streamwas because it supported piping.
我有大约 120 多个用户代码模块可以进行各种文件处理,它们与输出的最终目的地无关。引擎交给他们一个可管道化的可写输出流,然后他们通过管道传输给它。我不能给他们一个AWS.S3对象并要求他们upload()在不向所有模块添加代码的情况下调用它。我使用的原因s3-upload-stream是因为它支持管道。
Is there a way to make aws-sdk s3.upload()something I can pipe the stream to?
有没有办法让 aws-sdk 成为s3.upload()我可以通过管道传输的东西?
回答by Casey Benko
Wrap the S3 upload()function with the node.js stream.PassThrough()stream.
upload()用 node.jsstream.PassThrough()流包装 S3函数。
Here's an example:
下面是一个例子:
inputStream
.pipe(uploadFromStream(s3));
function uploadFromStream(s3) {
var pass = new stream.PassThrough();
var params = {Bucket: BUCKET, Key: KEY, Body: pass};
s3.upload(params, function(err, data) {
console.log(err, data);
});
return pass;
}
回答by Ahmet Cetin
A bit late answer, it might help someone else hopefully. You can return both writeable stream and the promise, so you can get response data when upload finishes.
回答有点晚,希望它可以帮助其他人。您可以同时返回可写流和承诺,以便在上传完成时获得响应数据。
const AWS = require('aws-sdk');
const stream = require('stream');
const uploadStream = ({ Bucket, Key }) => {
const s3 = new AWS.S3();
const pass = new stream.PassThrough();
return {
writeStream: pass,
promise: s3.upload({ Bucket, Key, Body: pass }).promise(),
};
}
And you can use the function as follows:
您可以按如下方式使用该功能:
const { writeStream, promise } = uploadStream({Bucket: 'yourbucket', Key: 'yourfile.mp4'});
const readStream = fs.createReadStream('/path/to/yourfile.mp4');
const pipeline = readStream.pipe(writeStream);
Now you can either check promise:
现在您可以检查承诺:
promise.then(() => {
console.log('upload completed successfully');
}).catch((err) = > {
console.log('upload failed.', err.message);
});
Or as stream.pipe() returns stream.Writable, the destination (writeStream variable above), allowing for a chain of pipes, we can also use its events:
或者当 stream.pipe() 返回 stream.Writable 时,目标(上面的 writeStream 变量),允许管道链,我们也可以使用它的事件:
pipeline.on('close', () => {
console.log('upload successful');
});
pipeline.on('error', (err) => {
console.log('upload failed', err.message)
});
回答by tsuz
In the accepted answer, the function ends before the upload is complete, and thus, it's incorrect. The code below pipes correctly from a readable stream.
在接受的答案中,该功能在上传完成之前结束,因此是不正确的。下面的代码从可读流中正确地进行管道传输。
async function uploadReadableStream(stream) {
const params = {Bucket: bucket, Key: key, Body: stream};
return s3.upload(params).promise();
}
async function upload() {
const readable = getSomeReadableStream();
const results = await uploadReadableStream(readable);
console.log('upload complete', results);
}
You can also go a step further and output progress info using ManagedUploadas such:
您还可以更进一步并使用以下方式输出进度信息ManagedUpload:
const manager = s3.upload(params);
manager.on('httpUploadProgress', (progress) => {
console.log('progress', progress) // { loaded: 4915, total: 192915, part: 1, key: 'foo.jpg' }
});
回答by cortopy
None of the answers worked for me because I wanted to:
没有一个答案对我有用,因为我想:
- Pipe into
s3.upload() - Pipe the result of
s3.upload()into another stream
- 管道进入
s3.upload() - 将 的结果通过管道
s3.upload()输送到另一个流中
The accepted answer doesn't do the latter. The others rely on the promise api, which is cumbersome to work when working with stream pipes.
接受的答案不做后者。其他的依赖于promise api,这在使用流管道时很麻烦。
This is my modification of the accepted answer.
这是我对已接受答案的修改。
const s3 = new S3();
function writeToS3({Key, Bucket}) {
const Body = new stream.PassThrough();
s3.upload({
Body,
Key,
Bucket: process.env.adpBucket
})
.on('httpUploadProgress', progress => {
console.log('progress', progress);
})
.send((err, data) => {
if (err) {
Body.destroy(err);
} else {
console.log(`File uploaded and available at ${data.Location}`);
Body.destroy();
}
});
return Body;
}
const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});
pipeline.on('close', () => {
// upload finished, do something else
})
pipeline.on('error', () => {
// upload wasn't successful. Handle it
})
回答by dzole vladimirov
Type Script solution:
This example uses:
Type Script 解决方案:
此示例使用:
import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";
And async function:
和异步功能:
public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> {
const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
const passT = new stream.PassThrough();
return {
writeStream: passT,
promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
};
};
const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
fsExtra.createReadStream(filePath).pipe(writeStream); // NOTE: Addition You can compress to zip by .pipe(zlib.createGzip()).pipe(writeStream)
let output = true;
await promise.catch((reason)=> { output = false; console.log(reason);});
return output;
}
Call this method somewhere like:
在某处调用此方法,例如:
let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);
回答by Tim
For those complaining that the when they use the s3 api upload function and a zero byte file ends up on s3 (@Radar155 and @gabo) - I also had this problem.
对于那些抱怨他们何时使用 s3 api 上传功能和零字节文件在 s3(@Radar155 和 @gabo)上结束的人 - 我也遇到了这个问题。
Create a second PassThrough stream and just pipe all data from the first to the second and pass the reference to that second to s3. You can do this in a couple of different ways - possibly a dirty way is to listen for the "data" event on the first stream and then write that same data to the second stream - the similarly for the "end" event - just call the end function on the second stream. I've no idea whether this is a bug in the aws api, the version of node or some other issue - but it worked around the issue for me.
创建第二个 PassThrough 流并将所有数据从第一个传递到第二个并将对第二个的引用传递给 s3。您可以通过几种不同的方式执行此操作 - 可能一种肮脏的方式是在第一个流上侦听“数据”事件,然后将相同的数据写入第二个流 - 与“结束”事件类似 - 只需调用第二个流上的结束函数。我不知道这是 aws api 中的错误、节点版本还是其他一些问题 - 但它为我解决了这个问题。
Here is how it might look:
这是它的外观:
var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();
var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
destStream.write(chunk);
});
srcStream.on('end', function () {
dataStream.end();
});
回答by varun bhaya
The thing here to note in the most accepted answer above is that: You need to return the pass in the function if you are using pipe like,
在上面最被接受的答案中要注意的是:如果您使用管道,则需要返回函数中的传递,
fs.createReadStream(<filePath>).pipe(anyUploadFunction())
fs.createReadStream(<filePath>).pipe(anyUploadFunction())
function anyUploadFunction () {
let pass = new stream.PassThrough();
return pass // <- Returning this pass is important for the stream to understand where it needs to write to.
}
Otherwise it will silently move onto next without throwing an error or will throw an error of TypeError: dest.on is not a functiondepending upon how you have written the function
否则它会默默地移动到下一个而不会抛出错误,或者会抛出错误,TypeError: dest.on is not a function具体取决于您如何编写函数
回答by mattdlockyer
If it helps anyone I was able to stream from the client to s3 successfully:
如果它可以帮助我成功地从客户端流式传输到 s3 的任何人:
https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a
https://gist.github.com/mattlockyer/532291b6194f6d9ca40cb82564db9d2a
The serverside code assumes reqis a stream object, in my case it was sent from the client with file info set in the headers.
服务器端代码假定req是一个流对象,在我的情况下,它是从客户端发送的,文件信息设置在标头中。
const fileUploadStream = (req, res) => {
//get "body" args from header
const { id, fn } = JSON.parse(req.get('body'));
const Key = id + '/' + fn; //upload to s3 folder "id" with filename === fn
const params = {
Key,
Bucket: bucketName, //set somewhere
Body: req, //req is a stream
};
s3.upload(params, (err, data) => {
if (err) {
res.send('Error Uploading Data: ' + JSON.stringify(err) + '\n' + JSON.stringify(err.stack));
} else {
res.send(Key);
}
});
};
Yes it breaks convention but if you look at the gist it's much cleaner than anything else I found using multer, busboy etc...
是的,它打破了惯例,但如果你看一下要点,它比我使用 multer、busboy 等发现的任何其他东西都要干净得多......
+1 for pragmatism and thanks to @SalehenRahman for his help.
+1 务实,感谢@SalehenRahman 的帮助。
回答by TestWell
I'm using KnexJS and had a problem using their streaming API. I finally fixed it, hopefully the following will help someone.
我正在使用 KnexJS,并且在使用他们的流 API 时遇到了问题。我终于修复了它,希望以下内容能对某人有所帮助。
const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();
knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());
const uploadResult = await s3
.upload({
Bucket: 'my-bucket',
Key: 'stream-test.txt',
Body: passThroughStream
})
.promise();

