Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Launching threads/async in DSPy 2.5.30+ without DSPy threading primitives #1812

Open
italianconcerto opened this issue Nov 17, 2024 · 16 comments

Comments

@italianconcerto
Copy link

italianconcerto commented Nov 17, 2024

import threading
import dspy
import os

lm = dspy.LM(
    "openrouter/meta-llama/llama-3.1-70b-instruct",
    api_base="https://openrouter.ai/api/v1",
    api_key=os.environ["OPENROUTER_API_KEY"],
    temperature=0.2,
    cache=False
)
dspy.settings.configure(lm=lm)

predict = dspy.Predict("question -> answer")

def run():
    response = predict("Paris.")
    print(response)

output = run()
print(output)

If I run this I get something like:

Exception in thread Thread-1 (run):
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/[email protected]/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "/opt/homebrew/Cellar/[email protected]/3.12.7_1/Frameworks/Python.framework/Versions/3.12/lib/python3.12/threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/johnny_silicio/test/async_dspy.py", line 17, in run
    response = predict("Paris.")
               ^^^^^^^^^^^^^^^^^
  File "/Users/johnny_silicio/test/venv/lib/python3.12/site-packages/dspy/utils/callback.py", line 198, in wrapper
    callbacks = dspy.settings.get("callbacks", []) + getattr(instance, "callbacks", [])
                ^^^^^^^^^^^^^^^^^
  File "/Users/johnny_silicio/test/venv/lib/python3.12/site-packages/dsp/utils/settings.py", line 65, in __getattr__
    if hasattr(self.config, name):
               ^^^^^^^^^^^
  File "/Users/johnny_silicio/test/venv/lib/python3.12/site-packages/dsp/utils/settings.py", line 62, in config
    return self.stack_by_thread[thread_id][-1]
           ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^
KeyError: 6185250816

It's like this with either python 3.10 and 3.12.

@okhat
Copy link
Collaborator

okhat commented Nov 17, 2024

Hey @italianconcerto ! I don't think the code you posted will result in the error you posted.

However, if you're using threading or async then it may happen. Can you share more about how you're setting up threading or async?

@okhat
Copy link
Collaborator

okhat commented Nov 17, 2024

I relaxed that error, so you'll get a warning now if you upgrade to DSPy 2.5.31 which is now released.

The supported way to create threads or async threads is to use launchers in dspy, like: dspy.asyncify, dspy.Parallel, dspy.Evaluate, program.batch, etc.

For async in particular, here are new docs on deployment with async/FastAPI: https://dspy.ai/tutorials/deployment/

@okhat okhat changed the title Last version of DSPy not working at all Launching threads/async in DSPy 2.5.30+ without DSPy threading primitives Nov 17, 2024
@italianconcerto
Copy link
Author

I confirm that the error came after I ran that script but I also confirm that I tried to run an multi-threaded program with dspy. I assume so that the first run impacted the second? Weird behaviour

@italianconcerto
Copy link
Author

@okhat Thanks, I couldn't find it in the docs before

@okhat
Copy link
Collaborator

okhat commented Nov 17, 2024

Thank you @italianconcerto ! Did this change resolve the issue?

@yarelm
Copy link

yarelm commented Nov 19, 2024

What about situations of using things like optimizers and evaluators? I wasn't able to make it work in an async environment. I kept getting the error You seem to be creating DSPy threads in an unsupported way

@lmonson
Copy link

lmonson commented Nov 19, 2024

I have a FastAPI app that makes synchronous dspy calls during requests. Effectively this:

lm = dspy.LM( name_that_varies_per_request )
with dspy.context(lm=lm):
      prediction = my_module(module_args)

But I get an assertion failure You seem to be creating DSPy threads in an unsupported way (not a warning). I'm confused how to fix this per the above without adopting async for my requests.

@okhat
Copy link
Collaborator

okhat commented Nov 19, 2024

Thank you @yarelm and @lmonson ! I've reverted this change. You can pip install -U dspy after 2-3 minutes and get DSPy 2.5.32 which won't have this problem. A more fundamental fix is coming in the next 48 hours too.

@denisergashbaev
Copy link

