-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: inline BeginTransaction with first statement #1692
Changes from all commits
3d06c45
8d5c6d6
15944f5
dd9891e
6b57710
976ee59
154872b
2caaa0f
9929be5
f65e2d1
6d83a94
7eb3831
f9aa221
eeef8cf
4fff842
cfa405e
53f1e1c
e146e89
1b82e78
e5585ff
241394d
fa3c024
c2b2ac6
2f226e1
6d94a86
b0bccf5
346abea
311a80f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -216,6 +216,9 @@ export type CommitCallback = | |
export class Snapshot extends EventEmitter { | ||
protected _options!: spannerClient.spanner.v1.ITransactionOptions; | ||
protected _seqno = 1; | ||
protected _idWaiter: Readable; | ||
protected _inlineBeginStarted; | ||
protected _useInRunner = false; | ||
id?: Uint8Array | string; | ||
ended: boolean; | ||
metadata?: spannerClient.spanner.v1.ITransaction; | ||
|
@@ -289,6 +292,10 @@ export class Snapshot extends EventEmitter { | |
this.resourceHeader_ = { | ||
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_, | ||
}; | ||
this._idWaiter = new Readable({ | ||
read() {}, | ||
}); | ||
this._inlineBeginStarted = false; | ||
} | ||
|
||
/** | ||
|
@@ -378,17 +385,7 @@ export class Snapshot extends EventEmitter { | |
callback!(err, resp); | ||
return; | ||
} | ||
|
||
const {id, readTimestamp} = resp; | ||
|
||
this.id = id!; | ||
this.metadata = resp; | ||
|
||
if (readTimestamp) { | ||
this.readTimestampProto = readTimestamp; | ||
this.readTimestamp = new PreciseDate(readTimestamp as DateStruct); | ||
} | ||
|
||
this._update(resp); | ||
callback!(null, resp); | ||
} | ||
); | ||
|
@@ -573,6 +570,8 @@ export class Snapshot extends EventEmitter { | |
|
||
if (this.id) { | ||
transaction.id = this.id as Uint8Array; | ||
} else if (this._options.readWrite) { | ||
transaction.begin = this._options; | ||
} else { | ||
transaction.singleUse = this._options; | ||
} | ||
|
@@ -603,6 +602,10 @@ export class Snapshot extends EventEmitter { | |
); | ||
|
||
const makeRequest = (resumeToken?: ResumeToken): Readable => { | ||
if (this.id && transaction.begin) { | ||
delete transaction.begin; | ||
transaction.id = this.id; | ||
} | ||
return this.requestStream({ | ||
client: 'SpannerClient', | ||
method: 'streamingRead', | ||
|
@@ -612,11 +615,21 @@ export class Snapshot extends EventEmitter { | |
}); | ||
}; | ||
|
||
return partialResultStream(makeRequest, { | ||
return partialResultStream(this._wrapWithIdWaiter(makeRequest), { | ||
json, | ||
jsonOptions, | ||
maxResumeRetries, | ||
}); | ||
}) | ||
?.on('response', response => { | ||
if (response.metadata && response.metadata!.transaction && !this.id) { | ||
this._update(response.metadata!.transaction); | ||
} | ||
}) | ||
.on('error', () => { | ||
if (!this.id && this._useInRunner) { | ||
this.begin(); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
|
@@ -909,6 +922,9 @@ export class Snapshot extends EventEmitter { | |
.on('response', response => { | ||
if (response.metadata) { | ||
metadata = response.metadata; | ||
if (metadata.transaction && !this.id) { | ||
this._update(metadata.transaction); | ||
} | ||
} | ||
}) | ||
.on('data', row => rows.push(row)) | ||
|
@@ -1034,6 +1050,8 @@ export class Snapshot extends EventEmitter { | |
const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; | ||
if (this.id) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This does not take into account that a transaction could in theory have multiple requests in flight at the same time. If a transaction starts out with sending two Consider the following test case (the it('should only inline one begin transaction', async () => {
const database = newTestDatabase();
await database.runTransactionAsync(async tx => {
const rowCount1 = getRowCountFromStreamingSql(tx, {sql: selectSql});
const rowCount2 = getRowCountFromStreamingSql(tx, {sql: selectSql});
await Promise.all([rowCount1, rowCount2]);
await tx.commit();
});
await database.close();
let request = spannerMock.getRequests().find(val => {
return (val as v1.ExecuteSqlRequest).sql;
}) as v1.ExecuteSqlRequest;
assert.ok(request, 'no ExecuteSqlRequest found');
assert.deepStrictEqual(request.transaction!.begin!.readWrite, {});
assert.strictEqual(request.sql, selectSql);
request = spannerMock
.getRequests()
.slice()
.reverse()
.find(val => {
return (val as v1.ExecuteSqlRequest).sql;
}) as v1.ExecuteSqlRequest;
assert.ok(request, 'no ExecuteSqlRequest found');
assert.strictEqual(request.sql, selectSql);
assert.ok(request.transaction!.id, 'TransactionID is not set.');
const beginTxnRequest = spannerMock.getRequests().find(val => {
return (val as v1.BeginTransactionRequest).options?.readWrite;
}) as v1.BeginTransactionRequest;
assert.ok(!beginTxnRequest, 'beginTransaction was called');
}); There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're absolutely right. Added a lock that prevents a multiple inline begin transaction at the same time. Thanks! |
||
transaction.id = this.id as Uint8Array; | ||
} else if (this._options.readWrite) { | ||
transaction.begin = this._options; | ||
} else { | ||
transaction.singleUse = this._options; | ||
} | ||
|
@@ -1059,7 +1077,7 @@ export class Snapshot extends EventEmitter { | |
}; | ||
|
||
const makeRequest = (resumeToken?: ResumeToken): Readable => { | ||
if (!reqOpts) { | ||
if (!reqOpts || (this.id && !reqOpts.transaction.id)) { | ||
try { | ||
sanitizeRequest(); | ||
} catch (e) { | ||
|
@@ -1078,11 +1096,21 @@ export class Snapshot extends EventEmitter { | |
}); | ||
}; | ||
|
||
return partialResultStream(makeRequest, { | ||
return partialResultStream(this._wrapWithIdWaiter(makeRequest), { | ||
json, | ||
jsonOptions, | ||
maxResumeRetries, | ||
}); | ||
}) | ||
.on('response', response => { | ||
if (response.metadata && response.metadata!.transaction && !this.id) { | ||
this._update(response.metadata!.transaction); | ||
} | ||
}) | ||
.on('error', () => { | ||
if (!this.id && this._useInRunner) { | ||
this.begin(); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
|
@@ -1226,6 +1254,51 @@ export class Snapshot extends EventEmitter { | |
|
||
return {params, paramTypes}; | ||
} | ||
|
||
/** | ||
* Update transaction properties from the response. | ||
* | ||
* @private | ||
* | ||
* @param {spannerClient.spanner.v1.ITransaction} resp Response object. | ||
*/ | ||
protected _update(resp: spannerClient.spanner.v1.ITransaction): void { | ||
const {id, readTimestamp} = resp; | ||
|
||
this.id = id!; | ||
this.metadata = resp; | ||
|
||
if (readTimestamp) { | ||
this.readTimestampProto = readTimestamp; | ||
this.readTimestamp = new PreciseDate(readTimestamp as DateStruct); | ||
} | ||
this._idWaiter.emit('notify'); | ||
} | ||
|
||
/** | ||
* Wrap `makeRequest` function with the lock to make sure the inline begin | ||
* transaction can happen only once. | ||
* | ||
* @param makeRequest | ||
* @private | ||
*/ | ||
private _wrapWithIdWaiter( | ||
makeRequest: (resumeToken?: ResumeToken) => Readable | ||
): (resumeToken?: ResumeToken) => Readable { | ||
if (this.id || !this._options.readWrite) { | ||
return makeRequest; | ||
} | ||
if (!this._inlineBeginStarted) { | ||
this._inlineBeginStarted = true; | ||
return makeRequest; | ||
} | ||
return (resumeToken?: ResumeToken): Readable => | ||
this._idWaiter.once('notify', () => | ||
makeRequest(resumeToken) | ||
.on('data', chunk => this._idWaiter.emit('data', chunk)) | ||
.once('end', () => this._idWaiter.emit('end')) | ||
); | ||
} | ||
} | ||
|
||
/*! Developer Documentation | ||
|
@@ -1528,14 +1601,20 @@ export class Transaction extends Dml { | |
return {sql, params, paramTypes}; | ||
}); | ||
|
||
const transaction: spannerClient.spanner.v1.ITransactionSelector = {}; | ||
if (this.id) { | ||
transaction.id = this.id as Uint8Array; | ||
} else { | ||
transaction.begin = this._options; | ||
} | ||
const reqOpts: spannerClient.spanner.v1.ExecuteBatchDmlRequest = { | ||
session: this.session.formattedName_!, | ||
requestOptions: this.configureTagOptions( | ||
false, | ||
this.requestOptions?.transactionTag ?? undefined, | ||
(options as BatchUpdateOptions).requestOptions | ||
), | ||
transaction: {id: this.id!}, | ||
transaction, | ||
seqno: this._seqno++, | ||
statements, | ||
} as spannerClient.spanner.v1.ExecuteBatchDmlRequest; | ||
|
@@ -1562,6 +1641,11 @@ export class Transaction extends Dml { | |
} | ||
|
||
const {resultSets, status} = resp; | ||
for (const resultSet of resultSets) { | ||
if (!this.id && resultSet.metadata?.transaction) { | ||
this._update(resultSet.metadata.transaction); | ||
} | ||
} | ||
const rowCounts: number[] = resultSets.map(({stats}) => { | ||
return ( | ||
(stats && | ||
|
@@ -1686,8 +1770,11 @@ export class Transaction extends Dml { | |
|
||
if (this.id) { | ||
reqOpts.transactionId = this.id as Uint8Array; | ||
} else { | ||
} else if (!this._useInRunner) { | ||
reqOpts.singleUseTransaction = this._options; | ||
} else { | ||
this.begin().then(() => this.commit(options, callback)); | ||
return; | ||
} | ||
|
||
if ( | ||
|
@@ -2184,6 +2271,13 @@ export class Transaction extends Dml { | |
const unique = new Set(allKeys); | ||
return Array.from(unique).sort(); | ||
} | ||
|
||
/** | ||
* Mark transaction as started from the runner. | ||
*/ | ||
useInRunner(): void { | ||
this._useInRunner = true; | ||
} | ||
} | ||
|
||
/*! Developer Documentation | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind adding a test that verifies the following:
PartialResultSet
s are returned by the stream, with (at least) one of them returning a resume token.UNAVAILABLE
error and the stream is restarted with a resume token.Step 3 should use the transaction ID that was returned by step 1, and not include a
BeginTransaction
option.(https://github.com/googleapis/nodejs-spanner/pull/1253/files is a very old implementation for the same. Some of the tests there might be re-usable.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added. thanks a lot for this comment: it helped to find a bug in the code