MongoDB:将来自多个集合的数据合并为一个......如何?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/5681851/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-09 12:04:02  来源:igfitidea点击:

MongoDB: Combine data from multiple collections into one..how?

mongodbmongodb-queryaggregation-framework

提问by user697697

How can I (in MongoDB) combine data from multiple collections into one collection?

我如何(在 MongoDB 中)将来自多个集合的数据合并到一个集合中?

Can I use map-reduce and if so then how?

我可以使用 map-reduce 吗?如果可以,那么如何使用?

I would greatly appreciate some example as I am a novice.

由于我是新手,我将不胜感激一些示例。

回答by rmarscher

Although you can't do this real-time, you can run map-reduce multiple times to merge data together by using the "reduce" out option in MongoDB 1.8+ map/reduce (see http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Outputoptions). You need to have some key in both collections that you can use as an _id.

尽管您无法实时执行此操作,但您可以使用 MongoDB 1.8+ map/reduce 中的“reduce”输出选项多次运行 map-reduce 以将数据合并在一起(请参阅http://www.mongodb.org/ display/DOCS/MapReduce#MapReduce-Outputoptions)。您需要在两个集合中都有一些可以用作 _id 的键。

For example, let's say you have a userscollection and a commentscollection and you want to have a new collection that has some user demographic info for each comment.

例如,假设您有一个users集合和一个comments集合,并且您想要一个新集合,其中包含每个评论的一些用户人口统计信息。

Let's say the userscollection has the following fields:

假设该users集合具有以下字段:

  • _id
  • firstName
  • lastName
  • country
  • gender
  • age
  • _ID
  • 国家
  • 性别
  • 年龄

And then the commentscollection has the following fields:

然后该comments集合具有以下字段:

  • _id
  • userId
  • comment
  • created
  • _ID
  • 用户身份
  • 评论
  • 创建

You would do this map/reduce:

你会做这个映射/减少:

var mapUsers, mapComments, reduce;
db.users_comments.remove();

// setup sample data - wouldn't actually use this in production
db.users.remove();
db.comments.remove();
db.users.save({firstName:"Rich",lastName:"S",gender:"M",country:"CA",age:"18"});
db.users.save({firstName:"Rob",lastName:"M",gender:"M",country:"US",age:"25"});
db.users.save({firstName:"Sarah",lastName:"T",gender:"F",country:"US",age:"13"});
var users = db.users.find();
db.comments.save({userId: users[0]._id, "comment": "Hey, what's up?", created: new ISODate()});
db.comments.save({userId: users[1]._id, "comment": "Not much", created: new ISODate()});
db.comments.save({userId: users[0]._id, "comment": "Cool", created: new ISODate()});
// end sample data setup

mapUsers = function() {
    var values = {
        country: this.country,
        gender: this.gender,
        age: this.age
    };
    emit(this._id, values);
};
mapComments = function() {
    var values = {
        commentId: this._id,
        comment: this.comment,
        created: this.created
    };
    emit(this.userId, values);
};
reduce = function(k, values) {
    var result = {}, commentFields = {
        "commentId": '', 
        "comment": '',
        "created": ''
    };
    values.forEach(function(value) {
        var field;
        if ("comment" in value) {
            if (!("comments" in result)) {
                result.comments = [];
            }
            result.comments.push(value);
        } else if ("comments" in value) {
            if (!("comments" in result)) {
                result.comments = [];
            }
            result.comments.push.apply(result.comments, value.comments);
        }
        for (field in value) {
            if (value.hasOwnProperty(field) && !(field in commentFields)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};
db.users.mapReduce(mapUsers, reduce, {"out": {"reduce": "users_comments"}});
db.comments.mapReduce(mapComments, reduce, {"out": {"reduce": "users_comments"}});
db.users_comments.find().pretty(); // see the resulting collection

At this point, you will have a new collection called users_commentsthat contains the merged data and you can now use that. These reduced collections all have _idwhich is the key you were emitting in your map functions and then all of the values are a sub-object inside the valuekey - the values aren't at the top level of these reduced documents.

此时,您将拥有一个名为的新集合users_comments,其中包含合并的数据,您现在可以使用它。这些缩减集合都具有_idwhich 是您在地图函数中发出的键,然后所有值都是value键内的子对象- 这些值不在这些缩减文档的顶层。

This is a somewhat simple example. You can repeat this with more collections as much as you want to keep building up the reduced collection. You could also do summaries and aggregations of data in the process. Likely you would define more than one reduce function as the logic for aggregating and preserving existing fields gets more complex.

这是一个有点简单的例子。你可以用更多的集合重复这个,只要你想继续建立减少的集合。您还可以在此过程中对数据进行汇总和聚合。您可能会定义多个 reduce 函数,因为聚合和保留现有字段的逻辑变得更加复杂。

You'll also note that there is now one document for each user with all of that user's comments in an array. If we were merging data that has a one-to-one relationship rather than one-to-many, it would be flat and you could simply use a reduce function like this:

您还会注意到,现在每个用户都有一个文档,该文档包含一个数组中的所有该用户的评论。如果我们合并具有一对一关系而不是一对多关系的数据,它将是扁平的,您可以简单地使用这样的 reduce 函数:

reduce = function(k, values) {
    var result = {};
    values.forEach(function(value) {
        var field;
        for (field in value) {
            if (value.hasOwnProperty(field)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};

If you want to flatten the users_commentscollection so it's one document per comment, additionally run this:

如果您想将users_comments集合展平,以便每个评论是一个文档,请另外运行以下命令:

var map, reduce;
map = function() {
    var debug = function(value) {
        var field;
        for (field in value) {
            print(field + ": " + value[field]);
        }
    };
    debug(this);
    var that = this;
    if ("comments" in this.value) {
        this.value.comments.forEach(function(value) {
            emit(value.commentId, {
                userId: that._id,
                country: that.value.country,
                age: that.value.age,
                comment: value.comment,
                created: value.created,
            });
        });
    }
};
reduce = function(k, values) {
    var result = {};
    values.forEach(function(value) {
        var field;
        for (field in value) {
            if (value.hasOwnProperty(field)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};
db.users_comments.mapReduce(map, reduce, {"out": "comments_with_demographics"});

This technique should definitely not be performed on the fly. It's suited for a cron job or something like that which updates the merged data periodically. You'll probably want to run ensureIndexon the new collection to make sure queries you perform against it run quickly (keep in mind that your data is still inside a valuekey, so if you were to index comments_with_demographicson the comment createdtime, it would be db.comments_with_demographics.ensureIndex({"value.created": 1});

这种技术绝对不应该即时执行。它适用于 cron 作业或类似定期更新合并数据的作业。您可能希望ensureIndex在新集合上运行以确保您对它执行的查询快速运行(请记住,您的数据仍在一个value键中,因此如果您要comments_with_demographics在评论created时间建立索引,它将是db.comments_with_demographics.ensureIndex({"value.created": 1});

回答by Bruno Krebs

MongoDB 3.2 now allows one to combine data from multiple collections into one through the $lookup aggregation stage. As a practical example, lets say that you have data about books split into two different collections.

MongoDB 3.2 现在允许通过$lookup 聚合阶段将来自多个集合的数据合并为一个。作为一个实际示例,假设您将有关书籍的数据分为两个不同的集合。

First collection, called books, having the following data:

第一个集合,称为books,具有以下数据:

{
    "isbn": "978-3-16-148410-0",
    "title": "Some cool book",
    "author": "John Doe"
}
{
    "isbn": "978-3-16-148999-9",
    "title": "Another awesome book",
    "author": "Jane Roe"
}

And the second collection, called books_selling_data, having the following data:

第二个集合称为books_selling_data,具有以下数据:

{
    "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
    "isbn": "978-3-16-148410-0",
    "copies_sold": 12500
}
{
    "_id": ObjectId("56e31ce076cdf52e541d9d28"),
    "isbn": "978-3-16-148999-9",
    "copies_sold": 720050
}
{
    "_id": ObjectId("56e31ce076cdf52e541d9d29"),
    "isbn": "978-3-16-148999-9",
    "copies_sold": 1000
}

To merge both collections is just a matter of using $lookup in the following way:

合并两个集合只是通过以下方式使用 $lookup 的问题:

db.books.aggregate([{
    $lookup: {
            from: "books_selling_data",
            localField: "isbn",
            foreignField: "isbn",
            as: "copies_sold"
        }
}])

After this aggregation, the bookscollection will look like the following:

在此聚合之后,books集合将如下所示:

{
    "isbn": "978-3-16-148410-0",
    "title": "Some cool book",
    "author": "John Doe",
    "copies_sold": [
        {
            "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
            "isbn": "978-3-16-148410-0",
            "copies_sold": 12500
        }
    ]
}
{
    "isbn": "978-3-16-148999-9",
    "title": "Another awesome book",
    "author": "Jane Roe",
    "copies_sold": [
        {
            "_id": ObjectId("56e31ce076cdf52e541d9d28"),
            "isbn": "978-3-16-148999-9",
            "copies_sold": 720050
        },
        {
            "_id": ObjectId("56e31ce076cdf52e541d9d28"),
            "isbn": "978-3-16-148999-9",
            "copies_sold": 1000
        }
    ]
}

It is important to note a few things:

重要的是要注意以下几点:

  1. The "from" collection, in this case books_selling_data, cannot be sharded.
  2. The "as" field will be an array, as the example above.
  3. Both "localField" and "foreignField" options on the $lookup stagewill be treated as null for matching purposes if they don't exist in their respective collections (the $lookup docshas a perfect example about that).
  1. 在这种情况下books_selling_data,“来自”集合不能被分片。
  2. “as”字段将是一个数组,如上例所示。
  3. 如果$lookup 阶段的"localField" 和 "foreignField" 选项在各自的集合中不存在,则它们都将被视为空值以进行匹配($lookup 文档有一个完美的例子)。

So, as a conclusion, if you want to consolidate both collections, having, in this case, a flat copies_sold field with the total copies sold, you will have to work a little bit more, probably using an intermediary collection that will, then, be $outto the final collection.

所以,作为一个结论,如果你想合并两个集合,在这种情况下,有一个平面的 copy_sold 字段和销售的总副本,你将不得不多做一点工作,可能使用一个中间集合,然后,是$出来到最终集合。

回答by Hieu Le

If there is no bulk insert into mongodb, we loop all objects in the small_collectionand insert them one by one into the big_collection:

如果没有批量插入到mongodb中,我们将循环中的所有对象small_collection并一一插入big_collection

db.small_collection.find().forEach(function(obj){ 
   db.big_collection.insert(obj)
});

回答by Anish Agarwal

Very basic example with $lookup.

$lookup 的非常基本的例子。

db.getCollection('users').aggregate([
    {
        $lookup: {
            from: "userinfo",
            localField: "userId",
            foreignField: "userId",
            as: "userInfoData"
        }
    },
    {
        $lookup: {
            from: "userrole",
            localField: "userId",
            foreignField: "userId",
            as: "userRoleData"
        }
    },
    { $unwind: { path: "$userInfoData", preserveNullAndEmptyArrays: true }},
    { $unwind: { path: "$userRoleData", preserveNullAndEmptyArrays: true }}
])

Here is used

这里使用

 { $unwind: { path: "$userInfoData", preserveNullAndEmptyArrays: true }}, 
 { $unwind: { path: "$userRoleData", preserveNullAndEmptyArrays: true }}

Instead of

代替

{ $unwind:"$userRoleData"} 
{ $unwind:"$userRoleData"}

Because { $unwind:"$userRoleData"}this will return empty or 0 result if no matching record found with $lookup.

因为{ $unwind:"$userRoleData"}如果 $lookup 没有找到匹配的记录,这将返回空或 0 结果。

回答by sboisse

Doing unions in MongoDB in a 'SQL UNION' fashion is possible using aggregations along with lookups, in a single query. Here is an example I have tested that works with MongoDB 4.0:

可以在单个查询中使用聚合和查找以“SQL UNION”方式在 MongoDB 中进行联合。这是我测试过的适用于 MongoDB 4.0 的示例:

// Create employees data for testing the union.
db.getCollection('employees').insert({ name: "John", type: "employee", department: "sales" });
db.getCollection('employees').insert({ name: "Martha", type: "employee", department: "accounting" });
db.getCollection('employees').insert({ name: "Amy", type: "employee", department: "warehouse" });
db.getCollection('employees').insert({ name: "Mike", type: "employee", department: "warehouse"  });

// Create freelancers data for testing the union.
db.getCollection('freelancers').insert({ name: "Stephany", type: "freelancer", department: "accounting" });
db.getCollection('freelancers').insert({ name: "Martin", type: "freelancer", department: "sales" });
db.getCollection('freelancers').insert({ name: "Doug", type: "freelancer", department: "warehouse"  });
db.getCollection('freelancers').insert({ name: "Brenda", type: "freelancer", department: "sales"  });

// Here we do a union of the employees and freelancers using a single aggregation query.
db.getCollection('freelancers').aggregate( // 1. Use any collection containing at least one document.
  [
    { $limit: 1 }, // 2. Keep only one document of the collection.
    { $project: { _id: '$$REMOVE' } }, // 3. Remove everything from the document.

    // 4. Lookup collections to union together.
    { $lookup: { from: 'employees', pipeline: [{ $match: { department: 'sales' } }], as: 'employees' } },
    { $lookup: { from: 'freelancers', pipeline: [{ $match: { department: 'sales' } }], as: 'freelancers' } },

    // 5. Union the collections together with a projection.
    { $project: { union: { $concatArrays: ["$employees", "$freelancers"] } } },

    // 6. Unwind and replace root so you end up with a result set.
    { $unwind: '$union' },
    { $replaceRoot: { newRoot: '$union' } }
  ]);

Here is the explanation of how it works:

以下是其工作原理的解释:

  1. Instantiate an aggregateout of anycollection of your database that has at least one document in it. If you can't guarantee any collection of your database will not be empty, you can workaround this issue by creating in your database some sort of 'dummy' collection containing a single empty document in it that will be there specifically for doing union queries.

  2. Make the first stage of your pipeline to be { $limit: 1 }. This will strip all the documents of the collection except the first one.

  3. Strip all the fields of the remaining document by using a $projectstage:

    { $project: { _id: '$$REMOVE' } }
    
  4. Your aggregate now contains a single, empty document. It's time to add lookups for each collection you want to union together. You may use the pipelinefield to do some specific filtering, or leave localFieldand foreignFieldas null to match the whole collection.

    { $lookup: { from: 'collectionToUnion1', pipeline: [...], as: 'Collection1' } },
    { $lookup: { from: 'collectionToUnion2', pipeline: [...], as: 'Collection2' } },
    { $lookup: { from: 'collectionToUnion3', pipeline: [...], as: 'Collection3' } }
    
  5. You now have an aggregate containing a single document that contains 3 arrays like this:

    {
        Collection1: [...],
        Collection2: [...],
        Collection3: [...]
    }
    

    You can then merge them together into a single array using a $projectstage along with the $concatArraysaggregation operator:

    {
      "$project" :
      {
        "Union" : { $concatArrays: ["$Collection1", "$Collection2", "$Collection3"] }
      }
    }
    
  6. You now have an aggregate containing a single document, into which is located an array that contains your union of collections. What remains to be done is to add an $unwindand a $replaceRootstage to split your array into separate documents:

    { $unwind: "$Union" },
    { $replaceRoot: { newRoot: "$Union" } }
    
  7. Voilà. You now have a result set containing the collections you wanted to union together. You can then add more stages to filter it further, sort it, apply skip() and limit(). Pretty much anything you want.

  1. aggregate从您的数据库中至少包含一个文档的任何集合中实例化一个。如果您不能保证您的数据库的任何集合不会为空,您可以通过在您的数据库中创建某种“虚拟”集合来解决此问题,该集合中包含一个专门用于执行联合查询的空文档。

  2. 使您的管道的第一阶段成为{ $limit: 1 }. 这将删除集合中除第一个之外的所有文档。

  3. 使用$project阶段去除剩余文档的所有字段:

    { $project: { _id: '$$REMOVE' } }
    
  4. 您的聚合现在包含一个单独的空文档。是时候为要合并的每个集合添加查找了。您可以使用该pipeline字段来进行一些特定的过滤,或者将localField和保留foreignField为空以匹配整个集合。

    { $lookup: { from: 'collectionToUnion1', pipeline: [...], as: 'Collection1' } },
    { $lookup: { from: 'collectionToUnion2', pipeline: [...], as: 'Collection2' } },
    { $lookup: { from: 'collectionToUnion3', pipeline: [...], as: 'Collection3' } }
    
  5. 您现在有一个包含单个文档的聚合,该文档包含 3 个数组,如下所示:

    {
        Collection1: [...],
        Collection2: [...],
        Collection3: [...]
    }
    

    然后,您可以使用$project阶段和$concatArrays聚合运算符将它们合并到一个数组中:

    {
      "$project" :
      {
        "Union" : { $concatArrays: ["$Collection1", "$Collection2", "$Collection3"] }
      }
    }
    
  6. 您现在有一个包含单个文档的聚合,其中包含一个包含集合并集的数组。剩下要做的是添加一个$unwind和一个$replaceRoot阶段来将您的数组拆分为单独的文档:

    { $unwind: "$Union" },
    { $replaceRoot: { newRoot: "$Union" } }
    
  7. 瞧。您现在有一个结果集,其中包含您想要联合在一起的集合。然后,您可以添加更多阶段以进一步过滤、排序、应用 skip() 和 limit()。几乎任何你想要的。

回答by KARTHIKEYAN.A

use multiple $lookupfor multiple collections in aggregation

对聚合中的多个集合使用多个$lookup

query:

询问:

db.getCollection('servicelocations').aggregate([
  {
    $match: {
      serviceLocationId: {
        $in: ["36728"]
      }
    }
  },
  {
    $lookup: {
      from: "orders",
      localField: "serviceLocationId",
      foreignField: "serviceLocationId",
      as: "orders"
    }
  },
  {
    $lookup: {
      from: "timewindowtypes",
      localField: "timeWindow.timeWindowTypeId",
      foreignField: "timeWindowTypeId",
      as: "timeWindow"
    }
  },
  {
    $lookup: {
      from: "servicetimetypes",
      localField: "serviceTimeTypeId",
      foreignField: "serviceTimeTypeId",
      as: "serviceTime"
    }
  },
  {
    $unwind: "$orders"
  },
  {
    $unwind: "$serviceTime"
  },
  {
    $limit: 14
  }
])

result:

结果:

{
    "_id" : ObjectId("59c3ac4bb7799c90ebb3279b"),
    "serviceLocationId" : "36728",
    "regionId" : 1.0,
    "zoneId" : "DXBZONE1",
    "description" : "AL HALLAB REST EMIRATES MALL",
    "locationPriority" : 1.0,
    "accountTypeId" : 1.0,
    "locationType" : "SERVICELOCATION",
    "location" : {
        "makani" : "",
        "lat" : 25.119035,
        "lng" : 55.198694
    },
    "deliveryDays" : "MTWRFSU",
    "timeWindow" : [ 
        {
            "_id" : ObjectId("59c3b0a3b7799c90ebb32cde"),
            "timeWindowTypeId" : "1",
            "Description" : "MORNING",
            "timeWindow" : {
                "openTime" : "06:00",
                "closeTime" : "08:00"
            },
            "accountId" : 1.0
        }, 
        {
            "_id" : ObjectId("59c3b0a3b7799c90ebb32cdf"),
            "timeWindowTypeId" : "1",
            "Description" : "MORNING",
            "timeWindow" : {
                "openTime" : "09:00",
                "closeTime" : "10:00"
            },
            "accountId" : 1.0
        }, 
        {
            "_id" : ObjectId("59c3b0a3b7799c90ebb32ce0"),
            "timeWindowTypeId" : "1",
            "Description" : "MORNING",
            "timeWindow" : {
                "openTime" : "10:30",
                "closeTime" : "11:30"
            },
            "accountId" : 1.0
        }
    ],
    "address1" : "",
    "address2" : "",
    "phone" : "",
    "city" : "",
    "county" : "",
    "state" : "",
    "country" : "",
    "zipcode" : "",
    "imageUrl" : "",
    "contact" : {
        "name" : "",
        "email" : ""
    },
    "status" : "ACTIVE",
    "createdBy" : "",
    "updatedBy" : "",
    "updateDate" : "",
    "accountId" : 1.0,
    "serviceTimeTypeId" : "1",
    "orders" : [ 
        {
            "_id" : ObjectId("59c3b291f251c77f15790f92"),
            "orderId" : "AQ18O1704264",
            "serviceLocationId" : "36728",
            "orderNo" : "AQ18O1704264",
            "orderDate" : "18-Sep-17",
            "description" : "AQ18O1704264",
            "serviceType" : "Delivery",
            "orderSource" : "Import",
            "takenBy" : "KARIM",
            "plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
            "plannedDeliveryTime" : "",
            "actualDeliveryDate" : "",
            "actualDeliveryTime" : "",
            "deliveredBy" : "",
            "size1" : 296.0,
            "size2" : 3573.355,
            "size3" : 240.811,
            "jobPriority" : 1.0,
            "cancelReason" : "",
            "cancelDate" : "",
            "cancelBy" : "",
            "reasonCode" : "",
            "reasonText" : "",
            "status" : "",
            "lineItems" : [ 
                {
                    "ItemId" : "BNWB020",
                    "size1" : 15.0,
                    "size2" : 78.6,
                    "size3" : 6.0
                }, 
                {
                    "ItemId" : "BNWB021",
                    "size1" : 20.0,
                    "size2" : 252.0,
                    "size3" : 11.538
                }, 
                {
                    "ItemId" : "BNWB023",
                    "size1" : 15.0,
                    "size2" : 285.0,
                    "size3" : 16.071
                }, 
                {
                    "ItemId" : "CPMW112",
                    "size1" : 3.0,
                    "size2" : 25.38,
                    "size3" : 1.731
                }, 
                {
                    "ItemId" : "MMGW001",
                    "size1" : 25.0,
                    "size2" : 464.375,
                    "size3" : 46.875
                }, 
                {
                    "ItemId" : "MMNB218",
                    "size1" : 50.0,
                    "size2" : 920.0,
                    "size3" : 60.0
                }, 
                {
                    "ItemId" : "MMNB219",
                    "size1" : 50.0,
                    "size2" : 630.0,
                    "size3" : 40.0
                }, 
                {
                    "ItemId" : "MMNB220",
                    "size1" : 50.0,
                    "size2" : 416.0,
                    "size3" : 28.846
                }, 
                {
                    "ItemId" : "MMNB270",
                    "size1" : 50.0,
                    "size2" : 262.0,
                    "size3" : 20.0
                }, 
                {
                    "ItemId" : "MMNB302",
                    "size1" : 15.0,
                    "size2" : 195.0,
                    "size3" : 6.0
                }, 
                {
                    "ItemId" : "MMNB373",
                    "size1" : 3.0,
                    "size2" : 45.0,
                    "size3" : 3.75
                }
            ],
            "accountId" : 1.0
        }, 
        {
            "_id" : ObjectId("59c3b291f251c77f15790f9d"),
            "orderId" : "AQ137O1701240",
            "serviceLocationId" : "36728",
            "orderNo" : "AQ137O1701240",
            "orderDate" : "18-Sep-17",
            "description" : "AQ137O1701240",
            "serviceType" : "Delivery",
            "orderSource" : "Import",
            "takenBy" : "KARIM",
            "plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
            "plannedDeliveryTime" : "",
            "actualDeliveryDate" : "",
            "actualDeliveryTime" : "",
            "deliveredBy" : "",
            "size1" : 28.0,
            "size2" : 520.11,
            "size3" : 52.5,
            "jobPriority" : 1.0,
            "cancelReason" : "",
            "cancelDate" : "",
            "cancelBy" : "",
            "reasonCode" : "",
            "reasonText" : "",
            "status" : "",
            "lineItems" : [ 
                {
                    "ItemId" : "MMGW001",
                    "size1" : 25.0,
                    "size2" : 464.38,
                    "size3" : 46.875
                }, 
                {
                    "ItemId" : "MMGW001-F1",
                    "size1" : 3.0,
                    "size2" : 55.73,
                    "size3" : 5.625
                }
            ],
            "accountId" : 1.0
        }, 
        {
            "_id" : ObjectId("59c3b291f251c77f15790fd8"),
            "orderId" : "AQ110O1705036",
            "serviceLocationId" : "36728",
            "orderNo" : "AQ110O1705036",
            "orderDate" : "18-Sep-17",
            "description" : "AQ110O1705036",
            "serviceType" : "Delivery",
            "orderSource" : "Import",
            "takenBy" : "KARIM",
            "plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
            "plannedDeliveryTime" : "",
            "actualDeliveryDate" : "",
            "actualDeliveryTime" : "",
            "deliveredBy" : "",
            "size1" : 60.0,
            "size2" : 1046.0,
            "size3" : 68.0,
            "jobPriority" : 1.0,
            "cancelReason" : "",
            "cancelDate" : "",
            "cancelBy" : "",
            "reasonCode" : "",
            "reasonText" : "",
            "status" : "",
            "lineItems" : [ 
                {
                    "ItemId" : "MMNB218",
                    "size1" : 50.0,
                    "size2" : 920.0,
                    "size3" : 60.0
                }, 
                {
                    "ItemId" : "MMNB219",
                    "size1" : 10.0,
                    "size2" : 126.0,
                    "size3" : 8.0
                }
            ],
            "accountId" : 1.0
        }
    ],
    "serviceTime" : {
        "_id" : ObjectId("59c3b07cb7799c90ebb32cdc"),
        "serviceTimeTypeId" : "1",
        "serviceTimeType" : "nohelper",
        "description" : "",
        "fixedTime" : 30.0,
        "variableTime" : 0.0,
        "accountId" : 1.0
    }
}

回答by shauli

Mongorestore has this feature of appending on top of whatever is already in the database, so this behavior could be used for combining two collections:

Mongorestore 具有附加在数据库中已有内容之上的功能,因此此行为可用于组合两个集合:

  1. mongodump collection1
  2. collection2.rename(collection1)
  3. mongorestore
  1. mongodump 集合1
  2. collection2.rename(collection1)
  3. mongorestore

Didn't try it yet, but it might perform faster than the map/reduce approach.

还没有尝试过,但它可能比 map/reduce 方法执行得更快。

回答by Xavier Guihot

Starting Mongo 4.4, we can achieve this join within an aggregation pipeline by coupling the new $unionWithaggregation stage with $group's new $accumulatoroperator:

开始Mongo 4.4,我们可以通过将新的$unionWith聚合阶段与$group的新$accumulator运算符耦合来在聚合管道中实现这种连接:

// > db.users.find()
//   [{ user: 1, name: "x" }, { user: 2, name: "y" }]
// > db.books.find()
//   [{ user: 1, book: "a" }, { user: 1, book: "b" }, { user: 2, book: "c" }]
// > db.movies.find()
//   [{ user: 1, movie: "g" }, { user: 2, movie: "h" }, { user: 2, movie: "i" }]
db.users.aggregate([
  { $unionWith: "books"  },
  { $unionWith: "movies" },
  { $group: {
    _id: "$user",
    user: {
      $accumulator: {
        accumulateArgs: ["$name", "$book", "$movie"],
        init: function() { return { books: [], movies: [] } },
        accumulate: function(user, name, book, movie) {
          if (name) user.name = name;
          if (book) user.books.push(book);
          if (movie) user.movies.push(movie);
          return user;
        },
        merge: function(userV1, userV2) {
          if (userV2.name) userV1.name = userV2.name;
          userV1.books.concat(userV2.books);
          userV1.movies.concat(userV2.movies);
          return userV1;
        },
        lang: "js"
      }
    }
  }}
])
// { _id: 1, user: { books: ["a", "b"], movies: ["g"], name: "x" } }
// { _id: 2, user: { books: ["c"], movies: ["h", "i"], name: "y" } }
  • $unionWithcombines records from the given collection within documents already in the aggregation pipeline. After the 2 union stages, we thus have all users, books and movies records within the pipeline.

  • We then $grouprecords by $userand accumulate items using the $accumulatoroperator allowing custom accumulations of documents as they get grouped:

    • the fields we're interested in accumulating are defined with accumulateArgs.
    • initdefines the state that will be accumulated as we group elements.
    • the accumulatefunction allows performing a custom action with a record being grouped in order to build the accumulated state. For instance, if the item being grouped has the bookfield defined, then we update the bookspart of the state.
    • mergeis used to merge two internal states. It's only used for aggregations running on sharded clusters or when the operation exceeds memory limits.
  • $unionWith在聚合管道中已有的文档中组合来自给定集合的记录。在 2 个联合阶段之后,我们就拥有了管道中的所有用户、书籍和电影记录。

  • 然后我们使用操作符$group记录$user和累积项目,$accumulator允许自定义累积文档,因为它们被分组:

    • 我们有兴趣累积的字段是用 定义的accumulateArgs
    • init定义在我们对元素进行分组时将累积的状态。
    • accumulate函数允许使用分组记录执行自定义操作,以构建累积状态。例如,如果被分组的项目book定义了字段,那么我们更新books状态的一部分。
    • merge用于合并两个内部状态。它仅用于在分片集群上运行的聚合或当操作超过内存限制时。

回答by Shangab

Yes you can: Take this utility function that I have written today:

是的,你可以:拿我今天写的这个实用函数:

function shangMergeCol() {
  tcol= db.getCollection(arguments[0]);
  for (var i=1; i<arguments.length; i++){
    scol= db.getCollection(arguments[i]);
    scol.find().forEach(
        function (d) {
            tcol.insert(d);
        }
    )
  }
}

You can pass to this function any number of collections, the first one is going to be the target one. All the rest collections are sources to be transferred to the target one.

您可以向此函数传递任意数量的集合,第一个将成为目标集合。所有其余的集合都是要转移到目标集合的源。

回答by Vipul Mehta

Code snippet. Courtesy-Multiple posts on stack overflow including this one.

代码片段。礼貌-堆栈溢出的多个帖子,包括这个帖子。

 db.cust.drop();
 db.zip.drop();
 db.cust.insert({cust_id:1, zip_id: 101});
 db.cust.insert({cust_id:2, zip_id: 101});
 db.cust.insert({cust_id:3, zip_id: 101});
 db.cust.insert({cust_id:4, zip_id: 102});
 db.cust.insert({cust_id:5, zip_id: 102});

 db.zip.insert({zip_id:101, zip_cd:'AAA'});
 db.zip.insert({zip_id:102, zip_cd:'BBB'});
 db.zip.insert({zip_id:103, zip_cd:'CCC'});

mapCust = function() {
    var values = {
        cust_id: this.cust_id
    };
    emit(this.zip_id, values);
};

mapZip = function() {
    var values = {
    zip_cd: this.zip_cd
    };
    emit(this.zip_id, values);
};

reduceCustZip =  function(k, values) {
    var result = {};
    values.forEach(function(value) {
    var field;
        if ("cust_id" in value) {
            if (!("cust_ids" in result)) {
                result.cust_ids = [];
            }
            result.cust_ids.push(value);
        } else {
    for (field in value) {
        if (value.hasOwnProperty(field) ) {
                result[field] = value[field];
        }
         };  
       }
      });
       return result;
};


db.cust_zip.drop();
db.cust.mapReduce(mapCust, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.zip.mapReduce(mapZip, reduceCustZip, {"out": {"reduce": "cust_zip"}});
db.cust_zip.find();


mapCZ = function() {
    var that = this;
    if ("cust_ids" in this.value) {
        this.value.cust_ids.forEach(function(value) {
            emit(value.cust_id, {
                zip_id: that._id,
                zip_cd: that.value.zip_cd
            });
        });
    }
};

reduceCZ = function(k, values) {
    var result = {};
    values.forEach(function(value) {
        var field;
        for (field in value) {
            if (value.hasOwnProperty(field)) {
                result[field] = value[field];
            }
        }
    });
    return result;
};
db.cust_zip_joined.drop();
db.cust_zip.mapReduce(mapCZ, reduceCZ, {"out": "cust_zip_joined"}); 
db.cust_zip_joined.find().pretty();


var flattenMRCollection=function(dbName,collectionName) {
    var collection=db.getSiblingDB(dbName)[collectionName];

    var i=0;
    var bulk=collection.initializeUnorderedBulkOp();
    collection.find({ value: { $exists: true } }).addOption(16).forEach(function(result) {
        print((++i));
        //collection.update({_id: result._id},result.value);

        bulk.find({_id: result._id}).replaceOne(result.value);

        if(i%1000==0)
        {
            print("Executing bulk...");
            bulk.execute();
            bulk=collection.initializeUnorderedBulkOp();
        }
    });
    bulk.execute();
};


flattenMRCollection("mydb","cust_zip_joined");
db.cust_zip_joined.find().pretty();