How do I append Mongo DB aggregation results to an existing collection?

Starting Mongo 4.2, the new $merge aggregation operator (similar to $out) allows merging the result of an aggregation pipeline into the specified collection:

Given this input:

db.source.insert([
  { "_id": "id_1", "a": 34 },
  { "_id": "id_3", "a": 38 },
  { "_id": "id_4", "a": 54 }
])
db.target.insert([
  { "_id": "id_1", "a": 12 },
  { "_id": "id_2", "a": 54 }
])

the $merge aggregation stage can be used as such:

db.source.aggregate([
  // { $whatever aggregation stage, for this example, we just keep records as is }
  { $merge: { into: "target" } }
])

to produce:

// > db.target.find()
{ "_id" : "id_1", "a" : 34 }
{ "_id" : "id_2", "a" : 54 }
{ "_id" : "id_3", "a" : 38 }
{ "_id" : "id_4", "a" : 54 }

Note that the $merge operator comes with many options to specify how to merge inserted records conflicting with existing records.

In this case (with the default options), this:

  • keeps the target collection's existing documents (this is the case of { "_id": "id_2", "a": 54 })

  • inserts documents from the output of the aggregation pipeline into the target collection when they are not already present (based on the _id - this is the case of { "_id" : "id_3", "a" : 38 })

  • replaces the target collection's records when the aggregation pipeline produces documents existing in the target collection (based on the _id - this is the case of { "_id": "id_1", "a": 12 } replaced by { "_id" : "id_1", "a" : 34 })


It's not the prettiest thing ever, but as another alternative syntax (from a post-processing archive/append operation)...

db.targetCollection.insertMany(db.runCommand(
{
    aggregate: "sourceCollection",
    pipeline: 
    [
        { $skip: 0 },
        { $limit: 5 },
        { 
            $project:
            {
                myObject: "$$ROOT",
                processedDate: { $add: [new ISODate(), 0] }
            }
        }
    ]
}).result)

I'm not sure how this stacks up against the forEach variant, but i find it more intuitive to read.


The short answer is "you can't":

If the collection specified by the $out operation already exists, then upon completion of the aggregation, the $out stage atomically replaces the existing collection with the new results collection. The $out operation does not change any indexes that existed on the previous collection. If the aggregation fails, the $out operation makes no changes to the pre-existing collection.

As a workaround, you can copy the collection document specified by $out to a "permanent" collection just after aggregation, in one of a several ways (non of which is ideal though):

  • copyTo() is the easiest, mind the Warning. Don't use other for small results.
  • Use JS: db.out.find().forEach(function(doc) {db.target.insert(doc)})
  • Use mongoexport / mongoimport