From 8fccf2eed3e6ebacd26b5807801263fc070b76ac Mon Sep 17 00:00:00 2001 From: Aarushi <50577581+aarushik93@users.noreply.github.com> Date: Mon, 18 Nov 2024 16:33:15 +0000 Subject: [PATCH] fix(platform/builder): Add heartbeat mechanism (#8665) * add heartbeat mechanism * formatting data * import List * another import fix * wip * formatting adn linting --- .../backend/backend/server/model.py | 11 ++-- .../backend/backend/server/ws_api.py | 7 ++ .../src/lib/autogpt-server-api/baseClient.ts | 64 +++++++++++++++++-- 3 files changed, 73 insertions(+), 9 deletions(-) diff --git a/autogpt_platform/backend/backend/server/model.py b/autogpt_platform/backend/backend/server/model.py index 4891c2c9cbb9..7c554b445c53 100644 --- a/autogpt_platform/backend/backend/server/model.py +++ b/autogpt_platform/backend/backend/server/model.py @@ -1,5 +1,5 @@ import enum -import typing +from typing import Any, List, Optional, Union import pydantic @@ -12,11 +12,12 @@ class Methods(enum.Enum): UNSUBSCRIBE = "unsubscribe" EXECUTION_EVENT = "execution_event" ERROR = "error" + HEARTBEAT = "heartbeat" class WsMessage(pydantic.BaseModel): method: Methods - data: typing.Dict[str, typing.Any] | list[typing.Any] | None = None + data: Optional[Union[dict[str, Any], list[Any], str]] = None success: bool | None = None channel: str | None = None error: str | None = None @@ -40,8 +41,8 @@ class CreateGraph(pydantic.BaseModel): class CreateAPIKeyRequest(pydantic.BaseModel): name: str - permissions: typing.List[APIKeyPermission] - description: typing.Optional[str] = None + permissions: List[APIKeyPermission] + description: Optional[str] = None class CreateAPIKeyResponse(pydantic.BaseModel): @@ -54,4 +55,4 @@ class SetGraphActiveVersion(pydantic.BaseModel): class UpdatePermissionsRequest(pydantic.BaseModel): - permissions: typing.List[APIKeyPermission] + permissions: List[APIKeyPermission] diff --git a/autogpt_platform/backend/backend/server/ws_api.py b/autogpt_platform/backend/backend/server/ws_api.py index 8e3c706d80d4..421a911abdaf 100644 --- a/autogpt_platform/backend/backend/server/ws_api.py +++ b/autogpt_platform/backend/backend/server/ws_api.py @@ -138,6 +138,13 @@ async def websocket_router( while True: data = await websocket.receive_text() message = WsMessage.model_validate_json(data) + + if message.method == Methods.HEARTBEAT: + await websocket.send_json( + {"method": Methods.HEARTBEAT.value, "data": "pong", "success": True} + ) + continue + if message.method == Methods.SUBSCRIBE: await handle_subscribe(websocket, manager, message) diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts index 6eaef5c3b29c..da3e77f1c41f 100644 --- a/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts +++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts @@ -28,6 +28,10 @@ export default class BaseAutoGPTServerAPI { private wsConnecting: Promise | null = null; private wsMessageHandlers: Record void>> = {}; private supabaseClient: SupabaseClient | null = null; + heartbeatInterval: number | null = null; + readonly HEARTBEAT_INTERVAL = 30000; // 30 seconds + readonly HEARTBEAT_TIMEOUT = 10000; // 10 seconds + heartbeatTimeoutId: number | null = null; constructor( baseUrl: string = process.env.NEXT_PUBLIC_AGPT_SERVER_URL || @@ -324,34 +328,84 @@ export default class BaseAutoGPTServerAPI { } } + startHeartbeat() { + this.stopHeartbeat(); + this.heartbeatInterval = window.setInterval(() => { + if (this.webSocket?.readyState === WebSocket.OPEN) { + this.webSocket.send( + JSON.stringify({ + method: "heartbeat", + data: "ping", + success: true, + }), + ); + + this.heartbeatTimeoutId = window.setTimeout(() => { + console.log("Heartbeat timeout - reconnecting"); + this.webSocket?.close(); + this.connectWebSocket(); + }, this.HEARTBEAT_TIMEOUT); + } + }, this.HEARTBEAT_INTERVAL); + } + + stopHeartbeat() { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } + if (this.heartbeatTimeoutId) { + clearTimeout(this.heartbeatTimeoutId); + this.heartbeatTimeoutId = null; + } + } + + handleHeartbeatResponse() { + if (this.heartbeatTimeoutId) { + clearTimeout(this.heartbeatTimeoutId); + this.heartbeatTimeoutId = null; + } + } + async connectWebSocket(): Promise { this.wsConnecting ??= new Promise(async (resolve, reject) => { try { const token = (await this.supabaseClient?.auth.getSession())?.data.session ?.access_token || ""; - const wsUrlWithToken = `${this.wsUrl}?token=${token}`; this.webSocket = new WebSocket(wsUrlWithToken); this.webSocket.onopen = () => { - console.debug("WebSocket connection established"); + console.log("WebSocket connection established"); + this.startHeartbeat(); // Start heartbeat when connection opens resolve(); }; this.webSocket.onclose = (event) => { - console.debug("WebSocket connection closed", event); + console.log("WebSocket connection closed", event); + this.stopHeartbeat(); // Stop heartbeat when connection closes this.webSocket = null; + // Attempt to reconnect after a delay + setTimeout(() => this.connectWebSocket(), 1000); }; this.webSocket.onerror = (error) => { console.error("WebSocket error:", error); + this.stopHeartbeat(); // Stop heartbeat on error reject(error); }; this.webSocket.onmessage = (event) => { const message: WebsocketMessage = JSON.parse(event.data); - if (message.method == "execution_event") { + + // Handle heartbeat response + if (message.method === "heartbeat" && message.data === "pong") { + this.handleHeartbeatResponse(); + return; + } + + if (message.method === "execution_event") { message.data = parseNodeExecutionResultTimestamps(message.data); } this.wsMessageHandlers[message.method]?.forEach((handler) => @@ -367,6 +421,7 @@ export default class BaseAutoGPTServerAPI { } disconnectWebSocket() { + this.stopHeartbeat(); // Stop heartbeat when disconnecting if (this.webSocket && this.webSocket.readyState === WebSocket.OPEN) { this.webSocket.close(); } @@ -423,6 +478,7 @@ type GraphCreateRequestBody = type WebsocketMessageTypeMap = { subscribe: { graph_id: string }; execution_event: NodeExecutionResult; + heartbeat: "ping" | "pong"; }; type WebsocketMessage = {