Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for blind writes #2065

Merged
merged 29 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7ca36ee
sample: blind write
alkatrivedi Jun 10, 2024
8bded80
sample: blind write
alkatrivedi Jun 10, 2024
16330f0
refactor
alkatrivedi Jun 12, 2024
c12fa1d
Merge branch 'googleapis:main' into blind-writes
alkatrivedi Jun 18, 2024
01098c1
add class mutation
alkatrivedi Jun 18, 2024
e848eaf
add class mutation
alkatrivedi Jun 18, 2024
fb6c892
Merge branch 'googleapis:main' into blind-writes
alkatrivedi Jun 20, 2024
934863d
feat: blind-writes
alkatrivedi Jun 20, 2024
acc2956
Merge branch 'main' into blind-writes
alkatrivedi Jun 21, 2024
3a5d4f6
refactor
alkatrivedi Jun 21, 2024
431a5bd
fix: lint errors
alkatrivedi Jun 21, 2024
47545d9
refactor
alkatrivedi Jun 21, 2024
2fd72f9
fix: lint errors
alkatrivedi Jun 21, 2024
8a4f973
fix: header checks
alkatrivedi Jun 21, 2024
21159ce
refactor the blind write
alkatrivedi Jun 21, 2024
c154d12
Merge branch 'main' into blind-writes
alkatrivedi Jun 24, 2024
74a0188
feat: add support for blind writes
release-please[bot] Jun 24, 2024
aea887b
Merge branch 'googleapis:main' into blind-writes
alkatrivedi Jul 18, 2024
c4a2ba0
fix: presubmit error
alkatrivedi Jul 18, 2024
0a1cb15
Merge branch 'main' into blind-writes
surbhigarg92 Jul 19, 2024
9d29938
Merge branch 'main' into blind-writes
surbhigarg92 Jul 19, 2024
6a38244
Merge branch 'main' into blind-writes
alkatrivedi Jul 22, 2024
5596859
refactor: docs of the method writeAtLeastOnce
alkatrivedi Jul 24, 2024
d234007
test: unit test using mockspanner
alkatrivedi Jul 25, 2024
cb58537
fix: lint errors
alkatrivedi Jul 25, 2024
ea778ed
docs refactor
alkatrivedi Jul 25, 2024
24f38be
Merge branch 'main' into blind-writes
alkatrivedi Jul 26, 2024
72940ca
refactor
alkatrivedi Jul 26, 2024
2b204fe
refactor
alkatrivedi Jul 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ import {
import {CreateTableCallback, CreateTableResponse, Table} from './table';
import {
BatchWriteOptions,
CommitCallback,
CommitResponse,
ExecuteSqlRequest,
MutationGroup,
Mutation,
RunCallback,
RunResponse,
RunUpdateCallback,
Expand Down Expand Up @@ -3346,6 +3349,91 @@ class Database extends common.GrpcServiceObject {
return proxyStream as NodeJS.ReadableStream;
}

/**
* Apply Blind Write of the Mutations
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
*
* writeAtLeastOnce(Blind Write) requests are not replay protected, meaning that it may apply mutations more
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
* than once, if the mutations are not idempotent, this may lead to a failure being
* reported when the mutation was applied once. Replays of non-idempotent mutations may
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
* have undesirable effects. For example, replays of an insert mutation may produce an
* already exists error. For this reason, most users of the library will prefer to use
* {@link runTransaction} instead.
*
* However, {@link writeAtLeastOnce()} requires only a single RPC, whereas {@link runTransaction()}
* requires two RPCs (one of which may be performed in advance), and so this method may be
* appropriate for latency sensitive and/or high throughput blind writing.
*
* We recommend structuring your mutation groups to be idempotent to avoid this issue.
*
* @param {Mutation} [mutation] Mutations to be applied.
* @param {CallOptions} [options] Options object for blind write request.
* @param {CommitCallback} [callback] Callback function for blind write request.
*
* @returns {Promise}
*
* @example
* ```
* const {Spanner} = require('@google-cloud/spanner');
* const spanner = new Spanner();
*
* const instance = spanner.instance('my-instance');
* const database = instance.database('my-database');
* const mutation = new Mutation();
* mutation.insert('Singers', {
* SingerId: '1',
* FirstName: 'Marc',
* LastName: 'Richards',
* });
* mutation.update('Singers', {
* SingerId: '1',
* FirstName: 'John',
* LastName: 'Richards',
* });
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
*
* try {
* const [response, err] = await database.writeAtLeastOnce(mutation, {});
* console.log(response.commitTimestamp);
* } catch(err) {
* console.log("Error: ", err);
* }
* ```
*/
writeAtLeastOnce(mutation: Mutation): Promise<CommitResponse>;
writeAtLeastOnce(
mutation: Mutation,
options: CallOptions
): Promise<CommitResponse>;
writeAtLeastOnce(mutation: Mutation, callback: CommitCallback): void;
writeAtLeastOnce(
mutation: Mutation,
optionsOrCallback?: CallOptions | CommitCallback,
callback?: CommitCallback
): void | Promise<CommitResponse> {
const cb =
typeof optionsOrCallback === 'function'
? (optionsOrCallback as CommitCallback)
: callback;
const options =
typeof optionsOrCallback === 'object' && optionsOrCallback
? (optionsOrCallback as CallOptions)
: {};
this.pool_.getSession((err, session?, transaction?) => {
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
cb
? this.writeAtLeastOnce(mutation, cb)
: this.writeAtLeastOnce(mutation, options);
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
return;
}
if (err) {
cb!(err as grpc.ServiceError);
return;
}
this._releaseOnEnd(session!, transaction!);
transaction?.setQueuedMutations(mutation.proto());
return transaction?.commit(options, cb!);
});
}

/**
* Create a Session object.
*
Expand Down Expand Up @@ -3674,6 +3762,7 @@ callbackifyAll(Database, {
'batchCreateSessions',
'batchTransaction',
'batchWriteAtLeastOnce',
'writeAtLeastOnce',
'close',
'createBatchTransaction',
'createSession',
Expand Down
115 changes: 115 additions & 0 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,17 @@ export class Transaction extends Dml {
return undefined;
}

/**
* This method updates the _queuedMutations property of the transaction.
*
* @public
*
* @param {spannerClient.spanner.v1.Mutation[]} [mutation]
*/
setQueuedMutations(mutation: spannerClient.spanner.v1.Mutation[]): void {
this._queuedMutations = mutation;
}

/**
* @typedef {object} CommitOptions
* @property {google.spanner.v1.IRequestOptions} requestOptions The request options to include
Expand Down Expand Up @@ -2530,6 +2541,110 @@ function buildDeleteMutation(
return mutation as spannerClient.spanner.v1.Mutation;
}

/**
* Mutation represent a set of changes to be applied atomically to a cloud spanner
* database with a {@link Transaction}.
* Mutations are used to insert, update, upsert(insert or update), replace, or
* delete rows within tables.
*
* Mutations are added to a {@link Transaction} and are not executed until the
* transaction is committed via {@link Transaction#commit}.
*
* If the transaction is rolled back or encounters an error, the mutations are
* discarded.
*
* @example
* ```
* const {Spanner, Mutation} = require('@google-cloud/spanner');
* const spanner = new Spanner();
*
* const instance = spanner.instance('my-instance');
* const database = instance.database('my-database');
*
* const mutation = new Mutation();
* mutation.insert('Singers', {SingerId: '123', FirstName: 'David'});
* mutation.update('Singers', {SingerId: '123', FirstName: 'Marc'});
*
* try {
* database.writeAtLeastOnce(mutation, (err, res) => {
* console.log("RESPONSE: ", res);
* });
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
* } catch(err) {
* console.log("ERROR: ", err);
* }
* ```
*/
export class Mutation {
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
/**
* An array to store the mutations.
*/
private _queuedMutations: spannerClient.spanner.v1.Mutation[];

/**
* Creates a new Mutation object.
*/
constructor() {
this._queuedMutations = [];
}

/**
* Adds an insert operation to the mutation set.
* @param {string} table. The name of the table to insert into.
* @param {object|object[]} rows. A single row object or an array of row objects to insert.
*/
insert(table: string, rows: object | object[]): void {
this._queuedMutations.push(buildMutation('insert', table, rows));
}

/**
* Adds an update operation to the mutation set.
* @param {string} table. The name of the table to update.
* @param {object|object[]} rows. A single row object or an array of row objects to update.
* Each row object must contain the primary key values to indentify the row to update.
*/
update(table: string, rows: object | object[]): void {
this._queuedMutations.push(buildMutation('update', table, rows));
}

/**
* Adds an upsert operation to the mutation set.
* An upsert will insert a new row if it does not exist or update an existing row if it does.
* @param {string} table. The name of the table to upsert.
* @param {object|object[]} rows. A single row object or an array of row objects to upsert.
*/
upsert(table: string, rows: object | object[]): void {
this._queuedMutations.push(buildMutation('insertOrUpdate', table, rows));
}

/**
* Adds a replace operation to the mutation set.
* A replace operation deletes the existing row (if it exists) and inserts the new row.
* @param {string} table. The name of the table to replace.
* @param {object|object[]} rows. A single row object or an array of row objects to replace.
*/
replace(table: string, rows: object | object[]): void {
this._queuedMutations.push(buildMutation('replace', table, rows));
}

/**
* Adds a deleteRows operation to the mutation set.
* This operation deletes rows from the specified table based on their primary keys.
* @param {string} table. The name of the table to deleteRows from.
* @param {key[]} key. An array of key objects, each represeting the primary key of a row to delete.
*/
deleteRows(table: string, keys: Key[]): void {
this._queuedMutations.push(buildDeleteMutation(table, keys));
}

/**
* Returns the internal representation of the queued mutations as a protobuf message.
* @returns {spannerClient.spanner.v1.Mutation[]}. The protobuf message representing the mutations.
*/
proto(): spannerClient.spanner.v1.Mutation[] {
return this._queuedMutations;
}
}

/**
* A group of mutations to be committed together.
* Related mutations should be placed in a group.
Expand Down
105 changes: 104 additions & 1 deletion test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ import {protos} from '../src';
import * as inst from '../src/instance';
import RequestOptions = google.spanner.v1.RequestOptions;
import EncryptionType = google.spanner.admin.database.v1.RestoreDatabaseEncryptionConfig.EncryptionType;
import {BatchWriteOptions} from '../src/transaction';
import {
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
BatchWriteOptions,
CommitCallback,
CommitOptions,
Mutation,
} from '../src/transaction';
import {error} from 'is';

let promisified = false;
const fakePfy = extend({}, pfy, {
Expand Down Expand Up @@ -125,17 +131,31 @@ class FakeTable {
class FakeTransaction extends EventEmitter {
calledWith_: IArguments;
_options!: google.spanner.v1.ITransactionOptions;
private _queuedMutations: google.spanner.v1.Mutation[];
constructor(options) {
super();
this._options = options;
this.calledWith_ = arguments;
this._queuedMutations = [];
}
begin() {}
end() {}
runStream(): Transform {
return through.obj();
}
runUpdate() {}
setQueuedMutations(mutation) {
this._queuedMutations = mutation;
}
commit(
options?: CommitOptions,
callback?: CommitCallback
): void | Promise<google.spanner.v1.ICommitResponse> {
if (callback) {
callback(null, {commitTimestamp: {seconds: 1, nanos: 0}});
}
return Promise.resolve({commitTimestamp: {seconds: 1, nanos: 0}});
}
}

let fakeTransactionRunner: FakeTransactionRunner;
Expand Down Expand Up @@ -762,6 +782,89 @@ describe('Database', () => {
});
});

describe('writeAtLeastOnce', () => {
const mutation = new Mutation();
mutation.insert('MyTable', {
Key: 'k3',
Thing: 'xyz',
});

const SESSION = new FakeSession();
const RESPONSE = {commitTimestamp: {seconds: 1, nanos: 0}};
const TRANSACTION = new FakeTransaction(
{} as google.spanner.v1.TransactionOptions.ReadWrite
);

let pool: FakeSessionPool;

beforeEach(() => {
pool = database.pool_;
(sandbox.stub(pool, 'getSession') as sinon.SinonStub).callsFake(
callback => {
callback(null, SESSION, TRANSACTION);
}
);
});

it('should return any errors getting a session', done => {
const fakeErr = new Error('err');

(pool.getSession as sinon.SinonStub).callsFake(callback =>
callback(fakeErr, null, null)
);

database.writeAtLeastOnce(mutation, err => {
assert.deepStrictEqual(err, fakeErr);
done();
});
});

it('should return successful CommitResponse when passing an empty mutation', done => {
const fakeMutation = new Mutation();
try {
database.writeAtLeastOnce(fakeMutation, (err, response) => {
assert.ifError(err);
assert.deepStrictEqual(
response.commitTimestamp,
RESPONSE.commitTimestamp
);
});
done();
} catch (error) {
assert(error instanceof Error);
}
});

it('should return an error when passing null mutation', done => {
const fakeError = new Error('err');
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
try {
database.writeAtLeastOnce(null, (err, res) => {});
} catch (err) {
(err as grpc.ServiceError).message.includes(
"Cannot read properties of null (reading 'proto')"
);
done();
}
});

it('should return CommitResponse on successful write using Callback', done => {
database.writeAtLeastOnce(mutation, (err, res) => {
assert.deepStrictEqual(err, null);
assert.deepStrictEqual(res, RESPONSE);
done();
});
});

it('should return CommitResponse on successful write using await', async () => {
sinon.stub(database, 'writeAtLeastOnce').resolves([RESPONSE]);
const [response, err] = await database.writeAtLeastOnce(mutation, {});
assert.deepStrictEqual(
response.commitTimestamp,
RESPONSE.commitTimestamp
);
});
});

describe('close', () => {
const FAKE_ID = 'a/c/b/d';

Expand Down
Loading