Skip to content

Commit

Permalink
@foxglove/ws-protocol: Add services capability, minor cleanup (foxglo…
Browse files Browse the repository at this point in the history
…ve#347)

**Public-Facing Changes**
- Updates `@foxglove/ws-protocol` to implement service operations added
in foxglove#344


**Description**
- Updates `@foxglove/ws-protocol` to implement service operations added
in foxglove#344
- Minor cleanup in the way client messages are being handled
  • Loading branch information
achim-k authored and pezy committed Jul 27, 2023
1 parent 8f995a8 commit 9ebae06
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 54 deletions.
2 changes: 1 addition & 1 deletion typescript/ws-protocol/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@foxglove/ws-protocol",
"version": "0.3.3",
"version": "0.4.0",
"description": "Foxglove Studio WebSocket protocol",
"keywords": [
"foxglove",
Expand Down
45 changes: 45 additions & 0 deletions typescript/ws-protocol/src/FoxgloveClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import {
Parameter,
ParameterValues,
ServerMessage,
Service,
ServiceCallPayload,
ServiceCallResponse,
ServiceId,
SubscriptionId,
Time,
} from "./types";
Expand All @@ -28,9 +32,14 @@ type EventTypes = {
time: (event: Time) => void;
advertise: (newChannels: Channel[]) => void;
unadvertise: (removedChannels: ChannelId[]) => void;
advertiseServices: (newServices: Service[]) => void;
unadvertiseServices: (removedServices: ServiceId[]) => void;
parameterValues: (event: ParameterValues) => void;
serviceCallResponse: (event: ServiceCallResponse) => void;
};

const textEncoder = new TextEncoder();

export default class FoxgloveClient {
static SUPPORTED_SUBPROTOCOL = "foxglove.websocket.v1";

Expand Down Expand Up @@ -104,13 +113,25 @@ export default class FoxgloveClient {
this.emitter.emit("parameterValues", message);
return;

case "advertiseServices":
this.emitter.emit("advertiseServices", message.services);
return;

case "unadvertiseServices":
this.emitter.emit("unadvertiseServices", message.serviceIds);
return;

case BinaryOpcode.MESSAGE_DATA:
this.emitter.emit("message", message);
return;

case BinaryOpcode.TIME:
this.emitter.emit("time", message);
return;

case BinaryOpcode.SERVICE_CALL_RESPONSE:
this.emitter.emit("serviceCallResponse", message);
return;
}
this.emitter.emit(
"error",
Expand Down Expand Up @@ -173,6 +194,30 @@ export default class FoxgloveClient {
this.ws.send(payload);
}

sendCallServiceRequest(request: ServiceCallPayload): void {
const encoding = textEncoder.encode(request.encoding);
const payload = new Uint8Array(1 + 4 + 4 + 4 + encoding.length + request.data.byteLength);
const view = new DataView(payload.buffer, payload.byteOffset, payload.byteLength);
let offset = 0;
view.setUint8(offset, ClientBinaryOpcode.SERVICE_CALL_REQUEST);
offset += 1;
view.setUint32(offset, request.serviceId, true);
offset += 4;
view.setUint32(offset, request.callId, true);
offset += 4;
view.setUint32(offset, request.encoding.length, true);
offset += 4;
payload.set(encoding, offset);
offset += encoding.length;
const data = new Uint8Array(
request.data.buffer,
request.data.byteOffset,
request.data.byteLength,
);
payload.set(data, offset);
this.ws.send(payload);
}

private send(message: ClientMessage) {
this.ws.send(JSON.stringify(message));
}
Expand Down
89 changes: 88 additions & 1 deletion typescript/ws-protocol/src/FoxgloveServer.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import ConsumerQueue from "consumer-queue";
import { AddressInfo, Data, WebSocket, WebSocketServer } from "ws";

import { BinaryOpcode, ClientPublish, IWebSocket, Parameter, ServerCapability } from ".";
import {
BinaryOpcode,
ClientBinaryOpcode,
ClientPublish,
IWebSocket,
Parameter,
ServerCapability,
ServiceCallPayload,
} from ".";
import FoxgloveServer from "./FoxgloveServer";

function uint32LE(n: number): Uint8Array {
Expand Down Expand Up @@ -60,6 +68,9 @@ async function setupServerAndClient(server: FoxgloveServer) {
server.on("unsubscribeParameterUpdates", (event) =>
eventQueue.push(["unsubscribeParameterUpdates", event]),
);
server.on("serviceCallRequest", (event, clientConnection) =>
eventQueue.push(["serviceCallRequest", event, clientConnection]),
);

const nextEvent = async () => await eventQueue.pop();

Expand Down Expand Up @@ -393,4 +404,80 @@ describe("FoxgloveServer", () => {
}
close();
});

it("receives service request from client and sends response back", async () => {
const server = new FoxgloveServer({
name: "foo",
capabilities: [ServerCapability.services],
supportedEncodings: ["json"],
});
const { send, nextJsonMessage, nextBinaryMessage, nextEvent, close } =
await setupServerAndClient(server);

try {
await expect(nextJsonMessage()).resolves.toEqual({
op: "serverInfo",
name: "foo",
capabilities: ["services"],
supportedEncodings: ["json"],
});

const service = {
name: "foo",
type: "bar",
requestSchema: "schema1",
responseSchema: "FooShcame",
};
const serviceId = server.addService(service);
await expect(nextJsonMessage()).resolves.toEqual({
op: "advertiseServices",
services: [{ ...service, id: serviceId }],
});

const request: ServiceCallPayload = {
serviceId,
callId: 123,
encoding: "json",
data: new DataView(new Uint8Array([1, 2, 3]).buffer),
};

const serializedRequest = new Uint8Array([
ClientBinaryOpcode.SERVICE_CALL_REQUEST,
...uint32LE(serviceId),
...uint32LE(request.callId),
...uint32LE(request.encoding.length),
...new TextEncoder().encode(request.encoding),
...new Uint8Array(request.data.buffer, request.data.byteOffset, request.data.byteLength),
]);
send(serializedRequest);

const [eventId, receivedRequest, connection] = await nextEvent();
expect(eventId).toEqual("serviceCallRequest");
expect(receivedRequest).toEqual({ op: ClientBinaryOpcode.SERVICE_CALL_REQUEST, ...request });

const response: ServiceCallPayload = {
...request,
data: new DataView(new Uint8Array([4, 5, 6]).buffer),
};

server.sendServiceCallResponse(response, connection as IWebSocket);

await expect(nextBinaryMessage()).resolves.toEqual(
new Uint8Array([
BinaryOpcode.SERVICE_CALL_RESPONSE,
...uint32LE(response.serviceId),
...uint32LE(response.callId),
...uint32LE("json".length),
...new TextEncoder().encode("json"),
4,
5,
6,
]),
);
} catch (ex) {
close();
throw ex;
}
close();
});
});
Loading

0 comments on commit 9ebae06

Please sign in to comment.