From 73ae12677911e0463e648131cc43ea31df61540b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jannis=20Sch=C3=B6nleber?= Date: Sun, 1 Dec 2024 00:37:54 +0000 Subject: [PATCH] [Bugfix] fix race condition that leads to wrong order of token returned MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit During the startup of the api server the setup function is called multiple times (every 5s). So the longer the longer the startup time (generally for larger models) the more consumers are contending for the output. This can then lead to race condition where the order of the answer token is wrong. Introduce here: https://github.com/vllm-project/vllm/pull/9973 References: https://github.com/vllm-project/vllm/issues/10376 https://github.com/vllm-project/vllm/issues/10589 https://github.com/vllm-project/vllm/pull/10782 Signed-off-by: Jannis Schönleber --- vllm/engine/multiprocessing/client.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/vllm/engine/multiprocessing/client.py b/vllm/engine/multiprocessing/client.py index fe21c58c775fe..3a3d157d73b6f 100644 --- a/vllm/engine/multiprocessing/client.py +++ b/vllm/engine/multiprocessing/client.py @@ -255,7 +255,14 @@ async def setup(self): """Setup the client before it starts sending server requests.""" # Start output_loop - self.output_loop = asyncio.create_task(self.run_output_handler_loop()) + if self.output_loop is None: + # only generate once to avoid multiple concurrent output_loops + # this will lead to race conditions and wrong orders of tokens + # returned by the engine + # setup will be called multiple times during the startup of + # the engine + self.output_loop = asyncio.create_task( + self.run_output_handler_loop()) with self.get_data_socket() as socket: # Wait until server is ready. @@ -264,8 +271,9 @@ async def setup(self): self.tracing_flag = response.tracing_enabled # Start health_loop. - self.health_loop = asyncio.create_task( - self.run_heartbeat_loop(timeout=VLLM_RPC_TIMEOUT)) + if self.health_loop is None: + self.health_loop = asyncio.create_task( + self.run_heartbeat_loop(timeout=VLLM_RPC_TIMEOUT)) def close(self): """Destroy the ZeroMQ Context."""