Skip to content

Commit

Permalink
Finish Water broker system rewrite
Browse files Browse the repository at this point in the history
Hopefully. Now only missing the RabbitConnection.
  • Loading branch information
wasdennnoch committed Jul 16, 2024
1 parent 3d757dd commit ed1dcf6
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 38 deletions.
28 changes: 17 additions & 11 deletions water/src/broker/BrokerClient.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { BaseIssue, BaseSchema, InferOutput } from "valibot";
import { Logger } from "../logging/Logger.js";
import type { BrokerConnection, TopicListener } from "./BrokerConnection.js";
import type { BrokerMessage } from "./BrokerMessage.js";
Expand All @@ -15,13 +16,13 @@ export abstract class BrokerClient {

public constructor(public readonly connection: BrokerConnection) { }

public consumer<T>(
public consumer<TSchema extends BaseSchema<unknown, unknown, BaseIssue<unknown>>>(
topic: string,
key: string,
schema: T, // TODO
schema: TSchema,
options: BrokerClientOptions = new BrokerClientOptions(),
callback: (msg: BrokerMessage<T>) => Promise<void>,
): ConsumerSubclient<T> {
callback: (msg: BrokerMessage<InferOutput<TSchema>>) => Promise<void>,
): ConsumerSubclient<TSchema> {
Logger.debug(BrokerClient.TAG, `Creating consumer for key '${key}' in topic '${topic}'`);
const client = new ConsumerSubclient(this.connection, this, topic, key, options, schema, callback);
this.registerSubclient(client);
Expand All @@ -31,23 +32,28 @@ export abstract class BrokerClient {
public producer<T>(
topic: string,
key: string,
schema: T, // TODO
options: BrokerClientOptions = new BrokerClientOptions(),
): ProducerSubclient<T> {
Logger.debug(BrokerClient.TAG, `Creating producer for key '${key}' in topic '${topic}'`);
const client = new ProducerSubclient(this.connection, this, topic, key, options, schema);
const client = new ProducerSubclient(this.connection, this, topic, key, options);
this.registerSubclient(client);
return client;
}

public rpc<RequestT, ResponseT>(
public rpc<
RequestTSchema extends BaseSchema<unknown, unknown, BaseIssue<unknown>>,
ResponseTSchema extends BaseSchema<unknown, unknown, BaseIssue<unknown>>,
>(
topic: string,
key: string,
requestSchema: RequestT, // TODO
responseSchema: ResponseT, // TODO
requestSchema: RequestTSchema,
responseSchema: ResponseTSchema,
options: BrokerClientOptions = new BrokerClientOptions(),
callback: (msg: RpcRequestMessage<RequestT, ResponseT>) => Promise<RpcResponse<ResponseT>>,
): RpcClient<RequestT, ResponseT> {
callback: (msg: RpcRequestMessage<
InferOutput<RequestTSchema>,
InferOutput<ResponseTSchema>
>) => Promise<RpcResponse<ResponseTSchema>>,
): RpcClient<RequestTSchema, ResponseTSchema> {
return new RpcClient(
this,
topic,
Expand Down
14 changes: 7 additions & 7 deletions water/src/broker/Subclients.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { BaseIssue, BaseSchema, InferOutput } from "valibot";
import { parse } from "valibot";
import { Logger } from "../logging/Logger.js";
import type { BrokerClient } from "./BrokerClient.js";
import type { BrokerConnection, MessageId } from "./BrokerConnection.js";
Expand Down Expand Up @@ -40,7 +42,6 @@ export class ProducerSubclient<T> extends BaseSubclient {
topic: string,
key: string,
options: BrokerClientOptions,
private readonly schema: T, // TODO eeeeeh
) {
super(connection, client, topic, key, options);
}
Expand Down Expand Up @@ -80,7 +81,7 @@ export class ProducerSubclient<T> extends BaseSubclient {

}

export class ConsumerSubclient<T> extends BaseSubclient {
export class ConsumerSubclient<TSchema extends BaseSchema<unknown, unknown, BaseIssue<unknown>>> extends BaseSubclient {

private static readonly TAG = "ConsumerSubclient";

Expand All @@ -90,8 +91,8 @@ export class ConsumerSubclient<T> extends BaseSubclient {
topic: string,
key: string,
options: BrokerClientOptions,
private readonly schema: T, // TODO eeeeeh
private readonly callback: (msg: BrokerMessage<T>) => Promise<void>,
private readonly schema: TSchema,
private readonly callback: (msg: BrokerMessage<InferOutput<TSchema>>) => Promise<void>,
) {
super(connection, client, topic, key, options);
}
Expand Down Expand Up @@ -121,9 +122,8 @@ export class ConsumerSubclient<T> extends BaseSubclient {
}
}

private parseIncoming(data: string): T {
// TODO schema validation
return JSON.parse(data);
private parseIncoming(data: string): InferOutput<TSchema> {
return parse(this.schema, JSON.parse(data));
}

}
95 changes: 78 additions & 17 deletions water/src/broker/rpc/RpcClient.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,47 @@
import type { BaseIssue, BaseSchema, InferOutput } from "valibot";
import { Logger } from "../../logging/Logger.js";
import type { BrokerClient } from "../BrokerClient.js";
import { IgnoreRpcRequest, RpcException } from "../Exceptions.js";
import { IgnoreRpcRequest, RpcException, RpcRequestTimeout } from "../Exceptions.js";
import type { BrokerClientOptions, ConsumerSubclient, ProducerSubclient } from "../Subclients.js";
import { BaseSubclient } from "../Subclients.js";
import { OnlineEmitter } from "../../util/OnlineEmitter.js";
import type { BrokerMessage } from "../BrokerMessage.js";
import { CountDownLatch } from "../../util/CountDownLatch.js";
import type { RpcRequestMessage, RpcResponse } from "./RpcMessage.js";
import { RpcResponseMessage } from "./RpcMessage.js";
import { RpcMessageHeaders } from "./RpcMessageHeaders.js";
import type { RpcStatus } from "./RpcStatus.js";

export class RpcClient<RequestT, ResponseT> extends BaseSubclient {
export class RpcClient<
RequestTSchema extends BaseSchema<unknown, unknown, BaseIssue<unknown>>,
ResponseTSchema extends BaseSchema<unknown, unknown, BaseIssue<unknown>>,
> extends BaseSubclient {

private static readonly TAG = "RpcClient";

private readonly requestProducer: ProducerSubclient<RequestT>;
private readonly requestConsumer: ConsumerSubclient<RequestT>;
private readonly responseProducer: ProducerSubclient<ResponseT>;
private readonly responseConsumer: ConsumerSubclient<ResponseT>;
private readonly requestProducer: ProducerSubclient<InferOutput<RequestTSchema>>;
private readonly requestConsumer: ConsumerSubclient<RequestTSchema>;
private readonly responseProducer: ProducerSubclient<InferOutput<ResponseTSchema>>;
private readonly responseConsumer: ConsumerSubclient<ResponseTSchema>;
private readonly responses: OnlineEmitter<BrokerMessage<InferOutput<ResponseTSchema>>> = new OnlineEmitter();

public constructor(
client: BrokerClient,
topic: string,
key: string,
options: BrokerClientOptions,
private readonly requestSchema: RequestT, // TODO eeeeeh
private readonly responseSchema: ResponseT, // TODO eeeeeh
private readonly callback: (msg: RpcRequestMessage<RequestT, ResponseT>) => Promise<RpcResponse<ResponseT>>,
requestSchema: RequestTSchema,
responseSchema: ResponseTSchema,
private readonly callback: (msg: RpcRequestMessage<
InferOutput<RequestTSchema>,
InferOutput<ResponseTSchema>
>) => Promise<RpcResponse<ResponseTSchema>>,
) {
super(client.connection, client, topic, key, options);

this.requestProducer = this.client.producer(
topic,
key,
requestSchema,
options,
);
this.requestConsumer = this.client.consumer(
Expand All @@ -41,7 +51,7 @@ export class RpcClient<RequestT, ResponseT> extends BaseSubclient {
options,
async msg => {
const sendResponse = async (
response: ResponseT | null,
response: InferOutput<ResponseTSchema> | null,
status: RpcStatus,
isException: boolean,
isUpdate: boolean,
Expand All @@ -63,7 +73,7 @@ export class RpcClient<RequestT, ResponseT> extends BaseSubclient {
await this.responseProducer.internalSend(responseMsg);
};

const rpcMessage = msg.toRpcRequestMessage<ResponseT>(async (status, data) => {
const rpcMessage = msg.toRpcRequestMessage<InferOutput<ResponseTSchema>>(async (status, data) => {
await sendResponse(data, status, false, true);
});
try {
Expand All @@ -84,7 +94,6 @@ export class RpcClient<RequestT, ResponseT> extends BaseSubclient {
this.responseProducer = this.client.producer(
this.toResponseTopic(topic),
this.toResponseKey(key),
responseSchema,
options,
);
this.responseConsumer = this.client.consumer(
Expand All @@ -93,18 +102,70 @@ export class RpcClient<RequestT, ResponseT> extends BaseSubclient {
responseSchema,
options,
async msg => {
// TODO
this.responses.emit(msg);
},
);
}

public async call(
request: RequestT,
request: InferOutput<RequestTSchema>,
services: Set<string> = new Set(),
instances: Set<string> = new Set(),
timeout: number = 10 * 1000,
): Promise<RpcResponseMessage<InferOutput<ResponseTSchema>>> {
const generator = this.stream(request, services, instances, timeout, 1);
const msg = (await generator.next()).value;
if (!msg) {
throw new Error("Unexpected end of single-response stream");
}
return msg;
}

public async *stream(
request: InferOutput<RequestTSchema>,
services: Set<string> = new Set(),
instances: Set<string> = new Set(),
timeout: number = 10 * 1000,
): Promise<RpcResponseMessage<ResponseT>> {
return this.stream(request, services, instances, timeout, 1);
maxResponses: number = Infinity,
): AsyncGenerator<RpcResponseMessage<InferOutput<ResponseTSchema>>, void> {
if (timeout === Infinity && maxResponses === Infinity) {
throw new Error("Must specify either a timeout or a max number of responses");
}
if (maxResponses !== Infinity && maxResponses <= 0) {
throw new Error("maxResponses must be at least 1");
}
let responseCounter = 0;
const timeoutLatch = maxResponses ? new CountDownLatch(maxResponses) : null;
const timeoutPromise = timeoutLatch?.await(timeout) ?? new Promise(() => { });

const messageId = await this.requestProducer.send(request, services, instances);

while (true) {
const result = await Promise.race([
this.responses.awaitValue(),
timeoutPromise,
]);
if (typeof result === "boolean") {
if (result) {
return;
} else {
throw new RpcRequestTimeout(`RPC request timed out after ${timeout} ms`);
}
}
const msg = result.toRpcResponseMessage();
if (msg.headers.inReplyTo !== messageId) {
return;
}
if (msg.headers.isException) {
throw new RpcException(msg.headers.status);
}
yield msg;
timeoutLatch?.countDown();
responseCounter++;
if (responseCounter >= maxResponses) {
return;
}
}
}

protected override doDestroy(): void {
Expand Down
12 changes: 9 additions & 3 deletions water/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
export * from "./CommonConfig.js";

export * from "./broker/BrokerClient.js";
export * from "./broker/BrokerMessage.js";
export * from "./broker/BrokerConnection.js";
export * from "./broker/BrokerMessage.js";
export * from "./broker/BrokerMessageHeaders.js";
export * from "./broker/kafka/KafkaConnection.js";
export * from "./broker/kafka/KafkaMessageHeaders.js";
export * from "./broker/Exceptions.js";
export * from "./broker/LocalConnection.js";
export * from "./broker/Subclients.js";
export * from "./broker/rpc/RpcClient.js";
export * from "./broker/rpc/RpcMessage.js";
export * from "./broker/rpc/RpcMessageHeaders.js";
export * from "./broker/rpc/RpcStatus.js";

export * from "./logging/Logger.js";

export * from "./util/CountDownLatch.js";
export * from "./util/OnlineEmitter.js";
43 changes: 43 additions & 0 deletions water/src/util/OnlineEmitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
export class OnlineEmitter<T> {

private readonly listeners: Set<(data: T) => void> = new Set();

public emit(data: T): void {
for (const listener of this.listeners) {
listener(data);
}
}

public addListener(listener: (data: T) => void): void {
this.listeners.add(listener);
}


public removeListener(listener: (data: T) => void): void {
this.listeners.delete(listener);
}

public async awaitValue(): Promise<T> {
return new Promise(resolve => {
const listener = (data: T) => {
this.removeListener(listener);
resolve(data);
};
this.addListener(listener);
});
}

public [Symbol.asyncIterator](): AsyncIterableIterator<T> {
const awaitValue = this.awaitValue.bind(this);
return {
async next(): Promise<IteratorResult<T>> {
const value = await awaitValue();
return { value, done: false };
},
[Symbol.asyncIterator](): AsyncIterableIterator<T> {
return this;
},
};
}

}

0 comments on commit ed1dcf6

Please sign in to comment.