Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core[minor]: Runnable with message history #3437

Merged
merged 39 commits into from
Dec 2, 2023
Merged
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
df9a9f1
Runnable with message history
bracesproul Nov 29, 2023
a22855e
cr
bracesproul Nov 29, 2023
66ed823
Merge branch 'main' into brace/runnable-message-history
bracesproul Nov 29, 2023
add083f
cr
bracesproul Nov 29, 2023
05b71ea
adds withListeners method to runnables/callbacks
bracesproul Nov 29, 2023
2cf91d5
added entrypoint for root listener file
bracesproul Nov 29, 2023
21ce187
cr
bracesproul Nov 29, 2023
a8b2dbd
cr
bracesproul Nov 29, 2023
48e1f7e
cr
bracesproul Nov 29, 2023
a56cd19
cr
bracesproul Nov 29, 2023
89d689d
cr
bracesproul Nov 29, 2023
9fd9eaf
Merge branch 'brace/with-listeners' of https://github.com/langchain-a…
bracesproul Nov 29, 2023
be79ae6
support async listeners
bracesproul Nov 29, 2023
ad795af
Merge branch 'brace/with-listeners' of https://github.com/langchain-a…
bracesproul Nov 29, 2023
b71e30d
allow for run or run and config as args to listener funcs
bracesproul Nov 29, 2023
b7575a4
Merge branch 'brace/with-listeners' of https://github.com/langchain-a…
bracesproul Nov 29, 2023
e8432ac
cr
bracesproul Nov 30, 2023
00ab504
chore: lint files
bracesproul Nov 30, 2023
08698aa
cr
bracesproul Nov 30, 2023
b055337
cr
bracesproul Nov 30, 2023
8e23d5d
eslint disbale any
bracesproul Nov 30, 2023
a5576a8
Merge branch 'brace/with-listeners' of https://github.com/langchain-a…
bracesproul Nov 30, 2023
485b915
Merge branch 'main' into brace/with-listeners
bracesproul Nov 30, 2023
4394aa0
Merge branch 'main' into brace/with-listeners
bracesproul Dec 1, 2023
670f4f1
update types
bracesproul Dec 1, 2023
0a09cda
Merge branch 'main' into brace/with-listeners
bracesproul Dec 1, 2023
3b76fc4
Merge branch 'main' into brace/runnable-message-history
bracesproul Dec 1, 2023
bc8cdbf
Merge branch 'main' into brace/with-listeners
bracesproul Dec 1, 2023
7eb2ef9
Merge branch 'brace/with-listeners' of https://github.com/langchain-a…
bracesproul Dec 1, 2023
cec10ac
cr
bracesproul Dec 1, 2023
c9d3402
cr
bracesproul Dec 1, 2023
12b2d9b
merge main
bracesproul Dec 1, 2023
b465628
cr
bracesproul Dec 2, 2023
6df6e5d
Merge branch 'main' into brace/runnable-message-history
bracesproul Dec 2, 2023
dcbc640
cr
bracesproul Dec 2, 2023
5e113e9
cr
bracesproul Dec 2, 2023
1708764
Merge branch 'main' into brace/runnable-message-history
bracesproul Dec 2, 2023
52ceeea
cr
bracesproul Dec 2, 2023
f3f6c79
Style
jacoblee93 Dec 2, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
support async listeners
bracesproul committed Nov 29, 2023
commit be79ae6d056e0c46d139cd2694cde6413f60c982
53 changes: 31 additions & 22 deletions langchain-core/src/runnables/base.ts
Original file line number Diff line number Diff line change
@@ -572,9 +572,9 @@ export abstract class Runnable<
onEnd,
onError,
}: {
onStart?: (run: Run) => void;
onEnd?: (run: Run) => void;
onError?: (run: Run) => void;
onStart?: (run: Run) => void | Promise<void>;
onEnd?: (run: Run) => void | Promise<void>;
onError?: (run: Run) => void | Promise<void>;
}): Runnable<RunInput, RunOutput, CallOptions> {
// eslint-disable-next-line @typescript-eslint/no-use-before-define
return new RunnableBinding<RunInput, RunOutput, CallOptions>({
@@ -628,7 +628,9 @@ export class RunnableBinding<

protected kwargs?: Partial<CallOptions>;

configFactories?: Array<(config: RunnableConfig) => RunnableConfig>;
configFactories?: Array<
(config: RunnableConfig) => RunnableConfig | Promise<RunnableConfig>
>;

constructor(fields: RunnableBindingArgs<RunInput, RunOutput, CallOptions>) {
super(fields);
@@ -639,12 +641,16 @@ export class RunnableBinding<
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
_mergeConfig(options?: Record<string, any>): Partial<CallOptions> {
async _mergeConfig(
options?: Record<string, any>
): Promise<Partial<CallOptions>> {
const config = mergeConfigs(this.config, options);
return mergeConfigs(
config,
...(this.configFactories
? this.configFactories.map((f) => f(config))
? await Promise.all(
this.configFactories.map(async (f) => await f(config))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f -> factoryMethod or something

)
: [])
);
}
@@ -686,7 +692,7 @@ export class RunnableBinding<
): Promise<RunOutput> {
return this.bound.invoke(
input,
this._mergeConfig({ ...options, ...this.kwargs })
await this._mergeConfig({ ...options, ...this.kwargs })
);
}

@@ -714,13 +720,16 @@ export class RunnableBinding<
batchOptions?: RunnableBatchOptions
): Promise<(RunOutput | Error)[]> {
const mergedOptions = Array.isArray(options)
? options.map((individualOption) =>
this._mergeConfig({
...individualOption,
...this.kwargs,
})
? await Promise.all(
options.map(
async (individualOption) =>
await this._mergeConfig({
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No race condition right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't believe so, no.

...individualOption,
...this.kwargs,
})
)
)
: this._mergeConfig({ ...options, ...this.kwargs });
: await this._mergeConfig({ ...options, ...this.kwargs });
return this.bound.batch(inputs, mergedOptions, batchOptions);
}

@@ -730,7 +739,7 @@ export class RunnableBinding<
) {
yield* this.bound._streamIterator(
input,
this._mergeConfig({ ...options, ...this.kwargs })
await this._mergeConfig({ ...options, ...this.kwargs })
);
}

@@ -740,7 +749,7 @@ export class RunnableBinding<
): Promise<IterableReadableStream<RunOutput>> {
return this.bound.stream(
input,
this._mergeConfig({ ...options, ...this.kwargs })
await this._mergeConfig({ ...options, ...this.kwargs })
);
}

@@ -751,7 +760,7 @@ export class RunnableBinding<
): AsyncGenerator<RunOutput> {
yield* this.bound.transform(
generator,
this._mergeConfig({ ...options, ...this.kwargs })
await this._mergeConfig({ ...options, ...this.kwargs })
);
}

@@ -779,9 +788,9 @@ export class RunnableBinding<
onEnd,
onError,
}: {
onStart?: (run: Run) => void;
onEnd?: (run: Run) => void;
onError?: (run: Run) => void;
onStart?: (run: Run) => void | Promise<void>;
onEnd?: (run: Run) => void | Promise<void>;
onError?: (run: Run) => void | Promise<void>;
}): Runnable<RunInput, RunOutput, CallOptions> {
//
return new RunnableBinding<RunInput, RunOutput, CallOptions>({
@@ -886,9 +895,9 @@ export class RunnableEach<
onEnd,
onError,
}: {
onStart?: (run: Run) => void;
onEnd?: (run: Run) => void;
onError?: (run: Run) => void;
onStart?: (run: Run) => void | Promise<void>;
onEnd?: (run: Run) => void | Promise<void>;
onError?: (run: Run) => void | Promise<void>;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
}): Runnable<any, any, CallOptions> {
return new RunnableEach<RunInputItem, RunOutputItem, CallOptions>({
22 changes: 11 additions & 11 deletions langchain-core/src/tracers/root_listener.ts
Original file line number Diff line number Diff line change
@@ -6,20 +6,20 @@ export class RootListenersTracer extends BaseTracer {
/** The Run's ID. Type UUID */
rootId?: string;

argOnStart?: (run: Run) => void;
argOnStart?: (run: Run) => void | Promise<void>;

argOnEnd?: (run: Run) => void;
argOnEnd?: (run: Run) => void | Promise<void>;

argOnError?: (run: Run) => void;
argOnError?: (run: Run) => void | Promise<void>;

constructor({
onStart,
onEnd,
onError,
}: {
onStart?: (run: Run) => void;
onEnd?: (run: Run) => void;
onError?: (run: Run) => void;
onStart?: (run: Run) => void | Promise<void>;
onEnd?: (run: Run) => void | Promise<void>;
onError?: (run: Run) => void | Promise<void>;
}) {
super();
this.argOnStart = onStart;
@@ -36,28 +36,28 @@ export class RootListenersTracer extends BaseTracer {
return Promise.resolve();
}

onRunCreate(run: Run) {
async onRunCreate(run: Run) {
if (this.rootId) {
return;
}

this.rootId = run.id;

if (this.argOnStart) {
this.argOnStart(run);
await this.argOnStart(run);
}
}

onRunUpdate(run: Run) {
async onRunUpdate(run: Run) {
if (run.id !== this.rootId) {
return;
}
if (!run.error) {
if (this.argOnEnd) {
this.argOnEnd(run);
await this.argOnEnd(run);
}
} else if (this.argOnError) {
this.argOnError(run);
await this.argOnError(run);
}
}
}
46 changes: 43 additions & 3 deletions langchain/src/schema/tests/runnable.test.ts
Original file line number Diff line number Diff line change
@@ -250,7 +250,6 @@ test("Runnable withConfig", async () => {
});

test("Listeners work", async () => {
// implement
const prompt = ChatPromptTemplate.fromMessages([
SystemMessagePromptTemplate.fromTemplate("You are a nice assistant."),
["human", "{question}"],
@@ -265,8 +264,49 @@ test("Listeners work", async () => {

await chain
.withListeners({
onStart: mockStart,
onEnd: mockEnd,
onStart: (run) => {
mockStart(run);
},
onEnd: (run) => {
mockEnd(run);
},
})
.invoke({ question: "What is the meaning of life?" });

expect(mockStart).toHaveBeenCalledTimes(1);
expect((mockStart.mock.calls[0][0] as { name: string }).name).toBe(
"RunnableSequence"
);
expect(mockEnd).toHaveBeenCalledTimes(1);
});

test("Listeners work with async handlers", async () => {
const prompt = ChatPromptTemplate.fromMessages([
SystemMessagePromptTemplate.fromTemplate("You are a nice assistant."),
["human", "{question}"],
]);
const model = new FakeListChatModel({
responses: ["foo"],
});
const chain = prompt.pipe(model);

const mockStart = jest.fn();
const mockEnd = jest.fn();

await chain
.withListeners({
// eslint-disable-next-line @typescript-eslint/no-misused-promises
onStart: async (run) => {
const promise = new Promise((resolve) => setTimeout(resolve, 2000));
await promise;
mockStart(run);
},
// eslint-disable-next-line @typescript-eslint/no-misused-promises
onEnd: async (run) => {
const promise = new Promise((resolve) => setTimeout(resolve, 2000));
await promise;
mockEnd(run);
},
})
.invoke({ question: "What is the meaning of life?" });