Skip to content

Commit

Permalink
Merge pull request #14846 from Automattic/vkarpov15/changestream-sync…
Browse files Browse the repository at this point in the history
…-error

fix(cursor): throw error in ChangeStream constructor if `changeStreamThunk()` throws a sync error
  • Loading branch information
vkarpov15 authored Sep 3, 2024
2 parents 8f4070a + 9fa956f commit 94e1237
Showing 1 changed file with 43 additions and 9 deletions.
52 changes: 43 additions & 9 deletions lib/cursor/changeStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

const EventEmitter = require('events').EventEmitter;
const MongooseError = require('../error/mongooseError');

/*!
* ignore
Expand All @@ -25,6 +26,7 @@ class ChangeStream extends EventEmitter {
this.bindedEvents = false;
this.pipeline = pipeline;
this.options = options;
this.errored = false;

if (options && options.hydrate && !options.model) {
throw new Error(
Expand All @@ -33,19 +35,36 @@ class ChangeStream extends EventEmitter {
);
}

let syncError = null;
this.$driverChangeStreamPromise = new Promise((resolve, reject) => {
// This wrapper is necessary because of buffering.
changeStreamThunk((err, driverChangeStream) => {
if (err != null) {
this.emit('error', err);
return reject(err);
}
try {
changeStreamThunk((err, driverChangeStream) => {
if (err != null) {
this.errored = true;
this.emit('error', err);
return reject(err);
}

this.driverChangeStream = driverChangeStream;
this.emit('ready');
resolve();
});
this.driverChangeStream = driverChangeStream;
this.emit('ready');
resolve();
});
} catch (err) {
syncError = err;
this.errored = true;
this.emit('error', err);
reject(err);
}
});

// Because a ChangeStream is an event emitter, there's no way to register an 'error' handler
// that catches errors which occur in the constructor, unless we force sync errors into async
// errors with setImmediate(). For cleaner stack trace, we just immediately throw any synchronous
// errors that occurred with changeStreamThunk().
if (syncError != null) {
throw syncError;
}
}

_bindEvents() {
Expand Down Expand Up @@ -92,10 +111,16 @@ class ChangeStream extends EventEmitter {
}

hasNext(cb) {
if (this.errored) {
throw new MongooseError('Cannot call hasNext() on errored ChangeStream');
}
return this.driverChangeStream.hasNext(cb);
}

next(cb) {
if (this.errored) {
throw new MongooseError('Cannot call next() on errored ChangeStream');
}
if (this.options && this.options.hydrate) {
if (cb != null) {
const originalCb = cb;
Expand Down Expand Up @@ -126,16 +151,25 @@ class ChangeStream extends EventEmitter {
}

addListener(event, handler) {
if (this.errored) {
throw new MongooseError('Cannot call addListener() on errored ChangeStream');
}
this._bindEvents();
return super.addListener(event, handler);
}

on(event, handler) {
if (this.errored) {
throw new MongooseError('Cannot call on() on errored ChangeStream');
}
this._bindEvents();
return super.on(event, handler);
}

once(event, handler) {
if (this.errored) {
throw new MongooseError('Cannot call once() on errored ChangeStream');
}
this._bindEvents();
return super.once(event, handler);
}
Expand Down

0 comments on commit 94e1237

Please sign in to comment.