denisergashbaev commented Nov 20, 2024

@okhat although I have seen that you have reverted the change, I was wondering what is the recommended way (code snippet) of how to launch multiple threads of a single DSPy module. Right now, I create threads via ThreadPoolExecutor, each thread uses its own DSPy context. Something like this (which was giving an error mentioned above):

def function(args):
    with dspy.context(lm=gpt):
        specialist_extractor = Extractor(
            output_lang=output_lang
        ).activate_assertions()
        consultations_reformated = specialist_extractor.forward(
            args=args,
        )
        return consultations_reformated


with ThreadPoolExecutor() as executor:
    futures: list[Future[Extractor | None]] = [
        executor.submit(function, args), ...
]
    for future in as_completed(futures):
        if exception := future.exception():
            logger.error(
                "\n".join(
                    traceback.format_exception(
                        type(exception), exception, exception.__traceback__
                    )
                )
            )
            continue
        result = future.result()

@lw3259111
Copy link

I still encounter the error: "You seem to be creating DSPy threads in an unsupported way" when using the process_expert function in Storm

        # hierarchical chat: chat with one expert. Generate question, get answer
        def process_expert(expert):
            expert_name, expert_descriptoin = expert.split(":")
            for idx in range(self.max_turn_per_experts):
                with self.logging_wrapper.log_event(
                        f"warm start, perspective guided QA: expert {expert_name}; turn {idx + 1}"
                ):
                    try:
                        with lock:
                            history = self.format_dialogue_question_history_string(
                                conversation_history
                            )
                        with dspy.settings.context(lm=self.question_asking_lm):
                            question = self.ask_question(
                                topic=topic, history=history, current_expert=expert
                            ).question
                        answer = self.answer_question_module(
                            topic=topic,
                            question=question,
                            mode="brief",
                            style="conversational",
                        )
                        conversation_turn = ConversationTurn(
                            role=expert,
                            claim_to_make=question,
                            raw_utterance=answer.response,
                            utterance_type="Support",
                            queries=answer.queries,
                            raw_retrieved_info=answer.raw_retrieved_info,
                            cited_info=answer.cited_info,
                        )
                        if self.callback_handler is not None:
                            self.callback_handler.on_warmstart_update(
                                message="\n".join(
                                    [
                                        f"Finish browsing {url}"
                                        for url in [
                                        i.url for i in answer.raw_retrieved_info
                                    ]
                                    ]
                                )
                            )
                        with lock:
                            conversation_history.append(conversation_turn)
                    except Exception as e:
                        print(f"Error processing expert {expert}: {e}")

        multi-thread conversation
        with concurrent.futures.ThreadPoolExecutor(
            max_workers=self.max_thread
        ) as executor:
            futures = [
                executor.submit(process_expert, expert)
                for expert in experts[: min(len(experts), self.max_num_experts)]
            ]
            concurrent.futures.wait(futures)


        conversation_history = [background_seeking_dialogue] + conversation_history

        return dspy.Prediction(
            conversation_history=conversation_history, experts=experts
        )

@okhat although I have seen that you have reverted the change, I was wondering what is the recommended way (code snippet) of how to launch multiple threads of a single DSPy module. Right now, I create threads via ThreadPoolExecutor, each thread uses its own DSPy context. Something like this (which was giving an error mentioned above):

def function(args):
    with dspy.context(lm=gpt):
        specialist_extractor = Extractor(
            output_lang=output_lang
        ).activate_assertions()
        consultations_reformated = specialist_extractor.forward(
            args=args,
        )
        return consultations_reformated


with ThreadPoolExecutor() as executor:
    futures: list[Future[Extractor | None]] = [
        executor.submit(function, args), ...
]
    for future in as_completed(futures):
        if exception := future.exception():
            logger.error(
                "\n".join(
                    traceback.format_exception(
                        type(exception), exception, exception.__traceback__
                    )
                )
            )
            continue
        result = future.result()

@lw3259111
Copy link

