diff --git a/README.md b/README.md index 12878220b..14f4c8c8d 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-spanner/tre | Backups-restore | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups-restore.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups-restore.js,samples/README.md) | | Backups-update | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups-update.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups-update.js,samples/README.md) | | Backups | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/backups.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/backups.js,samples/README.md) | +| Batch Write | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch-write.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/batch-write.js,samples/README.md) | | Batch | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/batch.js,samples/README.md) | | CRUD | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/crud.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/crud.js,samples/README.md) | | Creates a new database with a specific default leader | [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/database-create-with-default-leader.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/database-create-with-default-leader.js,samples/README.md) | diff --git a/samples/README.md b/samples/README.md index d60b9cdc2..17d42d085 100644 --- a/samples/README.md +++ b/samples/README.md @@ -27,6 +27,7 @@ and automatic, synchronous replication for high availability. * [Backups-restore](#backups-restore) * [Backups-update](#backups-update) * [Backups](#backups) + * [Batch Write](#batch-write) * [Batch](#batch) * [CRUD](#crud) * [Creates a new database with a specific default leader](#creates-a-new-database-with-a-specific-default-leader) @@ -354,6 +355,23 @@ __Usage:__ +### Batch Write + +View the [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch-write.js). + +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-spanner&page=editor&open_in_editor=samples/batch-write.js,samples/README.md) + +__Usage:__ + + +`node batch-write.js ` + + +----- + + + + ### Batch View the [source code](https://github.com/googleapis/nodejs-spanner/blob/main/samples/batch.js). diff --git a/samples/batch-write.js b/samples/batch-write.js new file mode 100644 index 000000000..8f77557f4 --- /dev/null +++ b/samples/batch-write.js @@ -0,0 +1,123 @@ +/** + * Copyright 2024 Google LLC + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// sample-metadata: +// title: Batch Write +// usage: node batch-write.js + +'use strict'; + +async function main( + instanceId = 'my-instance', + databaseId = 'my-database', + projectId = 'my-project-id' +) { + // [START spanner_batch_write_at_least_once] + + // Imports the Google Cloud client library + const {Spanner, MutationGroup} = require('@google-cloud/spanner'); + + /** + * TODO(developer): Uncomment the following lines before running the sample. + */ + // const instanceId = 'my-instance'; + // const databaseId = 'my-database'; + // const projectId = 'my-project-id'; + + // Creates a client + const spanner = new Spanner({ + projectId: projectId, + }); + + // Gets a reference to a Cloud Spanner instance and database + const instance = spanner.instance(instanceId); + const database = instance.database(databaseId); + + // Create Mutation Groups + /** + * Related mutations should be placed in a group, such as insert mutations for both a parent and a child row. + * A group must contain related mutations. + * Please see {@link https://cloud.google.com/spanner/docs/reference/rpc/google.spanner.v1#google.spanner.v1.BatchWriteRequest.MutationGroup} + * for more details and examples. + */ + const mutationGroup1 = new MutationGroup(); + mutationGroup1.insert('Singers', { + SingerId: 1, + FirstName: 'Scarlet', + LastName: 'Terry', + }); + + const mutationGroup2 = new MutationGroup(); + mutationGroup2.insert('Singers', { + SingerId: 2, + FirstName: 'Marc', + }); + mutationGroup2.insert('Singers', { + SingerId: 3, + FirstName: 'Catalina', + LastName: 'Smith', + }); + mutationGroup2.insert('Albums', { + AlbumId: 1, + SingerId: 2, + AlbumTitle: 'Total Junk', + }); + mutationGroup2.insert('Albums', { + AlbumId: 2, + SingerId: 3, + AlbumTitle: 'Go, Go, Go', + }); + + const options = { + transactionTag: 'batch-write-tag', + }; + + try { + database + .batchWriteAtLeastOnce([mutationGroup1, mutationGroup2], options) + .on('error', console.error) + .on('data', response => { + // Check the response code of each response to determine whether the mutation group(s) were applied successfully. + if (response.status.code === 0) { + console.log( + `Mutation group indexes ${ + response.indexes + }, have been applied with commit timestamp ${Spanner.timestamp( + response.commitTimestamp + ).toJSON()}` + ); + } + // Mutation groups that fail to commit trigger a response with a non-zero status code. + else { + console.log( + `Mutation group indexes ${response.indexes}, could not be applied with error code ${response.status.code}, and error message ${response.status.message}` + ); + } + }) + .on('end', () => { + console.log('Request completed successfully'); + }); + } catch (err) { + console.log(err); + } + // [END spanner_batch_write_at_least_once] +} + +process.on('unhandledRejection', err => { + console.error(err.message); + process.exitCode = 1; +}); + +main(...process.argv.slice(2)); diff --git a/samples/system-test/spanner.test.js b/samples/system-test/spanner.test.js index 8ce55029b..a5e27a174 100644 --- a/samples/system-test/spanner.test.js +++ b/samples/system-test/spanner.test.js @@ -40,6 +40,7 @@ const requestTagCommand = 'node request-tag.js'; const timestampCmd = 'node timestamp.js'; const structCmd = 'node struct.js'; const dmlCmd = 'node dml.js'; +const batchWriteCmd = 'node batch-write.js'; const datatypesCmd = 'node datatypes.js'; const backupsCmd = 'node backups.js'; const instanceCmd = 'node instance.js'; @@ -967,6 +968,27 @@ describe('Autogenerated Admin Clients', () => { assert.match(output, new RegExp('Virginia Watson')); }); + // batch_write + it('should perform CRUD operations using batch write', async () => { + const output = execSync( + `${batchWriteCmd} ${INSTANCE_ID} ${DATABASE_ID} ${PROJECT_ID}` + ).toString(); + + const successRegex = + /Mutation group indexes [\d,]+ have been applied with commit timestamp \d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z/; + const failureRegex = + /Mutation group indexes [\d,]+, could not be applied with error code \d+, and error message .+/; + + const successMatch = successRegex.test(output); + const errorMatch = failureRegex.test(output); + + if (successMatch || errorMatch) { + assert.include(output, 'Request completed successfully'); + } else { + assert.ifError(output); + } + }); + // create_table_with_datatypes it('should create Venues example table with supported datatype columns', async () => { const output = execSync( diff --git a/src/database.ts b/src/database.ts index c0b5fd12f..4e9844cc4 100644 --- a/src/database.ts +++ b/src/database.ts @@ -60,7 +60,9 @@ import { } from './session-pool'; import {CreateTableCallback, CreateTableResponse, Table} from './table'; import { + BatchWriteOptions, ExecuteSqlRequest, + MutationGroup, RunCallback, RunResponse, RunUpdateCallback, @@ -3210,6 +3212,123 @@ class Database extends common.GrpcServiceObject { } } } + + /** + * Write a batch of mutations to Spanner. + * + * All mutations in a group are committed atomically. However, mutations across + * groups can be committed non-atomically in an unspecified order and thus, they + * must be independent of each other. Partial failure is possible, i.e., some groups + * may have been committed successfully, while some may have failed. The results of + * individual batches are streamed into the response as the batches are applied. + * + * batchWriteAtLeastOnce requests are not replay protected, meaning that each mutation group may + * be applied more than once. Replays of non-idempotent mutations may have undesirable + * effects. For example, replays of an insert mutation may produce an already exists + * error or if you use generated or commit timestamp-based keys, it may result in additional + * rows being added to the mutation's table. We recommend structuring your mutation groups to + * be idempotent to avoid this issue. + * + * @method Spanner#batchWriteAtLeastOnce + * + * @param {MutationGroup[]} [mutationGroups] The group of mutations to be applied. + * @param {BatchWriteOptions} [options] Options object for batch write request. + * + * @returns {ReadableStream} An object stream which emits + * {@link protos.google.spanner.v1.BatchWriteResponse|BatchWriteResponse} + * on 'data' event. + * + * @example + * ``` + * const {Spanner} = require('@google-cloud/spanner'); + * const spanner = new Spanner(); + * + * const instance = spanner.instance('my-instance'); + * const database = instance.database('my-database'); + * const mutationGroup = new MutationGroup(); + * mutationGroup.insert('Singers', { + * SingerId: '1', + * FirstName: 'Marc', + * LastName: 'Richards', + * }); + * + * database.batchWriteAtLeastOnce([mutationGroup]) + * .on('error', console.error) + * .on('data', response => { + * console.log('response: ', response); + * }) + * .on('end', () => { + * console.log('Request completed successfully'); + * }); + * + * //- + * // If you anticipate many results, you can end a stream early to prevent + * // unnecessary processing and API requests. + * //- + * database.batchWriteAtLeastOnce() + * .on('data', response => { + * this.end(); + * }); + * ``` + */ + batchWriteAtLeastOnce( + mutationGroups: MutationGroup[], + options?: BatchWriteOptions + ): NodeJS.ReadableStream { + const proxyStream: Transform = through.obj(); + + this.pool_.getSession((err, session) => { + if (err) { + proxyStream.destroy(err); + return; + } + const gaxOpts = extend(true, {}, options?.gaxOptions); + const reqOpts = Object.assign( + {} as spannerClient.spanner.v1.BatchWriteRequest, + { + session: session!.formattedName_!, + mutationGroups: mutationGroups.map(mg => mg.proto()), + requestOptions: options?.requestOptions, + } + ); + let dataReceived = false; + let dataStream = this.requestStream({ + client: 'SpannerClient', + method: 'batchWrite', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }); + dataStream + .once('data', () => (dataReceived = true)) + .once('error', err => { + if ( + !dataReceived && + isSessionNotFoundError(err as grpc.ServiceError) + ) { + // If there's a 'Session not found' error and we have not yet received + // any data, we can safely retry the writes on a new session. + // Register the error on the session so the pool can discard it. + if (session) { + session.lastError = err as grpc.ServiceError; + } + // Remove the current data stream from the end user stream. + dataStream.unpipe(proxyStream); + dataStream.end(); + // Create a new stream and add it to the end user stream. + dataStream = this.batchWriteAtLeastOnce(mutationGroups, options); + dataStream.pipe(proxyStream); + } else { + proxyStream.destroy(err); + } + }) + .once('end', () => this.pool_.release(session!)) + .pipe(proxyStream); + }); + + return proxyStream as NodeJS.ReadableStream; + } + /** * Create a Session object. * @@ -3515,6 +3634,7 @@ class Database extends common.GrpcServiceObject { promisifyAll(Database, { exclude: [ 'batchTransaction', + 'batchWriteAtLeastOnce', 'getRestoreInfo', 'getState', 'getDatabaseDialect', @@ -3536,6 +3656,7 @@ callbackifyAll(Database, { 'create', 'batchCreateSessions', 'batchTransaction', + 'batchWriteAtLeastOnce', 'close', 'createBatchTransaction', 'createSession', diff --git a/src/index.ts b/src/index.ts index 71370fce2..d6107aed1 100644 --- a/src/index.ts +++ b/src/index.ts @@ -70,7 +70,12 @@ import { import {Session} from './session'; import {SessionPool} from './session-pool'; import {Table} from './table'; -import {PartitionedDml, Snapshot, Transaction} from './transaction'; +import { + MutationGroup, + PartitionedDml, + Snapshot, + Transaction, +} from './transaction'; import grpcGcpModule = require('grpc-gcp'); const grpcGcp = grpcGcpModule(grpc); import * as v1 from './v1'; @@ -2011,6 +2016,15 @@ export {Snapshot}; */ export {Transaction}; +/** + * {@link MutationGroup} class. + * + * @name Spanner.MutationGroup + * @see MutationGroup + * @type {Constructor} + */ +export {MutationGroup}; + /** * @type {object} * @property {constructor} DatabaseAdminClient diff --git a/src/transaction.ts b/src/transaction.ts index 8f0bd55e9..479262a27 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -59,6 +59,11 @@ export interface TimestampBounds { returnReadTimestamp?: boolean; } +export interface BatchWriteOptions { + requestOptions?: Pick; + gaxOptions?: CallOptions; +} + export interface RequestOptions { json?: boolean; jsonOptions?: JSONOptions; @@ -2120,14 +2125,7 @@ export class Transaction extends Dml { * ``` */ deleteRows(table: string, keys: Key[]): void { - const keySet: spannerClient.spanner.v1.IKeySet = { - keys: arrify(keys).map(codec.convertToListValue), - }; - const mutation: spannerClient.spanner.v1.IMutation = { - delete: {table, keySet}, - }; - - this._queuedMutations.push(mutation as spannerClient.spanner.v1.Mutation); + this._queuedMutations.push(buildDeleteMutation(table, keys)); } /** @@ -2410,31 +2408,7 @@ export class Transaction extends Dml { table: string, keyVals: object | object[] ): void { - const rows: object[] = arrify(keyVals); - const columns = Transaction.getUniqueKeys(rows); - - const values = rows.map((row, index) => { - const keys = Object.keys(row); - const missingColumns = columns.filter(column => !keys.includes(column)); - - if (missingColumns.length > 0) { - throw new GoogleError( - [ - `Row at index ${index} does not contain the correct number of columns.`, - `Missing columns: ${JSON.stringify(missingColumns)}`, - ].join('\n\n') - ); - } - - const values = columns.map(column => row[column]); - return codec.convertToListValue(values); - }); - - const mutation: spannerClient.spanner.v1.IMutation = { - [method]: {table, columns, values}, - }; - - this._queuedMutations.push(mutation as spannerClient.spanner.v1.Mutation); + this._queuedMutations.push(buildMutation(method, table, keyVals)); } /** @@ -2495,6 +2469,130 @@ promisifyAll(Transaction, { exclude: ['deleteRows', 'insert', 'replace', 'update', 'upsert'], }); +/** + * Builds an array of protobuf Mutations from the given row(s). + * + * @param {string} method - CRUD method (insert, update, etc.). + * @param {string} table - Table to perform mutations in. + * @param {object | object[]} keyVals - Hash of key-value pairs representing the rows. + * @returns {spannerClient.spanner.v1.Mutation} - The formatted mutation. + * @throws {GoogleError} - If a row does not contain the correct number of columns. + */ +function buildMutation( + method: string, + table: string, + keyVals: object | object[] +): spannerClient.spanner.v1.Mutation { + const rows: object[] = arrify(keyVals); + const columns = Transaction.getUniqueKeys(rows); + + const values = rows.map((row, index) => { + const keys = Object.keys(row); + const missingColumns = columns.filter(column => !keys.includes(column)); + + if (missingColumns.length > 0) { + throw new GoogleError( + [ + `Row at index ${index} does not contain the correct number of columns.`, + `Missing columns: ${JSON.stringify(missingColumns)}`, + ].join('\n\n') + ); + } + + const values = columns.map(column => row[column]); + return codec.convertToListValue(values); + }); + + const mutation: spannerClient.spanner.v1.IMutation = { + [method]: {table, columns, values}, + }; + return mutation as spannerClient.spanner.v1.Mutation; +} + +/** + * Builds a delete mutation. + * + * @param {string} table - The name of the table. + * @param {Key[]} keys - The keys for the rows to delete. + * @returns {spannerClient.spanner.v1.Mutation} - The formatted delete mutation. + */ +function buildDeleteMutation( + table: string, + keys: Key[] +): spannerClient.spanner.v1.Mutation { + const keySet: spannerClient.spanner.v1.IKeySet = { + keys: arrify(keys).map(codec.convertToListValue), + }; + const mutation: spannerClient.spanner.v1.IMutation = { + delete: {table, keySet}, + }; + return mutation as spannerClient.spanner.v1.Mutation; +} + +/** + * A group of mutations to be committed together. + * Related mutations should be placed in a group. + * + * For example, two mutations inserting rows with the same primary + * key prefix in both parent and child tables are related. + * + * This object is created and returned from {@link Database#MutationGroup}. + * + * @example + * ``` + * const {Spanner} = require('@google-cloud/spanner'); + * const spanner = new Spanner(); + * + * const instance = spanner.instance('my-instance'); + * const database = instance.database('my-database'); + * + * const mutationGroup = new MutationGroup(); + * mutationGroup.insert('Singers', {SingerId: '123', FirstName: 'David'}); + * mutationGroup.update('Singers', {SingerId: '123', FirstName: 'Marc'}); + * + * database.batchWriteAtLeastOnce([mutationGroup], {}) + * .on('error', console.error) + * .on('data', response => { + * console.log('response: ', response); + * }) + * .on('end', () => { + * console.log('Request completed successfully'); + * }); + * ``` + */ +export class MutationGroup { + private _proto: spannerClient.spanner.v1.BatchWriteRequest.MutationGroup; + + constructor() { + this._proto = + new spannerClient.spanner.v1.BatchWriteRequest.MutationGroup(); + } + + insert(table: string, rows: object | object[]): void { + this._proto.mutations.push(buildMutation('insert', table, rows)); + } + + update(table: string, rows: object | object[]): void { + this._proto.mutations.push(buildMutation('update', table, rows)); + } + + upsert(table: string, rows: object | object[]): void { + this._proto.mutations.push(buildMutation('insertOrUpdate', table, rows)); + } + + replace(table: string, rows: object | object[]): void { + this._proto.mutations.push(buildMutation('replace', table, rows)); + } + + deleteRows(table: string, keys: Key[]): void { + this._proto.mutations.push(buildDeleteMutation(table, keys)); + } + + proto(): spannerClient.spanner.v1.BatchWriteRequest.IMutationGroup { + return this._proto; + } +} + /** * This type of transaction is used to execute a single Partitioned DML * statement. Partitioned DML partitions the key space and runs the DML diff --git a/system-test/spanner.ts b/system-test/spanner.ts index 2a4df13aa..e647919a7 100644 --- a/system-test/spanner.ts +++ b/system-test/spanner.ts @@ -38,6 +38,7 @@ import { ReadRequest, ExecuteSqlRequest, TimestampBounds, + MutationGroup, } from '../src/transaction'; import {Row} from '../src/partial-result-stream'; import {GetDatabaseConfig} from '../src/database'; @@ -4876,6 +4877,94 @@ describe('Spanner', () => { queryStreamMode(done, PG_DATABASE, query, POSTGRESQL_EXPECTED_ROW); }); + it('GOOGLE_STANDARD_SQL should execute mutation group using Batch write', function (done) { + if (IS_EMULATOR_ENABLED) { + this.skip(); + } + const mutationGroup = new MutationGroup(); + mutationGroup.upsert(TABLE_NAME, {SingerId: ID, Name: NAME}); + DATABASE.batchWriteAtLeastOnce([mutationGroup], {}) + .on('data', data => { + assert.strictEqual(data.status.code, 0); + }) + .on('end', () => { + done(); + }) + .on('error', error => { + done(error); + }); + }); + + it('GOOGLE_STANDARD_SQL should execute multiple mutation groups with success and failure using Batch write', function (done) { + if (IS_EMULATOR_ENABLED) { + this.skip(); + } + const id = generateName('id'); + + // Valid mutation group + const mutationGroup1 = new MutationGroup(); + mutationGroup1.insert(TABLE_NAME, {SingerId: id, Name: NAME}); + + // InValid mutation group with duplicate data + const mutationGroup2 = new MutationGroup(); + mutationGroup2.insert(TABLE_NAME, {SingerId: id, Name: NAME}); + + // Valid mutation group with invalid signer id + const mutationGroup3 = new MutationGroup(); + mutationGroup3.insert(TABLE_NAME, { + SingerId: null, + Name: NAME, + }); + + // Array of expected status code + // Code 0 is for mutation group with valid id + // Code 6 is for mutation group with duplicate id + // Code 9 is for mutation group with null id + const expectedStatusCode: number[] = [0, 6, 9]; + + // Array of status codes in the stream + const actualStatusCode: number[] = []; + + DATABASE.batchWriteAtLeastOnce([ + mutationGroup1, + mutationGroup2, + mutationGroup3, + ]) + .on('data', data => { + actualStatusCode.push(data.status.code); + }) + .on('error', error => { + done(error); + }) + .on('end', () => { + // make sure two mutation groups are failing and + // one mutation group is getting success + assert.deepStrictEqual( + actualStatusCode.sort(), + expectedStatusCode.sort() + ); + done(); + }); + }); + + it('POSTGRESQL should execute mutation group using Batch write', function (done) { + if (IS_EMULATOR_ENABLED) { + this.skip(); + } + const mutationGroup = new MutationGroup(); + mutationGroup.upsert(TABLE_NAME, {SingerId: ID, Name: NAME}); + PG_DATABASE.batchWriteAtLeastOnce([mutationGroup], {}) + .on('data', data => { + assert.strictEqual(data.status.code, 0); + }) + .on('end', () => { + done(); + }) + .on('error', error => { + done(error); + }); + }); + it('GOOGLE_STANDARD_SQL should allow "SELECT 1" queries', done => { DATABASE.run('SELECT 1', done); }); diff --git a/test/database.ts b/test/database.ts index b3cdf1465..9fb89c14d 100644 --- a/test/database.ts +++ b/test/database.ts @@ -28,7 +28,7 @@ import * as through from 'through2'; import * as pfy from '@google-cloud/promisify'; import {grpc} from 'google-gax'; import * as db from '../src/database'; -import {Spanner, Instance} from '../src'; +import {Spanner, Instance, MutationGroup} from '../src'; import {MockError} from './mockserver/mockspanner'; import {IOperation} from '../src/instance'; import { @@ -40,6 +40,7 @@ import {protos} from '../src'; import * as inst from '../src/instance'; import RequestOptions = google.spanner.v1.RequestOptions; import EncryptionType = google.spanner.admin.database.v1.RestoreDatabaseEncryptionConfig.EncryptionType; +import {BatchWriteOptions} from '../src/transaction'; let promisified = false; const fakePfy = extend({}, pfy, { @@ -50,6 +51,7 @@ const fakePfy = extend({}, pfy, { promisified = true; assert.deepStrictEqual(options.exclude, [ 'batchTransaction', + 'batchWriteAtLeastOnce', 'getRestoreInfo', 'getState', 'getDatabaseDialect', @@ -86,6 +88,7 @@ function fakePartialResultStream(this: Function & {calledWith_: IArguments}) { class FakeSession { calledWith_: IArguments; + formattedName_: any; constructor() { this.calledWith_ = arguments; } @@ -573,6 +576,190 @@ describe('Database', () => { }); }); + describe('batchWrite', () => { + const mutationGroup1 = new MutationGroup(); + mutationGroup1.insert('MyTable', { + Key: 'k1', + Thing: 'abc', + }); + const mutationGroup2 = new MutationGroup(); + mutationGroup2.insert('MyTable', { + Key: 'k2', + Thing: 'xyz', + }); + + const mutationGroups = [mutationGroup1, mutationGroup2]; + + let fakePool: FakeSessionPool; + let fakeSession: FakeSession; + let fakeDataStream: Transform; + let getSessionStub: sinon.SinonStub; + let requestStreamStub: sinon.SinonStub; + + const options = { + requestOptions: { + transactionTag: 'batch-write-tag', + }, + gaxOptions: {autoPaginate: false}, + } as BatchWriteOptions; + + beforeEach(() => { + fakePool = database.pool_; + fakeSession = new FakeSession(); + fakeDataStream = through.obj(); + + getSessionStub = ( + sandbox.stub(fakePool, 'getSession') as sinon.SinonStub + ).callsFake(callback => callback(null, fakeSession)); + + requestStreamStub = sandbox + .stub(database, 'requestStream') + .returns(fakeDataStream); + }); + + it('should get a session via `getSession`', done => { + getSessionStub.callsFake(() => {}); + database.batchWriteAtLeastOnce(mutationGroups, options); + assert.strictEqual(getSessionStub.callCount, 1); + done(); + }); + + it('should destroy the stream if `getSession` errors', done => { + const fakeError = new Error('err'); + + getSessionStub.callsFake(callback => callback(fakeError)); + database + .batchWriteAtLeastOnce(mutationGroups, options) + .on('error', err => { + assert.strictEqual(err, fakeError); + done(); + }); + }); + + it('should call `requestStream` with correct arguments', () => { + const expectedGaxOpts = extend(true, {}, options?.gaxOptions); + const expectedReqOpts = Object.assign( + {} as google.spanner.v1.BatchWriteRequest, + { + session: fakeSession!.formattedName_!, + mutationGroups: mutationGroups.map(mg => mg.proto()), + requestOptions: options?.requestOptions, + } + ); + + database.batchWriteAtLeastOnce(mutationGroups, options); + + assert.strictEqual(requestStreamStub.callCount, 1); + const args = requestStreamStub.firstCall.args[0]; + assert.strictEqual(args.client, 'SpannerClient'); + assert.strictEqual(args.method, 'batchWrite'); + assert.deepStrictEqual(args.reqOpts, expectedReqOpts); + assert.deepStrictEqual(args.gaxOpts, expectedGaxOpts); + assert.deepStrictEqual(args.headers, database.resourceHeader_); + }); + + it('should return error when passing an empty list of mutationGroups', done => { + const fakeError = new Error('err'); + database.batchWriteAtLeastOnce([], options).on('error', error => { + assert.strictEqual(error, fakeError); + done(); + }); + fakeDataStream.emit('error', fakeError); + }); + + it('should return data when passing a valid list of mutationGroups', done => { + database + .batchWriteAtLeastOnce(mutationGroups, options) + .on('data', data => { + assert.strictEqual(data, 'test'); + done(); + }); + fakeDataStream.emit('data', 'test'); + }); + + it('should emit correct event based on valid/invalid list of mutationGroups', done => { + const fakeError = new Error('err'); + const FakeMutationGroup1 = new MutationGroup(); + FakeMutationGroup1.insert('Singers', { + SingerId: 1, + FirstName: 'Scarlet', + LastName: 'Terry', + }); + FakeMutationGroup1.insert('Singers', { + SingerId: 1000000000000000000000000000000000, + FirstName: 'Scarlet', + LastName: 'Terry', + }); + + const FakeMutationGroup2 = new MutationGroup(); + FakeMutationGroup2.insert('Singers', { + SingerId: 2, + FirstName: 'Marc', + }); + FakeMutationGroup2.insert('Singers', { + SingerId: 3, + FirstName: 'Catalina', + LastName: 'Smith', + }); + FakeMutationGroup2.insert('Albums', { + AlbumId: 1, + SingerId: 2, + AlbumTitle: 'Total Junk', + }); + FakeMutationGroup2.insert('Albums', { + AlbumId: 2, + SingerId: 3, + AlbumTitle: 'Go, Go, Go', + }); + database + .batchWriteAtLeastOnce( + [FakeMutationGroup1, FakeMutationGroup2], + options + ) + .on('data', data => { + assert.strictEqual(data, 'testData'); + }) + .on('error', err => { + assert.strictEqual(err, fakeError); + }); + fakeDataStream.emit('data', 'testData'); + fakeDataStream.emit('error', fakeError); + done(); + }); + + it('should retry on "Session not found" error', done => { + const sessionNotFoundError = { + code: grpc.status.NOT_FOUND, + message: 'Session not found', + } as grpc.ServiceError; + let retryCount = 0; + + database + .batchWriteAtLeastOnce(mutationGroups, options) + .on('data', () => {}) + .on('error', err => { + assert.fail(err); + }) + .on('end', () => { + assert.strictEqual(retryCount, 1); + done(); + }); + + fakeDataStream.emit('error', sessionNotFoundError); + retryCount++; + }); + + it('should release session on stream end', () => { + const releaseStub = sandbox.stub(fakePool, 'release') as sinon.SinonStub; + + database.batchWriteAtLeastOnce(mutationGroups, options); + fakeDataStream.emit('end'); + + assert.strictEqual(releaseStub.callCount, 1); + assert.strictEqual(releaseStub.firstCall.args[0], fakeSession); + }); + }); + describe('close', () => { const FAKE_ID = 'a/c/b/d';