From b24696d51ccf6dc3e1ba786165bec9e1c994244c Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Sat, 17 Aug 2024 14:58:12 -0400 Subject: [PATCH 01/10] feat: upgrade mongodb -> 6.8.0, handle throwing error on closed cursor in Mongoose --- lib/cursor/queryCursor.js | 5 +++++ package.json | 2 +- test/query.cursor.test.js | 3 ++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/lib/cursor/queryCursor.js b/lib/cursor/queryCursor.js index 2c908de50d0..49359e40dfe 100644 --- a/lib/cursor/queryCursor.js +++ b/lib/cursor/queryCursor.js @@ -43,6 +43,7 @@ function QueryCursor(query) { this.cursor = null; this.skipped = false; this.query = query; + this._closeCalled = false; const model = query.model; this._mongooseOptions = {}; this._transforms = []; @@ -229,6 +230,7 @@ QueryCursor.prototype.close = async function close() { } try { await this.cursor.close(); + this._closeCalled = true; this.emit('close'); } catch (error) { this.listeners('error').length > 0 && this.emit('error', error); @@ -266,6 +268,9 @@ QueryCursor.prototype.next = async function next() { if (typeof arguments[0] === 'function') { throw new MongooseError('QueryCursor.prototype.next() no longer accepts a callback'); } + if (this._closeCalled) { + throw new MongooseError('Cannot call `next()` on a closed cursor'); + } return new Promise((resolve, reject) => { _next(this, function(error, doc) { if (error) { diff --git a/package.json b/package.json index 013c7325c21..9c9097adc18 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,7 @@ "dependencies": { "bson": "^6.7.0", "kareem": "2.6.3", - "mongodb": "6.7.0", + "mongodb": "6.8.0", "mpath": "0.9.0", "mquery": "5.0.0", "ms": "2.1.3", diff --git a/test/query.cursor.test.js b/test/query.cursor.test.js index a5f7afc2027..d80264c5f2d 100644 --- a/test/query.cursor.test.js +++ b/test/query.cursor.test.js @@ -415,7 +415,8 @@ describe('QueryCursor', function() { await cursor.next(); assert.ok(false); } catch (error) { - assert.equal(error.name, 'MongoCursorExhaustedError'); + assert.equal(error.name, 'MongooseError'); + assert.ok(error.message.includes('closed cursor'), error.message); } }); }); From 3bc8123bfab36d5ee528ce491362255d6c3b96a0 Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Sat, 17 Aug 2024 15:03:43 -0400 Subject: [PATCH 02/10] rename closeCalled -> closed for more consistent semantics --- lib/cursor/queryCursor.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/cursor/queryCursor.js b/lib/cursor/queryCursor.js index 49359e40dfe..6f00a316794 100644 --- a/lib/cursor/queryCursor.js +++ b/lib/cursor/queryCursor.js @@ -43,7 +43,7 @@ function QueryCursor(query) { this.cursor = null; this.skipped = false; this.query = query; - this._closeCalled = false; + this._closed = false; const model = query.model; this._mongooseOptions = {}; this._transforms = []; @@ -230,7 +230,7 @@ QueryCursor.prototype.close = async function close() { } try { await this.cursor.close(); - this._closeCalled = true; + this._closed = true; this.emit('close'); } catch (error) { this.listeners('error').length > 0 && this.emit('error', error); @@ -268,7 +268,7 @@ QueryCursor.prototype.next = async function next() { if (typeof arguments[0] === 'function') { throw new MongooseError('QueryCursor.prototype.next() no longer accepts a callback'); } - if (this._closeCalled) { + if (this._closed) { throw new MongooseError('Cannot call `next()` on a closed cursor'); } return new Promise((resolve, reject) => { From 91ebc9b8acb4d490e5147f6e0e38655ca9878f1e Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Tue, 20 Aug 2024 16:18:35 -0400 Subject: [PATCH 03/10] fix(ChangeStream): rely on promises over events, make tests more durable --- lib/cursor/changeStream.js | 52 +++++++++++++++++++++++--------------- test/connection.test.js | 2 ++ test/model.test.js | 37 +++++++++++++++++++++------ test/model.watch.test.js | 5 ++-- 4 files changed, 65 insertions(+), 31 deletions(-) diff --git a/lib/cursor/changeStream.js b/lib/cursor/changeStream.js index ec5cac5705f..55cdecfcdc2 100644 --- a/lib/cursor/changeStream.js +++ b/lib/cursor/changeStream.js @@ -33,15 +33,18 @@ class ChangeStream extends EventEmitter { ); } - // This wrapper is necessary because of buffering. - changeStreamThunk((err, driverChangeStream) => { - if (err != null) { - this.emit('error', err); - return; - } + this.$driverChangeStreamPromise = new Promise((resolve, reject) => { + // This wrapper is necessary because of buffering. + changeStreamThunk((err, driverChangeStream) => { + if (err != null) { + this.emit('error', err); + return reject(err); + } - this.driverChangeStream = driverChangeStream; - this.emit('ready'); + this.driverChangeStream = driverChangeStream; + this.emit('ready'); + resolve(); + }); }); } @@ -53,20 +56,23 @@ class ChangeStream extends EventEmitter { this.bindedEvents = true; if (this.driverChangeStream == null) { - this.once('ready', () => { - this.driverChangeStream.on('close', () => { - this.closed = true; - }); + this.$driverChangeStreamPromise.then( + () => { + this.driverChangeStream.on('close', () => { + this.closed = true; + }); - driverChangeStreamEvents.forEach(ev => { - this.driverChangeStream.on(ev, data => { - if (data != null && data.fullDocument != null && this.options && this.options.hydrate) { - data.fullDocument = this.options.model.hydrate(data.fullDocument); - } - this.emit(ev, data); + driverChangeStreamEvents.forEach(ev => { + this.driverChangeStream.on(ev, data => { + if (data != null && data.fullDocument != null && this.options && this.options.hydrate) { + data.fullDocument = this.options.model.hydrate(data.fullDocument); + } + this.emit(ev, data); + }); }); - }); - }); + }, + () => {} // No need to register events if opening change stream failed + ); return; } @@ -142,8 +148,12 @@ class ChangeStream extends EventEmitter { this.closed = true; if (this.driverChangeStream) { return this.driverChangeStream.close(); + } else { + return this.$driverChangeStreamPromise.then( + () => this.driverChangeStream.close(), + () => {} // No need to close if opening the change stream failed + ); } - return Promise.resolve(); } } diff --git a/test/connection.test.js b/test/connection.test.js index d395be5511b..03f87b40f3d 100644 --- a/test/connection.test.js +++ b/test/connection.test.js @@ -1032,6 +1032,8 @@ describe('connections:', function() { await nextChange; assert.equal(changes.length, 1); assert.equal(changes[0].operationType, 'insert'); + + await changeStream.close(); await conn.close(); }); diff --git a/test/model.test.js b/test/model.test.js index b73757e4721..52d04a92b9a 100644 --- a/test/model.test.js +++ b/test/model.test.js @@ -7,6 +7,7 @@ const sinon = require('sinon'); const start = require('./common'); const assert = require('assert'); +const { once } = require('events'); const random = require('./util').random; const util = require('./util'); @@ -3560,14 +3561,21 @@ describe('Model', function() { it('fullDocument (gh-11936)', async function() { const MyModel = db.model('Test', new Schema({ name: String })); + const doc = await MyModel.create({ name: 'Ned Stark' }); const changeStream = await MyModel.watch([], { fullDocument: 'updateLookup', hydrate: true }); + await changeStream.$driverChangeStreamPromise; - const doc = await MyModel.create({ name: 'Ned Stark' }); - - const p = changeStream.next(); + const p = new Promise((resolve) => { + changeStream.once('change', change => { + resolve(change); + }); + }); + // Need to wait for resume token to be set after the event listener, + // otherwise change stream might not pick up the update. + await once(changeStream.driverChangeStream, 'resumeTokenChanged'); await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' }); const changeData = await p; @@ -3576,6 +3584,8 @@ describe('Model', function() { doc._id.toHexString()); assert.ok(changeData.fullDocument.$__); assert.equal(changeData.fullDocument.get('name'), 'Tony Stark'); + + await changeStream.close(); }); it('fullDocument with immediate watcher and hydrate (gh-14049)', async function() { @@ -3583,15 +3593,22 @@ describe('Model', function() { const doc = await MyModel.create({ name: 'Ned Stark' }); + let changeStream = null; const p = new Promise((resolve) => { - MyModel.watch([], { + changeStream = MyModel.watch([], { fullDocument: 'updateLookup', hydrate: true - }).on('change', change => { + }); + + changeStream.on('change', change => { resolve(change); }); }); + // Need to wait for cursor to be initialized and for resume token to + // be set, otherwise change stream might not pick up the update. + await changeStream.$driverChangeStreamPromise; + await once(changeStream.driverChangeStream, 'resumeTokenChanged'); await MyModel.updateOne({ _id: doc._id }, { name: 'Tony Stark' }); const changeData = await p; @@ -3600,6 +3617,8 @@ describe('Model', function() { doc._id.toHexString()); assert.ok(changeData.fullDocument.$__); assert.equal(changeData.fullDocument.get('name'), 'Tony Stark'); + + await changeStream.close(); }); it('respects discriminators (gh-11007)', async function() { @@ -3654,11 +3673,12 @@ describe('Model', function() { setTimeout(resolve, 500, false); }); - changeStream.close(); - await db; + const close = changeStream.close(); + await db.asPromise(); const readyCalled = await ready; assert.strictEqual(readyCalled, false); + await close; await db.close(); }); @@ -3675,10 +3695,11 @@ describe('Model', function() { await MyModel.create({ name: 'Hodor' }); - changeStream.close(); + const close = changeStream.close(); const closedData = await closed; assert.strictEqual(closedData, true); + await close; await db.close(); }); diff --git a/test/model.watch.test.js b/test/model.watch.test.js index 84d41dc5b14..612bb84a75f 100644 --- a/test/model.watch.test.js +++ b/test/model.watch.test.js @@ -37,12 +37,13 @@ describe('model: watch: ', function() { const changeData = await changed; assert.equal(changeData.operationType, 'insert'); assert.equal(changeData.fullDocument.name, 'Ned Stark'); + await changeStream.close(); }); it('watch() close() prevents buffered watch op from running (gh-7022)', async function() { const MyModel = db.model('Test', new Schema({})); const changeStream = MyModel.watch(); - const ready = new global.Promise(resolve => { + const ready = new Promise(resolve => { changeStream.once('data', () => { resolve(true); }); @@ -64,7 +65,7 @@ describe('model: watch: ', function() { await MyModel.init(); const changeStream = MyModel.watch(); - const closed = new global.Promise(resolve => { + const closed = new Promise(resolve => { changeStream.once('close', () => resolve(true)); }); From 56f4adbd118f7f8a5f1d76f13fd16551e39689da Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Tue, 20 Aug 2024 16:23:27 -0400 Subject: [PATCH 04/10] bump max # of instantiations for now to fix tests --- scripts/tsc-diagnostics-check.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/tsc-diagnostics-check.js b/scripts/tsc-diagnostics-check.js index 3e1f6c66282..c23da8c1f55 100644 --- a/scripts/tsc-diagnostics-check.js +++ b/scripts/tsc-diagnostics-check.js @@ -3,7 +3,7 @@ const fs = require('fs'); const stdin = fs.readFileSync(0).toString('utf8'); -const maxInstantiations = isNaN(process.argv[2]) ? 127500 : parseInt(process.argv[2], 10); +const maxInstantiations = isNaN(process.argv[2]) ? 135000 : parseInt(process.argv[2], 10); console.log(stdin); From a8bb7d6339062c6a90780876beb37156347adae2 Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Tue, 20 Aug 2024 16:44:20 -0400 Subject: [PATCH 05/10] alternative approach to avoid change stream test failures --- lib/cursor/changeStream.js | 4 ++++ test/model.test.js | 3 +-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/cursor/changeStream.js b/lib/cursor/changeStream.js index 55cdecfcdc2..dec8508d58f 100644 --- a/lib/cursor/changeStream.js +++ b/lib/cursor/changeStream.js @@ -86,6 +86,10 @@ class ChangeStream extends EventEmitter { if (data != null && data.fullDocument != null && this.options && this.options.hydrate) { data.fullDocument = this.options.model.hydrate(data.fullDocument); } + if (ev === 'error' && this.closed) { + // If we've already closed the stream, no need to emit further error events + return; + } this.emit(ev, data); }); }); diff --git a/test/model.test.js b/test/model.test.js index 52d04a92b9a..a84b874b669 100644 --- a/test/model.test.js +++ b/test/model.test.js @@ -3695,11 +3695,10 @@ describe('Model', function() { await MyModel.create({ name: 'Hodor' }); - const close = changeStream.close(); + changeStream.close(); const closedData = await closed; assert.strictEqual(closedData, true); - await close; await db.close(); }); From 1b5fecc64fa51750ec69c34d32f8c7f0b140f93c Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Wed, 21 Aug 2024 07:30:14 -0400 Subject: [PATCH 06/10] alternative fix for change stream tests re: #14177 --- lib/cursor/changeStream.js | 4 ---- test/model.test.js | 7 +++++++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/cursor/changeStream.js b/lib/cursor/changeStream.js index dec8508d58f..55cdecfcdc2 100644 --- a/lib/cursor/changeStream.js +++ b/lib/cursor/changeStream.js @@ -86,10 +86,6 @@ class ChangeStream extends EventEmitter { if (data != null && data.fullDocument != null && this.options && this.options.hydrate) { data.fullDocument = this.options.model.hydrate(data.fullDocument); } - if (ev === 'error' && this.closed) { - // If we've already closed the stream, no need to emit further error events - return; - } this.emit(ev, data); }); }); diff --git a/test/model.test.js b/test/model.test.js index a84b874b669..e03cd5bd13b 100644 --- a/test/model.test.js +++ b/test/model.test.js @@ -3680,6 +3680,9 @@ describe('Model', function() { await close; await db.close(); + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); }); it('watch() close() closes the stream (gh-7022)', async function() { @@ -3700,6 +3703,10 @@ describe('Model', function() { assert.strictEqual(closedData, true); await db.close(); + + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); }); it('bubbles up resumeTokenChanged events (gh-14349)', async function() { From e0d598423ea216605d3d29a11d451c6797864d68 Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Wed, 21 Aug 2024 11:47:16 -0400 Subject: [PATCH 07/10] listen for error events on change stream to avoid change stream tests flaking --- test/model.watch.test.js | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/model.watch.test.js b/test/model.watch.test.js index 612bb84a75f..099101e764f 100644 --- a/test/model.watch.test.js +++ b/test/model.watch.test.js @@ -56,6 +56,9 @@ describe('model: watch: ', function() { assert.strictEqual(readyCalled, false); await close; + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); }); it('watch() close() closes the stream (gh-7022)', async function() { @@ -75,6 +78,9 @@ describe('model: watch: ', function() { const closedData = await closed; assert.strictEqual(closedData, true); + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); }); }); }); From b08c93e5789ba5a0c3d5b12728c6bc9af1143ce6 Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Wed, 21 Aug 2024 11:57:43 -0400 Subject: [PATCH 08/10] shuffle around error handler re: ChangeStream is closed errors --- test/model.test.js | 8 ++++---- test/model.watch.test.js | 13 +++++++------ 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/test/model.test.js b/test/model.test.js index e03cd5bd13b..d2060fd9d00 100644 --- a/test/model.test.js +++ b/test/model.test.js @@ -3678,11 +3678,11 @@ describe('Model', function() { const readyCalled = await ready; assert.strictEqual(readyCalled, false); - await close; - await db.close(); // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream // may still poll after close. changeStream.on('error', () => {}); + await close; + await db.close(); }); it('watch() close() closes the stream (gh-7022)', async function() { @@ -3702,11 +3702,11 @@ describe('Model', function() { const closedData = await closed; assert.strictEqual(closedData, true); - await db.close(); - // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream // may still poll after close. changeStream.on('error', () => {}); + + await db.close(); }); it('bubbles up resumeTokenChanged events (gh-14349)', async function() { diff --git a/test/model.watch.test.js b/test/model.watch.test.js index 099101e764f..3e2ad733a24 100644 --- a/test/model.watch.test.js +++ b/test/model.watch.test.js @@ -50,15 +50,15 @@ describe('model: watch: ', function() { setTimeout(resolve, 500, false); }); + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); const close = changeStream.close(); await db.asPromise(); const readyCalled = await ready; assert.strictEqual(readyCalled, false); await close; - // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream - // may still poll after close. - changeStream.on('error', () => {}); }); it('watch() close() closes the stream (gh-7022)', async function() { @@ -74,13 +74,14 @@ describe('model: watch: ', function() { await MyModel.create({ name: 'Hodor' }); + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); + await changeStream.close(); const closedData = await closed; assert.strictEqual(closedData, true); - // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream - // may still poll after close. - changeStream.on('error', () => {}); }); }); }); From 8c6bb8b2ce3e4522d758a8fd99f715d164fb864e Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Wed, 21 Aug 2024 13:12:22 -0400 Subject: [PATCH 09/10] shuffle around some more error handlers --- test/model.test.js | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/test/model.test.js b/test/model.test.js index d2060fd9d00..9524e2cdbe7 100644 --- a/test/model.test.js +++ b/test/model.test.js @@ -3673,14 +3673,15 @@ describe('Model', function() { setTimeout(resolve, 500, false); }); + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); + const close = changeStream.close(); await db.asPromise(); const readyCalled = await ready; assert.strictEqual(readyCalled, false); - // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream - // may still poll after close. - changeStream.on('error', () => {}); await close; await db.close(); }); @@ -3698,14 +3699,14 @@ describe('Model', function() { await MyModel.create({ name: 'Hodor' }); - changeStream.close(); - const closedData = await closed; - assert.strictEqual(closedData, true); - // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream // may still poll after close. changeStream.on('error', () => {}); + changeStream.close(); + const closedData = await closed; + assert.strictEqual(closedData, true); + await db.close(); }); From a725a7565c03935d6788284dca016f37718b51dc Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Wed, 21 Aug 2024 13:21:26 -0400 Subject: [PATCH 10/10] add some more error handling to hopefully prevent flaking change stream tests --- test/model.test.js | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/model.test.js b/test/model.test.js index 9524e2cdbe7..f2d88586e63 100644 --- a/test/model.test.js +++ b/test/model.test.js @@ -3509,6 +3509,9 @@ describe('Model', function() { } changeStream.removeListener('change', listener); listener = null; + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); changeStream.close(); changeStream = null; }); @@ -3658,6 +3661,9 @@ describe('Model', function() { assert.equal(changeData.operationType, 'insert'); assert.equal(changeData.fullDocument.name, 'Ned Stark'); + // Change stream may still emit "MongoAPIError: ChangeStream is closed" because change stream + // may still poll after close. + changeStream.on('error', () => {}); await changeStream.close(); await db.close(); });