Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: inline BeginTransaction with first statement #1692

Merged
merged 28 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3d06c45
feat: inline BeginTransaction with first statement
ko3a4ok Aug 6, 2022
8d5c6d6
fix: prevent multiple begin transactions happen during a race conditi…
ko3a4ok Aug 12, 2022
15944f5
Merge branch 'googleapis:main' into inline-begin-txns
ko3a4ok Aug 24, 2022
dd9891e
fix: minor
ko3a4ok Aug 26, 2022
6b57710
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 14, 2022
976ee59
Merge branch 'googleapis:main' into inline-begin-txns
ko3a4ok Sep 14, 2022
154872b
fix: call explicit begin transaction only after the first attempt.
ko3a4ok Sep 14, 2022
2caaa0f
feat: add inline begin transaction for batch DML requests
ko3a4ok Sep 20, 2022
9929be5
Merge branch 'main' into inline-begin-txns
ko3a4ok Sep 20, 2022
f65e2d1
fix: lint
ko3a4ok Sep 20, 2022
6d83a94
fix: explicit begin transaction for blind commit if a transaction run…
ko3a4ok Sep 23, 2022
7eb3831
Merge branch 'googleapis:main' into inline-begin-txns
ko3a4ok Sep 23, 2022
f9aa221
Merge branch 'googleapis:main' into inline-begin-txns
ko3a4ok Oct 12, 2022
eeef8cf
feat: adding more unit tests
ko3a4ok Oct 12, 2022
4fff842
fix: explicit begin transaction for unknown error
ko3a4ok Oct 17, 2022
cfa405e
Merge branch 'main' into inline-begin-txns
surbhigarg92 Oct 18, 2022
53f1e1c
fix: format
ko3a4ok Oct 17, 2022
e146e89
Merge remote-tracking branch 'origin/inline-begin-txns' into inline-b…
ko3a4ok Oct 21, 2022
1b82e78
Merge branch 'main' into inline-begin-txns
surbhigarg92 Oct 31, 2022
e5585ff
fix: tests after merge
ko3a4ok Oct 31, 2022
241394d
Lint fix
surbhigarg92 Nov 2, 2022
fa3c024
fix: lint
ko3a4ok Oct 31, 2022
c2b2ac6
Merge remote-tracking branch 'origin/inline-begin-txns' into inline-b…
ko3a4ok Nov 2, 2022
2f226e1
fix: samples test
ko3a4ok Nov 2, 2022
6d94a86
Merge remote-tracking branch 'origin/inline-begin-txns' into inline-b…
ko3a4ok Nov 4, 2022
b0bccf5
Merge branch 'main' into inline-begin-txns
surbhigarg92 Nov 4, 2022
346abea
fix: system test
ko3a4ok Nov 5, 2022
311a80f
fix: system emulator test
surbhigarg92 Nov 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/session-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,6 @@ export class SessionPool extends EventEmitter implements SessionPoolInterface {
const transaction = session.transaction(
(session.parent as Database).queryOptions_
);
await transaction.begin();
session.txn = transaction;
}

