Skip to content

Commit

Permalink
add heartbeat mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
aarushik93 committed Nov 15, 2024
1 parent f27f596 commit cc608c4
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 7 deletions.
7 changes: 7 additions & 0 deletions autogpt_platform/backend/backend/server/conn_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ def disconnect(self, websocket: WebSocket):
for subscribers in self.subscriptions.values():
subscribers.discard(websocket)

async def handle_heartbeat(self, websocket: WebSocket):
"""Handle heartbeat messages by responding with pong"""
await websocket.send_json({
"method": Methods.HEARTBEAT,
"data": "pong"
})

async def subscribe(self, graph_id: str, websocket: WebSocket):
if graph_id not in self.subscriptions:
self.subscriptions[graph_id] = set()
Expand Down
1 change: 1 addition & 0 deletions autogpt_platform/backend/backend/server/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Methods(enum.Enum):
UNSUBSCRIBE = "unsubscribe"
EXECUTION_EVENT = "execution_event"
ERROR = "error"
HEARTBEAT = "heartbeat"


class WsMessage(pydantic.BaseModel):
Expand Down
5 changes: 5 additions & 0 deletions autogpt_platform/backend/backend/server/ws_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ async def websocket_router(
while True:
data = await websocket.receive_text()
message = WsMessage.model_validate_json(data)

if message.method == Methods.HEARTBEAT and message.data == "ping":
await manager.handle_heartbeat(websocket)
continue

if message.method == Methods.SUBSCRIBE:
await handle_subscribe(websocket, manager, message)

Expand Down
62 changes: 55 additions & 7 deletions autogpt_platform/frontend/src/lib/autogpt-server-api/baseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ export default class BaseAutoGPTServerAPI {
private wsConnecting: Promise<void> | null = null;
private wsMessageHandlers: Record<string, Set<(data: any) => void>> = {};
private supabaseClient: SupabaseClient | null = null;
heartbeatInterval: number | null = null;
readonly HEARTBEAT_INTERVAL = 30000; // 30 seconds
readonly HEARTBEAT_TIMEOUT = 10000; // 10 seconds
heartbeatTimeoutId: number | null = null;

constructor(
baseUrl: string = process.env.NEXT_PUBLIC_AGPT_SERVER_URL ||
Expand Down Expand Up @@ -336,38 +340,80 @@ export default class BaseAutoGPTServerAPI {
}
}

async connectWebSocket(): Promise<void> {
startHeartbeat() {
this.stopHeartbeat();
this.heartbeatInterval = setInterval(() => {
if (this.webSocket?.readyState === WebSocket.OPEN) {
this.sendWebSocketMessage("heartbeat", "ping");

this.heartbeatTimeoutId = setTimeout(() => {
console.warn("Heartbeat timeout - reconnecting");
this.webSocket?.close();
this.connectWebSocket();
}, this.HEARTBEAT_TIMEOUT);
}
}, this.HEARTBEAT_INTERVAL);
}

stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
if (this.heartbeatTimeoutId) {
clearTimeout(this.heartbeatTimeoutId);
this.heartbeatTimeoutId = null;
}
}

handleHeartbeatResponse() {
if (this.heartbeatTimeoutId) {
clearTimeout(this.heartbeatTimeoutId);
this.heartbeatTimeoutId = null;
}
}

async connectWebSocket(): Promise<void> {
this.wsConnecting ??= new Promise(async (resolve, reject) => {
try {
const token =
(await this.supabaseClient?.auth.getSession())?.data.session
?.access_token || "";

const token = (await this.supabaseClient?.auth.getSession())?.data.session?.access_token || "";
const wsUrlWithToken = `${this.wsUrl}?token=${token}`;
this.webSocket = new WebSocket(wsUrlWithToken);

this.webSocket.onopen = () => {
console.debug("WebSocket connection established");
this.startHeartbeat(); // Start heartbeat when connection opens
resolve();
};

this.webSocket.onclose = (event) => {
console.debug("WebSocket connection closed", event);
this.stopHeartbeat(); // Stop heartbeat when connection closes
this.webSocket = null;
// Attempt to reconnect after a delay
setTimeout(() => this.connectWebSocket(), 1000);
};

this.webSocket.onerror = (error) => {
console.error("WebSocket error:", error);
this.stopHeartbeat(); // Stop heartbeat on error
reject(error);
};

this.webSocket.onmessage = (event) => {
const message: WebsocketMessage = JSON.parse(event.data);
if (message.method == "execution_event") {

// Handle heartbeat response
if (message.method === "heartbeat" && message.data === "pong") {
this.handleHeartbeatResponse();
return;
}

if (message.method === "execution_event") {
message.data = parseNodeExecutionResultTimestamps(message.data);
}
this.wsMessageHandlers[message.method]?.forEach((handler) =>
handler(message.data),
handler(message.data)
);
};
} catch (error) {
Expand All @@ -379,6 +425,7 @@ export default class BaseAutoGPTServerAPI {
}

disconnectWebSocket() {
this.stopHeartbeat(); // Stop heartbeat when disconnecting
if (this.webSocket && this.webSocket.readyState === WebSocket.OPEN) {
this.webSocket.close();
}
Expand Down Expand Up @@ -435,6 +482,7 @@ type GraphCreateRequestBody =
type WebsocketMessageTypeMap = {
subscribe: { graph_id: string };
execution_event: NodeExecutionResult;
heartbeat: "ping" | "pong";
};

type WebsocketMessage = {
Expand Down

0 comments on commit cc608c4

Please sign in to comment.