如何监听 MongoDB 集合的变化?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/9691316/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to listen for changes to a MongoDB collection?
提问by Andrew
I'm creating a sort of background job queue system with MongoDB as the data store. How can I "listen" for inserts to a MongoDB collection before spawning workers to process the job? Do I need to poll every few seconds to see if there are any changes from last time, or is there a way my script can wait for inserts to occur? This is a PHP project that I am working on, but feel free to answer in Ruby or language agnostic.
我正在创建一种使用 MongoDB 作为数据存储的后台作业队列系统。在生成工作人员处理作业之前,如何“侦听”对 MongoDB 集合的插入?我是否需要每隔几秒钟轮询一次以查看是否与上次相比有任何更改,或者我的脚本是否可以等待插入发生?这是我正在开发的一个 PHP 项目,但可以随意用 Ruby 或语言不可知论回答。
采纳答案by Andrew
MongoDB has what is called capped collections
and tailable cursors
that allows MongoDB to push data to the listeners.
MongoDB 具有所谓的功能capped collections
,tailable cursors
它允许 MongoDB 将数据推送到侦听器。
A capped collection
is essentially a collection that is a fixed size and only allows insertions. Here's what it would look like to create one:
Acapped collection
本质上是一个固定大小且只允许插入的集合。下面是创建一个的样子:
db.createCollection("messages", { capped: true, size: 100000000 })
MongoDB Tailable cursors (original post by Jonathan H. Wage)
MongoDB Tailable cursors(Jonathan H. Wage 的原帖)
Ruby
红宝石
coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
if doc = cursor.next_document
puts doc
else
sleep 1
end
end
PHP
PHP
$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) {
if ($cursor->hasNext()) {
$doc = $cursor->getNext();
print_r($doc);
} else {
sleep(1);
}
}
Python(by Robert Stewart)
Python(罗伯特·斯图尔特)
from pymongo import Connection
import time
db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
try:
doc = cursor.next()
print doc
except StopIteration:
time.sleep(1)
Perl(by Max)
Perl(由Max 编写)
use 5.010;
use strict;
use warnings;
use MongoDB;
my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)
{
if (defined(my $doc = $cursor->next))
{
say $doc;
}
else
{
sleep 1;
}
}
Additional Resources:
其他资源:
Ruby/Node.js 教程,它将引导您创建一个应用程序,该应用程序侦听 MongoDB 上限集合中的插入。
An article talking about tailable cursors in more detail.
PHP, Ruby, Python, and Perl examples of using tailable cursors.
回答by Gates VP
What you are thinking of sounds a lot like triggers. MongoDB does not have any support for triggers, however some people have "rolled their own" using some tricks. The key here is the oplog.
你在想什么听起来很像触发器。MongoDB 不支持触发器,但是有些人使用一些技巧“推出了自己的”。这里的关键是oplog。
When you run MongoDB in a Replica Set, all of the MongoDB actions are logged to an operations log (known as the oplog). The oplog is basically just a running list of the modifications made to the data. Replicas Sets function by listening to changes on this oplog and then applying the changes locally.
当您在副本集中运行 MongoDB 时,所有 MongoDB 操作都会记录到操作日志(称为 oplog)中。oplog 基本上只是对数据所做修改的运行列表。副本集功能通过侦听此 oplog 上的更改然后在本地应用更改。
Does this sound familiar?
这听起来很熟悉吗?
I cannot detail the whole process here, it is several pages of documentation, but the tools you need are available.
我不能在这里详细说明整个过程,它是几页文档,但您需要的工具可用。
First some write-ups on the oplog
- Brief description- Layout of the local
collection(which contains the oplog)
首先是关于 oplog 的一些文章 -简要说明-集合的布局local
(包含 oplog)
You will also want to leverage tailable cursors. These will provide you with a way to listen for changes instead of polling for them. Note that replication uses tailable cursors, so this is a supported feature.
您还需要利用可尾游标。这些将为您提供一种侦听更改而不是轮询更改的方法。请注意,复制使用可尾游标,因此这是一项受支持的功能。
回答by Mitar
Since MongoDB 3.6 there will be a new notifications API called Change Streams which you can use for this. See this blog post for an example. Example from it:
从 MongoDB 3.6 开始,将有一个名为 Change Streams 的新通知 API,您可以使用它。有关示例,请参阅此博客文章。从它的例子:
cursor = client.my_db.my_collection.changes([
{'$match': {
'operationType': {'$in': ['insert', 'replace']}
}},
{'$match': {
'newDocument.n': {'$gte': 1}
}}
])
# Loops forever.
for change in cursor:
print(change['newDocument'])
回答by Rio Weber
Check out this: Change Streams
看看这个:改变流
January 10, 2018 - Release 3.6
2018 年 1 月 10 日 -版本 3.6
*EDIT: I wrote an article about how to do this https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76
*编辑:我写了一篇关于如何做到这一点的文章https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76
https://docs.mongodb.com/v3.6/changeStreams/
https://docs.mongodb.com/v3.6/changeStreams/
It's new in mongodb 3.6https://docs.mongodb.com/manual/release-notes/3.6/2018/01/10
这是mongodb 3.6 中的新内容https://docs.mongodb.com/manual/release-notes/3.6/2018/01/10
$ mongod --version
db version v3.6.2
In order to use changeStreamsthe database must be a Replication Set
为了使用changeStreams,数据库必须是一个复制集
More about Replication Sets: https://docs.mongodb.com/manual/replication/
有关复制集的更多信息:https: //docs.mongodb.com/manual/replication/
Your Database will be a "Standalone" by default.
默认情况下,您的数据库将是“独立的”。
How to Convert a Standalone to a Replica Set: https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/
如何将独立转换为副本集:https: //docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/
The following exampleis a practical application for how you might use this.
* Specifically for Node.
以下示例是您如何使用它的实际应用程序。
*专为节点。
/* file.js */
'use strict'
module.exports = function (
app,
io,
User // Collection Name
) {
// SET WATCH ON COLLECTION
const changeStream = User.watch();
// Socket Connection
io.on('connection', function (socket) {
console.log('Connection!');
// USERS - Change
changeStream.on('change', function(change) {
console.log('COLLECTION CHANGED');
User.find({}, (err, data) => {
if (err) throw err;
if (data) {
// RESEND ALL USERS
socket.emit('users', data);
}
});
});
});
};
/* END - file.js */
Useful links:
https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example
有用的链接:
https: //docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set
https://docs.mongodb.com/manual/tutorial/change-streams-example
https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams
https://docs.mongodb.com/v3.6/tutorial/change-streams-example
http://plusnconsulting.com/post/MongoDB-Change-Streams
回答by Robert Walters
MongoDB version 3.6 now includes change streams which is essentially an API on top of the OpLog allowing for trigger/notification-like use cases.
MongoDB 3.6 版现在包含变更流,它本质上是 OpLog 之上的 API,允许类似触发器/通知的用例。
Here is a link to a Java example: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/
这是 Java 示例的链接:http: //mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/
A NodeJS example might look something like:
NodeJS 示例可能如下所示:
var MongoClient = require('mongodb').MongoClient;
MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
.then(function(client){
let db = client.db('MyStore')
let change_streams = db.collection('products').watch()
change_streams.on('change', function(change){
console.log(JSON.stringify(change));
});
});
回答by Alex
Alternatively, you could use the standard Mongo FindAndUpdate method, and within the callback, fire an EventEmitter event (in Node) when the callback is run.
或者,您可以使用标准的 Mongo FindAndUpdate 方法,并在回调中,在运行回调时触发 EventEmitter 事件(在 Node 中)。
Any other parts of the application or architecture listening to this event will be notified of the update, and any relevant data sent there also. This is a really simple way to achieve notifications from Mongo.
侦听此事件的应用程序或体系结构的任何其他部分将收到更新通知,以及发送到那里的任何相关数据。这是从 Mongo 获取通知的一种非常简单的方法。
回答by John Culviner
Many of these answers will only give you new records and not updates and/or are extremely ineffecient
许多这些答案只会给你新的记录而不是更新和/或效率极低
The only reliable, performant way to do this is to create a tailable cursor on local db: oplog.rs collection to get ALL changes to MongoDB and do with it what you will. (MongoDB even does this internally more or less to support replication!)
唯一可靠、高效的方法是在本地 db: oplog.rs 集合上创建一个可拖尾的游标,以获取对 MongoDB 的所有更改,并随心所欲地使用它。(MongoDB 甚至在内部或多或少地这样做是为了支持复制!)
Explanation of what the oplog contains: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/
oplog 包含的内容说明:https://www.compose.com/articles/the-mongodb-oplog-and-node-js/
Example of a Node.js library that provides an API around what is available to be done with the oplog: https://github.com/cayasso/mongo-oplog
一个 Node.js 库的例子,它提供了一个 API,围绕着可以用 oplog 完成的事情:https: //github.com/cayasso/mongo-oplog
回答by Manish Jain
There is an awesome set of services available called MongoDB Stitch. Look into stitch functions/triggers. Note this is a cloud-based paidservice (AWS). In your case, on an insert, you could call a custom function written in javascript.
有一组很棒的服务可用,称为MongoDB Stitch。查看缝合功能/触发器。请注意,这是一项基于云的付费服务 (AWS)。在您的情况下,在插入时,您可以调用用 javascript 编写的自定义函数。
回答by Maleen Abewardana
There is an working java example which can be found here.
可以在此处找到一个有效的 java 示例。
MongoClient mongoClient = new MongoClient();
DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");
DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
System.out.println("== open cursor ==");
Runnable task = () -> {
System.out.println("\tWaiting for events");
while (cur.hasNext()) {
DBObject obj = cur.next();
System.out.println( obj );
}
};
new Thread(task).start();
The key is QUERY OPTIONSgiven here.
关键是这里给出的查询选项。
Also you can change find query, if you don't need to load all the data every time.
如果您不需要每次都加载所有数据,您也可以更改查找查询。
BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1)));?//timestamp is within some range
query.put("op", "i"); //Only insert operation
DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);
回答by Duong Nguyen
Actually, instead of watching output, why you dont get notice when something new is inserted by using middle-ware that was provided by mongoose schema
实际上,当使用mongoose 模式提供的中间件插入新内容时,为什么您不会注意到,而不是观看输出
You can catch the event of insert a new document and do something after this insertion done
您可以捕获插入新文档的事件并在插入完成后做一些事情