Skip to content

Commit

Permalink
tests: unit test Batch Write
Browse files Browse the repository at this point in the history
  • Loading branch information
alkatrivedi committed Jun 5, 2024
1 parent 7045268 commit 0541dd4
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 18 deletions.
14 changes: 7 additions & 7 deletions samples/batch-write.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,29 @@ async function main(
// Create Mutation Groups
const mutationGroup1 = new MutationGroup();
mutationGroup1.insert('Singers', {
SingerId: '16',
SingerId: 1,
FirstName: 'Scarlet',
LastName: 'Terry',
});

const mutationGroup2 = new MutationGroup();
mutationGroup2.insert('Singers', {
SingerId: '17',
SingerId: 2,
FirstName: 'Marc',
});
mutationGroup2.insert('Singers', {
SingerId: '18',
SingerId: 3,
FirstName: 'Catalina',
LastName: 'Smith',
});
mutationGroup2.insert('Albums', {
AlbumId: '1',
SingerId: '17',
AlbumId: 1,
SingerId: 2,
AlbumTitle: 'Total Junk',
});
mutationGroup2.insert('Albums', {
AlbumId: '2',
SingerId: '18',
AlbumId: 2,
SingerId: 3,
AlbumTitle: 'Go, Go, Go',
});

Expand Down
4 changes: 1 addition & 3 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3232,7 +3232,7 @@ class Database extends common.GrpcServiceObject {
*
* const instance = spanner.instance('my-instance');
* const database = instance.database('my-database');
* const mutationGroups = new MutationGroup();
* const mutationGroup = new MutationGroup();
* mutationGroup.insert('Singers', {
* SingerId: '1',
* FirstName: 'Marc',
Expand Down Expand Up @@ -3621,7 +3621,6 @@ class Database extends common.GrpcServiceObject {
promisifyAll(Database, {
exclude: [
'batchTransaction',
'batchWrite',
'getRestoreInfo',
'getState',
'getDatabaseDialect',
Expand All @@ -3643,7 +3642,6 @@ callbackifyAll(Database, {
'create',
'batchCreateSessions',
'batchTransaction',
'batchWrite',
'close',
'createBatchTransaction',
'createSession',
Expand Down
1 change: 0 additions & 1 deletion src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2555,7 +2555,6 @@ function buildDeleteMutation(
* });
* ```
*/

export class MutationGroup {
private _proto: spannerClient.spanner.v1.BatchWriteRequest.MutationGroup;

Expand Down
27 changes: 22 additions & 5 deletions system-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4877,14 +4877,31 @@ describe('Spanner', () => {
queryStreamMode(done, PG_DATABASE, query, POSTGRESQL_EXPECTED_ROW);
});

// test for BATCH WRITE
it('Batch Write should query in stream mode', function (done) {
it('GOOGLE_STANDARD_SQL should execute mutation group using Batch write', function (done) {
if (IS_EMULATOR_ENABLED) {
this.skip();
}
const group = new MutationGroup();
group.upsert(TABLE_NAME, {SingerId: ID, Name: NAME});
DATABASE.batchWrite([group], {})
const mutationGroup = new MutationGroup();
mutationGroup.upsert(TABLE_NAME, {SingerId: ID, Name: NAME});
DATABASE.batchWrite([mutationGroup], {})
.on('data', data => {
assert.strictEqual(data.status.code, 0);
})
.on('end', () => {
done();
})
.on('error', error => {
done(error);
});
});

it('POSTGRESQL should execute mutation group using Batch write', function (done) {
if (IS_EMULATOR_ENABLED) {
this.skip();
}
const mutationGroup = new MutationGroup();
mutationGroup.upsert(TABLE_NAME, {SingerId: ID, Name: NAME});
PG_DATABASE.batchWrite([mutationGroup], {})
.on('data', data => {
assert.strictEqual(data.status.code, 0);
})
Expand Down
137 changes: 135 additions & 2 deletions test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import * as through from 'through2';
import * as pfy from '@google-cloud/promisify';
import {grpc} from 'google-gax';
import * as db from '../src/database';
import {Spanner, Instance} from '../src';
import {Spanner, Instance, MutationGroup} from '../src';
import {MockError} from './mockserver/mockspanner';
import {IOperation} from '../src/instance';
import {
Expand All @@ -40,6 +40,7 @@ import {protos} from '../src';
import * as inst from '../src/instance';
import RequestOptions = google.spanner.v1.RequestOptions;
import EncryptionType = google.spanner.admin.database.v1.RestoreDatabaseEncryptionConfig.EncryptionType;
import {BatchWriteOptions} from '../src/transaction';

let promisified = false;
const fakePfy = extend({}, pfy, {
Expand All @@ -50,7 +51,6 @@ const fakePfy = extend({}, pfy, {
promisified = true;
assert.deepStrictEqual(options.exclude, [
'batchTransaction',
'batchWrite',
'getRestoreInfo',
'getState',
'getDatabaseDialect',
Expand Down Expand Up @@ -87,6 +87,7 @@ function fakePartialResultStream(this: Function & {calledWith_: IArguments}) {

class FakeSession {
calledWith_: IArguments;
formattedName_: any;
constructor() {
this.calledWith_ = arguments;
}
Expand Down Expand Up @@ -574,6 +575,138 @@ describe('Database', () => {
});
});

describe('batchWrite', () => {
const mutationGroup1 = new MutationGroup();
mutationGroup1.insert('MyTable', {
Key: 'k1',
Thing: 'abc',
});
const mutationGroup2 = new MutationGroup();
mutationGroup2.insert('MyTable', {
Key: 'k2',
Thing: 'xyz',
});

const mutationGroups = [mutationGroup1, mutationGroup2];

let fakePool: FakeSessionPool;
let fakeSession: FakeSession;
let fakeDataStream: Transform;
let getSessionStub: sinon.SinonStub;
let requestStreamStub: sinon.SinonStub;

const options = {
requestOptions: {
transactionTag: 'batch-write-tag',
},
gaxOptions: {autoPaginate: false},
} as BatchWriteOptions;

beforeEach(() => {
fakePool = database.pool_;
fakeSession = new FakeSession();
fakeDataStream = through.obj();

getSessionStub = (
sandbox.stub(fakePool, 'getSession') as sinon.SinonStub
).callsFake(callback => callback(null, fakeSession));

requestStreamStub = sandbox
.stub(database, 'requestStream')
.returns(fakeDataStream);
});

it('should get a session via `getSession`', done => {
getSessionStub.callsFake(() => {});
database.batchWrite(mutationGroups, options);
assert.strictEqual(getSessionStub.callCount, 1);
done();
});

it('should destroy the stream if `getSession` errors', done => {
const fakeError = new Error('err');

getSessionStub.callsFake(callback => callback(fakeError));

database.batchWrite(mutationGroups, options).on('error', err => {
assert.strictEqual(err, fakeError);
done();
});
});

it('should call `requestStream` with correct arguments', () => {
const expectedGaxOpts = extend(true, {}, options?.gaxOptions);
const expectedReqOpts = Object.assign(
{} as google.spanner.v1.BatchWriteRequest,
{
session: fakeSession!.formattedName_!,
mutationGroups: mutationGroups.map(mg => mg.proto()),
requestOptions: options?.requestOptions,
}
);

database.batchWrite(mutationGroups, options);

assert.strictEqual(requestStreamStub.callCount, 1);
const args = requestStreamStub.firstCall.args[0];
assert.strictEqual(args.client, 'SpannerClient');
assert.strictEqual(args.method, 'batchWrite');
assert.deepStrictEqual(args.reqOpts, expectedReqOpts);
assert.deepStrictEqual(args.gaxOpts, expectedGaxOpts);
assert.deepStrictEqual(args.headers, database.resourceHeader_);
});

it('should return error when passing an empty list of mutationGroups', done => {
const fakeError = new Error('err');
database.batchWrite([], options).on('error', error => {
assert.strictEqual(error, fakeError);
done();
});
fakeDataStream.emit('error', fakeError);
});

it('should return data when passing a valid list of mutationGroups', done => {
database.batchWrite(mutationGroups, options).on('data', data => {
assert.strictEqual(data, 'test');
done();
});
fakeDataStream.emit('data', 'test');
});

it('should retry on "Session not found" error', done => {
const sessionNotFoundError = {
code: grpc.status.NOT_FOUND,
message: 'Session not found',
} as grpc.ServiceError;
let retryCount = 0;

database
.batchWrite(mutationGroups, options)
.on('data', () => {})
.on('error', err => {
console.log('err: ', err);
assert.fail(err);
})
.on('end', () => {
assert.strictEqual(retryCount, 1);
done();
});

fakeDataStream.emit('error', sessionNotFoundError);
retryCount++;
});

it('should release session on stream end', () => {
const releaseStub = sandbox.stub(fakePool, 'release') as sinon.SinonStub;

database.batchWrite(mutationGroups, options);
fakeDataStream.emit('end');

assert.strictEqual(releaseStub.callCount, 1);
assert.strictEqual(releaseStub.firstCall.args[0], fakeSession);
});
});

describe('close', () => {
const FAKE_ID = 'a/c/b/d';

Expand Down

0 comments on commit 0541dd4

Please sign in to comment.