diff --git a/samples/batch-write.js b/samples/batch-write.js index 5e61c61c5..94bdd5cab 100644 --- a/samples/batch-write.js +++ b/samples/batch-write.js @@ -48,29 +48,29 @@ async function main( // Create Mutation Groups const mutationGroup1 = new MutationGroup(); mutationGroup1.insert('Singers', { - SingerId: '16', + SingerId: 1, FirstName: 'Scarlet', LastName: 'Terry', }); const mutationGroup2 = new MutationGroup(); mutationGroup2.insert('Singers', { - SingerId: '17', + SingerId: 2, FirstName: 'Marc', }); mutationGroup2.insert('Singers', { - SingerId: '18', + SingerId: 3, FirstName: 'Catalina', LastName: 'Smith', }); mutationGroup2.insert('Albums', { - AlbumId: '1', - SingerId: '17', + AlbumId: 1, + SingerId: 2, AlbumTitle: 'Total Junk', }); mutationGroup2.insert('Albums', { - AlbumId: '2', - SingerId: '18', + AlbumId: 2, + SingerId: 3, AlbumTitle: 'Go, Go, Go', }); diff --git a/src/database.ts b/src/database.ts index 19d1f1e7e..3b6d03eb3 100644 --- a/src/database.ts +++ b/src/database.ts @@ -3232,7 +3232,7 @@ class Database extends common.GrpcServiceObject { * * const instance = spanner.instance('my-instance'); * const database = instance.database('my-database'); - * const mutationGroups = new MutationGroup(); + * const mutationGroup = new MutationGroup(); * mutationGroup.insert('Singers', { * SingerId: '1', * FirstName: 'Marc', @@ -3621,7 +3621,6 @@ class Database extends common.GrpcServiceObject { promisifyAll(Database, { exclude: [ 'batchTransaction', - 'batchWrite', 'getRestoreInfo', 'getState', 'getDatabaseDialect', @@ -3643,7 +3642,6 @@ callbackifyAll(Database, { 'create', 'batchCreateSessions', 'batchTransaction', - 'batchWrite', 'close', 'createBatchTransaction', 'createSession', diff --git a/src/transaction.ts b/src/transaction.ts index 7161aef3c..a23f9a00e 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -2555,7 +2555,6 @@ function buildDeleteMutation( * }); * ``` */ - export class MutationGroup { private _proto: spannerClient.spanner.v1.BatchWriteRequest.MutationGroup; diff --git a/system-test/spanner.ts b/system-test/spanner.ts index 3fa512a14..133a2af3e 100644 --- a/system-test/spanner.ts +++ b/system-test/spanner.ts @@ -4877,14 +4877,31 @@ describe('Spanner', () => { queryStreamMode(done, PG_DATABASE, query, POSTGRESQL_EXPECTED_ROW); }); - // test for BATCH WRITE - it('Batch Write should query in stream mode', function (done) { + it('GOOGLE_STANDARD_SQL should execute mutation group using Batch write', function (done) { if (IS_EMULATOR_ENABLED) { this.skip(); } - const group = new MutationGroup(); - group.upsert(TABLE_NAME, {SingerId: ID, Name: NAME}); - DATABASE.batchWrite([group], {}) + const mutationGroup = new MutationGroup(); + mutationGroup.upsert(TABLE_NAME, {SingerId: ID, Name: NAME}); + DATABASE.batchWrite([mutationGroup], {}) + .on('data', data => { + assert.strictEqual(data.status.code, 0); + }) + .on('end', () => { + done(); + }) + .on('error', error => { + done(error); + }); + }); + + it('POSTGRESQL should execute mutation group using Batch write', function (done) { + if (IS_EMULATOR_ENABLED) { + this.skip(); + } + const mutationGroup = new MutationGroup(); + mutationGroup.upsert(TABLE_NAME, {SingerId: ID, Name: NAME}); + PG_DATABASE.batchWrite([mutationGroup], {}) .on('data', data => { assert.strictEqual(data.status.code, 0); }) diff --git a/test/database.ts b/test/database.ts index b580f42fb..ee571e671 100644 --- a/test/database.ts +++ b/test/database.ts @@ -28,7 +28,7 @@ import * as through from 'through2'; import * as pfy from '@google-cloud/promisify'; import {grpc} from 'google-gax'; import * as db from '../src/database'; -import {Spanner, Instance} from '../src'; +import {Spanner, Instance, MutationGroup} from '../src'; import {MockError} from './mockserver/mockspanner'; import {IOperation} from '../src/instance'; import { @@ -40,6 +40,7 @@ import {protos} from '../src'; import * as inst from '../src/instance'; import RequestOptions = google.spanner.v1.RequestOptions; import EncryptionType = google.spanner.admin.database.v1.RestoreDatabaseEncryptionConfig.EncryptionType; +import {BatchWriteOptions} from '../src/transaction'; let promisified = false; const fakePfy = extend({}, pfy, { @@ -50,7 +51,6 @@ const fakePfy = extend({}, pfy, { promisified = true; assert.deepStrictEqual(options.exclude, [ 'batchTransaction', - 'batchWrite', 'getRestoreInfo', 'getState', 'getDatabaseDialect', @@ -87,6 +87,7 @@ function fakePartialResultStream(this: Function & {calledWith_: IArguments}) { class FakeSession { calledWith_: IArguments; + formattedName_: any; constructor() { this.calledWith_ = arguments; } @@ -574,6 +575,138 @@ describe('Database', () => { }); }); + describe('batchWrite', () => { + const mutationGroup1 = new MutationGroup(); + mutationGroup1.insert('MyTable', { + Key: 'k1', + Thing: 'abc', + }); + const mutationGroup2 = new MutationGroup(); + mutationGroup2.insert('MyTable', { + Key: 'k2', + Thing: 'xyz', + }); + + const mutationGroups = [mutationGroup1, mutationGroup2]; + + let fakePool: FakeSessionPool; + let fakeSession: FakeSession; + let fakeDataStream: Transform; + let getSessionStub: sinon.SinonStub; + let requestStreamStub: sinon.SinonStub; + + const options = { + requestOptions: { + transactionTag: 'batch-write-tag', + }, + gaxOptions: {autoPaginate: false}, + } as BatchWriteOptions; + + beforeEach(() => { + fakePool = database.pool_; + fakeSession = new FakeSession(); + fakeDataStream = through.obj(); + + getSessionStub = ( + sandbox.stub(fakePool, 'getSession') as sinon.SinonStub + ).callsFake(callback => callback(null, fakeSession)); + + requestStreamStub = sandbox + .stub(database, 'requestStream') + .returns(fakeDataStream); + }); + + it('should get a session via `getSession`', done => { + getSessionStub.callsFake(() => {}); + database.batchWrite(mutationGroups, options); + assert.strictEqual(getSessionStub.callCount, 1); + done(); + }); + + it('should destroy the stream if `getSession` errors', done => { + const fakeError = new Error('err'); + + getSessionStub.callsFake(callback => callback(fakeError)); + + database.batchWrite(mutationGroups, options).on('error', err => { + assert.strictEqual(err, fakeError); + done(); + }); + }); + + it('should call `requestStream` with correct arguments', () => { + const expectedGaxOpts = extend(true, {}, options?.gaxOptions); + const expectedReqOpts = Object.assign( + {} as google.spanner.v1.BatchWriteRequest, + { + session: fakeSession!.formattedName_!, + mutationGroups: mutationGroups.map(mg => mg.proto()), + requestOptions: options?.requestOptions, + } + ); + + database.batchWrite(mutationGroups, options); + + assert.strictEqual(requestStreamStub.callCount, 1); + const args = requestStreamStub.firstCall.args[0]; + assert.strictEqual(args.client, 'SpannerClient'); + assert.strictEqual(args.method, 'batchWrite'); + assert.deepStrictEqual(args.reqOpts, expectedReqOpts); + assert.deepStrictEqual(args.gaxOpts, expectedGaxOpts); + assert.deepStrictEqual(args.headers, database.resourceHeader_); + }); + + it('should return error when passing an empty list of mutationGroups', done => { + const fakeError = new Error('err'); + database.batchWrite([], options).on('error', error => { + assert.strictEqual(error, fakeError); + done(); + }); + fakeDataStream.emit('error', fakeError); + }); + + it('should return data when passing a valid list of mutationGroups', done => { + database.batchWrite(mutationGroups, options).on('data', data => { + assert.strictEqual(data, 'test'); + done(); + }); + fakeDataStream.emit('data', 'test'); + }); + + it('should retry on "Session not found" error', done => { + const sessionNotFoundError = { + code: grpc.status.NOT_FOUND, + message: 'Session not found', + } as grpc.ServiceError; + let retryCount = 0; + + database + .batchWrite(mutationGroups, options) + .on('data', () => {}) + .on('error', err => { + console.log('err: ', err); + assert.fail(err); + }) + .on('end', () => { + assert.strictEqual(retryCount, 1); + done(); + }); + + fakeDataStream.emit('error', sessionNotFoundError); + retryCount++; + }); + + it('should release session on stream end', () => { + const releaseStub = sandbox.stub(fakePool, 'release') as sinon.SinonStub; + + database.batchWrite(mutationGroups, options); + fakeDataStream.emit('end'); + + assert.strictEqual(releaseStub.callCount, 1); + assert.strictEqual(releaseStub.firstCall.args[0], fakeSession); + }); + }); + describe('close', () => { const FAKE_ID = 'a/c/b/d';