Skip to content

Commit

Permalink
feat(spanner): add support for batchWrite (googleapis#2054)
Browse files Browse the repository at this point in the history
This PR contains Batch Write API implementation in Spanner.
  • Loading branch information
alkatrivedi authored Jun 20, 2024
1 parent 47f8928 commit 06aab6e
Show file tree
Hide file tree
Showing 9 changed files with 708 additions and 35 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-spanner/tre
| Backups-restore | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups-restore.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups-restore.js,samples/README.md) |
| Backups-update | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups-update.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups-update.js,samples/README.md) |
| Backups | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups.js,samples/README.md) |
| Batch Write | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch-write.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/batch-write.js,samples/README.md) |
| Batch | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/batch.js,samples/README.md) |
| CRUD | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/crud.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/crud.js,samples/README.md) |
| Creates a new database with a specific default leader | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/database-create-with-default-leader.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/database-create-with-default-leader.js,samples/README.md) |
Expand Down
18 changes: 18 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and automatic, synchronous replication for high availability.
* [Backups-restore](#backups-restore)
* [Backups-update](#backups-update)
* [Backups](#backups)
* [Batch Write](#batch-write)
* [Batch](#batch)
* [CRUD](#crud)
* [Creates a new database with a specific default leader](#creates-a-new-database-with-a-specific-default-leader)
Expand Down Expand Up @@ -354,6 +355,23 @@ __Usage:__



### Batch Write

View the [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch-write.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/batch-write.js,samples/README.md)

__Usage:__


`node batch-write.js <INSTANCE_ID> <DATABASE_ID> <PROJECT_ID>`


-----




### Batch

View the [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch.js).
Expand Down
123 changes: 123 additions & 0 deletions samples/batch-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/**
* Copyright 2024 Google LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// sample-metadata:
// title: Batch Write
// usage: node batch-write.js <INSTANCE_ID> <DATABASE_ID> <PROJECT_ID>

'use strict';

async function main(
instanceId = 'my-instance',
databaseId = 'my-database',
projectId = 'my-project-id'
) {
// [START spanner_batch_write_at_least_once]

// Imports the Google Cloud client library
const {Spanner, MutationGroup} = require('@google-cloud/spanner');

/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const projectId = 'my-project-id';

// Creates a client
const spanner = new Spanner({
projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Create Mutation Groups
/**
* Related mutations should be placed in a group, such as insert mutations for both a parent and a child row.
* A group must contain related mutations.
* Please see {@link https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.BatchWriteRequest.MutationGroup}
* for more details and examples.
*/
const mutationGroup1 = new MutationGroup();
mutationGroup1.insert('Singers', {
SingerId: 1,
FirstName: 'Scarlet',
LastName: 'Terry',
});

const mutationGroup2 = new MutationGroup();
mutationGroup2.insert('Singers', {
SingerId: 2,
FirstName: 'Marc',
});
mutationGroup2.insert('Singers', {
SingerId: 3,
FirstName: 'Catalina',
LastName: 'Smith',
});
mutationGroup2.insert('Albums', {
AlbumId: 1,
SingerId: 2,
AlbumTitle: 'Total Junk',
});
mutationGroup2.insert('Albums', {
AlbumId: 2,
SingerId: 3,
AlbumTitle: 'Go, Go, Go',
});

const options = {
transactionTag: 'batch-write-tag',
};

try {
database
.batchWriteAtLeastOnce([mutationGroup1, mutationGroup2], options)
.on('error', console.error)
.on('data', response => {
// Check the response code of each response to determine whether the mutation group(s) were applied successfully.
if (response.status.code === 0) {
console.log(
`Mutation group indexes ${
response.indexes
}, have been applied with commit timestamp ${Spanner.timestamp(
response.commitTimestamp
).toJSON()}`
);
}
// Mutation groups that fail to commit trigger a response with a non-zero status code.
else {
console.log(
`Mutation group indexes ${response.indexes}, could not be applied with error code ${response.status.code}, and error message ${response.status.message}`
);
}
})
.on('end', () => {
console.log('Request completed successfully');
});
} catch (err) {
console.log(err);
}
// [END spanner_batch_write_at_least_once]
}

process.on('unhandledRejection', err => {
console.error(err.message);
process.exitCode = 1;
});

main(...process.argv.slice(2));
22 changes: 22 additions & 0 deletions samples/system-test/spanner.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const requestTagCommand = 'node request-tag.js';
const timestampCmd = 'node timestamp.js';
const structCmd = 'node struct.js';
const dmlCmd = 'node dml.js';
const batchWriteCmd = 'node batch-write.js';
const datatypesCmd = 'node datatypes.js';
const backupsCmd = 'node backups.js';
const instanceCmd = 'node instance.js';
Expand Down Expand Up @@ -967,6 +968,27 @@ describe('Autogenerated Admin Clients', () => {
assert.match(output, new RegExp('Virginia Watson'));
});

// batch_write
it('should perform CRUD operations using batch write', async () => {
const output = execSync(
`${batchWriteCmd} ${INSTANCE_ID} ${DATABASE_ID} ${PROJECT_ID}`
).toString();

const successRegex =
/Mutation group indexes [\d,]+ have been applied with commit timestamp \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/;
const failureRegex =
/Mutation group indexes [\d,]+, could not be applied with error code \d+, and error message .+/;

const successMatch = successRegex.test(output);
const errorMatch = failureRegex.test(output);

if (successMatch || errorMatch) {
assert.include(output, 'Request completed successfully');
} else {
assert.ifError(output);
}
});

// create_table_with_datatypes
it('should create Venues example table with supported datatype columns', async () => {
const output = execSync(
Expand Down
121 changes: 121 additions & 0 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ import {
} from './session-pool';
import {CreateTableCallback, CreateTableResponse, Table} from './table';
import {
BatchWriteOptions,
ExecuteSqlRequest,
MutationGroup,
RunCallback,
RunResponse,
RunUpdateCallback,
Expand Down Expand Up @@ -3210,6 +3212,123 @@ class Database extends common.GrpcServiceObject {
}
}
}

/**
* Write a batch of mutations to Spanner.
*
* All mutations in a group are committed atomically. However, mutations across
* groups can be committed non-atomically in an unspecified order and thus, they
* must be independent of each other. Partial failure is possible, i.e., some groups
* may have been committed successfully, while some may have failed. The results of
* individual batches are streamed into the response as the batches are applied.
*
* batchWriteAtLeastOnce requests are not replay protected, meaning that each mutation group may
* be applied more than once. Replays of non-idempotent mutations may have undesirable
* effects. For example, replays of an insert mutation may produce an already exists
* error or if you use generated or commit timestamp-based keys, it may result in additional
* rows being added to the mutation's table. We recommend structuring your mutation groups to
* be idempotent to avoid this issue.
*
* @method Spanner#batchWriteAtLeastOnce
*
* @param {MutationGroup[]} [mutationGroups] The group of mutations to be applied.
* @param {BatchWriteOptions} [options] Options object for batch write request.
*
* @returns {ReadableStream} An object stream which emits
* {@link protos.google.spanner.v1.BatchWriteResponse|BatchWriteResponse}
* on 'data' event.
*
* @example
* ```
* const {Spanner} = require('@google-cloud/spanner');
* const spanner = new Spanner();
*
* const instance = spanner.instance('my-instance');
* const database = instance.database('my-database');
* const mutationGroup = new MutationGroup();
* mutationGroup.insert('Singers', {
* SingerId: '1',
* FirstName: 'Marc',
* LastName: 'Richards',
* });
*
* database.batchWriteAtLeastOnce([mutationGroup])
* .on('error', console.error)
* .on('data', response => {
* console.log('response: ', response);
* })
* .on('end', () => {
* console.log('Request completed successfully');
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* //-
* database.batchWriteAtLeastOnce()
* .on('data', response => {
* this.end();
* });
* ```
*/
batchWriteAtLeastOnce(
mutationGroups: MutationGroup[],
options?: BatchWriteOptions
): NodeJS.ReadableStream {
const proxyStream: Transform = through.obj();

this.pool_.getSession((err, session) => {
if (err) {
proxyStream.destroy(err);
return;
}
const gaxOpts = extend(true, {}, options?.gaxOptions);
const reqOpts = Object.assign(
{} as spannerClient.spanner.v1.BatchWriteRequest,
{
session: session!.formattedName_!,
mutationGroups: mutationGroups.map(mg => mg.proto()),
requestOptions: options?.requestOptions,
}
);
let dataReceived = false;
let dataStream = this.requestStream({
client: 'SpannerClient',
method: 'batchWrite',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
});
dataStream
.once('data', () => (dataReceived = true))
.once('error', err => {
if (
!dataReceived &&
isSessionNotFoundError(err as grpc.ServiceError)
) {
// If there's a 'Session not found' error and we have not yet received
// any data, we can safely retry the writes on a new session.
// Register the error on the session so the pool can discard it.
if (session) {
session.lastError = err as grpc.ServiceError;
}
// Remove the current data stream from the end user stream.
dataStream.unpipe(proxyStream);
dataStream.end();
// Create a new stream and add it to the end user stream.
dataStream = this.batchWriteAtLeastOnce(mutationGroups, options);
dataStream.pipe(proxyStream);
} else {
proxyStream.destroy(err);
}
})
.once('end', () => this.pool_.release(session!))
.pipe(proxyStream);
});

return proxyStream as NodeJS.ReadableStream;
}

/**
* Create a Session object.
*
Expand Down Expand Up @@ -3515,6 +3634,7 @@ class Database extends common.GrpcServiceObject {
promisifyAll(Database, {
exclude: [
'batchTransaction',
'batchWriteAtLeastOnce',
'getRestoreInfo',
'getState',
'getDatabaseDialect',
Expand All @@ -3536,6 +3656,7 @@ callbackifyAll(Database, {
'create',
'batchCreateSessions',
'batchTransaction',
'batchWriteAtLeastOnce',
'close',
'createBatchTransaction',
'createSession',
Expand Down
16 changes: 15 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ import {
import {Session} from './session';
import {SessionPool} from './session-pool';
import {Table} from './table';
import {PartitionedDml, Snapshot, Transaction} from './transaction';
import {
MutationGroup,
PartitionedDml,
Snapshot,
Transaction,
} from './transaction';
import grpcGcpModule = require('grpc-gcp');
const grpcGcp = grpcGcpModule(grpc);
import * as v1 from './v1';
Expand Down Expand Up @@ -2011,6 +2016,15 @@ export {Snapshot};
*/
export {Transaction};

/**
* {@link MutationGroup} class.
*
* @name Spanner.MutationGroup
* @see MutationGroup
* @type {Constructor}
*/
export {MutationGroup};

/**
* @type {object}
* @property {constructor} DatabaseAdminClient
Expand Down
Loading

0 comments on commit 06aab6e

Please sign in to comment.