Skip to content

Commit

Permalink
[protocol] connection: buffer messages until reconnect
Browse files Browse the repository at this point in the history
This avoids Error("Connection got disposed") errors which gobble up to the frontend and block users.
  • Loading branch information
geropl committed Sep 22, 2021
1 parent b031cf3 commit 4f14526
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 8 deletions.
2 changes: 1 addition & 1 deletion components/dashboard/src/Login.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ export function Login() {
}, [])

const authorizeSuccessful = async (payload?: string) => {
updateUser();
updateUser().catch(console.error);
// Check for a valid returnTo in payload
const safeReturnTo = getSafeURLRedirect(payload);
if (safeReturnTo) {
Expand Down
170 changes: 163 additions & 7 deletions components/gitpod-protocol/src/messaging/browser/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*/

import { listen as doListen, Logger, ConsoleLogger } from "vscode-ws-jsonrpc";
import { Logger, ConsoleLogger, toSocket, IWebSocket } from "vscode-ws-jsonrpc";
import { MessageConnection, createMessageConnection } from "vscode-jsonrpc";
import { AbstractMessageWriter } from "vscode-jsonrpc/lib/messageWriter";
import { AbstractMessageReader } from "vscode-jsonrpc/lib/messageReader";
import { JsonRpcProxyFactory, JsonRpcProxy } from "../proxy-factory";
import { ConnectionHandler } from "../handler";
import ReconnectingWebSocket from 'reconnecting-websocket';
import ReconnectingWebSocket, { Event } from 'reconnecting-websocket';

export interface WebSocketOptions {
onerror?: (event: Event) => void;
Expand Down Expand Up @@ -64,11 +67,11 @@ export class WebSocketConnectionProvider {
logger.error(JSON.stringify(error));
});
}
doListen({
webSocket,
onConnection: connection => handler.onConnection(connection),
logger
});
doListen(
webSocket as any as ReconnectingWebSocket,
connection => handler.onConnection(connection),
logger,
);
return webSocket;
}

Expand All @@ -91,3 +94,156 @@ export class WebSocketConnectionProvider {
}

}

// The following was extracted from vscode-ws-jsonrpc to make these changes:
// - switch from WebSocket to ReconnectingWebSocket
// - webSocket.onopen: making sure it's only ever called once so we're re-using MessageConnection
// - WebSocketMessageWriter: buffer and re-try messages instead of throwing an error immidiately
// - WebSocketMessageReader: don't close MessageConnection on 'socket.onclose'
function doListen(resocket: ReconnectingWebSocket, onConnection: (connection: MessageConnection) => void, logger: Logger) {
let alreadyOpened = false;
resocket.onopen = () => {
if (alreadyOpened) {
return;
}
alreadyOpened = true;

const connection = createWebSocketConnection(resocket, logger);
onConnection(connection);
};
}

function createWebSocketConnection(resocket: ReconnectingWebSocket, logger: Logger) {
const socket = toSocket(resocket as any);
const messageReader = new NonClosingWebSocketMessageReader(socket);
const messageWriter = new BufferingWebSocketMessageWriter(resocket, logger);
const connection = createMessageConnection(messageReader, messageWriter, logger);
connection.onClose(() => connection.dispose());
return connection;
}

/**
* This takes vscode-ws-jsonrpc/lib/socket/writer/WebSocketMessageWriter and adds a buffer
*/
class BufferingWebSocketMessageWriter extends AbstractMessageWriter {
protected readonly socket: ReconnectingWebSocket;
protected readonly logger: Logger;
protected errorCount: number = 0;

protected buffer: any[] = [];

constructor(socket: ReconnectingWebSocket, logger: Logger) {
super();
this.socket = socket;
this.logger = logger;

socket.addEventListener("open", (event: Event) => this.flushBuffer());
}

write(msg: any) {
if (this.socket.readyState !== ReconnectingWebSocket.OPEN) {
this.bufferMsg(msg);
return;
}

try {
const content = JSON.stringify(msg);
this.socket.send(content);
} catch (e) {
this.errorCount++;
this.fireError(e, msg, this.errorCount);

this.bufferMsg(msg);
}
}

protected flushBuffer() {
if (this.buffer.length === 0) {
return
}

const buffer = [...this.buffer];
this.buffer = [];
for (const msg of buffer) {
this.write(msg);
}
this.logger.info(`flushed buffer (${this.buffer.length})`)
}

protected bufferMsg(msg: any) {
this.buffer.push(msg);
this.logger.info(`buffered message (${this.buffer.length})`);
}
}


/**
* This takes vscode-ws-jsonrpc/lib/socket/reader/WebSocketMessageReader and removes the "onClose -> fireClose" connection
*/
class NonClosingWebSocketMessageReader extends AbstractMessageReader {
protected readonly socket: IWebSocket;
protected readonly events: any[] = [];
protected state: 'initial' | 'listening' | 'closed' = 'initial';
protected callback: (message: any) => void = () => {};

constructor(socket: IWebSocket) {
super();
this.socket = socket;
this.socket.onMessage(message => this.readMessage(message));
this.socket.onError(error => this.fireError(error));
this.socket.onClose((code, reason) => {
if (code !== 1000) {
const error = {
name: '' + code,
message: `Error during socket reconnect: code = ${code}, reason = ${reason}`
};
this.fireError(error);
}
// this.fireClose(); // <-- reason for this class to be copied over
});
}
listen(callback: (message: any) => void) {
if (this.state === 'initial') {
this.state = 'listening';
this.callback = callback;
while (this.events.length !== 0) {
const event = this.events.pop();
if (event.message) {
this.readMessage(event.message);
}
else if (event.error) {
this.fireError(event.error);
}
else {
this.fireClose();
}
}
}
}
readMessage(message: any) {
if (this.state === 'initial') {
this.events.splice(0, 0, { message });
}
else if (this.state === 'listening') {
const data = JSON.parse(message);
this.callback(data);
}
}
fireError(error: any) {
if (this.state === 'initial') {
this.events.splice(0, 0, { error });
}
else if (this.state === 'listening') {
super.fireError(error);
}
}
fireClose() {
if (this.state === 'initial') {
this.events.splice(0, 0, {});
}
else if (this.state === 'listening') {
super.fireClose();
}
this.state = 'closed';
}
}

0 comments on commit 4f14526

Please sign in to comment.