From 9ebae06b27aff62f02bce7901f97ede13aafa1fb Mon Sep 17 00:00:00 2001 From: Hans-Joachim Krauch Date: Fri, 27 Jan 2023 17:28:17 -0300 Subject: [PATCH] @foxglove/ws-protocol: Add services capability, minor cleanup (#347) **Public-Facing Changes** - Updates `@foxglove/ws-protocol` to implement service operations added in #344 **Description** - Updates `@foxglove/ws-protocol` to implement service operations added in #344 - Minor cleanup in the way client messages are being handled --- typescript/ws-protocol/package.json | 2 +- typescript/ws-protocol/src/FoxgloveClient.ts | 45 ++++++ .../ws-protocol/src/FoxgloveServer.test.ts | 89 ++++++++++- typescript/ws-protocol/src/FoxgloveServer.ts | 146 ++++++++++++------ typescript/ws-protocol/src/parse.ts | 48 +++++- typescript/ws-protocol/src/types.ts | 46 +++++- 6 files changed, 322 insertions(+), 54 deletions(-) diff --git a/typescript/ws-protocol/package.json b/typescript/ws-protocol/package.json index 13cf1016..08d82151 100644 --- a/typescript/ws-protocol/package.json +++ b/typescript/ws-protocol/package.json @@ -1,6 +1,6 @@ { "name": "@foxglove/ws-protocol", - "version": "0.3.3", + "version": "0.4.0", "description": "Foxglove Studio WebSocket protocol", "keywords": [ "foxglove", diff --git a/typescript/ws-protocol/src/FoxgloveClient.ts b/typescript/ws-protocol/src/FoxgloveClient.ts index b5e04d5f..1d7cb840 100644 --- a/typescript/ws-protocol/src/FoxgloveClient.ts +++ b/typescript/ws-protocol/src/FoxgloveClient.ts @@ -13,6 +13,10 @@ import { Parameter, ParameterValues, ServerMessage, + Service, + ServiceCallPayload, + ServiceCallResponse, + ServiceId, SubscriptionId, Time, } from "./types"; @@ -28,9 +32,14 @@ type EventTypes = { time: (event: Time) => void; advertise: (newChannels: Channel[]) => void; unadvertise: (removedChannels: ChannelId[]) => void; + advertiseServices: (newServices: Service[]) => void; + unadvertiseServices: (removedServices: ServiceId[]) => void; parameterValues: (event: ParameterValues) => void; + serviceCallResponse: (event: ServiceCallResponse) => void; }; +const textEncoder = new TextEncoder(); + export default class FoxgloveClient { static SUPPORTED_SUBPROTOCOL = "foxglove.websocket.v1"; @@ -104,6 +113,14 @@ export default class FoxgloveClient { this.emitter.emit("parameterValues", message); return; + case "advertiseServices": + this.emitter.emit("advertiseServices", message.services); + return; + + case "unadvertiseServices": + this.emitter.emit("unadvertiseServices", message.serviceIds); + return; + case BinaryOpcode.MESSAGE_DATA: this.emitter.emit("message", message); return; @@ -111,6 +128,10 @@ export default class FoxgloveClient { case BinaryOpcode.TIME: this.emitter.emit("time", message); return; + + case BinaryOpcode.SERVICE_CALL_RESPONSE: + this.emitter.emit("serviceCallResponse", message); + return; } this.emitter.emit( "error", @@ -173,6 +194,30 @@ export default class FoxgloveClient { this.ws.send(payload); } + sendCallServiceRequest(request: ServiceCallPayload): void { + const encoding = textEncoder.encode(request.encoding); + const payload = new Uint8Array(1 + 4 + 4 + 4 + encoding.length + request.data.byteLength); + const view = new DataView(payload.buffer, payload.byteOffset, payload.byteLength); + let offset = 0; + view.setUint8(offset, ClientBinaryOpcode.SERVICE_CALL_REQUEST); + offset += 1; + view.setUint32(offset, request.serviceId, true); + offset += 4; + view.setUint32(offset, request.callId, true); + offset += 4; + view.setUint32(offset, request.encoding.length, true); + offset += 4; + payload.set(encoding, offset); + offset += encoding.length; + const data = new Uint8Array( + request.data.buffer, + request.data.byteOffset, + request.data.byteLength, + ); + payload.set(data, offset); + this.ws.send(payload); + } + private send(message: ClientMessage) { this.ws.send(JSON.stringify(message)); } diff --git a/typescript/ws-protocol/src/FoxgloveServer.test.ts b/typescript/ws-protocol/src/FoxgloveServer.test.ts index 33c15d8b..057ac06b 100644 --- a/typescript/ws-protocol/src/FoxgloveServer.test.ts +++ b/typescript/ws-protocol/src/FoxgloveServer.test.ts @@ -1,7 +1,15 @@ import ConsumerQueue from "consumer-queue"; import { AddressInfo, Data, WebSocket, WebSocketServer } from "ws"; -import { BinaryOpcode, ClientPublish, IWebSocket, Parameter, ServerCapability } from "."; +import { + BinaryOpcode, + ClientBinaryOpcode, + ClientPublish, + IWebSocket, + Parameter, + ServerCapability, + ServiceCallPayload, +} from "."; import FoxgloveServer from "./FoxgloveServer"; function uint32LE(n: number): Uint8Array { @@ -60,6 +68,9 @@ async function setupServerAndClient(server: FoxgloveServer) { server.on("unsubscribeParameterUpdates", (event) => eventQueue.push(["unsubscribeParameterUpdates", event]), ); + server.on("serviceCallRequest", (event, clientConnection) => + eventQueue.push(["serviceCallRequest", event, clientConnection]), + ); const nextEvent = async () => await eventQueue.pop(); @@ -393,4 +404,80 @@ describe("FoxgloveServer", () => { } close(); }); + + it("receives service request from client and sends response back", async () => { + const server = new FoxgloveServer({ + name: "foo", + capabilities: [ServerCapability.services], + supportedEncodings: ["json"], + }); + const { send, nextJsonMessage, nextBinaryMessage, nextEvent, close } = + await setupServerAndClient(server); + + try { + await expect(nextJsonMessage()).resolves.toEqual({ + op: "serverInfo", + name: "foo", + capabilities: ["services"], + supportedEncodings: ["json"], + }); + + const service = { + name: "foo", + type: "bar", + requestSchema: "schema1", + responseSchema: "FooShcame", + }; + const serviceId = server.addService(service); + await expect(nextJsonMessage()).resolves.toEqual({ + op: "advertiseServices", + services: [{ ...service, id: serviceId }], + }); + + const request: ServiceCallPayload = { + serviceId, + callId: 123, + encoding: "json", + data: new DataView(new Uint8Array([1, 2, 3]).buffer), + }; + + const serializedRequest = new Uint8Array([ + ClientBinaryOpcode.SERVICE_CALL_REQUEST, + ...uint32LE(serviceId), + ...uint32LE(request.callId), + ...uint32LE(request.encoding.length), + ...new TextEncoder().encode(request.encoding), + ...new Uint8Array(request.data.buffer, request.data.byteOffset, request.data.byteLength), + ]); + send(serializedRequest); + + const [eventId, receivedRequest, connection] = await nextEvent(); + expect(eventId).toEqual("serviceCallRequest"); + expect(receivedRequest).toEqual({ op: ClientBinaryOpcode.SERVICE_CALL_REQUEST, ...request }); + + const response: ServiceCallPayload = { + ...request, + data: new DataView(new Uint8Array([4, 5, 6]).buffer), + }; + + server.sendServiceCallResponse(response, connection as IWebSocket); + + await expect(nextBinaryMessage()).resolves.toEqual( + new Uint8Array([ + BinaryOpcode.SERVICE_CALL_RESPONSE, + ...uint32LE(response.serviceId), + ...uint32LE(response.callId), + ...uint32LE("json".length), + ...new TextEncoder().encode("json"), + 4, + 5, + 6, + ]), + ); + } catch (ex) { + close(); + throw ex; + } + close(); + }); }); diff --git a/typescript/ws-protocol/src/FoxgloveServer.ts b/typescript/ws-protocol/src/FoxgloveServer.ts index 08a4d78f..04378caa 100644 --- a/typescript/ws-protocol/src/FoxgloveServer.ts +++ b/typescript/ws-protocol/src/FoxgloveServer.ts @@ -2,6 +2,7 @@ import createDebug from "debug"; import EventEmitter from "eventemitter3"; import { ChannelId, StatusLevel } from "."; +import { parseClientMessage } from "./parse"; import { BinaryOpcode, Channel, @@ -14,6 +15,10 @@ import { Parameter, ServerCapability, ServerMessage, + Service, + ServiceCallPayload, + ServiceCallRequest, + ServiceId, SubscriptionId, } from "./types"; @@ -55,6 +60,8 @@ type EventTypes = { subscribeParameterUpdates: (parameterNames: string[]) => void; /** Request to unsubscribe from parameter value updates has been received. */ unsubscribeParameterUpdates: (parameterNames: string[]) => void; + /** Service call request has been received. */ + serviceCallRequest: (request: ServiceCallRequest, clientConnection: IWebSocket) => void; }; const log = createDebug("foxglove:server"); @@ -67,10 +74,12 @@ const REQUIRED_CAPABILITY_BY_OPERATION: Record< unsubscribe: undefined, advertise: ServerCapability.clientPublish, unadvertise: ServerCapability.clientPublish, + [ClientBinaryOpcode.MESSAGE_DATA]: ServerCapability.clientPublish, getParameters: ServerCapability.parameters, setParameters: ServerCapability.parameters, subscribeParameterUpdates: ServerCapability.parametersSubscribe, unsubscribeParameterUpdates: ServerCapability.parametersSubscribe, + [ClientBinaryOpcode.SERVICE_CALL_REQUEST]: ServerCapability.services, }; export default class FoxgloveServer { @@ -83,6 +92,8 @@ export default class FoxgloveServer { private clients = new Map(); private nextChannelId: ChannelId = 0; private channels = new Map(); + private nextServiceId: ServiceId = 0; + private services = new Map(); constructor({ name, @@ -157,6 +168,32 @@ export default class FoxgloveServer { } } + /** + * Advertise a new service and inform any connected clients. + * @returns The id of the new service + */ + addService(service: Omit): ServiceId { + const newId = ++this.nextServiceId; + const newService: Service = { ...service, id: newId }; + this.services.set(newId, newService); + for (const client of this.clients.values()) { + this.send(client.connection, { op: "advertiseServices", services: [newService] }); + } + return newId; + } + + /** + * Remove a previously advertised service and inform any connected clients. + */ + removeService(serviceId: ServiceId): void { + if (!this.services.delete(serviceId)) { + throw new Error(`Service ${serviceId} does not exist`); + } + for (const client of this.clients.values()) { + this.send(client.connection, { op: "unadvertiseServices", serviceIds: [serviceId] }); + } + } + /** * Emit a message payload to any clients subscribed to `chanId`. */ @@ -189,6 +226,34 @@ export default class FoxgloveServer { } } + /** + * Send a service call response to the client + * @param response Response to send to the client + * @param connection Connection of the client that called the service + */ + sendServiceCallResponse(response: ServiceCallPayload, connection: IWebSocket): void { + const utf8Encode = new TextEncoder(); + const encoding = utf8Encode.encode(response.encoding); + const payload = new Uint8Array(1 + 4 + 4 + 4 + encoding.length + response.data.byteLength); + const view = new DataView(payload.buffer, payload.byteOffset, payload.byteLength); + let offset = 0; + view.setUint8(offset, BinaryOpcode.SERVICE_CALL_RESPONSE); + offset += 1; + view.setUint32(offset, response.serviceId, true); + offset += 4; + view.setUint32(offset, response.callId, true); + offset += 4; + view.setUint32(offset, response.encoding.length, true); + offset += 4; + payload.set(encoding, offset); + offset += encoding.length; + payload.set( + new Uint8Array(response.data.buffer, response.data.byteOffset, response.data.byteLength), + offset, + ); + connection.send(payload); + } + /** * Publish parameter values. * @param parameters Parameter values @@ -262,6 +327,12 @@ export default class FoxgloveServer { if (this.channels.size > 0) { this.send(connection, { op: "advertise", channels: Array.from(this.channels.values()) }); } + if (this.services.size > 0) { + this.send(connection, { + op: "advertiseServices", + services: Array.from(this.services.values()), + }); + } connection.onclose = (event: CloseEvent) => { log( @@ -281,40 +352,18 @@ export default class FoxgloveServer { }; connection.onmessage = (event: MessageEvent) => { - if (typeof event.data === "string") { - // TEXT (JSON) message handling - let message: unknown; - try { - message = JSON.parse(event.data) as unknown; - } catch (error) { - this.emitter.emit( - "error", - new Error(`Invalid JSON message from ${name}: ${(error as Error).message}`), - ); - return; - } - - if (typeof message !== "object" || message == undefined) { - this.emitter.emit("error", new Error(`Expected JSON object, got ${typeof message}`)); - return; + let message: ClientMessage; + try { + if (event.data instanceof ArrayBuffer) { + message = parseClientMessage(event.data); + } else { + message = JSON.parse(event.data) as ClientMessage; } - try { - this.handleClientMessage(client, message as ClientMessage); - } catch (error) { - this.emitter.emit("error", error as Error); - } - } else if (event.data instanceof ArrayBuffer) { - // BINARY message handling - const message = new DataView(event.data); - - try { - this.handleClientBinaryMessage(client, message); - } catch (error) { - this.emitter.emit("error", error as Error); - } - } else { - this.emitter.emit("error", new Error(`Unexpected message type ${typeof event.data}`)); + this.handleClientMessage(client, message); + } catch (error) { + this.emitter.emit("error", error as Error); + return; } }; } @@ -482,32 +531,29 @@ export default class FoxgloveServer { } break; - default: - throw new Error(`Unrecognized client opcode: ${(message as { op: string }).op}`); - } - } - - private handleClientBinaryMessage(client: ClientInfo, message: DataView): void { - if (message.byteLength < 5) { - throw new Error(`Invalid binary message length ${message.byteLength}`); - } - - const opcode = message.getUint8(0); - switch (opcode) { case ClientBinaryOpcode.MESSAGE_DATA: { - const channelId = message.getUint32(1, true); - const channel = client.advertisements.get(channelId); + const channel = client.advertisements.get(message.channelId); if (!channel) { - throw new Error(`Client sent message data for unknown channel ${channelId}`); + throw new Error(`Client sent message data for unknown channel ${message.channelId}`); } - - const data = new DataView(message.buffer, message.byteOffset + 5, message.byteLength - 5); + const data = message.data; this.emitter.emit("message", { client, channel, data }); break; } + case ClientBinaryOpcode.SERVICE_CALL_REQUEST: { + const service = this.services.get(message.serviceId); + if (!service) { + throw new Error( + `Client sent service call request for unknown service ${message.serviceId}`, + ); + } + this.emitter.emit("serviceCallRequest", message, client.connection); + break; + } + default: - throw new Error(`Unrecognized client binary opcode: ${opcode}`); + throw new Error(`Unrecognized client opcode: ${(message as { op: string }).op}`); } } diff --git a/typescript/ws-protocol/src/parse.ts b/typescript/ws-protocol/src/parse.ts index d785f7e8..73672a73 100644 --- a/typescript/ws-protocol/src/parse.ts +++ b/typescript/ws-protocol/src/parse.ts @@ -1,4 +1,6 @@ -import { BinaryOpcode, ServerMessage } from "./types"; +import { BinaryOpcode, ClientBinaryOpcode, ClientMessage, ServerMessage } from "./types"; + +const textDecoder = new TextDecoder(); export function parseServerMessage(buffer: ArrayBuffer): ServerMessage { const view = new DataView(buffer); @@ -20,6 +22,50 @@ export function parseServerMessage(buffer: ArrayBuffer): ServerMessage { const timestamp = view.getBigUint64(offset, true); return { op, timestamp }; } + case BinaryOpcode.SERVICE_CALL_RESPONSE: { + const serviceId = view.getUint32(offset, true); + offset += 4; + const callId = view.getUint32(offset, true); + offset += 4; + const encodingLength = view.getUint32(offset, true); + offset += 4; + const encodingBytes = new DataView(buffer, offset, encodingLength); + const encoding = textDecoder.decode(encodingBytes); + offset += encodingLength; + const data = new DataView(buffer, offset, buffer.byteLength - offset); + return { op, serviceId, callId, encoding, data }; + } } throw new Error(`Unrecognized server opcode in binary message: ${op.toString(16)}`); } + +export function parseClientMessage(buffer: ArrayBuffer): ClientMessage { + const view = new DataView(buffer); + + let offset = 0; + const op = view.getUint8(offset); + offset += 1; + + switch (op as ClientBinaryOpcode) { + case ClientBinaryOpcode.MESSAGE_DATA: { + const channelId = view.getUint32(offset, true); + offset += 4; + const data = new DataView(buffer, offset, buffer.byteLength - offset); + return { op, channelId, data }; + } + case ClientBinaryOpcode.SERVICE_CALL_REQUEST: { + const serviceId = view.getUint32(offset, true); + offset += 4; + const callId = view.getUint32(offset, true); + offset += 4; + const encodingLength = view.getUint32(offset, true); + offset += 4; + const encodingBytes = new DataView(buffer, offset, encodingLength); + const encoding = textDecoder.decode(encodingBytes); + offset += encodingLength; + const data = new DataView(buffer, offset, buffer.byteLength - offset); + return { op, serviceId, callId, encoding, data }; + } + } + throw new Error(`Unrecognized client opcode in binary message: ${op.toString(16)}`); +} diff --git a/typescript/ws-protocol/src/types.ts b/typescript/ws-protocol/src/types.ts index 6e582724..4a182be1 100644 --- a/typescript/ws-protocol/src/types.ts +++ b/typescript/ws-protocol/src/types.ts @@ -1,9 +1,11 @@ export enum BinaryOpcode { MESSAGE_DATA = 1, TIME = 2, + SERVICE_CALL_RESPONSE = 3, } export enum ClientBinaryOpcode { MESSAGE_DATA = 1, + SERVICE_CALL_REQUEST = 2, } export enum StatusLevel { INFO = 0, @@ -15,11 +17,13 @@ export enum ServerCapability { time = "time", parameters = "parameters", parametersSubscribe = "parametersSubscribe", + services = "services", } export type ChannelId = number; export type ClientChannelId = number; export type SubscriptionId = number; +export type ServiceId = number; export type Channel = { id: ChannelId; @@ -28,6 +32,13 @@ export type Channel = { schemaName: string; schema: string; }; +export type Service = { + id: number; + name: string; + type: string; + requestSchema: string; + responseSchema: string; +}; export type Subscribe = { op: "subscribe"; @@ -56,6 +67,23 @@ export type ClientUnadvertise = { channelIds: ClientChannelId[]; }; +export type ClientMessageData = { + op: ClientBinaryOpcode.MESSAGE_DATA; + channelId: ClientChannelId; + data: DataView; +}; + +export type ServiceCallPayload = { + serviceId: ServiceId; + callId: number; + encoding: string; + data: DataView; +}; + +export type ServiceCallRequest = ServiceCallPayload & { + op: ClientBinaryOpcode.SERVICE_CALL_REQUEST; +}; + export type ClientMessage = | Subscribe | Unsubscribe @@ -64,7 +92,9 @@ export type ClientMessage = | GetParameters | SetParameters | SubscribeParameterUpdates - | UnsubscribeParameterUpdates; + | UnsubscribeParameterUpdates + | ClientMessageData + | ServiceCallRequest; export type ServerInfo = { op: "serverInfo"; @@ -109,6 +139,14 @@ export type UnsubscribeParameterUpdates = { op: "unsubscribeParameterUpdates"; parameterNames: string[]; }; +export type AdvertiseServices = { + op: "advertiseServices"; + services: Service[]; +}; +export type UnadvertiseServices = { + op: "unadvertiseServices"; + serviceIds: ServiceId[]; +}; export type MessageData = { op: BinaryOpcode.MESSAGE_DATA; subscriptionId: SubscriptionId; @@ -119,6 +157,9 @@ export type Time = { op: BinaryOpcode.TIME; timestamp: bigint; }; +export type ServiceCallResponse = ServiceCallPayload & { + op: BinaryOpcode.SERVICE_CALL_RESPONSE; +}; export type ClientPublish = { channel: ClientChannel; data: DataView; @@ -133,8 +174,11 @@ export type ServerMessage = | StatusMessage | Advertise | Unadvertise + | AdvertiseServices + | UnadvertiseServices | MessageData | Time + | ServiceCallResponse | ParameterValues; /**