Expand Down
82 changes: 68 additions & 14 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ export type CommitCallback =
export class Snapshot extends EventEmitter {
protected _options!: spannerClient.spanner.v1.ITransactionOptions;
protected _seqno = 1;
protected _idWaiter: Readable;
protected _inlineBeginStarted;
id?: Uint8Array | string;
ended: boolean;
metadata?: spannerClient.spanner.v1.ITransaction;
Expand Down Expand Up @@ -289,6 +291,11 @@ export class Snapshot extends EventEmitter {
this.resourceHeader_ = {
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_,
};
this._idWaiter = new Readable({
read() {
},
});
this._inlineBeginStarted = false;
}

/**
Expand Down Expand Up @@ -378,17 +385,7 @@ export class Snapshot extends EventEmitter {
callback!(err, resp);
return;
}

const {id, readTimestamp} = resp;

this.id = id!;
this.metadata = resp;

if (readTimestamp) {
this.readTimestampProto = readTimestamp;
this.readTimestamp = new PreciseDate(readTimestamp as DateStruct);
}

this._update(resp);
callback!(null, resp);
}
);
Expand Down Expand Up @@ -573,6 +570,8 @@ export class Snapshot extends EventEmitter {

if (this.id) {
transaction.id = this.id as Uint8Array;
} else if (typeof this._options.readWrite !== 'undefined') {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this rather be else if(this._options.readWrite)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, fixed, thanks

transaction.begin = this._options;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding a test that verifies the following:

  1. A query is the first statement in a read/write transaction. A transaction ID is successfully returned by initial request.
  2. One or more PartialResultSets are returned by the stream, with (at least) one of them returning a resume token.
  3. The stream fails halfway with an UNAVAILABLE error and the stream is restarted with a resume token.

Step 3 should use the transaction ID that was returned by step 1, and not include a BeginTransaction option.

(https://github.com/googleapis/nodejs-spanner/pull/1253/files is a very old implementation for the same. Some of the tests there might be re-usable.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. thanks a lot for this comment: it helped to find a bug in the code

} else {
transaction.singleUse = this._options;
}
Expand Down Expand Up @@ -612,10 +611,14 @@ export class Snapshot extends EventEmitter {
});
};

return partialResultStream(makeRequest, {
return partialResultStream(this._wrapWithIdWaiter(makeRequest), {
json,
jsonOptions,
maxResumeRetries,
}).on('response', response => {
Copy link
Contributor

@surbhigarg92 surbhigarg92 Aug 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if the call fails?
As per the design we should make an explicit begin transaction call if the call fails.

cc: @olavloite

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the call fails, the transaction is created explicitly with "BeginTransaction" rpc call.
It's covered in the test: "should use beginTransaction on retry".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this done at the backend or do we explicitly call begin transaction from the client side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the client side

if (response.metadata && response.metadata!.transaction && !this.id) {
this._update(response.metadata!.transaction);
}
});
}

Expand Down Expand Up @@ -909,6 +912,9 @@ export class Snapshot extends EventEmitter {
.on('response', response => {
if (response.metadata) {
metadata = response.metadata;
if (metadata.transaction && !this.id) {
this._update(metadata.transaction);
}
}
})
.on('data', row => rows.push(row))
Expand Down Expand Up @@ -1034,6 +1040,8 @@ export class Snapshot extends EventEmitter {
const transaction: spannerClient.spanner.v1.ITransactionSelector = {};
if (this.id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not take into account that a transaction could in theory have multiple requests in flight at the same time. If a transaction starts out with sending two SELECT statements to the backend, it might very well be that the first that is sent has not yet returned a transaction id before the second is being sent. That will cause both requests to include a BeginTransaction option.

Consider the following test case (the getRowCountFromStreamingSql function is defined in test/Spanner.ts):

    it('should only inline one begin transaction', async () => {
      const database = newTestDatabase();
      await database.runTransactionAsync(async tx => {
        const rowCount1 = getRowCountFromStreamingSql(tx, {sql: selectSql});
        const rowCount2 = getRowCountFromStreamingSql(tx, {sql: selectSql});
        await Promise.all([rowCount1, rowCount2]);
        await tx.commit();
      });
      await database.close();

      let request = spannerMock.getRequests().find(val => {
        return (val as v1.ExecuteSqlRequest).sql;
      }) as v1.ExecuteSqlRequest;
      assert.ok(request, 'no ExecuteSqlRequest found');
      assert.deepStrictEqual(request.transaction!.begin!.readWrite, {});
      assert.strictEqual(request.sql, selectSql);

      request = spannerMock
        .getRequests()
        .slice()
        .reverse()
        .find(val => {
          return (val as v1.ExecuteSqlRequest).sql;
        }) as v1.ExecuteSqlRequest;
      assert.ok(request, 'no ExecuteSqlRequest found');
      assert.strictEqual(request.sql, selectSql);
      assert.ok(request.transaction!.id, 'TransactionID is not set.');
      const beginTxnRequest = spannerMock.getRequests().find(val => {
        return (val as v1.BeginTransactionRequest).options?.readWrite;
      }) as v1.BeginTransactionRequest;
      assert.ok(!beginTxnRequest, 'beginTransaction was called');
    });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're absolutely right. Added a lock that prevents a multiple inline begin transaction at the same time. Thanks!

transaction.id = this.id as Uint8Array;
} else if (typeof this._options.readWrite !== 'undefined') {
transaction.begin = this._options;
} else {
transaction.singleUse = this._options;
}
Expand All @@ -1059,7 +1067,7 @@ export class Snapshot extends EventEmitter {
};

const makeRequest = (resumeToken?: ResumeToken): Readable => {
if (!reqOpts) {
if (!reqOpts || (this.id && !reqOpts.transaction.id)) {
try {
sanitizeRequest();
} catch (e) {
Expand All @@ -1078,10 +1086,14 @@ export class Snapshot extends EventEmitter {
});
};

return partialResultStream(makeRequest, {
return partialResultStream(this._wrapWithIdWaiter(makeRequest), {
json,
jsonOptions,
maxResumeRetries,
}).on('response', response => {
if (response.metadata && response.metadata!.transaction && !this.id) {
this._update(response.metadata!.transaction);
}
});
}

Expand Down Expand Up @@ -1226,6 +1238,48 @@ export class Snapshot extends EventEmitter {

return {params, paramTypes};
}

/**
* Update transaction properties from the response.
*
* @private
*
* @param {spannerClient.spanner.v1.ITransaction} resp Response object.
*/
private _update(resp: spannerClient.spanner.v1.ITransaction): void {
const {id, readTimestamp} = resp;

this.id = id!;
this.metadata = resp;

if (readTimestamp) {
this.readTimestampProto = readTimestamp;
this.readTimestamp = new PreciseDate(readTimestamp as DateStruct);
}
this._idWaiter.emit('notify');
}

/**
* Wrap `makeRequest` function with the lock to make sure the inline begin
* transaction can happen only once.
*
* @param makeRequest
* @private
*/
private _wrapWithIdWaiter(
makeRequest: (resumeToken?: ResumeToken) => Readable):
(resumeToken?: ResumeToken) => Readable {
if (this.id || typeof this._options.readWrite === 'undefined') {
return makeRequest;
}
if (!this._inlineBeginStarted) {
this._inlineBeginStarted = true;
return makeRequest;
}
return (resumeToken?: ResumeToken): Readable =>
this._idWaiter.once('notify', () =>
this._idWaiter.wrap(makeRequest(resumeToken)));
}
}

