Skip to content

Commit

Permalink
Merge pull request #14837 from Automattic/vkarpov15/gh-14763
Browse files Browse the repository at this point in the history
fix(model): throw error if `bulkSave()` did not insert or update any documents
  • Loading branch information
vkarpov15 authored Sep 3, 2024
2 parents 94e1237 + 5b40f1b commit c37e345
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 14 deletions.
49 changes: 35 additions & 14 deletions lib/model.js
Original file line number Diff line number Diff line change
Expand Up @@ -3368,19 +3368,27 @@ Model.bulkWrite = async function bulkWrite(ops, options) {
};

/**
* takes an array of documents, gets the changes and inserts/updates documents in the database
* according to whether or not the document is new, or whether it has changes or not.
* Takes an array of documents, gets the changes and inserts/updates documents in the database
* according to whether or not the document is new, or whether it has changes or not.
*
* `bulkSave` uses `bulkWrite` under the hood, so it's mostly useful when dealing with many documents (10K+)
*
* `bulkSave()` throws errors under the following conditions:
*
* - one of the provided documents fails validation. In this case, `bulkSave()` does not send a `bulkWrite()`, and throws the first validation error.
* - `bulkWrite()` fails (for example, due to being unable to connect to MongoDB or due to duplicate key error)
* - `bulkWrite()` did not insert or update **any** documents. In this case, `bulkSave()` will throw a DocumentNotFound error.
*
* Note that `bulkSave()` will **not** throw an error if only some of the `save()` calls succeeded.
*
* @param {Array<Document>} documents
* @param {Object} [options] options passed to the underlying `bulkWrite()`
* @param {Boolean} [options.timestamps] defaults to `null`, when set to false, mongoose will not add/update timestamps to the documents.
* @param {ClientSession} [options.session=null] The session associated with this bulk write. See [transactions docs](https://mongoosejs.com/docs/transactions.html).
* @param {String|number} [options.w=1] The [write concern](https://www.mongodb.com/docs/manual/reference/write-concern/). See [`Query#w()`](https://mongoosejs.com/docs/api/query.html#Query.prototype.w()) for more information.
* @param {number} [options.wtimeout=null] The [write concern timeout](https://www.mongodb.com/docs/manual/reference/write-concern/#wtimeout).
* @param {Boolean} [options.j=true] If false, disable [journal acknowledgement](https://www.mongodb.com/docs/manual/reference/write-concern/#j-option)
*
* @return {BulkWriteResult} the return value from `bulkWrite()`
*/
Model.bulkSave = async function bulkSave(documents, options) {
options = options || {};
Expand Down Expand Up @@ -3408,18 +3416,31 @@ Model.bulkSave = async function bulkSave(documents, options) {
(err) => ({ bulkWriteResult: null, bulkWriteError: err })
);

await Promise.all(
documents.map(async(document) => {
const documentError = bulkWriteError && bulkWriteError.writeErrors.find(writeError => {
const writeErrorDocumentId = writeError.err.op._id || writeError.err.op.q._id;
return writeErrorDocumentId.toString() === document._doc._id.toString();
});
const matchedCount = bulkWriteResult?.matchedCount ?? 0;
const insertedCount = bulkWriteResult?.insertedCount ?? 0;
if (writeOperations.length > 0 && matchedCount + insertedCount === 0 && !bulkWriteError) {
throw new DocumentNotFoundError(
writeOperations.filter(op => op.updateOne).map(op => op.updateOne.filter),
this.modelName,
writeOperations.length,
bulkWriteResult
);
}

if (documentError == null) {
await handleSuccessfulWrite(document);
}
})
);
const successfulDocuments = [];
for (let i = 0; i < documents.length; i++) {
const document = documents[i];
const documentError = bulkWriteError && bulkWriteError.writeErrors.find(writeError => {
const writeErrorDocumentId = writeError.err.op._id || writeError.err.op.q._id;
return writeErrorDocumentId.toString() === document._doc._id.toString();
});

if (documentError == null) {
successfulDocuments.push(document);
}
}

await Promise.all(successfulDocuments.map(document => handleSuccessfulWrite(document)));

if (bulkWriteError && bulkWriteError.writeErrors && bulkWriteError.writeErrors.length) {
throw bulkWriteError;
Expand Down
42 changes: 42 additions & 0 deletions test/model.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6973,6 +6973,48 @@ describe('Model', function() {
assert.ok(err == null);

});
it('should error if no documents were inserted or updated (gh-14763)', async function() {
const fooSchema = new mongoose.Schema({
bar: { type: Number }
}, { optimisticConcurrency: true });
const TestModel = db.model('Test', fooSchema);

const foo = await TestModel.create({
bar: 0
});

// update 1
foo.bar = 1;
await foo.save();

// parallel update
const fooCopy = await TestModel.findById(foo._id);
fooCopy.bar = 99;
await fooCopy.save();

foo.bar = 2;
const err = await TestModel.bulkSave([foo]).then(() => null, err => err);
assert.equal(err.name, 'DocumentNotFoundError');
assert.equal(err.numAffected, 1);
assert.ok(Array.isArray(err.filter));
});
it('should error if there is a validation error', async function() {
const fooSchema = new mongoose.Schema({
bar: { type: Number }
}, { optimisticConcurrency: true });
const TestModel = db.model('Test', fooSchema);

const docs = [
new TestModel({ bar: 42 }),
new TestModel({ bar: 'taco' })
];
const err = await TestModel.bulkSave(docs).then(() => null, err => err);
assert.equal(err.name, 'ValidationError');

// bulkSave() does not save any documents if any documents fail validation
const fromDb = await TestModel.find();
assert.equal(fromDb.length, 0);
});
it('Using bulkSave should not trigger an error (gh-11071)', async function() {

const pairSchema = mongoose.Schema({
Expand Down

0 comments on commit c37e345

Please sign in to comment.