node.js 连续迭代 mongodb 游标(在移动到下一个文档之前等待回调)

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/18119387/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-02 15:14:04  来源:igfitidea点击:

Iterating over a mongodb cursor serially (waiting for callbacks before moving to next document)

node.jsmongodbmongoskinasync.js

提问by UpTheCreek

Using mongoskin, I can do a query like this, which will return a cursor:

使用 mongoskin,我可以执行这样的查询,它将返回一个游标:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

      }
}

However, I'd like to call some async functions for each document, and only move on to the next item on the cursor after this has called back (similar to the eachSeries structure in the async.js module). E.g:

但是,我想为每个文档调用一些异步函数,并且只在它回调后移动到光标上的下一项(类似于 async.js 模块中的 eachSeries 结构)。例如:

myCollection.find({}, function(err, resultCursor) {
      resultCursor.each(function(err, result) {

            externalAsyncFunction(result, function(err) {
               //externalAsyncFunction completed - now want to move to next doc
            });

      }
}  

How could I do this?

我怎么能这样做?

Thanks

谢谢

UPDATE:

更新:

I don't wan't to use toArray()as this is a large batch operation, and the results might not fit in memory in one go.

我不想使用,toArray()因为这是一个大批量操作,而且结果可能一次不适合内存。

采纳答案by Timothy Strimple

If you don't want to load all of the results into memory using toArray, you can iterate using the cursor with something like the following.

如果您不想使用 toArray 将所有结果加载到内存中,您可以使用游标进行迭代,如下所示。

myCollection.find({}, function(err, resultCursor) {
  function processItem(err, item) {
    if(item === null) {
      return; // All done!
    }

    externalAsyncFunction(item, function(err) {
      resultCursor.nextObject(processItem);
    });

  }

  resultCursor.nextObject(processItem);
}  

回答by

A more modern approach that uses async/await:

使用async/ 的更现代的方法await

const cursor = db.collection("foo").find({});
while(await cursor.hasNext()) {
  const doc = await cursor.next();
  // process doc here
}

Notes:

笔记:

  • This may be even moresimple to do when async iteratorsarrive.
  • You'll probably want to add try/catch for error checking.
  • The containing function should be asyncor the code should be wrapped in (async function() { ... })()since it uses await.
  • If you want, add await new Promise(resolve => setTimeout(resolve, 1000));(pause for 1 second) at the end of the while loop to show that it does process docs one after the other.
  • 异步迭代器到达时,这可能简单。
  • 您可能希望添加 try/catch 以进行错误检查。
  • 包含函数应该是async或者代码应该被包裹,(async function() { ... })()因为它使用await.
  • 如果需要,await new Promise(resolve => setTimeout(resolve, 1000));在 while 循环的末尾添加(暂停 1 秒)以显示它确实一个接一个地处理文档。

回答by Daphoque

This works with large dataset by using setImmediate:

这适用于使用 setImmediate 的大型数据集:

var cursor = collection.find({filter...}).cursor();

cursor.nextObject(function fn(err, item) {
    if (err ||?!item) return;

    setImmediate(fnAction, item, arg1, arg2, function() {
        cursor.nextObject(fn);
    });
});

function fnAction(item, arg1, arg2, callback) {
    // Here you can do whatever you want to do with your item.
    return callback();
}

回答by Jaydeep Solanki

since node.js v10.3you can use async iterator

从 node.js v10.3 开始,您可以使用异步迭代器

const cursor = db.collection('foo').find({});
for await (const doc of cursor) {
  // do your thing
  // you can even use `await myAsyncOperation()` here
}

Jake Archibald wrote a great blog postabout async iterators, that I came to know after reading @user993683's answer.

Jake Archibald 写了一篇关于异步迭代器的很棒的博客文章,我是在阅读@user993683 的回答后才知道的。

回答by user3392439

If someone is looking for a Promise way of doing this (as opposed to using callbacks of nextObject), here it is. I am using Node v4.2.2 and mongo driver v2.1.7. This is kind of an asyncSeries version of Cursor.forEach():

如果有人正在寻找一种 Promise 方式来做到这一点(而不是使用 nextObject 的回调),那就是它。我正在使用 Node v4.2.2 和 mongo 驱动程序 v2.1.7。这是一种 asyncSeries 版本Cursor.forEach()

function forEachSeries(cursor, iterator) {
  return new Promise(function(resolve, reject) {
    var count = 0;
    function processDoc(doc) {
      if (doc != null) {
        count++;
        return iterator(doc).then(function() {
          return cursor.next().then(processDoc);
        });
      } else {
        resolve(count);
      }
    }
    cursor.next().then(processDoc);
  });
}

To use this, pass the cursor and an iterator that operates on each document asynchronously (like you would for Cursor.forEach). The iterator needs to return a promise, like most mongodb native driver functions do.

要使用它,请传递游标和异步操作每个文档的迭代器(就像您对 Cursor.forEach 所做的那样)。迭代器需要返回一个 promise,就像大多数 mongodb 本地驱动程序函数所做的那样。

Say, you want to update all documents in the collection test. This is how you would do it:

假设您要更新集合中的所有文档test。这是你会怎么做:

var theDb;
MongoClient.connect(dbUrl).then(function(db) {
  theDb = db;     // save it, we'll need to close the connection when done.
  var cur = db.collection('test').find();

  return forEachSeries(cur, function(doc) {    // this is the iterator
    return db.collection('test').updateOne(
      {_id: doc._id},
      {$set: {updated: true}}       // or whatever else you need to change
    );
    // updateOne returns a promise, if not supplied a callback. Just return it.
  });
})
.then(function(count) {
  console.log("All Done. Processed", count, "records");
  theDb.close();
})

回答by Antoine Desbois

You can do something like this using the async lib. The key point here is to check if the current doc is null. If it is, it means you are finished.

您可以使用异步库执行类似操作。这里的关键点是检查当前文档是否为空。如果是,则表示您已完成。

async.series([
        function (cb) {
            cursor.each(function (err, doc) {
                if (err) {
                    cb(err);
                } else if (doc === null) {
                    cb();
                } else {
                    console.log(doc);
                    array.push(doc);
                }
            });
        }
    ], function (err) {
        callback(err, array);
    });

回答by Salman

You can get the result in an Arrayand iterate using a recursive function, something like this.

您可以在 an 中获得结果Array并使用递归函数进行迭代,就像这样。

myCollection.find({}).toArray(function (err, items) {
    var count = items.length;
    var fn = function () {
        externalAsyncFuntion(items[count], function () {
            count -= 1;
            if (count) fn();
        })
    }

    fn();
});

Edit:

编辑:

This is only applicable for small datasets, for larger one's you should use cursors as mentioned in other answers.

这仅适用于小型数据集,对于较大的数据集,您应该使用其他答案中提到的游标。

回答by Gerard Carbó

You could use a Future:

你可以使用未来:

myCollection.find({}, function(err, resultCursor) {
    resultCursor.count(Meteor.bindEnvironment(function(err,count){
        for(var i=0;i<count;i++)
        {
            var itemFuture=new Future();

            resultCursor.nextObject(function(err,item)){
                itemFuture.result(item);
            }

            var item=itemFuture.wait();
            //do what you want with the item, 
            //and continue with the loop if so

        }
    }));
});

回答by Leo

You could use simple setTimeOut's. This is an example in typescript running on nodejs (I am using promises via the 'when' module but it can be done without them as well):

您可以使用简单的 setTimeOut。这是在 nodejs 上运行的打字稿示例(我通过 'when' 模块使用 promise,但也可以在没有它们的情况下完成):

        import mongodb = require("mongodb");

        var dbServer = new mongodb.Server('localhost', 27017, {auto_reconnect: true}, {});
        var db =  new mongodb.Db('myDb', dbServer);

        var util = require('util');
        var when = require('when'); //npm install when

        var dbDefer = when.defer();
        db.open(function() {
            console.log('db opened...');
            dbDefer.resolve(db);
        });

        dbDefer.promise.then(function(db : mongodb.Db){
            db.collection('myCollection', function (error, dataCol){
                if(error) {
                    console.error(error); return;
                }

                var doneReading = when.defer();

                var processOneRecordAsync = function(record) : When.Promise{
                    var result = when.defer();

                    setTimeout (function() {
                        //simulate a variable-length operation
                        console.log(util.inspect(record));
                        result.resolve('record processed');
                    }, Math.random()*5);

                    return result.promise;
                }

                var runCursor = function (cursor : MongoCursor){
                    cursor.next(function(error : any, record : any){
                        if (error){
                            console.log('an error occurred: ' + error);
                            return;
                        }
                        if (record){
                            processOneRecordAsync(record).then(function(r){
                                setTimeout(function() {runCursor(cursor)}, 1);
                            });
                        }
                        else{
                            //cursor up
                            doneReading.resolve('done reading data.');
                        }
                    });
                }

                dataCol.find({}, function(error, cursor : MongoCursor){
                    if (!error)
                    {
                        setTimeout(function() {runCursor(cursor)}, 1);
                    }
                });

                doneReading.promise.then(function(message : string){
                    //message='done reading data'
                    console.log(message);
                });
            });
        });