Skip to content

Commit

Permalink
feat: (observability) Add support for OpenTelemetry traces and allow …
Browse files Browse the repository at this point in the history
…observability options to be passed. (#2131)

This change plumbs ObservabilityConfig into Spanner, Instance and Database so that
any subsequent traces will use it when beginning spans and later on for metrics.

Updates #2079
  • Loading branch information
odeke-em authored Oct 3, 2024
1 parent 2fd63ac commit 5237e11
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 43 deletions.
2 changes: 1 addition & 1 deletion observability-test/batch-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ describe('BatchTransaction', () => {
batchTransaction = new BatchTransaction(SESSION as {} as Session);
batchTransaction.session = SESSION as {} as Session;
batchTransaction.id = ID;
batchTransaction.observabilityOptions = {tracerProvider: provider};
batchTransaction._observabilityOptions = {tracerProvider: provider};
REQUEST.callsFake((_, callback) => callback(null, RESPONSE));
});

Expand Down
2 changes: 1 addition & 1 deletion observability-test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ describe('Database', () => {
database = new Database(INSTANCE, NAME, POOL_OPTIONS);
database.parent = INSTANCE;
database.databaseRole = 'parent_role';
database.observabilityConfig = {
database._observabilityOptions = {
tracerProvider: provider,
enableExtendedTracing: false,
};
Expand Down
171 changes: 157 additions & 14 deletions observability-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import * as assert from 'assert';
import {grpc} from 'google-gax';
import {google} from '../protos/protos';
import {Database, Spanner} from '../src';
import {Database, Instance, Spanner} from '../src';
import {MutationSet} from '../src/transaction';
import protobuf = google.spanner.v1;
import * as mock from '../test/mockserver/mockspanner';
Expand All @@ -35,6 +35,8 @@ const {
AsyncHooksContextManager,
} = require('@opentelemetry/context-async-hooks');

const {ObservabilityOptions} = require('../src/instrument');

/** A simple result set for SELECT 1. */
function createSelect1ResultSet(): protobuf.ResultSet {
const fields = [
Expand All @@ -60,7 +62,9 @@ interface setupResults {
spannerMock: mock.MockSpanner;
}

async function setup(): Promise<setupResults> {
async function setup(
observabilityOptions?: typeof ObservabilityOptions
): Promise<setupResults> {
const server = new grpc.Server();

const spannerMock = mock.createMockSpanner(server);
Expand Down Expand Up @@ -97,6 +101,7 @@ async function setup(): Promise<setupResults> {
servicePath: 'localhost',
port,
sslCreds: grpc.credentials.createInsecure(),
observabilityOptions: observabilityOptions,
});

return Promise.resolve({
Expand All @@ -122,7 +127,16 @@ describe('EndToEnd', () => {
});

beforeEach(async () => {
const setupResult = await setup();
traceExporter = new InMemorySpanExporter();
const sampler = new AlwaysOnSampler();
const provider = new NodeTracerProvider({
sampler: sampler,
exporter: traceExporter,
});
const setupResult = await setup({
tracerProvider: provider,
enableExtendedTracing: false,
});
spanner = setupResult.spanner;
server = setupResult.server;
spannerMock = setupResult.spannerMock;
Expand All @@ -138,21 +152,10 @@ describe('EndToEnd', () => {
mock.StatementResult.updateCount(1)
);

traceExporter = new InMemorySpanExporter();
const sampler = new AlwaysOnSampler();

const provider = new NodeTracerProvider({
sampler: sampler,
exporter: traceExporter,
});
provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter));

const instance = spanner.instance('instance');
database = instance.database('database');
database.observabilityConfig = {
tracerProvider: provider,
enableExtendedTracing: false,
};
});

afterEach(() => {
Expand Down Expand Up @@ -440,3 +443,143 @@ describe('EndToEnd', () => {
});
});
});

describe('ObservabilityOptions injection and propagation', async () => {
const globalTraceExporter = new InMemorySpanExporter();
const globalTracerProvider = new NodeTracerProvider({
sampler: new AlwaysOnSampler(),
exporter: globalTraceExporter,
});
globalTracerProvider.addSpanProcessor(
new SimpleSpanProcessor(globalTraceExporter)
);
globalTracerProvider.register();

const injectedTraceExporter = new InMemorySpanExporter();
const injectedTracerProvider = new NodeTracerProvider({
sampler: new AlwaysOnSampler(),
exporter: injectedTraceExporter,
});
injectedTracerProvider.addSpanProcessor(
new SimpleSpanProcessor(injectedTraceExporter)
);

const observabilityOptions: typeof ObservabilityOptions = {
tracerProvider: injectedTracerProvider,
enableExtendedTracing: true,
};

const setupResult = await setup(observabilityOptions);
const spanner = setupResult.spanner;
const server = setupResult.server;
const spannerMock = setupResult.spannerMock;

after(async () => {
globalTraceExporter.reset();
injectedTraceExporter.reset();
await globalTracerProvider.shutdown();
await injectedTracerProvider.shutdown();
spannerMock.resetRequests();
spanner.close();
server.tryShutdown(() => {});
});

it('Passed into Spanner, Instance and Database', done => {
// Ensure that the same observability configuration is set on the Spanner client.
assert.deepStrictEqual(spanner._observabilityOptions, observabilityOptions);

// Acquire a handle to the Instance through spanner.instance.
const instanceByHandle = spanner.instance('instance');
assert.deepStrictEqual(
instanceByHandle._observabilityOptions,
observabilityOptions
);

// Create the Instance by means of a constructor directly.
const instanceByConstructor = new Instance(spanner, 'myInstance');
assert.deepStrictEqual(
instanceByConstructor._observabilityOptions,
observabilityOptions
);

// Acquire a handle to the Database through instance.database.
const databaseByHandle = instanceByHandle.database('database');
assert.deepStrictEqual(
databaseByHandle._observabilityOptions,
observabilityOptions
);

// Create the Database by means of a constructor directly.
const databaseByConstructor = new Database(
instanceByConstructor,
'myDatabase'
);
assert.deepStrictEqual(
databaseByConstructor._observabilityOptions,
observabilityOptions
);

done();
});

it('Propagates spans to the injected not global TracerProvider', done => {
const instance = spanner.instance('instance');
const database = instance.database('database');

database.run('SELECT 1', (err, rows) => {
assert.ifError(err);

injectedTraceExporter.forceFlush();
globalTraceExporter.forceFlush();
const spansFromInjected = injectedTraceExporter.getFinishedSpans();
const spansFromGlobal = globalTraceExporter.getFinishedSpans();

assert.strictEqual(
spansFromGlobal.length,
0,
'Expecting no spans from the global exporter'
);
assert.strictEqual(
spansFromInjected.length > 0,
true,
'Expecting spans from the injected exporter'
);

spansFromInjected.sort((spanA, spanB) => {
spanA.startTime < spanB.startTime;
});
const actualSpanNames: string[] = [];
const actualEventNames: string[] = [];
spansFromInjected.forEach(span => {
actualSpanNames.push(span.name);
span.events.forEach(event => {
actualEventNames.push(event.name);
});
});

const expectedSpanNames = [
'CloudSpanner.Database.runStream',
'CloudSpanner.Database.run',
];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

const expectedEventNames = [
'Acquiring session',
'Waiting for a session to become available',
'Acquired session',
'Using Session',
];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);

done();
});
});
});
2 changes: 1 addition & 1 deletion observability-test/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ describe('Table', () => {
extend(Table, TableCached);
table = new Table(DATABASE, NAME);
transaction = new FakeTransaction();
table.observabilityOptions = {tracerProvider: provider};
table._observabilityOptions = {tracerProvider: provider};
});

afterEach(() => {
Expand Down
6 changes: 3 additions & 3 deletions src/batch-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class BatchTransaction extends Snapshot {

const traceConfig: traceConfig = {
sql: query,
opts: this.observabilityOptions,
opts: this._observabilityOptions,
};
return startTrace(
'BatchTransaction.createQueryPartitions',
Expand Down Expand Up @@ -182,7 +182,7 @@ class BatchTransaction extends Snapshot {
*/
createPartitions_(config, callback) {
const traceConfig: traceConfig = {
opts: this.observabilityOptions,
opts: this._observabilityOptions,
};

return startTrace(
Expand Down Expand Up @@ -259,7 +259,7 @@ class BatchTransaction extends Snapshot {
*/
createReadPartitions(options, callback) {
const traceConfig: traceConfig = {
opts: this.observabilityOptions,
opts: this._observabilityOptions,
};

return startTrace(
Expand Down
26 changes: 14 additions & 12 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ class Database extends common.GrpcServiceObject {
databaseDialect?: EnumKey<
typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect
> | null;
observabilityConfig: ObservabilityOptions | undefined;
_observabilityOptions?: ObservabilityOptions;
constructor(
instance: Instance,
name: string,
Expand Down Expand Up @@ -467,7 +467,7 @@ class Database extends common.GrpcServiceObject {
Object.assign({}, queryOptions),
Database.getEnvironmentQueryOptions()
);
this.observabilityConfig = instance.observabilityConfig;
this._observabilityOptions = instance._observabilityOptions;
}
/**
* @typedef {array} SetDatabaseMetadataResponse
Expand Down Expand Up @@ -693,7 +693,7 @@ class Database extends common.GrpcServiceObject {

const sessions = (resp!.session || []).map(metadata => {
const session = this.session(metadata.name!);
session.observabilityConfig = this.observabilityConfig;
session._observabilityOptions = this._observabilityOptions;
session.metadata = metadata;
return session;
});
Expand Down Expand Up @@ -738,6 +738,7 @@ class Database extends common.GrpcServiceObject {
const id = identifier.transaction;
const transaction = new BatchTransaction(session, options);
transaction.id = id;
transaction._observabilityOptions = this._observabilityOptions;
transaction.readTimestamp = identifier.timestamp as PreciseDate;
return transaction;
}
Expand Down Expand Up @@ -827,7 +828,7 @@ class Database extends common.GrpcServiceObject {
? (optionsOrCallback as TimestampBounds)
: {};

const q = {opts: this.observabilityConfig};
const q = {opts: this._observabilityOptions};
return startTrace('Database.createBatchTransaction', q, span => {
this.pool_.getSession((err, session) => {
if (err) {
Expand Down Expand Up @@ -1085,6 +1086,7 @@ class Database extends common.GrpcServiceObject {
/CREATE TABLE `*([^\s`(]+)/
)![1];
const table = this.table(tableName!);
table._observabilityOptions = this._observabilityOptions;
callback!(null, table, operation!, resp!);
});
}
Expand Down Expand Up @@ -1873,7 +1875,7 @@ class Database extends common.GrpcServiceObject {
delete (gaxOpts as GetSessionsOptions).pageToken;
}

const q = {opts: this.observabilityConfig};
const q = {opts: this._observabilityOptions};
return startTrace('Database.getSessions', q, span => {
this.request<
google.spanner.v1.ISession,
Expand All @@ -1895,7 +1897,7 @@ class Database extends common.GrpcServiceObject {
sessionInstances = sessions.map(metadata => {
const session = self.session(metadata.name!);
session.metadata = metadata;
session.observabilityConfig = this.observabilityConfig;
session._observabilityOptions = this._observabilityOptions;
return session;
});
}
Expand Down Expand Up @@ -2056,7 +2058,7 @@ class Database extends common.GrpcServiceObject {
? (optionsOrCallback as TimestampBounds)
: {};

const q = {opts: this.observabilityConfig};
const q = {opts: this._observabilityOptions};
return startTrace('Database.getSnapshot', q, span => {
this.pool_.getSession((err, session) => {
if (err) {
Expand Down Expand Up @@ -2157,7 +2159,7 @@ class Database extends common.GrpcServiceObject {
? (optionsOrCallback as GetTransactionOptions)
: {};

const q = {opts: this.observabilityConfig};
const q = {opts: this._observabilityOptions};
return startTrace('Database.getTransaction', q, span => {
this.pool_.getSession((err, session, transaction) => {
if (options.requestOptions) {
Expand Down Expand Up @@ -2784,7 +2786,7 @@ class Database extends common.GrpcServiceObject {
? (optionsOrCallback as TimestampBounds)
: {};

const q = {sql: query, opts: this.observabilityConfig};
const q = {sql: query, opts: this._observabilityOptions};
return startTrace('Database.run', q, span => {
this.runStream(query, options)
.on('error', err => {
Expand Down Expand Up @@ -3005,7 +3007,7 @@ class Database extends common.GrpcServiceObject {
options?: TimestampBounds
): PartialResultStream {
const proxyStream: Transform = through.obj();
const q = {sql: query, opts: this.observabilityConfig};
const q = {sql: query, opts: this._observabilityOptions};
return startTrace('Database.runStream', q, span => {
this.pool_.getSession((err, session) => {
if (err) {
Expand Down Expand Up @@ -3183,7 +3185,7 @@ class Database extends common.GrpcServiceObject {
? (optionsOrRunFn as RunTransactionOptions)
: {};

const q = {opts: this.observabilityConfig};
const q = {opts: this._observabilityOptions};
startTrace('Database.runTransaction', q, span => {
this.pool_.getSession((err, session?, transaction?) => {
if (err) {
Expand Down Expand Up @@ -3576,7 +3578,7 @@ class Database extends common.GrpcServiceObject {
? (optionsOrCallback as CallOptions)
: {};

const q = {opts: this.observabilityConfig};
const q = {opts: this._observabilityOptions};
return startTrace('Database.writeAtLeastOnce', q, span => {
this.pool_.getSession((err, session?, transaction?) => {
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
Expand Down
Loading

0 comments on commit 5237e11

Please sign in to comment.