Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runnable Context Provider #13315

Closed
wants to merge 13 commits into from
Closed

Conversation

Toubat
Copy link

@Toubat Toubat commented Nov 14, 2023

(Note: still WIP, would appreciate some feedback from maintainer)

Description: This PR adds a new core Runnable component called RunnableContextProvider. The motivation behind this runnable is that sometimes when writing long and complex chains, developers need to pass some core piece of data across multiple stages of the chain. For example, when working with a naive RAG where the retriever retrieves context (say List[str] for simplicity), one common case is to pass the retrieved context and original question as part of the output (say for the sake of doing evals or other data manipulations). The original way of achieving this might look like below:

retriever = RunnableLambda(lambda _: ["doc 1 ...", ..., "doc n"])  # mock retriever

retrieval_chain =  {
    "context": retriever,
    "question": RunnablePassthrough(),
}

format_chain = {
    "original_context": itemgetter("context"),
    "formatted_context": itemgetter("context") | RunnableLambda(lambda x: "## Documents" + "\n\n".join(x)),
    "question": itemgetter("question"),
}

generation_chain = {
    "result": { 
        "question": itemgetter("question"), 
        "formatted_context": itemgetter("formatted_context") 
    } | prompt | llm,
    "context": itemgetter("original_context"),
    "question": itemgetter("question"),
}

rag_chain = retrieval_chain | format_chain | generation_chain

Look at how complex and unreadable the chain becomes even for a naive RAG example. Most of the complexity is due to passing extra data around, which adds lots of itemgetter and data Passthrough which should be unnecessary.

RunnableContextProvider solves this issue by allowing data sharing across different stages of chain without having to explicitly wire up the data connection pipeline. Here's the basic API usage for implementing the same naive RAG as above:

from langchain.schema.runnable import RunnableContextProvider

format_context = RunnableLambda(lambda x: "## Documents" + "\n\n".join(x))
rag_chain = RunnableContextProvider(
    lambda getter, setter: {
        "question": RunnablePassthrough() | setter("question"),
        "formatted_context": retriever | setter("context") | format_context,
    }
    | prompt
    | llm
    | {
        "result": RunnablePassthrough(),
        "context": getter("context"),
        "question": getter("question"),
    }
)
  • getter: an instance of RunnableContextGetter, which retrieves data from a key-value source automatically initialized in the background. It's a Runnable that outputs the value retrieved from the shared key-value source identified by the given key (input to RunnableContextGetter is ignored).
  • setter: an instance of RunnableContextSetter, which updates value into the shared source given the key. The value written into the key is the output from the previous piped Runnable chain. Output from RunnableContextSetter is connected to the output of the chain immediately before the setter. Therefore, in the case of
retriever | setter("context") | format_context

The lambda format_context would take the output from retriever as its input.

  • batching: batch and abatch work out-of-box. Each single chain call inside the batch has a unique source. In other words, chains across different batch call do not share the same source ideally.

Also, support the decorator's pattern.

@context_provider
def rag_chain(getter, setter):
    retriever = RunnableLambda(lambda _: ["doc 1 ...", ..., "doc n"])  # mock retriever
    format_context = RunnableLambda(lambda x: "## Documents" + "\n\n".join(x))
    
    return (
        {
            "question": RunnablePassthrough() | setter("question"),
            "formatted_context": retriever | setter("context") | format_context,
        }
        | prompt
        | llm
        | {
            "result": RunnablePassthrough(),
            "context": getter("context"),
            "question": getter("question"),
        }
    )

rag_chain.invoke(...)

Some Improvement Considerations

  • Allows setter to set multiple keys based on the same input. Potential API usage:
retriever | setter(lambda x: {
   "list_context": x,
    "string_context": "\n\n".join(x),
    "context_size": len(x)
}) | format_context
  • Naming: should we rename getter and setter to be inject and provide instead?
  • Whether we should allow each key to be set only once.
  • One caveat of RunnableContextProvider is that both getter and setter on the same key cannot appear inside the RunnableParallel because the order of execution is not guaranteed.

Issue: None.
Dependencies: None.
Tag Maintainers: @nfcampos
Twitter handle: [will add in the future]

Copy link

vercel bot commented Nov 14, 2023

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
langchain ⬜️ Ignored (Inspect) Visit Preview Nov 14, 2023 3:37am

@Toubat Toubat marked this pull request as ready for review November 14, 2023 00:18
@dosubot dosubot bot added Ɑ: models Related to LLMs or chat model modules 🤖:enhancement A large net-new component, integration, or chain. Use sparingly. The largest features labels Nov 14, 2023
@Toubat Toubat changed the title Context provider Runnable Context provider Nov 14, 2023
@Toubat Toubat changed the title Runnable Context provider Runnable Context Provider Nov 14, 2023
@baskaryan
Copy link
Collaborator

