diff --git a/src/database.ts b/src/database.ts index a8c1cb079..7c0f0addb 100644 --- a/src/database.ts +++ b/src/database.ts @@ -3212,8 +3212,26 @@ class Database extends common.GrpcServiceObject { ? (optionsOrRunFn as RunTransactionOptions) : {}; + const retry = (span: Span) => { + this.runTransaction(options, (err, txn) => { + if (err) { + setSpanError(span, err); + runFn!(err, null); + return; + } + + txn!.once('end', () => { + span.end(); + }); + txn!.once('error', () => { + span.end(); + }); + runFn!(null, txn!); + }); + }; + startTrace('Database.runTransaction', this._traceConfig, span => { - this.pool_.getSession(async (err, session?, transaction?) => { + this.pool_.getSession((err, session?, transaction?) => { if (err) { setSpanError(span, err); } @@ -3222,11 +3240,7 @@ class Database extends common.GrpcServiceObject { span.addEvent('No session available', { 'session.id': session?.id, }); - // In this case we are invoking runTransaction afresh - // hence we have to wait for this call to complete before - // ending the span. - await this.runTransaction(options, runFn!); - span.end(); + retry(span); return; } @@ -3246,6 +3260,18 @@ class Database extends common.GrpcServiceObject { transaction!.excludeTxnFromChangeStreams(); } + // Our span should only be ended if the + // transaction either errored or was ended. + transaction!.once('error', err => { + setSpanError(span, err!); + span.end(); + }); + + transaction!.once('end', err => { + setSpanError(span, err!); + span.end(); + }); + const release = () => { this.pool_.release(session!); }; @@ -3253,17 +3279,7 @@ class Database extends common.GrpcServiceObject { const runner = new TransactionRunner( session!, transaction!, - async (err, resp) => { - if (err) { - setSpanError(span, err!); - } - // It is paramount that we await - // the caller to return before - // exiting this function otherwise the span - // order will not be correct. - await runFn!(err, resp); - span.end(); - }, + runFn!, options ); @@ -3275,13 +3291,12 @@ class Database extends common.GrpcServiceObject { 'session.id': session?.id, }); release(); - await this.runTransaction(options, runFn!); + retry(span); } else { setImmediate(runFn!, err); release(); + span.end(); } - - span.end(); }); }); });