Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for blind writes #2065

Merged
merged 29 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7ca36ee
sample: blind write
alkatrivedi Jun 10, 2024
8bded80
sample: blind write
alkatrivedi Jun 10, 2024
16330f0
refactor
alkatrivedi Jun 12, 2024
c12fa1d
Merge branch 'googleapis:main' into blind-writes
alkatrivedi Jun 18, 2024
01098c1
add class mutation
alkatrivedi Jun 18, 2024
e848eaf
add class mutation
alkatrivedi Jun 18, 2024
fb6c892
Merge branch 'googleapis:main' into blind-writes
alkatrivedi Jun 20, 2024
934863d
feat: blind-writes
alkatrivedi Jun 20, 2024
acc2956
Merge branch 'main' into blind-writes
alkatrivedi Jun 21, 2024
3a5d4f6
refactor
alkatrivedi Jun 21, 2024
431a5bd
fix: lint errors
alkatrivedi Jun 21, 2024
47545d9
refactor
alkatrivedi Jun 21, 2024
2fd72f9
fix: lint errors
alkatrivedi Jun 21, 2024
8a4f973
fix: header checks
alkatrivedi Jun 21, 2024
21159ce
refactor the blind write
alkatrivedi Jun 21, 2024
c154d12
Merge branch 'main' into blind-writes
alkatrivedi Jun 24, 2024
74a0188
feat: add support for blind writes
release-please[bot] Jun 24, 2024
aea887b
Merge branch 'googleapis:main' into blind-writes
alkatrivedi Jul 18, 2024
c4a2ba0
fix: presubmit error
alkatrivedi Jul 18, 2024
0a1cb15
Merge branch 'main' into blind-writes
surbhigarg92 Jul 19, 2024
9d29938
Merge branch 'main' into blind-writes
surbhigarg92 Jul 19, 2024
6a38244
Merge branch 'main' into blind-writes
alkatrivedi Jul 22, 2024
5596859
refactor: docs of the method writeAtLeastOnce
alkatrivedi Jul 24, 2024
d234007
test: unit test using mockspanner
alkatrivedi Jul 25, 2024
cb58537
fix: lint errors
alkatrivedi Jul 25, 2024
ea778ed
docs refactor
alkatrivedi Jul 25, 2024
24f38be
Merge branch 'main' into blind-writes
alkatrivedi Jul 26, 2024
72940ca
refactor
alkatrivedi Jul 26, 2024
2b204fe
refactor
alkatrivedi Jul 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ import {
import {CreateTableCallback, CreateTableResponse, Table} from './table';
import {
BatchWriteOptions,
CommitCallback,
CommitResponse,
ExecuteSqlRequest,
MutationGroup,
MutationSet,
RunCallback,
RunResponse,
RunUpdateCallback,
Expand Down Expand Up @@ -3346,6 +3349,95 @@ class Database extends common.GrpcServiceObject {
return proxyStream as NodeJS.ReadableStream;
}

/**
* Write mutations using a single RPC invocation without replay protection.
*
* writeAtLeastOnce writes mutations to Spanner using a single Commit RPC.
* These requests are not replay protected, meaning that it may apply mutations more
* than once, if the mutations are not idempotent, this may lead to a failure being
* reported when the mutation was applied once. Replays non-idempotent mutations may
* have undesirable effects. For example, replays of an insert mutation may produce an
* already exists error. For this reason, most users of the library will prefer to use
* {@link runTransaction} instead.
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
*
* However, {@link writeAtLeastOnce()} requires only a single RPC, whereas {@link runTransaction()}
* requires two RPCs (one of which may be performed in advance), and so this method may be
* appropriate for latency sensitive and/or high throughput blind writing.
*
* We recommend structuring your mutation set to be idempotent to avoid this issue.
*
* @param {MutationSet} [mutations] Set of Mutations to be applied.
* @param {CallOptions} [options] Options object for blind write request.
* @param {CommitCallback} [callback] Callback function for blind write request.
*
* @returns {Promise}
*
* @example
* ```
* const {Spanner} = require('@google-cloud/spanner');
* const spanner = new Spanner();
*
* const instance = spanner.instance('my-instance');
* const database = instance.database('my-database');
* const mutations = new MutationSet();
* mutations.upsert('Singers', {
* SingerId: 1,
* FirstName: 'Scarlet',
* LastName: 'Terry',
* });
* mutations.upsert('Singers', {
* SingerId: 2,
* FirstName: 'Marc',
* LastName: 'Richards',
* });
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
*
* try {
* const [response, err] = await database.writeAtLeastOnce(mutations, {});
* console.log(response.commitTimestamp);
* } catch(err) {
* console.log("Error: ", err);
* }
* ```
*/
writeAtLeastOnce(mutations: MutationSet): Promise<CommitResponse>;
writeAtLeastOnce(
mutations: MutationSet,
options: CallOptions
): Promise<CommitResponse>;
writeAtLeastOnce(mutations: MutationSet, callback: CommitCallback): void;
writeAtLeastOnce(
mutations: MutationSet,
options: CallOptions,
callback: CommitCallback
): void;
writeAtLeastOnce(
mutations: MutationSet,
optionsOrCallback?: CallOptions | CommitCallback,
callback?: CommitCallback
): void | Promise<CommitResponse> {
const cb =
typeof optionsOrCallback === 'function'
? (optionsOrCallback as CommitCallback)
: callback;
const options =
typeof optionsOrCallback === 'object' && optionsOrCallback
? (optionsOrCallback as CallOptions)
: {};
this.pool_.getSession((err, session?, transaction?) => {
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
this.writeAtLeastOnce(mutations, options, cb!);
return;
}
if (err) {
cb!(err as grpc.ServiceError);
return;
}
this._releaseOnEnd(session!, transaction!);
transaction?.setQueuedMutations(mutations.proto());
return transaction?.commit(options, cb!);
});
}

/**
* Create a Session object.
*
Expand Down Expand Up @@ -3674,6 +3766,7 @@ callbackifyAll(Database, {
'batchCreateSessions',
'batchTransaction',
'batchWriteAtLeastOnce',
'writeAtLeastOnce',
'close',
'createBatchTransaction',
'createSession',
Expand Down
10 changes: 10 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ import {SessionPool} from './session-pool';
import {Table} from './table';
import {
MutationGroup,
MutationSet,
PartitionedDml,
Snapshot,
Transaction,
Expand Down Expand Up @@ -2025,6 +2026,15 @@ export {Transaction};
*/
export {MutationGroup};

/**
* {@link MutationSet} class.
*
* @name Spanner.MutationSet
* @see MutationSet
* @type {Constructor}
*/
export {MutationSet};

/**
* @type {object}
* @property {constructor} DatabaseAdminClient
Expand Down
115 changes: 115 additions & 0 deletions src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1854,6 +1854,17 @@ export class Transaction extends Dml {
return undefined;
}

/**
* This method updates the _queuedMutations property of the transaction.
*
* @public
*
* @param {spannerClient.spanner.v1.Mutation[]} [mutation]
*/
setQueuedMutations(mutation: spannerClient.spanner.v1.Mutation[]): void {
this._queuedMutations = mutation;
}

/**
* @typedef {object} CommitOptions
* @property {google.spanner.v1.IRequestOptions} requestOptions The request options to include
Expand Down Expand Up @@ -2530,6 +2541,110 @@ function buildDeleteMutation(
return mutation as spannerClient.spanner.v1.Mutation;
}

/**
* MutationSet represent a set of changes to be applied atomically to a Cloud Spanner
* database with a {@link Transaction}.
* Mutations are used to insert, update, upsert(insert or update), replace, or
* delete rows within tables.
*
* Mutations are added to a {@link Transaction} and are not executed until the
* transaction is committed via {@link Transaction#commit}.
*
* If the transaction is rolled back or encounters an error, the mutations are
* discarded.
*
* @example
* ```
* const {Spanner, Mutation} = require('@google-cloud/spanner');
* const spanner = new Spanner();
*
* const instance = spanner.instance('my-instance');
* const database = instance.database('my-database');
*
* const mutations = new MutationSet();
* mutations.insert('Singers', {SingerId: '123', FirstName: 'David'});
* mutations.update('Singers', {SingerId: '123', FirstName: 'Marc'});
*
* try {
* database.writeAtLeastOnce(mutations, (err, res) => {
* console.log("RESPONSE: ", res);
* });
* } catch(err) {
* console.log("ERROR: ", err);
* }
* ```
*/
export class MutationSet {
/**
* An array to store the mutations.
*/
private _queuedMutations: spannerClient.spanner.v1.Mutation[];

/**
* Creates a new Mutation object.
*/
constructor() {
this._queuedMutations = [];
}

/**
* Adds an insert operation to the mutation set.
* @param {string} table. The name of the table to insert into.
* @param {object|object[]} rows. A single row object or an array of row objects to insert.
*/
insert(table: string, rows: object | object[]): void {
this._queuedMutations.push(buildMutation('insert', table, rows));
}

/**
* Adds an update operation to the mutation set.
* @param {string} table. The name of the table to update.
* @param {object|object[]} rows. A single row object or an array of row objects to update.
* Each row object must contain the primary key values to indentify the row to update.
*/
update(table: string, rows: object | object[]): void {
this._queuedMutations.push(buildMutation('update', table, rows));
}

/**
* Adds an upsert operation to the mutation set.
* An upsert will insert a new row if it does not exist or update an existing row if it does.
* @param {string} table. The name of the table to upsert.
* @param {object|object[]} rows. A single row object or an array of row objects to upsert.
*/
upsert(table: string, rows: object | object[]): void {
this._queuedMutations.push(buildMutation('insertOrUpdate', table, rows));
}

/**
* Adds a replace operation to the mutation set.
* A replace operation deletes the existing row (if it exists) and inserts the new row.
* @param {string} table. The name of the table to replace.
* @param {object|object[]} rows. A single row object or an array of row objects to replace.
*/
replace(table: string, rows: object | object[]): void {
this._queuedMutations.push(buildMutation('replace', table, rows));
}

/**
* Adds a deleteRows operation to the mutation set.
* This operation deletes rows from the specified table based on their primary keys.
* @param {string} table. The name of the table to deleteRows from.
* @param {key[]} key. An array of key objects, each represeting the primary key of a row to delete.
*/
deleteRows(table: string, keys: Key[]): void {
this._queuedMutations.push(buildDeleteMutation(table, keys));
}

/**
* Returns the internal representation of the queued mutations as a protobuf message.
* @returns {spannerClient.spanner.v1.Mutation[]}. The protobuf message representing the mutations.
*/
proto(): spannerClient.spanner.v1.Mutation[] {
return this._queuedMutations;
}
}

/**
* A group of mutations to be committed together.
* Related mutations should be placed in a group.
Expand Down
Loading
Loading