Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V1] add error handling #11420

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 90 additions & 33 deletions vllm/v1/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import signal
import threading
import time
import traceback
from dataclasses import dataclass
from multiprocessing.connection import Connection
from multiprocessing.process import BaseProcess
from typing import List, Tuple, Type
from typing import List, Optional, Tuple, Type

import zmq
import zmq.asyncio
Expand Down Expand Up @@ -35,6 +37,29 @@
LOGGING_TIME_S = 5000


@dataclass
class EngineStartupError:
"""Container for startup errors that occur in EngineCore."""
error_type: str
error_message: str
traceback: str

def recreate_exception(self) -> Exception:
"""Recreate the original exception with the full context."""
exception_class = globals().get(self.error_type) or Exception
exc = exception_class(
f"{self.error_message}\n\n"
f"Original traceback from EngineCore:\n{self.traceback}")
return exc


@dataclass
class EngineCoreStatus:
"""Status message sent from EngineCore to main process."""
is_ready: bool
startup_error: Optional[EngineStartupError] = None


class EngineCore:
"""Inner loop of vLLM's Engine."""

Expand Down Expand Up @@ -176,35 +201,6 @@ def __init__(
with make_zmq_socket(ready_path, zmq.constants.PUSH) as ready_socket:
ready_socket.send_string(EngineCoreProc.READY_STR)

@staticmethod
def wait_for_startup(
proc: BaseProcess,
ready_path: str,
) -> None:
"""Wait until the EngineCore is ready."""

try:
sync_ctx = zmq.Context() # type: ignore[attr-defined]
socket = sync_ctx.socket(zmq.constants.PULL)
socket.connect(ready_path)

# Wait for EngineCore to send EngineCoreProc.READY_STR.
while socket.poll(timeout=POLLING_TIMEOUT_MS) == 0:
logger.debug("Waiting for EngineCoreProc to startup.")

if not proc.is_alive():
raise RuntimeError("EngineCoreProc failed to start.")

message = socket.recv_string()
assert message == EngineCoreProc.READY_STR

except BaseException as e:
logger.exception(e)
raise e

finally:
sync_ctx.destroy(linger=0)

@staticmethod
def make_engine_core_process(
vllm_config: VllmConfig,
Expand All @@ -216,26 +212,75 @@ def make_engine_core_process(
) -> EngineCoreProcHandle:
context = get_mp_context()

# Create a pipe for startup status communication
parent_conn, child_conn = context.Pipe()

process_kwargs = {
"input_path": input_path,
"output_path": output_path,
"ready_path": ready_path,
"vllm_config": vllm_config,
"executor_class": executor_class,
"usage_context": usage_context,
"status_pipe": child_conn,
}
# Run EngineCore busy loop in background process.
proc = context.Process(target=EngineCoreProc.run_engine_core,
kwargs=process_kwargs)
proc.start()

# Wait for startup
EngineCoreProc.wait_for_startup(proc, ready_path)
try:
EngineCoreProc.wait_for_startup(proc, ready_path, parent_conn)
except Exception as e:
# Clean up the process if startup failed
if proc.is_alive():
proc.terminate()
proc.join(timeout=5.0)
raise e
finally:
parent_conn.close()

return EngineCoreProcHandle(proc=proc,
ready_path=ready_path,
input_path=input_path,
output_path=output_path)

@staticmethod
def wait_for_startup(
proc: BaseProcess,
ready_path: str,
status_pipe: Connection,
) -> None:
"""Wait until the EngineCore is ready, with improved error handling."""
try:
sync_ctx = zmq.Context()
socket = sync_ctx.socket(zmq.constants.PULL)
socket.connect(ready_path)

# Wait for either a ready signal or an error
while socket.poll(timeout=POLLING_TIMEOUT_MS) == 0:
logger.debug("Waiting for EngineCoreProc to startup.")

# Check if process is still alive
if not proc.is_alive():
# Try to get error information from pipe
if status_pipe.poll():
status: EngineCoreStatus = status_pipe.recv()
if status.startup_error:
# Recreate and raise the original exception
raise status.startup_error.recreate_exception()

# Fallback error if no specific error was received
raise RuntimeError("EngineCoreProc failed to start"
"without providing error details")

message = socket.recv_string()
assert message == EngineCoreProc.READY_STR

finally:
sync_ctx.destroy(linger=0)

@staticmethod
def run_engine_core(*args, **kwargs):
"""Launch EngineCore busy loop in background process."""
Expand All @@ -244,6 +289,7 @@ def run_engine_core(*args, **kwargs):
# SystemExit exception is only raised once to allow this and worker
# processes to terminate without error
shutdown_requested = False
status_pipe = kwargs.pop("status_pipe")

# Ensure we can serialize transformer config after spawning
maybe_register_config_serialize_by_value()
Expand All @@ -261,16 +307,27 @@ def signal_handler(signum, frame):
engine_core = None
try:
engine_core = EngineCoreProc(*args, **kwargs)
# Signal successful startup
status_pipe.send(EngineCoreStatus(is_ready=True))
engine_core.run_busy_loop()

except SystemExit:
logger.debug("EngineCore interrupted.")

except BaseException as e:
except Exception as e:
# Capture the full error context
error_info = EngineStartupError(error_type=e.__class__.__name__,
error_message=str(e),
traceback=traceback.format_exc())

# Send error information to parent process
status_pipe.send(
EngineCoreStatus(is_ready=False, startup_error=error_info))
logger.exception(e)
raise e
raise

finally:
status_pipe.close()
if engine_core is not None:
engine_core.shutdown()
engine_core = None
Expand Down
Loading