Hey @Toubat appreciate the PR! We actually had something like this a bit ago but removed because it seemed redundant / wasn't really being used #12133

i think the RunnablePassthrough.assign method has really helped with cases like this as well. what do you think of something like this for the example you provided?

from langchain.prompts import PromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.schema import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough

prompt = PromptTemplate.from_template("{context} {question}")
llm = ChatOpenAI()


retriever = (lambda _: ["doc 1 ...", "doc n"])  # mock retriever

def _format(input):
    return "## Documents" + "\n\n".join(x for x in input["context"])

answer_chain = (
    RunnablePassthrough.assign(context=_format) | prompt | llm | StrOutputParser()
)
retrieval_chain =  (
    {"context": retriever, "question": RunnablePassthrough(),}
    | RunnablePassthrough.assign(answer=answer_chain)
)

retrieval_chain.invoke("say foo")

Screenshot 2023-11-13 at 6 17 47 PM

@Toubat
Copy link
Author

Toubat commented Nov 14, 2023

Hey @baskaryan, thanks for providing more insights on this! I believe there is some overlap between the current PR and the PutLocalVar and GetLocaVar. Regarding RunnablePassthrough.assign, it works pretty well on some cases where the previous piped chain is a map (so that we can use RunnablePassthrough.assign to merge the key values).

What I'm more interested is that if there is a way that allows to connect data from the very earlier stage of the chain pipeline to the very end stage of the chain pipeline (without the need to passing data in the intermediate chain stages). In addition, to use RunnablePassthrough.assign I believe we have to connect it with map before this chain, which might not be generic enough for some cases. A simple example to illustrate.

Suppose I have a quite long chain below:

core_data = lambda x: get_core_data(x)

chain = chain_op_1 
| chain_op_2 
| core_data 
| chain_op_3
| chain_op_4
| 
... 
| 
chain_op_n 
| { "result": RunnablePassthrough() }

Now, for some reason I want to update the chain by keeping track of inner data and store that as part of output. Suppose we want to get core_data output into the final answer:

{ "result": RunnablePassthrough(), "core_data": <want to get core_data here> }

I can get the sense of how to allow this using RunnablePassthrough.assign with

core_data = lambda x: get_core_data(x)

chain = {
    "core_data": chain_op_1 | chain_op_2 | core_data 
}
| RunnablePassthrough.assign(
    result=itemgetter("core_data") | chain_op_3 | chain_op_4 | ... | chain_op_n 
)

The problem I have is, the mental model of using RunnablePassthrough.assign with the above implementation isn't quite straightforward, partially due to the reason that

  1. The final result dict schema isn't been explicitly written by me, which makes me struggled a while trying to figure out what the output dict would look like. This issue would become worse as we stack more and more RunnablePassthrough.assign in a row, like
RunnablePassthrough.assign(a=...) | RunnablePassthrough.assign(b=...) | RunnablePassthrough.assign(c=...) ..
  1. Even adding a simple data passage across pipeline requires the rewritten of the chain, which wouldn't hurt for small chains but might be a huge refactor for longer and more complex chain. Similar feeling for
retrieval_chain =  (
    {"context": retriever, "question": RunnablePassthrough(),}
    | RunnablePassthrough.assign(answer=answer_chain)
)

At the first glance, I wouldn't know the final answer should be a dict of {"context": ..., "question": ..., "answer": ...}.

An example of using RunnableContextProvider would become:

core_data = lambda x: get_core_data(x)

@context_provider
def chain(getter, setter):
    return chain_op_1 
    | chain_op_2 
    | core_data 
    | setter("core_data")
    | chain_op_3
    | chain_op_4
    | 
    ... 
    | 
    chain_op_n 
    | { "result": RunnablePassthrough(), "core_data": getter("core_data") }

This makes minimal change of the original chain structure and looks more clear of what exactly I want to accomplish. I believe the benefit of having some data injection mechanism like RunnableContextProvider is that it allows enabling data sharing without the needs to make significant modification on the original chaining structure. Data sharing across different part of pipeline should be a plug-and-play ideally.

@nfcampos
Copy link
Collaborator

Hi @Toubat I've opened a PR with an API heavily inspired by your PR, can you have a look and let me know your thoughts? #14046 (the biggest reason for the rewrite is to have first-class support for streaming, with getters waiting on setters)

@nfcampos
Copy link
Collaborator

nfcampos commented Dec 1, 2023

Closing in favour of #14046

@nfcampos nfcampos closed this Dec 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🤖:enhancement A large net-new component, integration, or chain. Use sparingly. The largest features Ɑ: models Related to LLMs or chat model modules
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants