Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): add support for batchWrite #2054

Merged
merged 22 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-spanner/tre
| Backups-restore | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups-restore.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups-restore.js,samples/README.md) |
| Backups-update | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups-update.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups-update.js,samples/README.md) |
| Backups | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups.js,samples/README.md) |
| Batch Write | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch-write.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/batch-write.js,samples/README.md) |
| Batch | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/batch.js,samples/README.md) |
| CRUD | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/crud.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/crud.js,samples/README.md) |
| Creates a new database with a specific default leader | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/database-create-with-default-leader.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/database-create-with-default-leader.js,samples/README.md) |
Expand Down
18 changes: 18 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and automatic, synchronous replication for high availability.
* [Backups-restore](#backups-restore)
* [Backups-update](#backups-update)
* [Backups](#backups)
* [Batch Write](#batch-write)
* [Batch](#batch)
* [CRUD](#crud)
* [Creates a new database with a specific default leader](#creates-a-new-database-with-a-specific-default-leader)
Expand Down Expand Up @@ -354,6 +355,23 @@ __Usage:__



### Batch Write

View the [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch-write.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/batch-write.js,samples/README.md)

__Usage:__


`node batch-write.js <INSTANCE_ID> <DATABASE_ID> <PROJECT_ID>`


-----




### Batch

View the [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch.js).
Expand Down
110 changes: 110 additions & 0 deletions samples/batch-write.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Copyright 2024 Google LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// sample-metadata:
// title: Batch Write
// usage: node batch-write.js <INSTANCE_ID> <DATABASE_ID> <PROJECT_ID>

'use strict';

async function main(
instanceId = 'my-instance',
databaseId = 'my-database',
projectId = 'my-project-id'
) {
// [START spanner_batch_write_at_least_once]

// Imports the Google Cloud client library
const {Spanner, MutationGroup} = require('@google-cloud/spanner');

/**
* TODO(developer): Uncomment the following lines before running the sample.
*/
// const instanceId = 'my-instance';
// const databaseId = 'my-database';
// const projectId = 'my-project-id';

// Creates a client
const spanner = new Spanner({
projectId: projectId,
});

// Gets a reference to a Cloud Spanner instance and database
const instance = spanner.instance(instanceId);
const database = instance.database(databaseId);

// Create Mutation Groups
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
const mutationGroup1 = new MutationGroup();
mutationGroup1.insert('Singers', {
SingerId: 1,
FirstName: 'Scarlet',
LastName: 'Terry',
});

const mutationGroup2 = new MutationGroup();
mutationGroup2.insert('Singers', {
SingerId: 2,
FirstName: 'Marc',
});
mutationGroup2.insert('Singers', {
SingerId: 3,
FirstName: 'Catalina',
LastName: 'Smith',
});
mutationGroup2.insert('Albums', {
AlbumId: 1,
SingerId: 2,
AlbumTitle: 'Total Junk',
});
mutationGroup2.insert('Albums', {
AlbumId: 2,
SingerId: 3,
AlbumTitle: 'Go, Go, Go',
});

const options = {
transactionTag: 'batch-write-tag',
};

try {
database
.batchWrite([mutationGroup1, mutationGroup2], options)
.on('error', console.error)
.on('data', response => {
console.log(
`Mutation group indexes ${
response.indexes
} have been applied with commit timestamp ${Spanner.timestamp(
response.commitTimestamp
).toJSON()}`
);
})
.on('end', () => {
console.log('Mutations applied successfully');
});
} catch (err) {
console.log(
`Mutation group indexes ${err.indexes} could not be applied with error code ${err.status.code} and error message ${err.status.message}`
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
);
}
// [END spanner_batch_write_at_least_once]
}

process.on('unhandledRejection', err => {
console.error(err.message);
process.exitCode = 1;
});

main(...process.argv.slice(2));
13 changes: 13 additions & 0 deletions samples/system-test/spanner.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const requestTagCommand = 'node request-tag.js';
const timestampCmd = 'node timestamp.js';
const structCmd = 'node struct.js';
const dmlCmd = 'node dml.js';
const batchWriteCmd = 'node batch-write.js';
const datatypesCmd = 'node datatypes.js';
const backupsCmd = 'node backups.js';
const instanceCmd = 'node instance.js';
Expand Down Expand Up @@ -967,6 +968,18 @@ describe('Autogenerated Admin Clients', () => {
assert.match(output, new RegExp('Virginia Watson'));
});

// batch_write
it('should perform CRUD operations using batch write', async () => {
const output = execSync(
`${batchWriteCmd} ${INSTANCE_ID} ${DATABASE_ID} ${PROJECT_ID}`
).toString();
assert.match(
output,
/Mutation group indexes [\d,]+ have been applied with commit timestamp \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/
);
assert.include(output, 'Mutations applied successfully');
});

// create_table_with_datatypes
it('should create Venues example table with supported datatype columns', async () => {
const output = execSync(
Expand Down
108 changes: 108 additions & 0 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@
} from './session-pool';
import {CreateTableCallback, CreateTableResponse, Table} from './table';
import {
BatchWriteOptions,
ExecuteSqlRequest,
MutationGroup,
RunCallback,
RunResponse,
RunUpdateCallback,
Expand Down Expand Up @@ -1531,7 +1533,7 @@
): void;
async getDatabaseDialect(
optionsOrCallback?: CallOptions | GetDatabaseDialectCallback,
cb?: GetDatabaseDialectCallback

Check warning on line 1536 in src/database.ts

View workflow job for this annotation

GitHub Actions / lint

'cb' is defined but never used
): Promise<
| EnumKey<typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect>
| undefined
Expand Down Expand Up @@ -3210,6 +3212,110 @@
}
}
}

/**
* Get a list of {@link BatchWriteResponse} as a readable object stream.
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
*
* @method Spanner#batchWrite
*
* @param {MutationGroup[]} [mutationGroups] The group of mutations to be applied.
* @param {BatchWriteOptions} [options] Options object for batch write request.
*
* @returns {ReadableStream} An object stream which emits
* {@link protos.google.spanner.v1.BatchWriteResponse|BatchWriteResponse}
* on 'data' event.
*
* @example
* ```
* const {Spanner} = require('@google-cloud/spanner');
* const spanner = new Spanner();
*
* const instance = spanner.instance('my-instance');
* const database = instance.database('my-database');
* const mutationGroup = new MutationGroup();
* mutationGroup.insert('Singers', {
* SingerId: '1',
* FirstName: 'Marc',
* LastName: 'Richards',
* });
*
* database.batchWrite([mutationGroup])
* .on('error', console.error)
* .on('data', response => {
* console.log('response: ', response);
* })
* .on('end', () => {
* console.log('Request completed successfully');
olavloite marked this conversation as resolved.
Show resolved Hide resolved
* });
*
* //-
* // If you anticipate many results, you can end a stream early to prevent
* // unnecessary processing and API requests.
* //-
* database.batchWrite()
* .on('data', response => {
* this.end();
* });
* ```
*/
batchWrite(
alkatrivedi marked this conversation as resolved.
Show resolved Hide resolved
mutationGroups: MutationGroup[],
options?: BatchWriteOptions
): NodeJS.ReadableStream {
const proxyStream: Transform = through.obj();

this.pool_.getSession((err, session) => {
if (err) {
proxyStream.destroy(err);
return;
}
const gaxOpts = extend(true, {}, options?.gaxOptions);
const reqOpts = Object.assign(
{} as spannerClient.spanner.v1.BatchWriteRequest,
{
session: session!.formattedName_!,
mutationGroups: mutationGroups.map(mg => mg.proto()),
requestOptions: options?.requestOptions,
}
);
let dataReceived = false;
let dataStream = this.requestStream({
client: 'SpannerClient',
method: 'batchWrite',
reqOpts,
gaxOpts,
headers: this.resourceHeader_,
});
dataStream
.once('data', () => (dataReceived = true))
.once('error', err => {
if (
!dataReceived &&
isSessionNotFoundError(err as grpc.ServiceError)
) {
// If there's a 'Session not found' error and we have not yet received
// any data, we can safely retry the writes on a new session.
// Register the error on the session so the pool can discard it.
if (session) {
session.lastError = err as grpc.ServiceError;
}
// Remove the current data stream from the end user stream.
dataStream.unpipe(proxyStream);
dataStream.end();
// Create a new stream and add it to the end user stream.
dataStream = this.batchWrite(mutationGroups, options);
dataStream.pipe(proxyStream);
} else {
proxyStream.destroy(err);
}
})
.once('end', () => this.pool_.release(session!))
.pipe(proxyStream);
});

return proxyStream as NodeJS.ReadableStream;
}

/**
* Create a Session object.
*
Expand Down Expand Up @@ -3515,6 +3621,7 @@
promisifyAll(Database, {
exclude: [
'batchTransaction',
'batchWrite',
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
'getRestoreInfo',
'getState',
'getDatabaseDialect',
Expand All @@ -3536,6 +3643,7 @@
'create',
'batchCreateSessions',
'batchTransaction',
'batchWrite',
surbhigarg92 marked this conversation as resolved.
Show resolved Hide resolved
'close',
'createBatchTransaction',
'createSession',
Expand Down
16 changes: 15 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,12 @@ import {
import {Session} from './session';
import {SessionPool} from './session-pool';
import {Table} from './table';
import {PartitionedDml, Snapshot, Transaction} from './transaction';
import {
MutationGroup,
PartitionedDml,
Snapshot,
Transaction,
} from './transaction';
import grpcGcpModule = require('grpc-gcp');
const grpcGcp = grpcGcpModule(grpc);
import * as v1 from './v1';
Expand Down Expand Up @@ -2011,6 +2016,15 @@ export {Snapshot};
*/
export {Transaction};

/**
* {@link MutationGroup} class.
*
* @name Spanner.MutationGroup
* @see MutationGroup
* @type {Constructor}
*/
export {MutationGroup};

/**
* @type {object}
* @property {constructor} DatabaseAdminClient
Expand Down
Loading
Loading