如何监听 MongoDB 集合的变化?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/9691316/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-09 12:30:57  来源:igfitidea点击:

How to listen for changes to a MongoDB collection?

mongodb

提问by Andrew

I'm creating a sort of background job queue system with MongoDB as the data store. How can I "listen" for inserts to a MongoDB collection before spawning workers to process the job? Do I need to poll every few seconds to see if there are any changes from last time, or is there a way my script can wait for inserts to occur? This is a PHP project that I am working on, but feel free to answer in Ruby or language agnostic.

我正在创建一种使用 MongoDB 作为数据存储的后台作业队列系统。在生成工作人员处理作业之前,如何“侦听”对 MongoDB 集合的插入?我是否需要每隔几秒钟轮询一次以查看是否与上次相比有任何更改,或者我的脚本是否可以等待插入发生?这是我正在开发的一个 PHP 项目,但可以随意用 Ruby 或语言不可知论回答。

采纳答案by Andrew

MongoDB has what is called capped collectionsand tailable cursorsthat allows MongoDB to push data to the listeners.

MongoDB 具有所谓的功能capped collectionstailable cursors它允许 MongoDB 将数据推送到侦听器。

A capped collectionis essentially a collection that is a fixed size and only allows insertions. Here's what it would look like to create one:

Acapped collection本质上是一个固定大小且只允许插入的集合。下面是创建一个的样子:

db.createCollection("messages", { capped: true, size: 100000000 })

MongoDB Tailable cursors (original post by Jonathan H. Wage)

MongoDB Tailable cursors(Jonathan H. Wage 的原帖

Ruby

红宝石

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
    if ($cursor->hasNext()) {
        $doc = $cursor->getNext();
        print_r($doc);
    } else {
        sleep(1);
    }
}

Python(by Robert Stewart)

Python罗伯特·斯图尔特)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl(by Max)

Perl(由Max 编写

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
    if (defined(my $doc = $cursor->next))
    {
        say $doc;
    }
    else
    {
        sleep 1;
    }
}

Additional Resources:

其他资源:

Ruby/Node.js Tutorial which walks you through creating an application that listens to inserts in a MongoDB capped collection.

Ruby/Node.js 教程,它将引导您创建一个应用程序,该应用程序侦听 MongoDB 上限集合中的插入。

An article talking about tailable cursors in more detail.

一篇更详细地讨论可尾游标的文章。

PHP, Ruby, Python, and Perl examples of using tailable cursors.

使用可尾游标的 PHP、Ruby、Python 和 Perl 示例。

回答by Gates VP

What you are thinking of sounds a lot like triggers. MongoDB does not have any support for triggers, however some people have "rolled their own" using some tricks. The key here is the oplog.

你在想什么听起来很像触发器。MongoDB 不支持触​​发器,但是有些人使用一些技巧“推出了自己的”。这里的关键是oplog。

When you run MongoDB in a Replica Set, all of the MongoDB actions are logged to an operations log (known as the oplog). The oplog is basically just a running list of the modifications made to the data. Replicas Sets function by listening to changes on this oplog and then applying the changes locally.

当您在副本集中运行 MongoDB 时,所有 MongoDB 操作都会记录到操作日志(称为 oplog)中。oplog 基本上只是对数据所做修改的运行列表。副本集功能通过侦听此 oplog 上的更改然后在本地应用更改。

Does this sound familiar?

这听起来很熟悉吗?

I cannot detail the whole process here, it is several pages of documentation, but the tools you need are available.

我不能在这里详细说明整个过程,它是几页文档,但您需要的工具可用。

First some write-ups on the oplog - Brief description- Layout of the localcollection(which contains the oplog)

首先是关于 oplog 的一些文章 -简要说明-集合的布局local(包含 oplog)

You will also want to leverage tailable cursors. These will provide you with a way to listen for changes instead of polling for them. Note that replication uses tailable cursors, so this is a supported feature.

您还需要利用可尾游标。这些将为您提供一种侦听更改而不是轮询更改的方法。请注意,复制使用可尾游标,因此这是一项受支持的功能。

回答by Mitar

Since MongoDB 3.6 there will be a new notifications API called Change Streams which you can use for this. See this blog post for an example. Example from it:

从 MongoDB 3.6 开始,将有一个名为 Change Streams 的新通知 API,您可以使用它。有关示例,请参阅此博客文章。从它的例子:

cursor = client.my_db.my_collection.changes([
    {'$match': {
        'operationType': {'$in': ['insert', 'replace']}
    }},
    {'$match': {
        'newDocument.n': {'$gte': 1}
    }}
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])

回答by Rio Weber

Check out this: Change Streams

看看这个:改变流

January 10, 2018 - Release 3.6

2018 年 1 月 10 日 -版本 3.6

*EDIT: I wrote an article about how to do this https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