/*! Developer Documentation
Expand Down
96 changes: 58 additions & 38 deletions test/mockserver/mockspanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import * as path from 'path';
import {google} from '../../protos/protos';
import {grpc} from 'google-gax';
import {grpc, ServiceError} from 'google-gax';
import * as protoLoader from '@grpc/proto-loader';
// eslint-disable-next-line node/no-extraneous-import
import {Metadata} from '@grpc/grpc-js';
Expand Down Expand Up @@ -317,7 +317,7 @@ export class MockSpanner {

abortTransaction(transaction: Transaction): void {
const formattedId = `${transaction.session.formattedName_}/transactions/${transaction.id}`;
if (this.transactions.has(formattedId)) {
if (this.transactions.has(formattedId) || !transaction.id) {
this.transactions.delete(formattedId);
this.transactionOptions.delete(formattedId);
this.abortedTransactions.add(formattedId);
Expand Down Expand Up @@ -534,7 +534,7 @@ export class MockSpanner {
this.pushRequest(call.request!, call.metadata);
this.simulateExecutionTime(this.executeStreamingSql.name)
.then(() => {
if (call.request!.transaction && call.request!.transaction.id) {
if (call.request!.transaction) {
const fullTransactionId = `${call.request!.session}/transactions/${
call.request!.transaction.id
}`;
Expand All @@ -549,6 +549,20 @@ export class MockSpanner {
}
const res = this.statementResults.get(call.request!.sql);
if (res) {
if (call.request!.transaction?.begin) {
const txn = this._updateTransaction(
call.request!.session,
call.request!.transaction.begin
);
if (txn instanceof Error) {
call.emit('error', txn);
call.end();
return;
}
if (res.type === StatementResultType.RESULT_SET) {
(res.resultSet as protobuf.ResultSet).metadata!.transaction = txn;
}
}
let partialResultSets;
let resumeIndex;
let streamErr;
Expand Down Expand Up @@ -817,32 +831,14 @@ export class MockSpanner {
this.pushRequest(call.request!, call.metadata);
this.simulateExecutionTime(this.beginTransaction.name)
.then(() => {
const session = this.sessions.get(call.request!.session);
if (session) {
let counter = this.transactionCounters.get(session.name);
if (!counter) {
counter = 0;
}
const id = ++counter;
this.transactionCounters.set(session.name, counter);
const transactionId = id.toString().padStart(12, '0');
const fullTransactionId =
session.name + '/transactions/' + transactionId;
const readTimestamp =
call.request!.options && call.request!.options.readOnly
? now()
: undefined;
const transaction = protobuf.Transaction.create({
id: Buffer.from(transactionId),
readTimestamp,
});
this.transactions.set(fullTransactionId, transaction);
this.transactionOptions.set(fullTransactionId, call.request!.options);
callback(null, transaction);
const res = this._updateTransaction(
call.request!.session,
call.request!.options
);
if (res instanceof Error) {
callback(res);
} else {
callback(
MockSpanner.createSessionNotFoundError(call.request!.session)
);
callback(null, res);
}
})
.catch(err => {
Expand All @@ -857,16 +853,14 @@ export class MockSpanner {
this.pushRequest(call.request!, call.metadata);
this.simulateExecutionTime(this.commit.name)
.then(() => {
if (call.request!.transactionId) {
const fullTransactionId = `${call.request!.session}/transactions/${
call.request!.transactionId
}`;
if (this.abortedTransactions.has(fullTransactionId)) {
callback(
MockSpanner.createTransactionAbortedError(`${fullTransactionId}`)
);
return;
}
const fullTransactionId = `${call.request!.session}/transactions/${
call.request!.transactionId
}`;
if (this.abortedTransactions.has(fullTransactionId)) {
callback(
MockSpanner.createTransactionAbortedError(`${fullTransactionId}`)
);
return;
}
const session = this.sessions.get(call.request!.session);
if (session) {
Expand Down Expand Up @@ -947,6 +941,32 @@ export class MockSpanner {
this.pushRequest(call.request!, call.metadata);
callback(createUnimplementedError('PartitionQuery is not yet implemented'));
}

private _updateTransaction(
sessionName: string,
options: google.spanner.v1.ITransactionOptions | null | undefined
): google.spanner.v1.Transaction | ServiceError {
const session = this.sessions.get(sessionName);
if (!session) {
return MockSpanner.createSessionNotFoundError(sessionName);
}
let counter = this.transactionCounters.get(session.name);
if (!counter) {
counter = 0;
}
const id = ++counter;
this.transactionCounters.set(session.name, counter);
const transactionId = id.toString().padStart(12, '0');
const fullTransactionId = session.name + '/transactions/' + transactionId;
const readTimestamp = options && options.readOnly ? now() : undefined;
const transaction = protobuf.Transaction.create({
id: Buffer.from(transactionId),
readTimestamp,
});
this.transactions.set(fullTransactionId, transaction);
this.transactionOptions.set(fullTransactionId, options);
return transaction;
}
}

/**
Expand Down
15 changes: 0 additions & 15 deletions test/session-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1550,21 +1550,6 @@ describe('SessionPool', () => {
});
});

describe('_prepareTransaction', () => {
it('should prepare a transaction', async () => {
const fakeSession = createSession();
const fakeTransaction = new FakeTransaction();
const beginStub = sandbox.stub(fakeTransaction, 'begin').resolves();

fakeSession.transaction = sandbox.stub().returns(fakeTransaction);

await sessionPool._prepareTransaction(fakeSession);

assert.strictEqual(fakeSession.txn, fakeTransaction);
assert.strictEqual(beginStub.callCount, 1);
});
});

describe('_release', () => {
it('should release the session', () => {
const fakeSession = createSession('id', {type: types.ReadOnly});
Expand Down
Loading