From ccdb92f78f5b66778d9b601723c07b1e4a2c0dd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannis=20Sch=C3=B6nleber?= Date: Sun, 1 Dec 2024 00:37:54 +0000 Subject: [PATCH] [Bugfix] fix race condition that leads to wrong order of token returned MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During the startup of the api server the setup function is called multiple times (every 5s). So the longer the longer the startup time (generally for larger models) the more consumers are contending for the output. This can then lead to race condition where the order of the answer token is wrong. Introduce here: https://github.com/vllm-project/vllm/pull/9973 References: https://github.com/vllm-project/vllm/issues/10376 https://github.com/vllm-project/vllm/issues/10589 https://github.com/vllm-project/vllm/pull/10782 Signed-off-by: Jannis Schönleber --- vllm/engine/multiprocessing/client.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py index 0a046c71e86e8..24cacf1b538fe 100644 --- a/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py @@ -255,7 +255,14 @@ async def setup(self): """Setup the client before it starts sending server requests.""" # Start output_loop - self.output_loop = asyncio.create_task(self.run_output_handler_loop()) + if self.output_loop is None: + # only generate once to avoid multiple concurrent output_loops + # this will lead to race conditions and wrong orders of tokens + # returned by the engine + # setup will be called multiple times during the startup of + # the engine + self.output_loop = asyncio.create_task( + self.run_output_handler_loop()) with self.get_data_socket() as socket: # Wait until server is ready. @@ -264,8 +271,9 @@ async def setup(self): self.tracing_flag = response.tracing_enabled # Start health_loop. - self.health_loop = asyncio.create_task( - self.run_heartbeat_loop(timeout=VLLM_RPC_TIMEOUT)) + if self.health_loop is None: + self.health_loop = asyncio.create_task( + self.run_heartbeat_loop(timeout=VLLM_RPC_TIMEOUT)) def close(self): """Destroy the ZeroMQ Context."""