*编辑:我写了一篇关于如何做到这一点的文章https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/

https://docs.mongodb.com/v3.6/changeStreams/



It's new in mongodb 3.6https://docs.mongodb.com/manual/release-notes/3.6/2018/01/10

这是mongodb 3.6 中的新内容https://docs.mongodb.com/manual/release-notes/3.6/2018/01/10

$ mongod --version
db version v3.6.2


In order to use changeStreamsthe database must be a Replication Set

为了使用changeStreams,数据库必须是一个复制集

More about Replication Sets: https://docs.mongodb.com/manual/replication/

有关复制集的更多信息:https: //docs.mongodb.com/manual/replication/

Your Database will be a "Standalone" by default.

默认情况下,您的数据库将是“独立的”。

How to Convert a Standalone to a Replica Set: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/

如何将独立转换为副本集:https: //docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/



The following exampleis a practical application for how you might use this.
* Specifically for Node.

以下示例是您如何使用它的实际应用程序。
*专为节点。

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) {
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) {
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) {
            console.log('COLLECTION CHANGED');

            User.find({}, (err, data) => {
                if (err) throw err;

                if (data) {
                    // RESEND ALL USERS
                    socket.emit('users', data);
                }
            });
        });
    });
};
/* END - file.js */


Useful links:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

有用的链接:
https: //docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams

https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams

回答by Robert Walters

MongoDB version 3.6 now includes change streams which is essentially an API on top of the OpLog allowing for trigger/notification-like use cases.

MongoDB 3.6 版现在包含变更流,它本质上是 OpLog 之上的 API,允许类似触发器/通知的用例。

Here is a link to a Java example: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

这是 Java 示例的链接:http: //mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

A NodeJS example might look something like:

NodeJS 示例可能如下所示:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client){
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change){
            console.log(JSON.stringify(change));
          });
      });

回答by Alex

Alternatively, you could use the standard Mongo FindAndUpdate method, and within the callback, fire an EventEmitter event (in Node) when the callback is run.

或者,您可以使用标准的 Mongo FindAndUpdate 方法,并在回调中,在运行回调时触发 EventEmitter 事件(在 Node 中)。

Any other parts of the application or architecture listening to this event will be notified of the update, and any relevant data sent there also. This is a really simple way to achieve notifications from Mongo.

侦听此事件的应用程序或体系结构的任何其他部分将收到更新通知,以及发送到那里的任何相关数据。这是从 Mongo 获取通知的一种非常简单的方法。

回答by John Culviner

Many of these answers will only give you new records and not updates and/or are extremely ineffecient

许多这些答案只会给你新的记录而不是更新和/或效率极低

The only reliable, performant way to do this is to create a tailable cursor on local db: oplog.rs collection to get ALL changes to MongoDB and do with it what you will. (MongoDB even does this internally more or less to support replication!)

唯一可靠、高效的方法是在本地 db: oplog.rs 集合上创建一个可拖尾的游标,以获取对 MongoDB 的所有更改,并随心所欲地使用它。(MongoDB 甚至在内部或多或少地这样做是为了支持复制!)

Explanation of what the oplog contains: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

oplog 包含的内容说明:https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

Example of a Node.js library that provides an API around what is available to be done with the oplog: https://github.com/cayasso/mongo-oplog

一个 Node.js 库的例子,它提供了一个 API,围绕着可以用 oplog 完成的事情:https: //github.com/cayasso/mongo-oplog

回答by Manish Jain

There is an awesome set of services available called MongoDB Stitch. Look into stitch functions/triggers. Note this is a cloud-based paidservice (AWS). In your case, on an insert, you could call a custom function written in javascript.

有一组很棒的服务可用,称为MongoDB Stitch。查看缝合功能/触发器。请注意,这是一项基于云的付费服务 (AWS)。在您的情况下,在插入时,您可以调用用 javascript 编写的自定义函数。

enter image description here

在此处输入图片说明

回答by Maleen Abewardana

There is an working java example which can be found here.

可以在此处找到一个有效的 java 示例。

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> {
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) {
            DBObject obj = cur.next();
            System.out.println( obj );

        }
    };
    new Thread(task).start();

The key is QUERY OPTIONSgiven here.

关键是这里给出的查询选项

Also you can change find query, if you don't need to load all the data every time.

如果您不需要每次都加载所有数据,您也可以更改查找查询。

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1)));?//timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

回答by Duong Nguyen

Actually, instead of watching output, why you dont get notice when something new is inserted by using middle-ware that was provided by mongoose schema

实际上,当使用mongoose 模式提供的中间件插入新内容时,为什么您不会注意到,而不是观看输出

You can catch the event of insert a new document and do something after this insertion done

您可以捕获插入新文档的事件并在插入完成后做一些事情