diff --git a/langchain-core/src/callbacks/manager.ts b/langchain-core/src/callbacks/manager.ts index 34fe086636a6..baeccb445618 100644 --- a/langchain-core/src/callbacks/manager.ts +++ b/langchain-core/src/callbacks/manager.ts @@ -61,6 +61,13 @@ export interface BaseCallbackConfig { * Tags are passed to all callbacks, metadata is passed to handle*Start callbacks. */ callbacks?: Callbacks; + + /** + * Runtime values for attributes previously made configurable on this Runnable, + * or sub-Runnables. + */ + // eslint-disable-next-line @typescript-eslint/no-explicit-any + configurable?: Record; } export function parseCallbackConfigArg( @@ -484,9 +491,9 @@ export class CallbackManager extends BaseCallbackManager implements BaseCallbackManagerMethods { - handlers: BaseCallbackHandler[]; + handlers: BaseCallbackHandler[] = []; - inheritableHandlers: BaseCallbackHandler[]; + inheritableHandlers: BaseCallbackHandler[] = []; tags: string[] = []; @@ -500,10 +507,26 @@ export class CallbackManager private readonly _parentRunId?: string; - constructor(parentRunId?: string) { + constructor( + parentRunId?: string, + options?: { + handlers?: BaseCallbackHandler[]; + inheritableHandlers?: BaseCallbackHandler[]; + tags?: string[]; + inheritableTags?: string[]; + metadata?: Record; + inheritableMetadata?: Record; + } + ) { super(); - this.handlers = []; - this.inheritableHandlers = []; + this.handlers = options?.handlers ?? this.handlers; + this.inheritableHandlers = + options?.inheritableHandlers ?? this.inheritableHandlers; + this.tags = options?.tags ?? this.tags; + this.inheritableTags = options?.inheritableTags ?? this.inheritableTags; + this.metadata = options?.metadata ?? this.metadata; + this.inheritableMetadata = + options?.inheritableMetadata ?? this.inheritableMetadata; this._parentRunId = parentRunId; } diff --git a/langchain-core/src/chat_history.ts b/langchain-core/src/chat_history.ts index 6841d19e865e..979323008b3b 100644 --- a/langchain-core/src/chat_history.ts +++ b/langchain-core/src/chat_history.ts @@ -32,3 +32,33 @@ export abstract class BaseListChatMessageHistory extends Serializable { return this.addMessage(new AIMessage(message)); } } + +export class FakeChatMessageHistory extends BaseChatMessageHistory { + lc_namespace = ["langchain", "core", "message", "fake"]; + + messages: Array = []; + + constructor() { + super(); + } + + async getMessages(): Promise { + return this.messages; + } + + async addMessage(message: BaseMessage): Promise { + this.messages.push(message); + } + + async addUserMessage(message: string): Promise { + this.messages.push(new HumanMessage(message)); + } + + async addAIChatMessage(message: string): Promise { + this.messages.push(new AIMessage(message)); + } + + async clear(): Promise { + this.messages = []; + } +} diff --git a/langchain-core/src/runnables/base.ts b/langchain-core/src/runnables/base.ts index 0403b91486bb..697bc59f095a 100644 --- a/langchain-core/src/runnables/base.ts +++ b/langchain-core/src/runnables/base.ts @@ -12,11 +12,21 @@ import { } from "../tracers/log_stream.js"; import { Serializable } from "../load/serializable.js"; import { IterableReadableStream } from "../utils/stream.js"; -import { RunnableConfig, getCallbackMangerForConfig } from "./config.js"; +import { + RunnableConfig, + getCallbackMangerForConfig, + mergeConfigs, +} from "./config.js"; import { AsyncCaller } from "../utils/async_caller.js"; +import { Run } from "../tracers/base.js"; +import { RootListenersTracer } from "../tracers/root_listener.js"; export type RunnableFunc = ( - input: RunInput + input: RunInput, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + options?: Record & { + config?: RunnableConfig; + } ) => RunOutput | Promise; // eslint-disable-next-line @typescript-eslint/no-explicit-any @@ -549,6 +559,45 @@ export abstract class Runnable< static isRunnable(thing: any): thing is Runnable { return thing ? thing.lc_runnable : false; } + + /** + * Bind lifecycle listeners to a Runnable, returning a new Runnable. + * The Run object contains information about the run, including its id, + * type, input, output, error, startTime, endTime, and any tags or metadata + * added to the run. + * + * @param {Object} params - The object containing the callback functions. + * @param {(run: Run) => void} params.onStart - Called before the runnable starts running, with the Run object. + * @param {(run: Run) => void} params.onEnd - Called after the runnable finishes running, with the Run object. + * @param {(run: Run) => void} params.onError - Called if the runnable throws an error, with the Run object. + */ + withListeners({ + onStart, + onEnd, + onError, + }: { + onStart?: (run: Run, config?: RunnableConfig) => void | Promise; + onEnd?: (run: Run, config?: RunnableConfig) => void | Promise; + onError?: (run: Run, config?: RunnableConfig) => void | Promise; + }): Runnable { + // eslint-disable-next-line @typescript-eslint/no-use-before-define + return new RunnableBinding({ + bound: this, + config: {}, + configFactories: [ + (config) => ({ + callbacks: [ + new RootListenersTracer({ + config, + onStart, + onEnd, + onError, + }), + ], + }), + ], + }); + } } export type RunnableBindingArgs< @@ -557,8 +606,9 @@ export type RunnableBindingArgs< CallOptions extends RunnableConfig > = { bound: Runnable; - kwargs: Partial; + kwargs?: Partial; config: RunnableConfig; + configFactories?: Array<(config: RunnableConfig) => RunnableConfig>; }; /** @@ -581,31 +631,35 @@ export class RunnableBinding< config: RunnableConfig; - protected kwargs: Partial; + protected kwargs?: Partial; + + configFactories?: Array< + (config: RunnableConfig) => RunnableConfig | Promise + >; constructor(fields: RunnableBindingArgs) { super(fields); this.bound = fields.bound; this.kwargs = fields.kwargs; this.config = fields.config; + this.configFactories = fields.configFactories; } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - _mergeConfig(options?: Record) { + async _mergeConfig( // eslint-disable-next-line @typescript-eslint/no-explicit-any - const copy: Record = { ...this.config }; - if (options) { - for (const key of Object.keys(options)) { - if (key === "metadata") { - copy[key] = { ...copy[key], ...options[key] }; - } else if (key === "tags") { - copy[key] = (copy[key] ?? []).concat(options[key] ?? []); - } else { - copy[key] = options[key] ?? copy[key]; - } - } - } - return copy as Partial; + options?: Record + ): Promise> { + const config = mergeConfigs(this.config, options); + return mergeConfigs( + config, + ...(this.configFactories + ? await Promise.all( + this.configFactories.map( + async (configFactory) => await configFactory(config) + ) + ) + : []) + ); } bind( @@ -645,7 +699,7 @@ export class RunnableBinding< ): Promise { return this.bound.invoke( input, - this._mergeConfig({ ...options, ...this.kwargs }) + await this._mergeConfig({ ...options, ...this.kwargs }) ); } @@ -673,13 +727,15 @@ export class RunnableBinding< batchOptions?: RunnableBatchOptions ): Promise<(RunOutput | Error)[]> { const mergedOptions = Array.isArray(options) - ? options.map((individualOption) => - this._mergeConfig({ - ...individualOption, - ...this.kwargs, - }) + ? await Promise.all( + options.map(async (individualOption) => + this._mergeConfig({ + ...individualOption, + ...this.kwargs, + }) + ) ) - : this._mergeConfig({ ...options, ...this.kwargs }); + : await this._mergeConfig({ ...options, ...this.kwargs }); return this.bound.batch(inputs, mergedOptions, batchOptions); } @@ -689,7 +745,7 @@ export class RunnableBinding< ) { yield* this.bound._streamIterator( input, - this._mergeConfig({ ...options, ...this.kwargs }) + await this._mergeConfig({ ...options, ...this.kwargs }) ); } @@ -699,7 +755,7 @@ export class RunnableBinding< ): Promise> { return this.bound.stream( input, - this._mergeConfig({ ...options, ...this.kwargs }) + await this._mergeConfig({ ...options, ...this.kwargs }) ); } @@ -710,7 +766,7 @@ export class RunnableBinding< ): AsyncGenerator { yield* this.bound.transform( generator, - this._mergeConfig({ ...options, ...this.kwargs }) + await this._mergeConfig({ ...options, ...this.kwargs }) ); } @@ -721,6 +777,45 @@ export class RunnableBinding< ): thing is RunnableBinding { return thing.bound && Runnable.isRunnable(thing.bound); } + + /** + * Bind lifecycle listeners to a Runnable, returning a new Runnable. + * The Run object contains information about the run, including its id, + * type, input, output, error, startTime, endTime, and any tags or metadata + * added to the run. + * + * @param {Object} params - The object containing the callback functions. + * @param {(run: Run) => void} params.onStart - Called before the runnable starts running, with the Run object. + * @param {(run: Run) => void} params.onEnd - Called after the runnable finishes running, with the Run object. + * @param {(run: Run) => void} params.onError - Called if the runnable throws an error, with the Run object. + */ + withListeners({ + onStart, + onEnd, + onError, + }: { + onStart?: (run: Run, config?: RunnableConfig) => void | Promise; + onEnd?: (run: Run, config?: RunnableConfig) => void | Promise; + onError?: (run: Run, config?: RunnableConfig) => void | Promise; + }): Runnable { + return new RunnableBinding({ + bound: this.bound, + kwargs: this.kwargs, + config: this.config, + configFactories: [ + (config) => ({ + callbacks: [ + new RootListenersTracer({ + config, + onStart, + onEnd, + onError, + }), + ], + }), + ], + }); + } } /** @@ -789,6 +884,32 @@ export class RunnableEach< this._patchConfig(config, runManager?.getChild()) ); } + + /** + * Bind lifecycle listeners to a Runnable, returning a new Runnable. + * The Run object contains information about the run, including its id, + * type, input, output, error, startTime, endTime, and any tags or metadata + * added to the run. + * + * @param {Object} params - The object containing the callback functions. + * @param {(run: Run) => void} params.onStart - Called before the runnable starts running, with the Run object. + * @param {(run: Run) => void} params.onEnd - Called after the runnable finishes running, with the Run object. + * @param {(run: Run) => void} params.onError - Called if the runnable throws an error, with the Run object. + */ + withListeners({ + onStart, + onEnd, + onError, + }: { + onStart?: (run: Run, config?: RunnableConfig) => void | Promise; + onEnd?: (run: Run, config?: RunnableConfig) => void | Promise; + onError?: (run: Run, config?: RunnableConfig) => void | Promise; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + }): Runnable { + return new RunnableEach({ + bound: this.bound.withListeners({ onStart, onEnd, onError }), + }); + } } /** @@ -1382,7 +1503,7 @@ export class RunnableLambda extends Runnable< config?: Partial, runManager?: CallbackManagerForChainRun ) { - let output = await this.func(input); + let output = await this.func(input, { config }); if (output && Runnable.isRunnable(output)) { output = await output.invoke( input, diff --git a/langchain-core/src/runnables/config.ts b/langchain-core/src/runnables/config.ts index d70cbeb328bc..e604071a7b09 100644 --- a/langchain-core/src/runnables/config.ts +++ b/langchain-core/src/runnables/config.ts @@ -14,3 +14,81 @@ export async function getCallbackMangerForConfig(config?: RunnableConfig) { config?.metadata ); } + +export function mergeConfigs( + config: RunnableConfig, + // eslint-disable-next-line @typescript-eslint/no-explicit-any + options?: Record +): Partial { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const copy: Record = { ...config }; + if (options) { + for (const key of Object.keys(options)) { + if (key === "metadata") { + copy[key] = { ...copy[key], ...options[key] }; + } else if (key === "tags") { + copy[key] = (copy[key] ?? []).concat(options[key] ?? []); + } else if (key === "callbacks") { + const baseCallbacks = copy.callbacks; + const providedCallbacks = options.callbacks ?? config.callbacks; + // callbacks can be either undefined, Array or manager + // so merging two callbacks values has 6 cases + if (Array.isArray(providedCallbacks)) { + if (!baseCallbacks) { + copy.callbacks = providedCallbacks; + } else if (Array.isArray(baseCallbacks)) { + copy.callbacks = baseCallbacks.concat(providedCallbacks); + } else { + // baseCallbacks is a manager + const manager = baseCallbacks.copy(); + for (const callback of providedCallbacks) { + manager.addHandler(callback, true); + } + copy.callbacks = manager; + } + } else if (providedCallbacks) { + // providedCallbacks is a manager + if (!baseCallbacks) { + copy.callbacks = providedCallbacks; + } else if (Array.isArray(baseCallbacks)) { + const manager = providedCallbacks.copy(); + for (const callback of baseCallbacks) { + manager.addHandler(callback, true); + } + copy.callbacks = manager; + } else { + // baseCallbacks is also a manager + copy.callbacks = new CallbackManager( + providedCallbacks.parentRunId, + { + handlers: baseCallbacks.handlers.concat( + providedCallbacks.handlers + ), + inheritableHandlers: baseCallbacks.inheritableHandlers.concat( + providedCallbacks.inheritableHandlers + ), + tags: Array.from( + new Set(baseCallbacks.tags.concat(providedCallbacks.tags)) + ), + inheritableTags: Array.from( + new Set( + baseCallbacks.inheritableTags.concat( + providedCallbacks.inheritableTags + ) + ) + ), + metadata: { + ...baseCallbacks.metadata, + ...providedCallbacks.metadata, + }, + } + ); + } + } + } else { + copy[key] = options[key] ?? copy[key]; + } + } + } + return copy as Partial; +} diff --git a/langchain-core/src/runnables/history.ts b/langchain-core/src/runnables/history.ts new file mode 100644 index 000000000000..662a1cbb4de6 --- /dev/null +++ b/langchain-core/src/runnables/history.ts @@ -0,0 +1,190 @@ +import { BaseCallbackConfig } from "../callbacks/manager.js"; +import { BaseChatMessageHistory } from "../chat_history.js"; +import { + AIMessage, + BaseMessage, + HumanMessage, + isBaseMessage, +} from "../messages/index.js"; +import { Run } from "../tracers/base.js"; +import { + Runnable, + RunnableBinding, + type RunnableBindingArgs, + RunnableLambda, +} from "./base.js"; +import { RunnableConfig } from "./config.js"; +import { RunnablePassthrough } from "./passthrough.js"; + +type GetSessionHistoryCallable = ( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ...args: Array +) => Promise; + +export class RunnableWithMessageHistory< + RunInput, + RunOutput +> extends RunnableBinding { + runnable: Runnable; + + inputMessagesKey?: string; + + outputMessagesKey?: string; + + historyMessagesKey?: string; + + getMessageHistory: GetSessionHistoryCallable; + + constructor( + fields: Omit< + RunnableBindingArgs, + "bound" + > & { + runnable: Runnable; + getMessageHistory: GetSessionHistoryCallable; + inputMessagesKey?: string; + outputMessagesKey?: string; + historyMessagesKey?: string; + } + ) { + let historyChain: Runnable = new RunnableLambda({ + func: (input, options) => this._enterHistory(input, options ?? {}), + }).withConfig({ runName: "loadHistory" }); + + const messagesKey = fields.historyMessagesKey ?? fields.inputMessagesKey; + if (messagesKey) { + historyChain = RunnablePassthrough.assign({ + [messagesKey]: historyChain, + }).withConfig({ runName: "insertHistory" }); + } + + const bound = historyChain + .pipe( + fields.runnable.withListeners({ + onEnd: (run, config) => this._exitHistory(run, config ?? {}), + }) + ) + .withConfig({ runName: "RunnableWithMessageHistory" }); + + super({ + ...fields, + bound, + }); + this.runnable = fields.runnable; + this.getMessageHistory = fields.getMessageHistory; + this.inputMessagesKey = fields.inputMessagesKey; + this.outputMessagesKey = fields.outputMessagesKey; + this.historyMessagesKey = fields.historyMessagesKey; + } + + _getInputMessages( + inputValue: string | BaseMessage | Array + ): Array { + if (typeof inputValue === "string") { + return [new HumanMessage(inputValue)]; + } else if (Array.isArray(inputValue)) { + return inputValue; + } else { + return [inputValue]; + } + } + + _getOutputMessages( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + outputValue: string | BaseMessage | Array | Record + ): Array { + let newOutputValue = outputValue; + if ( + !Array.isArray(outputValue) && + !isBaseMessage(outputValue) && + typeof outputValue !== "string" + ) { + newOutputValue = outputValue[this.outputMessagesKey ?? "output"]; + } + + if (typeof newOutputValue === "string") { + return [new AIMessage(newOutputValue)]; + } else if (Array.isArray(newOutputValue)) { + return newOutputValue; + } else if (isBaseMessage(newOutputValue)) { + return [newOutputValue]; + } + throw new Error( + `Expected a string, BaseMessage, or array of BaseMessages. Received: ${JSON.stringify( + newOutputValue, + null, + 2 + )}` + ); + } + + _enterHistory( + // eslint-disable-next-line @typescript-eslint/no-explicit-any + input: any, + kwargs?: { config?: RunnableConfig } + ): Array { + const history = kwargs?.config?.configurable?.messageHistory; + + if (this.historyMessagesKey) { + return history.messages; + } + + const inputVal = + input || + (this.inputMessagesKey ? input[this.inputMessagesKey] : undefined); + const historyMessages = history ? history.messages : []; + const returnType = [ + ...historyMessages, + ...this._getInputMessages(inputVal), + ]; + return returnType; + } + + async _exitHistory(run: Run, config: BaseCallbackConfig): Promise { + const history = config.configurable?.messageHistory; + + // Get input messages + const { inputs } = run; + const inputValue = inputs[this.inputMessagesKey ?? "input"]; + const inputMessages = this._getInputMessages(inputValue); + // Get output messages + const outputValue = run.outputs; + if (!outputValue) { + throw new Error( + `Output values from 'Run' undefined. Run: ${JSON.stringify( + run, + null, + 2 + )}` + ); + } + const outputMessages = this._getOutputMessages(outputValue); + + for await (const message of [...inputMessages, ...outputMessages]) { + await history.addMessage(message); + } + } + + async _mergeConfig(...configs: Array) { + const config = await super._mergeConfig(...configs); + // Extract sessionId + if (!config.configurable || !config.configurable.sessionId) { + const exampleInput = { + [this.inputMessagesKey ?? "input"]: "foo", + }; + const exampleConfig = { configurable: { sessionId: "123" } }; + throw new Error( + `sessionId is required. Pass it in as part of the config argument to .invoke() or .stream()\n` + + `eg. chain.invoke(${JSON.stringify(exampleInput)}, ${JSON.stringify( + exampleConfig + )})` + ); + } + // attach messageHistory + const { sessionId } = config.configurable; + config.configurable.messageHistory = await this.getMessageHistory( + sessionId + ); + return config; + } +} diff --git a/langchain-core/src/runnables/tests/runnable.test.ts b/langchain-core/src/runnables/tests/runnable.test.ts index 702e2a309193..149a39112c4f 100644 --- a/langchain-core/src/runnables/tests/runnable.test.ts +++ b/langchain-core/src/runnables/tests/runnable.test.ts @@ -1,16 +1,18 @@ /* eslint-disable no-promise-executor-return */ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { test } from "@jest/globals"; +import { Run } from "langsmith"; +import { jest } from "@jest/globals"; +import { createChatMessageChunkEncoderStream } from "../../language_models/chat_models.js"; +import { BaseMessage } from "../../messages/index.js"; +import { OutputParserException } from "../../output_parsers/base.js"; +import { StringOutputParser } from "../../output_parsers/string.js"; import { ChatPromptTemplate, - HumanMessagePromptTemplate, - PromptTemplate, SystemMessagePromptTemplate, -} from "../../prompts/index.js"; -import { Document } from "../../documents/document.js"; -import { createChatMessageChunkEncoderStream } from "../../language_models/chat_models.js"; -import { StringOutputParser } from "../../output_parsers/string.js"; + HumanMessagePromptTemplate, +} from "../../prompts/chat.js"; +import { PromptTemplate } from "../../prompts/prompt.js"; import { FakeLLM, FakeChatModel, @@ -18,11 +20,11 @@ import { FakeStreamingLLM, FakeSplitIntoListParser, FakeRunnable, + FakeListChatModel, } from "../../utils/testing/index.js"; import { RunnableSequence, RunnableMap, RunnableLambda } from "../base.js"; import { RouterRunnable } from "../router.js"; -import { OutputParserException } from "../../output_parsers/base.js"; -import { BaseMessage } from "../../messages/index.js"; +import { Document } from "../../documents/document.js"; test("Test batch", async () => { const llm = new FakeLLM({}); @@ -215,6 +217,73 @@ test("Runnable withConfig", async () => { expect(chunks[0]?.metadata).toEqual({ a: "updated", b: "c" }); }); +test("Listeners work", async () => { + const prompt = ChatPromptTemplate.fromMessages([ + SystemMessagePromptTemplate.fromTemplate("You are a nice assistant."), + ["human", "{question}"], + ]); + const model = new FakeListChatModel({ + responses: ["foo"], + }); + const chain = prompt.pipe(model); + + const mockStart = jest.fn(); + const mockEnd = jest.fn(); + + await chain + .withListeners({ + onStart: (run: Run) => { + mockStart(run); + }, + onEnd: (run: Run) => { + mockEnd(run); + }, + }) + .invoke({ question: "What is the meaning of life?" }); + + expect(mockStart).toHaveBeenCalledTimes(1); + expect((mockStart.mock.calls[0][0] as { name: string }).name).toBe( + "RunnableSequence" + ); + expect(mockEnd).toHaveBeenCalledTimes(1); +}); + +test("Listeners work with async handlers", async () => { + const prompt = ChatPromptTemplate.fromMessages([ + SystemMessagePromptTemplate.fromTemplate("You are a nice assistant."), + ["human", "{question}"], + ]); + const model = new FakeListChatModel({ + responses: ["foo"], + }); + const chain = prompt.pipe(model); + + const mockStart = jest.fn(); + const mockEnd = jest.fn(); + + await chain + .withListeners({ + onStart: async (run: Run) => { + const promise = new Promise((resolve) => setTimeout(resolve, 2000)); + await promise; + mockStart(run); + }, + // eslint-disable-next-line @typescript-eslint/no-misused-promises + onEnd: async (run: Run) => { + const promise = new Promise((resolve) => setTimeout(resolve, 2000)); + await promise; + mockEnd(run); + }, + }) + .invoke({ question: "What is the meaning of life?" }); + + expect(mockStart).toHaveBeenCalledTimes(1); + expect((mockStart.mock.calls[0][0] as { name: string }).name).toBe( + "RunnableSequence" + ); + expect(mockEnd).toHaveBeenCalledTimes(1); +}); + test("Create a runnable sequence and run it", async () => { const promptTemplate = PromptTemplate.fromTemplate("{input}"); const llm = new FakeChatModel({}); diff --git a/langchain-core/src/runnables/tests/runnable_history.test.ts b/langchain-core/src/runnables/tests/runnable_history.test.ts new file mode 100644 index 000000000000..c6e5d1822e63 --- /dev/null +++ b/langchain-core/src/runnables/tests/runnable_history.test.ts @@ -0,0 +1,47 @@ +import { BaseMessage, HumanMessage } from "../../messages/index.js"; +import { RunnableLambda } from "../base.js"; +import { RunnableConfig } from "../config.js"; +import { RunnableWithMessageHistory } from "../history.js"; +import { + BaseChatMessageHistory, + FakeChatMessageHistory, +} from "../../chat_history.js"; + +async function getGetSessionHistory(): Promise< + (sessionId: string) => Promise +> { + const chatHistoryStore: { [key: string]: BaseChatMessageHistory } = {}; + + async function getSessionHistory( + sessionId: string + ): Promise { + if (!(sessionId in chatHistoryStore)) { + chatHistoryStore[sessionId] = new FakeChatMessageHistory(); + } + return chatHistoryStore[sessionId]; + } + + return getSessionHistory; +} + +test("Runnable with message history", async () => { + const runnable = new RunnableLambda({ + func: (messages: BaseMessage[]) => + `you said: ${messages + .filter((m) => m._getType() === "human") + .map((m) => m.content) + .join("\n")}`, + }); + + const getMessageHistory = await getGetSessionHistory(); + const withHistory = new RunnableWithMessageHistory({ + runnable, + config: {}, + getMessageHistory, + }); + const config: RunnableConfig = { configurable: { sessionId: "1" } }; + let output = await withHistory.invoke([new HumanMessage("hello")], config); + expect(output).toBe("you said: hello"); + output = await withHistory.invoke([new HumanMessage("good bye")], config); + expect(output).toBe("you said: hello\ngood bye"); +}); diff --git a/langchain-core/src/tracers/root_listener.ts b/langchain-core/src/tracers/root_listener.ts new file mode 100644 index 000000000000..222c307f531f --- /dev/null +++ b/langchain-core/src/tracers/root_listener.ts @@ -0,0 +1,90 @@ +import { RunnableConfig } from "../runnables/config.js"; +import { BaseTracer, Run } from "./base.js"; + +export class RootListenersTracer extends BaseTracer { + name = "RootListenersTracer"; + + /** The Run's ID. Type UUID */ + rootId?: string; + + config: RunnableConfig; + + argOnStart?: { + (run: Run): void | Promise; + (run: Run, config: RunnableConfig): void | Promise; + }; + + argOnEnd?: { + (run: Run): void | Promise; + (run: Run, config: RunnableConfig): void | Promise; + }; + + argOnError?: { + (run: Run): void | Promise; + (run: Run, config: RunnableConfig): void | Promise; + }; + + constructor({ + config, + onStart, + onEnd, + onError, + }: { + config: RunnableConfig; + onStart?: (run: Run, config?: RunnableConfig) => void | Promise; + onEnd?: (run: Run, config?: RunnableConfig) => void | Promise; + onError?: (run: Run, config?: RunnableConfig) => void | Promise; + }) { + super(); + this.config = config; + this.argOnStart = onStart; + this.argOnEnd = onEnd; + this.argOnError = onError; + } + + /** + * This is a legacy method only called once for an entire run tree + * therefore not useful here + * @param {Run} _ Not used + */ + persistRun(_: Run): Promise { + return Promise.resolve(); + } + + async onRunCreate(run: Run) { + if (this.rootId) { + return; + } + + this.rootId = run.id; + + if (this.argOnStart) { + if (this.argOnStart.length === 1) { + await this.argOnStart(run); + } else if (this.argOnStart.length === 2) { + await this.argOnStart(run, this.config); + } + } + } + + async onRunUpdate(run: Run) { + if (run.id !== this.rootId) { + return; + } + if (!run.error) { + if (this.argOnEnd) { + if (this.argOnEnd.length === 1) { + await this.argOnEnd(run); + } else if (this.argOnEnd.length === 2) { + await this.argOnEnd(run, this.config); + } + } + } else if (this.argOnError) { + if (this.argOnError.length === 1) { + await this.argOnError(run); + } else if (this.argOnError.length === 2) { + await this.argOnError(run, this.config); + } + } + } +} diff --git a/langchain/src/agents/toolkits/conversational_retrieval/tool.ts b/langchain/src/agents/toolkits/conversational_retrieval/tool.ts index cc62eb3b28a4..36a15e2ef9a1 100644 --- a/langchain/src/agents/toolkits/conversational_retrieval/tool.ts +++ b/langchain/src/agents/toolkits/conversational_retrieval/tool.ts @@ -19,7 +19,7 @@ export function createRetrieverTool( input, runManager?.getChild("retriever") ); - return formatDocumentsAsString(docs, "\n"); + return formatDocumentsAsString(docs); }; const schema = z.object({ input: z diff --git a/langchain/src/memory/vector_store.ts b/langchain/src/memory/vector_store.ts index 8cdbb4e412fb..a0ba15b781d3 100644 --- a/langchain/src/memory/vector_store.ts +++ b/langchain/src/memory/vector_store.ts @@ -89,7 +89,7 @@ export class VectorStoreRetrieverMemory return { [this.memoryKey]: this.returnDocs ? results - : formatDocumentsAsString(results, "\n"), + : formatDocumentsAsString(results), }; } diff --git a/langchain/src/tools/webbrowser.ts b/langchain/src/tools/webbrowser.ts index c765d4c50492..55ec8249d8a1 100644 --- a/langchain/src/tools/webbrowser.ts +++ b/langchain/src/tools/webbrowser.ts @@ -267,7 +267,7 @@ export class WebBrowser extends Tool { undefined, runManager?.getChild("vectorstore") ); - context = formatDocumentsAsString(results, "\n"); + context = formatDocumentsAsString(results); } const input = `Text:${context}\n\nI need ${ diff --git a/langchain/src/util/document.ts b/langchain/src/util/document.ts index 3867af470ef9..a1ee0afc5285 100644 --- a/langchain/src/util/document.ts +++ b/langchain/src/util/document.ts @@ -7,7 +7,5 @@ import { Document } from "../document.js"; * @param documents * @returns A string of the documents page content, separated by newlines. */ -export const formatDocumentsAsString = ( - documents: Document[], - separator = "\n\n" -): string => documents.map((doc) => doc.pageContent).join(separator); +export const formatDocumentsAsString = (documents: Document[]): string => + documents.map((doc) => doc.pageContent).join("\n\n");