在 Node.js 中解析巨大的日志文件 - 逐行读取
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/16010915/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Parsing huge logfiles in Node.js - read in line-by-line
提问by victorhooi
I need to do some parsing of large (5-10 Gb)logfiles in Javascript/Node.js (I'm using Cube).
我需要对 Javascript/Node.js 中的大型(5-10 Gb)日志文件进行一些解析(我使用的是 Cube)。
The logline looks something like:
日志行看起来像:
10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".
We need to read each line, do some parsing (e.g. strip out 5, 7and SUCCESS), then pump this data into Cube (https://github.com/square/cube) using their JS client.
我们需要阅读每一行,做了一些分析(如带出来5,7和SUCCESS),然后该泵将数据立方体(https://github.com/square/cube使用他们的JS客户端)。
Firstly, what is the canonical way in Node to read in a file, line by line?
首先,Node 中逐行读取文件的规范方式是什么?
It seems to be fairly common question online:
网上好像很常见的一个问题:
- http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js
- Read a file one line at a time in node.js?
- http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js
- 在 node.js 中一次读取一行文件?
A lot of the answers seem to point to a bunch of third-party modules:
很多答案似乎都指向一堆第三方模块:
- https://github.com/nickewing/line-reader
- https://github.com/jahewson/node-byline
- https://github.com/pkrumins/node-lazy
- https://github.com/Gagle/Node-BufferedReader
- https://github.com/nickewing/line-reader
- https://github.com/jahewson/node-byline
- https://github.com/pkrumins/node-lazy
- https://github.com/Gagle/Node-BufferedReader
However, this seems like a fairly basic task - surely, there's a simple way within the stdlib to read in a textfile, line-by-line?
但是,这似乎是一项相当基本的任务 - 当然,stdlib 中有一种简单的方法可以逐行读取文本文件?
Secondly, I then need to process each line (e.g. convert the timestamp into a Date object, and extract useful fields).
其次,我需要处理每一行(例如将时间戳转换为日期对象,并提取有用的字段)。
What's the best way to do this, maximising throughput? Is there some way that won't block on either reading in each line, or on sending it to Cube?
最大程度提高吞吐量的最佳方法是什么?是否有某种方法不会阻止每行读取或将其发送到 Cube?
Thirdly - I'm guessing using string splits, and the JS equivalent of contains (IndexOf != -1?) will be a lot faster than regexes? Has anybody had much experience in parsing massive amounts of text data in Node.js?
第三 - 我猜测使用字符串拆分,JS 等价于 contains (IndexOf != -1?) 将比正则表达式快得多?有没有人在 Node.js 中解析大量文本数据方面有很多经验?
Cheers, Victor
干杯,维克多
回答by Gerard
I searched for a solution to parse very large files (gbs) line by line using a stream. All the third-party libraries and examples did not suit my needs since they processed the files not line by line (like 1 , 2 , 3 , 4 ..) or read the entire file to memory
我搜索了一个解决方案来使用流逐行解析非常大的文件 (gbs)。所有第三方库和示例都不适合我的需要,因为它们不是逐行处理文件(如 1 、 2 、 3 、 4 ..)或将整个文件读入内存
The following solution can parse very large files, line by line using stream & pipe. For testing I used a 2.1 gb file with 17.000.000 records. Ram usage did not exceed 60 mb.
以下解决方案可以使用流和管道逐行解析非常大的文件。为了测试,我使用了一个包含 17.000.000 条记录的 2.1 GB 文件。Ram 使用量不超过 60 mb。
First, install the event-streampackage:
首先,安装事件流包:
npm install event-stream
Then:
然后:
var fs = require('fs')
, es = require('event-stream');
var lineNr = 0;
var s = fs.createReadStream('very-large-file.csv')
.pipe(es.split())
.pipe(es.mapSync(function(line){
// pause the readstream
s.pause();
lineNr += 1;
// process line here and call s.resume() when rdy
// function below was for logging memory usage
logMemoryUsage(lineNr);
// resume the readstream, possibly from a callback
s.resume();
})
.on('error', function(err){
console.log('Error while reading file.', err);
})
.on('end', function(){
console.log('Read entire file.')
})
);


Please let me know how it goes!
请让我知道它是怎么回事!
回答by user568109
You can use the inbuilt readlinepackage, see docs here. I use streamto create a new output stream.
您可以使用内置readline包,请参阅此处的文档。我使用流来创建一个新的输出流。
var fs = require('fs'),
readline = require('readline'),
stream = require('stream');
var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;
var rl = readline.createInterface({
input: instream,
output: outstream,
terminal: false
});
rl.on('line', function(line) {
console.log(line);
//Do your stuff ...
//Then write to outstream
rl.write(cubestuff);
});
Large files will take some time to process. Do tell if it works.
处理大文件需要一些时间。请告诉它是否有效。
回答by ambodi
I really liked @gerardanswer which is actually deserves to be the correct answer here. I made some improvements:
我真的很喜欢@gerard 的答案,这实际上应该是这里的正确答案。我做了一些改进:
- Code is in a class (modular)
- Parsing is included
- Ability to resume is given to the outside in case there is an asynchronous job is chained to reading the CSV like inserting to DB, or a HTTP request
- Reading in chunks/batche sizes that user can declare. I took care of encoding in the stream too, in case you have files in different encoding.
- 代码在一个类中(模块化)
- 包括解析
- 如果异步作业链接到读取 CSV(如插入数据库或 HTTP 请求),则向外部提供恢复能力
- 读取用户可以声明的块/批次大小。我也处理了流中的编码,以防您有不同编码的文件。
Here's the code:
这是代码:
'use strict'
const fs = require('fs'),
util = require('util'),
stream = require('stream'),
es = require('event-stream'),
parse = require("csv-parse"),
iconv = require('iconv-lite');
class CSVReader {
constructor(filename, batchSize, columns) {
this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
this.batchSize = batchSize || 1000
this.lineNumber = 0
this.data = []
this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true}
}
read(callback) {
this.reader
.pipe(es.split())
.pipe(es.mapSync(line => {
++this.lineNumber
parse(line, this.parseOptions, (err, d) => {
this.data.push(d[0])
})
if (this.lineNumber % this.batchSize === 0) {
callback(this.data)
}
})
.on('error', function(){
console.log('Error while reading file.')
})
.on('end', function(){
console.log('Read entirefile.')
}))
}
continue () {
this.data = []
this.reader.resume()
}
}
module.exports = CSVReader
So basically, here is how you will use it:
所以基本上,这里是你将如何使用它:
let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())
I tested this with a 35GB CSV file and it worked for me and that's why I chose to build it on @gerard's answer, feedbacks are welcomed.
我用一个 35GB 的 CSV 文件对此进行了测试,它对我有用,这就是我选择在@gerard的答案上构建它的原因 ,欢迎提供反馈。
回答by Eugene Ilyushin
I used https://www.npmjs.com/package/line-by-linefor reading more than 1 000 000 lines from a text file. In this case, an occupied capacity of RAM was about 50-60 megabyte.
我使用https://www.npmjs.com/package/line-by-line从文本文件中读取超过 1 000 000 行。在这种情况下,RAM的占用容量约为50-60兆字节。
const LineByLineReader = require('line-by-line'),
lr = new LineByLineReader('big_file.txt');
lr.on('error', function (err) {
// 'err' contains error object
});
lr.on('line', function (line) {
// pause emitting of lines...
lr.pause();
// ...do your asynchronous line processing..
setTimeout(function () {
// ...and continue emitting lines.
lr.resume();
}, 100);
});
lr.on('end', function () {
// All lines are read, file is closed now.
});
回答by Kris Roofe
Apart from read the big file line by line, you also can read it chunk by chunk. For more refer to this article
除了逐行读取大文件之外,您还可以逐块读取它。更多请参考这篇文章
var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
offset += bytesRead;
var str = chunkBuffer.slice(0, bytesRead).toString();
var arr = str.split('\n');
if(bytesRead = chunkSize) {
// the last item of the arr may be not a full line, leave it to the next chunk
offset -= arr.pop().length;
}
lines.push(arr);
}
console.log(lines);
回答by deemstone
I had the same problem yet. After comparing several modules that seem to have this feature, I decided to do it myself, it's simpler than I thought.
我还有同样的问题。对比了几个貌似有这个功能的模块,我决定自己做,比我想象的简单。
gist: https://gist.github.com/deemstone/8279565
要点:https: //gist.github.com/deemstone/8279565
var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start){ ... }); //lines{array} start{int} lines[0] No.
It cover the file opened in a closure, that fetchBlock()returned will fetch a block from the file, end split to array (will deal the segment from last fetch).
它涵盖了在闭包中打开的文件,fetchBlock()返回的文件将从文件中获取一个块,结束拆分为数组(将处理上次获取的段)。
I've set the block size to 1024 for each read operation. This may have bugs, but code logic is obvious, try it yourself.
我已将每个读取操作的块大小设置为 1024。这个可能有bug,但是代码逻辑很明显,自己试试吧。
回答by Jaime Gómez
The Node.js Documentation offers a very elegant example using the Readline module.
Node.js 文档提供了一个使用 Readline 模块的非常优雅的示例。
Example: Read File Stream Line-by-Line
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface({
input: fs.createReadStream('sample.txt'),
crlfDelay: Infinity
});
rl.on('line', (line) => {
console.log(`Line from file: ${line}`);
});
Note: we use the crlfDelay option to recognize all instances of CR LF ('\r\n') as a single line break.
注意:我们使用 crlfDelay 选项将 CR LF ('\r\n') 的所有实例识别为单个换行符。
回答by hereandnow78
node-byline uses streams, so i would prefer that one for your huge files.
node-byline 使用流,所以我更喜欢你的大文件。
for your date-conversions i would use moment.js.
对于您的日期转换,我会使用moment.js。
for maximising your throughput you could think about using a software-cluster. there are some nice-modules which wrap the node-native cluster-module quite well. i like cluster-masterfrom isaacs. e.g. you could create a cluster of x workers which all compute a file.
为了最大化您的吞吐量,您可以考虑使用软件集群。有一些很好的模块可以很好地包装节点原生集群模块。我喜欢isaacs 的cluster-master。例如,您可以创建一个由 x 个工人组成的集群,它们都计算一个文件。
for benchmarking splits vs regexes use benchmark.js. i havent tested it until now. benchmark.js is available as a node-module
对于基准拆分与正则表达式使用benchmark.js。直到现在我还没有测试过。benchmark.js 可用作节点模块
回答by Benvorth
Based on thisquestions answer I implemented a class you can use to read a file synchronously line-by-line with fs.readSync(). You can make this "pause" and "resume" by using a Qpromise (jQueryseems to require a DOM so cant run it with nodejs):
基于这个问题的答案,我实现了一个类,你可以用它来同步读取文件fs.readSync()。您可以通过使用Q承诺来实现“暂停”和“恢复” (jQuery似乎需要一个 DOM,因此无法使用它运行nodejs):
var fs = require('fs');
var Q = require('q');
var lr = new LineReader(filenameToLoad);
lr.open();
var promise;
workOnLine = function () {
var line = lr.readNextLine();
promise = complexLineTransformation(line).then(
function() {console.log('ok');workOnLine();},
function() {console.log('error');}
);
}
workOnLine();
complexLineTransformation = function (line) {
var deferred = Q.defer();
// ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error));
return deferred.promise;
}
function LineReader (filename) {
this.moreLinesAvailable = true;
this.fd = undefined;
this.bufferSize = 1024*1024;
this.buffer = new Buffer(this.bufferSize);
this.leftOver = '';
this.read = undefined;
this.idxStart = undefined;
this.idx = undefined;
this.lineNumber = 0;
this._bundleOfLines = [];
this.open = function() {
this.fd = fs.openSync(filename, 'r');
};
this.readNextLine = function () {
if (this._bundleOfLines.length === 0) {
this._readNextBundleOfLines();
}
this.lineNumber++;
var lineToReturn = this._bundleOfLines[0];
this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
return lineToReturn;
};
this.getLineNumber = function() {
return this.lineNumber;
};
this._readNextBundleOfLines = function() {
var line = "";
while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file
this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver
this.idxStart = 0
while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver
line = this.leftOver.substring(this.idxStart, this.idx);
this._bundleOfLines.push(line);
this.idxStart = this.idx + 1;
}
this.leftOver = this.leftOver.substring(this.idxStart);
if (line !== "") {
break;
}
}
};
}
回答by Raza
import * as csv from 'fast-csv';
import * as fs from 'fs';
interface Row {
[s: string]: string;
}
type RowCallBack = (data: Row, index: number) => object;
export class CSVReader {
protected file: string;
protected csvOptions = {
delimiter: ',',
headers: true,
ignoreEmpty: true,
trim: true
};
constructor(file: string, csvOptions = {}) {
if (!fs.existsSync(file)) {
throw new Error(`File ${file} not found.`);
}
this.file = file;
this.csvOptions = Object.assign({}, this.csvOptions, csvOptions);
}
public read(callback: RowCallBack): Promise < Array < object >> {
return new Promise < Array < object >> (resolve => {
const readStream = fs.createReadStream(this.file);
const results: Array < any > = [];
let index = 0;
const csvStream = csv.parse(this.csvOptions).on('data', async (data: Row) => {
index++;
results.push(await callback(data, index));
}).on('error', (err: Error) => {
console.error(err.message);
throw err;
}).on('end', () => {
resolve(results);
});
readStream.pipe(csvStream);
});
}
}
import { CSVReader } from '../src/helpers/CSVReader';
(async () => {
const reader = new CSVReader('./database/migrations/csv/users.csv');
const users = await reader.read(async data => {
return {
username: data.username,
name: data.name,
email: data.email,
cellPhone: data.cell_phone,
homePhone: data.home_phone,
roleId: data.role_id,
description: data.description,
state: data.state,
};
});
console.log(users);
})();