I still encounter the error: "You seem to be creating DSPy threads in an unsupported way" when using the process_expert function in Storm

        # hierarchical chat: chat with one expert. Generate question, get answer
        def process_expert(expert):
            expert_name, expert_descriptoin = expert.split(":")
            for idx in range(self.max_turn_per_experts):
                with self.logging_wrapper.log_event(
                        f"warm start, perspective guided QA: expert {expert_name}; turn {idx + 1}"
                ):
                    try:
                        with lock:
                            history = self.format_dialogue_question_history_string(
                                conversation_history
                            )
                        with dspy.settings.context(lm=self.question_asking_lm):
                            question = self.ask_question(
                                topic=topic, history=history, current_expert=expert
                            ).question
                        answer = self.answer_question_module(
                            topic=topic,
                            question=question,
                            mode="brief",
                            style="conversational",
                        )
                        conversation_turn = ConversationTurn(
                            role=expert,
                            claim_to_make=question,
                            raw_utterance=answer.response,
                            utterance_type="Support",
                            queries=answer.queries,
                            raw_retrieved_info=answer.raw_retrieved_info,
                            cited_info=answer.cited_info,
                        )
                        if self.callback_handler is not None:
                            self.callback_handler.on_warmstart_update(
                                message="\n".join(
                                    [
                                        f"Finish browsing {url}"
                                        for url in [
                                        i.url for i in answer.raw_retrieved_info
                                    ]
                                    ]
                                )
                            )
                        with lock:
                            conversation_history.append(conversation_turn)
                    except Exception as e:
                        print(f"Error processing expert {expert}: {e}")

        multi-thread conversation
        with concurrent.futures.ThreadPoolExecutor(
            max_workers=self.max_thread
        ) as executor:
            futures = [
                executor.submit(process_expert, expert)
                for expert in experts[: min(len(experts), self.max_num_experts)]
            ]
            concurrent.futures.wait(futures)


        conversation_history = [background_seeking_dialogue] + conversation_history

        return dspy.Prediction(
            conversation_history=conversation_history, experts=experts
        )

@okhat although I have seen that you have reverted the change, I was wondering what is the recommended way (code snippet) of how to launch multiple threads of a single DSPy module. Right now, I create threads via ThreadPoolExecutor, each thread uses its own DSPy context. Something like this (which was giving an error mentioned above):

def function(args):
    with dspy.context(lm=gpt):
        specialist_extractor = Extractor(
            output_lang=output_lang
        ).activate_assertions()
        consultations_reformated = specialist_extractor.forward(
            args=args,
        )
        return consultations_reformated


with ThreadPoolExecutor() as executor:
    futures: list[Future[Extractor | None]] = [
        executor.submit(function, args), ...
]
    for future in as_completed(futures):
        if exception := future.exception():
            logger.error(
                "\n".join(
                    traceback.format_exception(
                        type(exception), exception, exception.__traceback__
                    )
                )
            )
            continue
        result = future.result()

I resolved the issue by first running pip uninstall dspy to uninstall and remove the DSP package, followed by pip install dspy -U to install the latest version.

@okhat
Copy link
Collaborator

okhat commented Nov 21, 2024

@denisergashbaev @lmonson Right now, the right way to use threads is dspy.Parallel() or program.batch()

@okhat
Copy link
Collaborator

okhat commented Nov 24, 2024

Hey @denisergashbaev @lmonson and everyone, this should now be resolved in DSPy 2.5.36!

User threads are now supported, not just threads launched using DSPy primitives!

I'll keep this open for a bit.

@yarelm
Copy link

yarelm commented Dec 2, 2024

Hi @okhat !

One of the recent versions caused a regression for me. I upgraded from v2.5.32 to v.2.5.41

I'm running my app using streamlit, which makes it so the app is not running on the main thread (to my understanding)

On the main.py of my app, I'm calling asyncify() which is failing with the error:
asyncify can only be called from the main thread

What would you suggest?

@okhat
Copy link
Collaborator

okhat commented Dec 2, 2024

@yarelm I'm sorry to hear this! Can you stick to an older version, like v2.5.28 or something? I drafted a redesign of the threading system to remove the limitation on main threads.

@MatFrancois
Copy link

Hi ! is there any example or doc to use Parallel or batch? (better than reading source code)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants