From eb2276d2b763469e23e8204933fb8dac69256bd1 Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Tue, 12 Nov 2024 19:56:19 +0000 Subject: [PATCH 1/4] use event loops --- .../request_processor/base_request_processor.py | 7 +++++-- .../curator/request_processor/event_loop.py | 16 ++++++++++++++++ .../openai_batch_request_processor.py | 12 ++++++++---- 3 files changed, 29 insertions(+), 6 deletions(-) create mode 100644 src/bespokelabs/curator/request_processor/event_loop.py diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index 3583352f..8cb06529 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -13,6 +13,7 @@ from pydantic import BaseModel from bespokelabs.curator.prompter.prompt_formatter import PromptFormatter +from bespokelabs.curator.request_processor.event_loop import get_or_create_event_loop from bespokelabs.curator.request_processor.generic_request import GenericRequest from bespokelabs.curator.request_processor.generic_response import GenericResponse @@ -161,9 +162,11 @@ async def create_all_request_files(): ] await asyncio.gather(*tasks) - asyncio.run(create_all_request_files()) + loop = get_or_create_event_loop() + loop.run_until_complete(create_all_request_files()) else: - asyncio.run( + loop = get_or_create_event_loop() + loop.run_until_complete( self.acreate_request_file(dataset, prompt_formatter, requests_file) ) diff --git a/src/bespokelabs/curator/request_processor/event_loop.py b/src/bespokelabs/curator/request_processor/event_loop.py new file mode 100644 index 00000000..0cf2960c --- /dev/null +++ b/src/bespokelabs/curator/request_processor/event_loop.py @@ -0,0 +1,16 @@ +import asyncio + + +def get_or_create_event_loop(): + """ + Get the current event loop or create a new one if there isn't one. + """ + try: + return asyncio.get_running_loop() + except RuntimeError as e: + # If no event loop is running, asyncio will + # return a RuntimeError (https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_running_loop). + # In that case, we can create a new event loop. + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop diff --git a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py index 58fe21e2..a1c7bfc1 100644 --- a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py @@ -3,16 +3,19 @@ import logging import os from typing import Callable, Dict, Optional, TypeVar -from openai import AsyncOpenAI + import aiofiles +from openai import AsyncOpenAI +from tqdm import tqdm + from bespokelabs.curator.dataset import Dataset +from bespokelabs.curator.prompter.prompt_formatter import PromptFormatter from bespokelabs.curator.request_processor.base_request_processor import ( BaseRequestProcessor, GenericRequest, GenericResponse, ) -from bespokelabs.curator.prompter.prompt_formatter import PromptFormatter -from tqdm import tqdm +from bespokelabs.curator.request_processor.event_loop import get_or_create_event_loop T = TypeVar("T") logger = logging.getLogger(__name__) @@ -243,7 +246,8 @@ async def submit_all_batches(): # TODO(Ryan): likely can add some logic for smarter check_interval based on batch size and if the batch has started or not, fine to do a dumb ping for now batch_watcher = BatchWatcher(working_dir, check_interval=self.check_interval) - asyncio.run( + loop = get_or_create_event_loop() + loop.run_until_complete( batch_watcher.watch(prompt_formatter, self.get_generic_response, dataset) ) From 3c8b13479102b0cf2a8b6e771e05423b4b777dd7 Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Tue, 12 Nov 2024 20:02:53 +0000 Subject: [PATCH 2/4] replace more asyncio --- .../request_processor/openai_online_request_processor.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py index 37469b9a..cb282899 100644 --- a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py @@ -5,21 +5,21 @@ import re import time from dataclasses import dataclass, field +from functools import partial from typing import Any, Callable, Dict, Optional, Set, Tuple, TypeVar import aiohttp import requests import tiktoken from tqdm import tqdm -from functools import partial from bespokelabs.curator.dataset import Dataset +from bespokelabs.curator.prompter.prompter import PromptFormatter from bespokelabs.curator.request_processor.base_request_processor import ( BaseRequestProcessor, GenericRequest, GenericResponse, ) -from bespokelabs.curator.prompter.prompter import PromptFormatter T = TypeVar("T") logger = logging.getLogger(__name__) @@ -187,7 +187,8 @@ def run( for requests_file, responses_file in zip( requests_files, responses_files ): - asyncio.run( + loop = get_or_create_event_loop() + loop.run_until_complete( self.process_api_requests_from_file( requests_filepath=requests_file, save_filepath=responses_file, From b045e8de4c9c4b08f69ec2bd0ddd596ccfa478a8 Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Tue, 12 Nov 2024 20:04:15 +0000 Subject: [PATCH 3/4] fix missing import --- .../curator/request_processor/openai_online_request_processor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py index cb282899..789bbd44 100644 --- a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py @@ -20,6 +20,7 @@ GenericRequest, GenericResponse, ) +from bespokelabs.curator.request_processor.event_loop import get_or_create_event_loop T = TypeVar("T") logger = logging.getLogger(__name__) From 7491c169b540d1290dac7195433bc120fbfeab0e Mon Sep 17 00:00:00 2001 From: Trung Vu Date: Tue, 12 Nov 2024 20:05:22 +0000 Subject: [PATCH 4/4] black --- .../curator/request_processor/base_request_processor.py | 8 ++++++-- .../request_processor/openai_batch_request_processor.py | 8 ++++++-- .../request_processor/openai_online_request_processor.py | 4 +++- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/bespokelabs/curator/request_processor/base_request_processor.py b/src/bespokelabs/curator/request_processor/base_request_processor.py index 15e6448a..e5cb5d0d 100644 --- a/src/bespokelabs/curator/request_processor/base_request_processor.py +++ b/src/bespokelabs/curator/request_processor/base_request_processor.py @@ -13,9 +13,13 @@ from pydantic import BaseModel from bespokelabs.curator.prompter.prompt_formatter import PromptFormatter -from bespokelabs.curator.request_processor.event_loop import get_or_create_event_loop +from bespokelabs.curator.request_processor.event_loop import ( + get_or_create_event_loop, +) from bespokelabs.curator.request_processor.generic_request import GenericRequest -from bespokelabs.curator.request_processor.generic_response import GenericResponse +from bespokelabs.curator.request_processor.generic_response import ( + GenericResponse, +) logger = logging.getLogger(__name__) diff --git a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py index cb9410b3..1c21db9e 100644 --- a/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_batch_request_processor.py @@ -15,7 +15,9 @@ GenericRequest, GenericResponse, ) -from bespokelabs.curator.request_processor.event_loop import get_or_create_event_loop +from bespokelabs.curator.request_processor.event_loop import ( + get_or_create_event_loop, +) T = TypeVar("T") logger = logging.getLogger(__name__) @@ -260,7 +262,9 @@ async def submit_all_batches(): loop = get_or_create_event_loop() loop.run_until_complete( - batch_watcher.watch(prompt_formatter, self.get_generic_response, dataset) + batch_watcher.watch( + prompt_formatter, self.get_generic_response, dataset + ) ) dataset = self.create_dataset_files( diff --git a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py index 789bbd44..d80c7d88 100644 --- a/src/bespokelabs/curator/request_processor/openai_online_request_processor.py +++ b/src/bespokelabs/curator/request_processor/openai_online_request_processor.py @@ -20,7 +20,9 @@ GenericRequest, GenericResponse, ) -from bespokelabs.curator.request_processor.event_loop import get_or_create_event_loop +from bespokelabs.curator.request_processor.event_loop import ( + get_or_create_event_loop, +) T = TypeVar("T") logger = logging.getLogger(__name__)