Skip to content

Commit

Permalink
observability: trace BatchTransaction and Table
Browse files Browse the repository at this point in the history
This change is part of a series of changes to add
OpenTelemetry traces, focused on BatchTransaction and Table.

While here, made the tests for sessionPool spans much more
precise to avoid flakes.

Updates #2079
Built from PR #2087
Updates #2114
  • Loading branch information
odeke-em committed Sep 17, 2024
1 parent 3300ab5 commit e74c73e
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 94 deletions.
144 changes: 85 additions & 59 deletions src/batch-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
CLOUD_RESOURCE_HEADER,
addLeaderAwareRoutingHeader,
} from '../src/common';
import {startTrace, setSpanError} from './instrument';

export interface TransactionIdentifier {
session: string | Session;
Expand Down Expand Up @@ -136,21 +137,30 @@ class BatchTransaction extends Snapshot {
delete reqOpts.gaxOptions;
delete reqOpts.types;

const headers: {[k: string]: string} = {};
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}
return startTrace('BatchTransaction.createQueryPartitions', query, span => {
const headers: {[k: string]: string} = {};
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

this.createPartitions_(
{
client: 'SpannerClient',
method: 'partitionQuery',
reqOpts,
gaxOpts: query.gaxOptions,
headers: headers,
},
(err, partitions, resp) => {
if (err) {
setSpanError(span, err);
}

this.createPartitions_(
{
client: 'SpannerClient',
method: 'partitionQuery',
reqOpts,
gaxOpts: query.gaxOptions,
headers: headers,
},
callback
);
span.end();
callback(err, partitions, resp);
}
);
});
}
/**
* Generic create partition method. Handles common parameters used in both
Expand All @@ -163,37 +173,43 @@ class BatchTransaction extends Snapshot {
* @param {function} callback Callback function.
*/
createPartitions_(config, callback) {
const query = extend({}, config.reqOpts, {
session: this.session.formattedName_,
transaction: {id: this.id},
});
config.reqOpts = extend({}, query);
config.headers = {
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_,
};
delete query.partitionOptions;
this.session.request(config, (err, resp) => {
if (err) {
callback(err, null, resp);
return;
}

const partitions = resp.partitions.map(partition => {
return extend({}, query, partition);
return startTrace('BatchTransaction.createPartitions', {}, span => {
const query = extend({}, config.reqOpts, {
session: this.session.formattedName_,
transaction: {id: this.id},
});
config.reqOpts = extend({}, query);
config.headers = {
[CLOUD_RESOURCE_HEADER]: (this.session.parent as Database)
.formattedName_,
};
delete query.partitionOptions;
this.session.request(config, (err, resp) => {
if (err) {
setSpanError(span, err);
span.end();
callback(err, null, resp);
return;
}

if (resp.transaction) {
const {id, readTimestamp} = resp.transaction;
const partitions = resp.partitions.map(partition => {
return extend({}, query, partition);
});

this.id = id;
if (resp.transaction) {
const {id, readTimestamp} = resp.transaction;

if (readTimestamp) {
this.readTimestampProto = readTimestamp;
this.readTimestamp = new PreciseDate(readTimestamp);
this.id = id;

if (readTimestamp) {
this.readTimestampProto = readTimestamp;
this.readTimestamp = new PreciseDate(readTimestamp);
}
}
}

callback(null, partitions, resp);
span.end();
callback(null, partitions, resp);
});
});
}
/**
Expand Down Expand Up @@ -226,29 +242,38 @@ class BatchTransaction extends Snapshot {
* @returns {Promise<CreateReadPartitionsResponse>}
*/
createReadPartitions(options, callback) {
const reqOpts = Object.assign({}, options, {
keySet: Snapshot.encodeKeySet(options),
});
return startTrace('BatchTransaction.createReadPartitions', {}, span => {
const reqOpts = Object.assign({}, options, {
keySet: Snapshot.encodeKeySet(options),
});

delete reqOpts.gaxOptions;
delete reqOpts.keys;
delete reqOpts.ranges;
delete reqOpts.gaxOptions;
delete reqOpts.keys;
delete reqOpts.ranges;

const headers: {[k: string]: string} = {};
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}
const headers: {[k: string]: string} = {};
if (this._getSpanner().routeToLeaderEnabled) {
addLeaderAwareRoutingHeader(headers);
}

this.createPartitions_(
{
client: 'SpannerClient',
method: 'partitionRead',
reqOpts,
gaxOpts: options.gaxOptions,
headers: headers,
},
(err, partitions, resp) => {
if (err) {
setSpanError(span, err);
}

this.createPartitions_(
{
client: 'SpannerClient',
method: 'partitionRead',
reqOpts,
gaxOpts: options.gaxOptions,
headers: headers,
},
callback
);
span.end();
callback(err, partitions, resp);
}
);
});
}
/**
* Executes partition.
Expand Down Expand Up @@ -322,6 +347,7 @@ class BatchTransaction extends Snapshot {
* ```
*/
executeStream(partition) {
// TODO: Instrument the streams with Otel.
if (is.string(partition.table)) {
return this.createReadStream(partition.table, partition);
}
Expand Down
46 changes: 26 additions & 20 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import {
import {google as databaseAdmin} from '../protos/protos';
import {Schema, LongRunningCallback} from './common';
import IRequestOptions = databaseAdmin.spanner.v1.IRequestOptions;
import {startTrace, setSpanError} from './instrument';

export type Key = string | string[];

Expand Down Expand Up @@ -1072,29 +1073,34 @@ class Table {
options: MutateRowsOptions | CallOptions = {},
callback: CommitCallback
): void {
const requestOptions =
'requestOptions' in options ? options.requestOptions : {};
startTrace('Table.' + method, {}, span => {
const requestOptions =
'requestOptions' in options ? options.requestOptions : {};

const excludeTxnFromChangeStreams =
'excludeTxnFromChangeStreams' in options
? options.excludeTxnFromChangeStreams
: false;
const excludeTxnFromChangeStreams =
'excludeTxnFromChangeStreams' in options
? options.excludeTxnFromChangeStreams
: false;

this.database.runTransaction(
{
requestOptions: requestOptions,
excludeTxnFromChangeStreams: excludeTxnFromChangeStreams,
},
(err, transaction) => {
if (err) {
callback(err);
return;
}
this.database.runTransaction(
{
requestOptions: requestOptions,
excludeTxnFromChangeStreams: excludeTxnFromChangeStreams,
},
(err, transaction) => {
if (err) {
setSpanError(span, err);
span.end();
callback(err);
return;
}

transaction![method](this.name, rows as Key[]);
transaction!.commit(options, callback);
}
);
span.end();
transaction![method](this.name, rows as Key[]);
transaction!.commit(options, callback);
}
);
});
}
}

Expand Down
18 changes: 3 additions & 15 deletions test/session-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1412,25 +1412,13 @@ describe('SessionPool', () => {
await sessionPool._release(session);
span.end();

const spans = exporter.getFinishedSpans();
const events = span.events;
assert.strictEqual(!events, false, 'Events must be set');
assert.strictEqual(
spans.length,
1,
'Exactly 1 span should have been exported'
);
assert.strictEqual(
spans[0].name,
`${SPAN_NAMESPACE_PREFIX}.${topLevelSpanName}`,
'Expected only the top-level created span'
);
const span0 = spans[0];
assert.strictEqual(!span0.events, false, 'Events must be set');
assert.strictEqual(
span0.events.length > 0,
events.length > 0,
true,
'Expecting at least 1 event'
);
const events = span0.events;

// Sort the events by earliest time of occurence.
events.sort((evtA, evtB) => {
Expand Down

0 comments on commit e74c73e

Please sign in to comment.