Skip to content

Commit

Permalink
refactor: Updated shim.recordConsume to use shim.record and added…
Browse files Browse the repository at this point in the history
… ability to invoke an after hook with callback args (#2207)
  • Loading branch information
bizob2828 authored May 24, 2024
1 parent 330cc4b commit 4f48fc3
Show file tree
Hide file tree
Showing 21 changed files with 292 additions and 332 deletions.
18 changes: 9 additions & 9 deletions lib/instrumentation/amqplib/amqplib.js
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,20 @@ function wrapModel(shim, Model, promiseMode) {
destinationName: shim.FIRST,
callback: setCallback(shim, promiseMode),
promise: promiseMode,
messageHandler: function handleConsumedMessage(shim, fn, name, message) {
after: function handleConsumedMessage({ shim, result, args, segment }) {
if (!shim.agent.config.message_tracer.segment_parameters.enabled) {
shim.logger.trace('Not capturing segment parameters')
return
}

// the message is the param when using the promised based model
message = promiseMode ? message : message[1]
const message = promiseMode ? result : args?.[1]
if (!message) {
shim.logger.trace('No results from consume.')
return null
}
const parameters = getParametersFromMessage(message)

const headers = message?.properties?.headers

return { parameters, headers }
shim.copySegmentParameters(segment, parameters)
}
})
)
Expand Down Expand Up @@ -312,12 +314,10 @@ function wrapModel(shim, Model, promiseMode) {
* Extracts the appropriate messageHandler parameters for the consume method.
*
* @param {Shim} shim instance of shim
* @param {object} _consumer not used
* @param {string} _name not used
* @param {Array} args arguments passed to the consume method
* @returns {object} message params
*/
function describeMessage(shim, _consumer, _name, args) {
function describeMessage(shim, args) {
const [message] = args

if (!message?.properties) {
Expand Down
3 changes: 1 addition & 2 deletions lib/instrumentation/aws-sdk/v3/bedrock.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ function getBedrockSpec({ commandName }, shim, _original, _name, args) {
return new RecorderSpec({
promise: true,
name: `Llm/${modelType}/Bedrock/${commandName}`,
// eslint-disable-next-line max-params
after: (shim, _fn, _fnName, err, response, segment) => {
after: ({ shim, error: err, result: response, segment }) => {
const passThroughParams = {
shim,
err,
Expand Down
2 changes: 1 addition & 1 deletion lib/instrumentation/core/inspector.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function initialize(agent, inspector, name, shim) {
shim.wrap(sessionProto, 'post', function wrapPost(shim, fn) {
return function wrappedPost() {
const args = shim.argsToArray.apply(shim, arguments)
shim.bindCallbackSegment(args, shim.LAST)
shim.bindCallbackSegment(null, args, shim.LAST)
return fn.apply(this, args)
}
})
Expand Down
6 changes: 2 additions & 4 deletions lib/instrumentation/langchain/runnable.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ function instrumentInvokeChain({ langchain, shim }) {
return new RecorderSpec({
name: `${LANGCHAIN.CHAIN}/${fnName}`,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, output, segment) {
after({ error: err, result: output, segment }) {
recordChatCompletionEvents({
segment,
messages: [output],
Expand Down Expand Up @@ -97,8 +96,7 @@ function instrumentStream({ langchain, shim }) {
return new RecorderSpec({
name: `${LANGCHAIN.CHAIN}/${fnName}`,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, output, segment) {
after({ error: err, result: output, segment }) {
// Input error occurred which means a stream was not created.
// Skip instrumenting streaming and create Llm Events from
// the data we have
Expand Down
3 changes: 1 addition & 2 deletions lib/instrumentation/langchain/tools.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ module.exports = function initialize(shim, tools) {
return new RecorderSpec({
name: `${LANGCHAIN.TOOL}/${name}`,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, output, segment) {
after({ error: err, result: output, segment }) {
const metadata = mergeMetadata(instanceMeta, paramsMeta)
const tags = mergeTags(instanceTags, paramsTags)
segment.end()
Expand Down
3 changes: 1 addition & 2 deletions lib/instrumentation/langchain/vectorstore.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ module.exports = function initialize(shim, vectorstores) {
return new RecorderSpec({
name: `${LANGCHAIN.VECTORSTORE}/${fnName}`,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, output, segment) {
after({ error: err, result: output, segment }) {
if (!output) {
// If we get an error, it is possible that `output = null`.
// In that case, we define it to be an empty array.
Expand Down
2 changes: 1 addition & 1 deletion lib/instrumentation/memcached.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ module.exports = function initialize(agent, memcached, moduleName, shim) {
return new OperationSpec({
name: metacall.type || 'Unknown',
callback: function wrapCallback(shim, fn, fnName, opSegment) {
shim.bindCallbackSegment(metacall, 'callback', opSegment)
shim.bindCallbackSegment(null, metacall, 'callback', opSegment)
},
parameters
})
Expand Down
6 changes: 2 additions & 4 deletions lib/instrumentation/openai.js
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,7 @@ module.exports = function initialize(agent, openai, moduleName, shim) {
return new RecorderSpec({
name: OPENAI.COMPLETION,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, response, segment) {
after({ error: err, result: response, segment }) {
if (request.stream) {
instrumentStream({ agent, shim, request, response, segment })
} else {
Expand Down Expand Up @@ -294,8 +293,7 @@ module.exports = function initialize(agent, openai, moduleName, shim) {
return new RecorderSpec({
name: OPENAI.EMBEDDING,
promise: true,
// eslint-disable-next-line max-params
after(_shim, _fn, _name, err, response, segment) {
after({ error: err, result: response, segment }) {
addLlmMeta({ agent, segment })

if (!response) {
Expand Down
6 changes: 3 additions & 3 deletions lib/instrumentation/redis.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ function registerInternalSendCommand(shim, proto) {
parameters,
callback: function bindCallback(shim, _f, _n, segment) {
if (shim.isFunction(commandObject.callback)) {
shim.bindCallbackSegment(commandObject, 'callback', segment)
shim.bindCallbackSegment(null, commandObject, 'callback', segment)
} else {
const self = this
commandObject.callback = shim.bindSegment(
Expand Down Expand Up @@ -87,9 +87,9 @@ function registerSendCommand(shim, proto) {
callback: function bindCallback(shim, _f, _n, segment) {
const last = args[args.length - 1]
if (shim.isFunction(last)) {
shim.bindCallbackSegment(args, shim.LAST, segment)
shim.bindCallbackSegment(null, args, shim.LAST, segment)
} else if (shim.isArray(last) && shim.isFunction(last[last.length - 1])) {
shim.bindCallbackSegment(last, shim.LAST, segment)
shim.bindCallbackSegment(null, last, shim.LAST, segment)
}
}
})
Expand Down
2 changes: 1 addition & 1 deletion lib/instrumentation/superagent.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ function wrapCallback(shim, callback) {
return function wrappedCallback() {
const segment = shim.getSegment(this)
if (segment && segment.transaction.isActive()) {
shim.bindCallbackSegment(this, '_callback', segment)
shim.bindCallbackSegment(null, this, '_callback', segment)
}
return callback.apply(this, arguments)
}
Expand Down
128 changes: 9 additions & 119 deletions lib/shim/message-shim/consume.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/

'use strict'
const TraceSegment = require('../../transaction/trace/segment')
const genericRecorder = require('../../metrics/recorders/generic')
const { _nameMessageSegment } = require('./common')
const specs = require('../specs')
Expand All @@ -25,7 +24,7 @@ module.exports = createRecorder
function updateSpecFromArgs({ shim, fn, fnName, args, spec }) {
let msgDesc = null
if (shim.isFunction(spec)) {
msgDesc = spec.call(this, shim, fn, fnName, args)
msgDesc = spec(shim, fn, fnName, args)
} else {
msgDesc = spec
const destIdx = shim.normalizeIndex(args.length, spec.destinationName)
Expand All @@ -37,130 +36,21 @@ function updateSpecFromArgs({ shim, fn, fnName, args, spec }) {
return msgDesc
}

/**
* Binds the consumer callback to the active segment.
*
* @private
* @param {object} params to function
* @param {MessageShim} params.shim instance of shim
* @param {Array} params.args arguments passed to original consume function
* @param {specs.MessageSpec} params.msgDesc spec for the wrapped consume function
* @param {TraceSegment} params.segment active segment to bind callback
* @param {boolean} params.getParams flag to copy message parameters to segment
* @param {Function} params.resHandler function to handle response from callback to obtain the message parameters
*/
function bindCallback({ shim, args, msgDesc, segment, getParams, resHandler }) {
const cbIdx = shim.normalizeIndex(args.length, msgDesc.callback)
if (cbIdx !== null) {
shim.bindCallbackSegment(args, cbIdx, segment)

// If we have a callback and a results handler, then wrap the callback so
// we can call the results handler and get the message properties.
if (resHandler) {
shim.wrap(args, cbIdx, function wrapCb(shim, cb, cbName) {
if (shim.isFunction(cb)) {
return function cbWrapper() {
const cbArgs = shim.argsToArray.apply(shim, arguments)
const msgProps = resHandler.call(this, shim, cb, cbName, cbArgs)
if (getParams && msgProps && msgProps.parameters) {
shim.copySegmentParameters(segment, msgProps.parameters)
}

return cb.apply(this, arguments)
}
}
})
}
}
}

/**
* Binds the consumer function to the async context and checks return to possibly
* bind the promise
*
* @private
* @param {object} params to function
* @param {MessageShim} params.shim instance of shim
* @param {Function} params.fn consumer function
* @param {string} params.fnName name of function
* @param {Array} params.args arguments passed to original consume function
* @param {specs.MessageSpec} params.msgDesc spec for the wrapped consume function
* @param {TraceSegment} params.segment active segment to bind callback
* @param {boolean} params.getParams flag to copy message parameters to segment
* @param {Function} params.resHandler function to handle response from callback to obtain the message parameters
* @returns {Promise|*} response from consume function
*/
function bindConsumer({ shim, fn, fnName, args, msgDesc, segment, getParams, resHandler }) {
// Call the method in the context of our segment.
let ret = shim.applySegment(fn, segment, true, this, args)

if (ret && msgDesc.promise && shim.isPromise(ret)) {
ret = shim.bindPromise(ret, segment)

// Intercept the promise to handle the result.
if (resHandler) {
ret = ret.then(function interceptValue(res) {
const msgProps = resHandler.call(this, shim, fn, fnName, res)
if (getParams && msgProps && msgProps.parameters) {
shim.copySegmentParameters(segment, msgProps.parameters)
}
return res
})
}
}

return ret
}

/**
*
* @private
* @param {object} params to function
* @param {MessageShim} params.shim instance of shim
* @param {Function} params.fn function that is being wrapped
* @param {string} params.fnName name of function
* @param params.args
* @param {specs.MessageSpec} params.spec spec for the wrapped consume function
* @returns {Function} recorder for consume function
* @returns {specs.MessageSpec} updated spec with logic to name segment and apply the genericRecorder
*/
function createRecorder({ shim, fn, fnName, spec }) {
return function consumeRecorder() {
const parent = shim.getSegment()
if (!parent || !parent.transaction.isActive()) {
shim.logger.trace('Not recording consume, no active transaction.')
return fn.apply(this, arguments)
}

// Process the message args.
const args = shim.argsToArray.apply(shim, arguments)
const msgDesc = updateSpecFromArgs.call(this, { shim, fn, fnName, args, spec })

// Make the segment if we can.
if (!msgDesc) {
shim.logger.trace('Not recording consume, no message descriptor.')
return fn.apply(this, args)
}

const name = _nameMessageSegment(shim, msgDesc, shim._metrics.CONSUME)

// Adds details needed by createSegment when used with a spec
msgDesc.name = name
msgDesc.recorder = genericRecorder
msgDesc.parent = parent

const segment = shim.createSegment(msgDesc)
const getParams = shim.agent.config.message_tracer.segment_parameters.enabled
const resHandler = shim.isFunction(msgDesc.messageHandler) ? msgDesc.messageHandler : null

bindCallback({ shim, args, msgDesc, segment, getParams, resHandler })
return bindConsumer.call(this, {
shim,
fn,
fnName,
args,
msgDesc,
segment,
getParams,
resHandler
})
}
function createRecorder({ spec, shim, fn, fnName, args }) {
const msgDesc = updateSpecFromArgs({ shim, fn, fnName, args, spec })
// Adds details needed by createSegment when used with a spec
msgDesc.name = _nameMessageSegment(shim, msgDesc, shim._metrics.CONSUME)
msgDesc.recorder = genericRecorder
return msgDesc
}
13 changes: 2 additions & 11 deletions lib/shim/message-shim/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -287,17 +287,8 @@ function recordConsume(nodule, properties, spec) {
properties = null
}

// This is using wrap instead of record because the spec allows for a messageHandler
// which is being used to handle the result of the callback or promise of the
// original wrapped consume function.
// TODO: https://github.com/newrelic/node-newrelic/issues/981
return this.wrap(nodule, properties, function wrapConsume(shim, fn, fnName) {
if (!shim.isFunction(fn)) {
shim.logger.debug('Not wrapping %s (%s) as consume', fn, fnName)
return fn
}

return createRecorder({ shim, fn, fnName, spec })
return this.record(nodule, properties, function wrapConsume(shim, fn, fnName, args) {
return createRecorder({ spec, shim, fn, fnName, args })
})
}

Expand Down
9 changes: 4 additions & 5 deletions lib/shim/message-shim/subscribe-consume.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ function makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) {
spec.queue = queue
}

return function wrapConsumer(shim, consumer, cName) {
return function wrapConsumer(shim, consumer) {
if (!shim.isFunction(consumer)) {
return consumer
}

const consumerWrapper = createConsumerWrapper({ shim, consumer, cName, spec })
const consumerWrapper = createConsumerWrapper({ shim, consumer, spec })
return shim.bindCreateTransaction(
consumerWrapper,
new specs.TransactionSpec({
Expand All @@ -108,10 +108,9 @@ function makeWrapConsumer({ spec, queue, destinationName, destNameIsArg }) {
* @param {MessageShim} params.shim instance of shim
* @param {specs.MessageSubscribeSpec} params.spec spec for function
* @param {Function} params.consumer function for consuming message
* @param {string} params.cName name of consumer function
* @returns {Function} handler for the transaction being created
*/
function createConsumerWrapper({ shim, spec, consumer, cName }) {
function createConsumerWrapper({ shim, spec, consumer }) {
return function createConsumeTrans() {
// If there is no transaction or we're in a pre-existing transaction,
// then don't do anything. Note that the latter should never happen.
Expand All @@ -123,7 +122,7 @@ function createConsumerWrapper({ shim, spec, consumer, cName }) {
return consumer.apply(this, args)
}

const msgDesc = spec.messageHandler.call(this, shim, consumer, cName, args)
const msgDesc = spec.messageHandler.call(this, shim, args)

// If message could not be handled, immediately kill this transaction.
if (!msgDesc) {
Expand Down
Loading

0 comments on commit 4f48fc3

Please sign in to comment.