From 4f145267485d933677b7cc73b16b3d7607138e04 Mon Sep 17 00:00:00 2001 From: Gero Posmyk-Leinemann Date: Tue, 21 Sep 2021 10:02:54 +0000 Subject: [PATCH] [protocol] connection: buffer messages until reconnect This avoids Error("Connection got disposed") errors which gobble up to the frontend and block users. --- components/dashboard/src/Login.tsx | 2 +- .../src/messaging/browser/connection.ts | 170 +++++++++++++++++- 2 files changed, 164 insertions(+), 8 deletions(-) diff --git a/components/dashboard/src/Login.tsx b/components/dashboard/src/Login.tsx index e67bca5f2024de..71df482bdecef1 100644 --- a/components/dashboard/src/Login.tsx +++ b/components/dashboard/src/Login.tsx @@ -53,7 +53,7 @@ export function Login() { }, []) const authorizeSuccessful = async (payload?: string) => { - updateUser(); + updateUser().catch(console.error); // Check for a valid returnTo in payload const safeReturnTo = getSafeURLRedirect(payload); if (safeReturnTo) { diff --git a/components/gitpod-protocol/src/messaging/browser/connection.ts b/components/gitpod-protocol/src/messaging/browser/connection.ts index 9adc452a7795da..3b495b6123b2a2 100644 --- a/components/gitpod-protocol/src/messaging/browser/connection.ts +++ b/components/gitpod-protocol/src/messaging/browser/connection.ts @@ -5,10 +5,13 @@ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 */ -import { listen as doListen, Logger, ConsoleLogger } from "vscode-ws-jsonrpc"; +import { Logger, ConsoleLogger, toSocket, IWebSocket } from "vscode-ws-jsonrpc"; +import { MessageConnection, createMessageConnection } from "vscode-jsonrpc"; +import { AbstractMessageWriter } from "vscode-jsonrpc/lib/messageWriter"; +import { AbstractMessageReader } from "vscode-jsonrpc/lib/messageReader"; import { JsonRpcProxyFactory, JsonRpcProxy } from "../proxy-factory"; import { ConnectionHandler } from "../handler"; -import ReconnectingWebSocket from 'reconnecting-websocket'; +import ReconnectingWebSocket, { Event } from 'reconnecting-websocket'; export interface WebSocketOptions { onerror?: (event: Event) => void; @@ -64,11 +67,11 @@ export class WebSocketConnectionProvider { logger.error(JSON.stringify(error)); }); } - doListen({ - webSocket, - onConnection: connection => handler.onConnection(connection), - logger - }); + doListen( + webSocket as any as ReconnectingWebSocket, + connection => handler.onConnection(connection), + logger, + ); return webSocket; } @@ -91,3 +94,156 @@ export class WebSocketConnectionProvider { } } + +// The following was extracted from vscode-ws-jsonrpc to make these changes: +// - switch from WebSocket to ReconnectingWebSocket +// - webSocket.onopen: making sure it's only ever called once so we're re-using MessageConnection +// - WebSocketMessageWriter: buffer and re-try messages instead of throwing an error immidiately +// - WebSocketMessageReader: don't close MessageConnection on 'socket.onclose' +function doListen(resocket: ReconnectingWebSocket, onConnection: (connection: MessageConnection) => void, logger: Logger) { + let alreadyOpened = false; + resocket.onopen = () => { + if (alreadyOpened) { + return; + } + alreadyOpened = true; + + const connection = createWebSocketConnection(resocket, logger); + onConnection(connection); + }; +} + +function createWebSocketConnection(resocket: ReconnectingWebSocket, logger: Logger) { + const socket = toSocket(resocket as any); + const messageReader = new NonClosingWebSocketMessageReader(socket); + const messageWriter = new BufferingWebSocketMessageWriter(resocket, logger); + const connection = createMessageConnection(messageReader, messageWriter, logger); + connection.onClose(() => connection.dispose()); + return connection; +} + +/** + * This takes vscode-ws-jsonrpc/lib/socket/writer/WebSocketMessageWriter and adds a buffer + */ +class BufferingWebSocketMessageWriter extends AbstractMessageWriter { + protected readonly socket: ReconnectingWebSocket; + protected readonly logger: Logger; + protected errorCount: number = 0; + + protected buffer: any[] = []; + + constructor(socket: ReconnectingWebSocket, logger: Logger) { + super(); + this.socket = socket; + this.logger = logger; + + socket.addEventListener("open", (event: Event) => this.flushBuffer()); + } + + write(msg: any) { + if (this.socket.readyState !== ReconnectingWebSocket.OPEN) { + this.bufferMsg(msg); + return; + } + + try { + const content = JSON.stringify(msg); + this.socket.send(content); + } catch (e) { + this.errorCount++; + this.fireError(e, msg, this.errorCount); + + this.bufferMsg(msg); + } + } + + protected flushBuffer() { + if (this.buffer.length === 0) { + return + } + + const buffer = [...this.buffer]; + this.buffer = []; + for (const msg of buffer) { + this.write(msg); + } + this.logger.info(`flushed buffer (${this.buffer.length})`) + } + + protected bufferMsg(msg: any) { + this.buffer.push(msg); + this.logger.info(`buffered message (${this.buffer.length})`); + } +} + + +/** + * This takes vscode-ws-jsonrpc/lib/socket/reader/WebSocketMessageReader and removes the "onClose -> fireClose" connection + */ +class NonClosingWebSocketMessageReader extends AbstractMessageReader { + protected readonly socket: IWebSocket; + protected readonly events: any[] = []; + protected state: 'initial' | 'listening' | 'closed' = 'initial'; + protected callback: (message: any) => void = () => {}; + + constructor(socket: IWebSocket) { + super(); + this.socket = socket; + this.socket.onMessage(message => this.readMessage(message)); + this.socket.onError(error => this.fireError(error)); + this.socket.onClose((code, reason) => { + if (code !== 1000) { + const error = { + name: '' + code, + message: `Error during socket reconnect: code = ${code}, reason = ${reason}` + }; + this.fireError(error); + } + // this.fireClose(); // <-- reason for this class to be copied over + }); + } + listen(callback: (message: any) => void) { + if (this.state === 'initial') { + this.state = 'listening'; + this.callback = callback; + while (this.events.length !== 0) { + const event = this.events.pop(); + if (event.message) { + this.readMessage(event.message); + } + else if (event.error) { + this.fireError(event.error); + } + else { + this.fireClose(); + } + } + } + } + readMessage(message: any) { + if (this.state === 'initial') { + this.events.splice(0, 0, { message }); + } + else if (this.state === 'listening') { + const data = JSON.parse(message); + this.callback(data); + } + } + fireError(error: any) { + if (this.state === 'initial') { + this.events.splice(0, 0, { error }); + } + else if (this.state === 'listening') { + super.fireError(error); + } + } + fireClose() { + if (this.state === 'initial') { + this.events.splice(0, 0, {}); + } + else if (this.state === 'listening') { + super.fireClose(); + } + this.state = 'closed'; + } +}