Skip to content

Commit

Permalink
initial functionality for a custom stream handler
Browse files Browse the repository at this point in the history
  • Loading branch information
OvidijusParsiunas committed Sep 12, 2023
1 parent d7c30fc commit bd971f1
Show file tree
Hide file tree
Showing 11 changed files with 1,246 additions and 1,128 deletions.
2,271 changes: 1,161 additions & 1,110 deletions component/custom-elements.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion component/src/services/serviceIO.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import {ValidateMessageBeforeSending} from '../types/validateMessageBeforeSendin
import {Messages} from '../views/chat/messages/messages';
import {InterfacesUnion} from '../types/utilityTypes';
import {FILE_TYPES} from '../types/fileTypes';
import {Request} from '../types/request';
import {StreamEvents} from '../types/handler';
import {Response} from '../types/response';
import {Request} from '../types/request';
import {DeepChat} from '../deepChat';
import {Demo} from '../types/demo';

Expand All @@ -23,6 +24,7 @@ export interface StreamHandlers {
onOpen: () => void;
onClose: () => void;
abortStream: AbortController;
stopClicked: StreamEvents['stopClicked']; // custom stream handler as can't listen to abort when user overwrites it
simulationInterim?: number;
}

Expand Down
14 changes: 14 additions & 0 deletions component/src/types/handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import {Response} from './response';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type Handler = (body: any) => Promise<Response>;

export interface StreamEvents {
onOpen: () => void;
onResult: (result: {text?: string; error?: string}) => void;
onClose: () => void;
stopClicked: {listener: () => void};
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type StreamHandler = (body: any, events: StreamEvents) => void;
5 changes: 3 additions & 2 deletions component/src/types/request.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import {Handler, StreamHandler} from './handler';
import {GenericObject} from './object';
import {Response} from './response';

export interface Request {
url?: string;
method?: string;
headers?: GenericObject<string>;
additionalBodyProps?: GenericObject<any>;
websocket?: boolean | string | string[];
handler?: (body: any) => Promise<Response>;
handler?: Handler;
streamHandler?: StreamHandler;
}
6 changes: 3 additions & 3 deletions component/src/utils/HTTP/HTTPRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ export class HTTPRequest {
public static async request(io: ServiceIO, body: object, messages: Messages, stringifyBody = true) {
const requestDetails: ResponseDetails = {body, headers: io.requestSettings?.headers};
const {body: interceptedBody, headers: interceptedHeaders, error} =
(await RequestUtils.processResponseInterceptor(io.deepChat, requestDetails));
(await RequestUtils.processRequestInterceptor(io.deepChat, requestDetails));
const {onFinish} = io.completionsHandlers;
if (error) return HTTPRequest.onInterceptorError(messages, error, onFinish);
// WORK - will enable this later on
// if (io.requestSettings?.handler) return CustomRequest.request(io, interceptedBody, messages);
// if (io.requestSettings?.handler) return CustomHandler.request(io, interceptedBody, messages);
if (io.requestSettings?.url === Demo.URL) return Demo.request(messages, onFinish, io.deepChat.responseInterceptor);
let responseValid = true;
fetch(io.requestSettings?.url || io.url || '', {
Expand Down Expand Up @@ -83,7 +83,7 @@ export class HTTPRequest {
public static async poll(io: ServiceIO, body: object, messages: Messages, stringifyBody = true) {
const requestDetails = {body, headers: io.requestSettings?.headers};
const {body: interceptedBody, headers, error} =
(await RequestUtils.processResponseInterceptor(io.deepChat, requestDetails));
(await RequestUtils.processRequestInterceptor(io.deepChat, requestDetails));
if (error) return HTTPRequest.onInterceptorError(messages, error);
const url = io.requestSettings?.url || io.url || '';
const method = io.requestSettings?.method || 'POST';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ import {ErrorMessages} from '../errorMessages/errorMessages';
import {Messages} from '../../views/chat/messages/messages';
import {RequestDetails} from '../../types/interceptors';
import {ServiceIO} from '../../services/serviceIO';
import {RequestUtils} from './requestUtils';
import {Response} from '../../types/response';
import {RequestUtils} from './requestUtils';
import {Stream} from './stream';

export class CustomRequest {
export class CustomHandler {
public static async request(io: ServiceIO, body: RequestDetails['body'], messages: Messages) {
io.requestSettings
.handler?.(body)
Expand All @@ -28,4 +28,40 @@ export class CustomRequest {
io.completionsHandlers.onFinish();
});
}

public static async stream(io: ServiceIO, body: RequestDetails['body'], messages: Messages) {
let isHandlerActive = true;
let isOpen = false;
let textElement: HTMLElement | null = null;
const onOpen = () => {
if (isOpen || !isHandlerActive) return;
textElement = messages.addNewStreamedMessage();
io.streamHandlers.onOpen();
isOpen = true;
};
const onClose = () => {
if (!isHandlerActive) return;
messages.finaliseStreamedMessage();
io.streamHandlers.onClose();
isHandlerActive = false;
};
const onResult = (result: {text?: string; error?: string}) => {
if (!isHandlerActive) return;
if (result.error) {
console.error(result.error);
messages.finaliseStreamedMessage();
io.streamHandlers.onClose();
messages.addNewErrorMessage('service', result.error);
isHandlerActive = false;
} else if (result.text && textElement) {
messages.updateStreamedMessage(result.text, textElement);
}
};
io.streamHandlers.abortStream.abort = () => {
messages.finaliseStreamedMessage();
io.streamHandlers.onClose();
isHandlerActive = false;
};
io.requestSettings.streamHandler?.(body, {onOpen, onResult, onClose, stopClicked: io.streamHandlers.stopClicked});
}
}
2 changes: 1 addition & 1 deletion component/src/utils/HTTP/requestUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class RequestUtils {
return response.blob();
}

public static async processResponseInterceptor(deepChat: DeepChat, requestDetails: RequestDetails): InterceptorResult {
public static async processRequestInterceptor(deepChat: DeepChat, requestDetails: RequestDetails): InterceptorResult {
const result = (await deepChat.requestInterceptor?.(requestDetails)) || requestDetails;
const resReqDetails = result as RequestDetails;
const resErrDetails = result as {error?: string};
Expand Down
8 changes: 5 additions & 3 deletions component/src/utils/HTTP/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import {ServiceIO, StreamHandlers} from '../../services/serviceIO';
import {OpenAIConverseResult} from '../../types/openAIResult';
import {ErrorMessages} from '../errorMessages/errorMessages';
import {Messages} from '../../views/chat/messages/messages';
import {Response as DResponse} from '../../types/response';
import {RequestUtils} from './requestUtils';
import {Response} from '../../types/response';
import {Demo} from '../demo/demo';

type SimulationSH = Omit<StreamHandlers, 'abortStream'> & {abortStream: {abort: () => void}};
Expand All @@ -14,9 +14,11 @@ export class Stream {
public static async request(io: ServiceIO, body: object, messages: Messages, stringifyBody = true) {
const requestDetails = {body, headers: io.requestSettings?.headers};
const {body: interceptedBody, headers: interceptedHeaders, error} =
(await RequestUtils.processResponseInterceptor(io.deepChat, requestDetails));
(await RequestUtils.processRequestInterceptor(io.deepChat, requestDetails));
const {onOpen, onClose, abortStream} = io.streamHandlers;
if (error) return Stream.onInterceptorError(messages, error, onClose);
// WORK - will enable this later on
// if (io.requestSettings?.streamHandler) return CustomHandler.stream(io, interceptedBody, messages);
if (io.requestSettings?.url === Demo.URL) return Demo.requestStream(messages, io.streamHandlers);
let textElement: HTMLElement | null = null;
fetchEventSource(io.requestSettings?.url || io.url || '', {
Expand All @@ -36,7 +38,7 @@ export class Stream {
if (JSON.stringify(message.data) !== JSON.stringify('[DONE]')) {
const response = JSON.parse(message.data) as unknown as OpenAIConverseResult;
io.extractResultData?.(response)
.then((textBody?: Response) => {
.then((textBody?: DResponse) => {
if (textBody?.text === undefined) {
// strategy - do not to stop the stream on one message failure to give other messages a change to display
console.error(`Response data: ${message.data} \n ${ErrorMessages.INVALID_STREAM_RESPONSE}`);
Expand Down
2 changes: 1 addition & 1 deletion component/src/utils/HTTP/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export class Websocket {

public static async sendWebsocket(ws: WebSocket, io: ServiceIO, body: object, messages: Messages, stringifyBody = true) {
const requestDetails = {body, headers: io.requestSettings?.headers};
const {body: interceptedBody, error} = await RequestUtils.processResponseInterceptor(io.deepChat, requestDetails);
const {body: interceptedBody, error} = await RequestUtils.processRequestInterceptor(io.deepChat, requestDetails);
if (error) return messages.addNewErrorMessage('service', error);
const processedBody = stringifyBody ? JSON.stringify(interceptedBody) : interceptedBody;
if (io.requestSettings?.url === Demo.URL) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {SUBMIT_ICON_STRING} from '../../../../../icons/submitIcon';
import {SVGIconUtils} from '../../../../../utils/svg/svgIconUtils';
import {SubmitButtonStateStyle} from './submitButtonStateStyle';
import {ServiceIO} from '../../../../../services/serviceIO';
import {StreamEvents} from '../../../../../types/handler';
import {TextInputEl} from '../../textInput/textInput';
import {Messages} from '../../../messages/messages';
import {DeepChat} from '../../../../../deepChat';
Expand All @@ -20,6 +21,7 @@ export class SubmitButton extends InputButton<Styles> {
private readonly _messages: Messages;
private readonly _inputElementRef: HTMLElement;
private readonly _abortStream: AbortController;
private readonly _stopClicked: StreamEvents['stopClicked'];
private readonly _innerElements: DefinedButtonInnerElements<Styles>;
private readonly _fileAttachments: FileAttachments;
private _isSVGLoadingIconOverriden = false;
Expand All @@ -33,6 +35,7 @@ export class SubmitButton extends InputButton<Styles> {
this._fileAttachments = fileAttachments;
this._innerElements = this.createInnerElements();
this._abortStream = new AbortController();
this._stopClicked = {listener: () => {}};
this._serviceIO = serviceIO;
this.attemptOverwriteLoadingStyle(deepChat);
this.changeToSubmitIcon();
Expand Down Expand Up @@ -98,6 +101,7 @@ export class SubmitButton extends InputButton<Styles> {
onOpen: this.changeToStopIcon.bind(this),
onClose: this.changeToSubmitIcon.bind(this),
abortStream: this._abortStream,
stopClicked: this._stopClicked,
};
const {stream} = this._serviceIO.deepChat;
if (typeof stream === 'object' && typeof stream.simulation === 'number') {
Expand Down Expand Up @@ -145,9 +149,10 @@ export class SubmitButton extends InputButton<Styles> {
if (!programmatic) this._fileAttachments?.removeAllFiles();
}

// This will not stop the stream on the server side
private stopStream() {
// This will not stop the stream on the server side
this._abortStream.abort();
this._stopClicked.listener();
this.changeToSubmitIcon();
}

Expand Down
15 changes: 11 additions & 4 deletions component/src/views/chat/messages/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,20 @@ export class Messages {
return undefined;
}

private getLastMessageElement() {
return this.elementRef.children[this.elementRef.children.length - 1];
}

private getLastMessageBubbleElement() {
return this.getLastMessageElement()?.children?.[0]?.children?.[0];
}

public isLastMessageError() {
return this.elementRef.children[this.elementRef.children.length - 1]?.children?.[0]?.children?.[0]?.classList.contains(
'error-message-text'
);
return this.getLastMessageBubbleElement()?.classList.contains('error-message-text');
}

public removeError() {
if (this.isLastMessageError()) this.elementRef.children[this.elementRef.children.length - 1].remove();
if (this.isLastMessageError()) this.getLastMessageElement().remove();
}

public addLoadingMessage() {
Expand Down Expand Up @@ -318,6 +324,7 @@ export class Messages {
}

public finaliseStreamedMessage() {
if (!this.getLastMessageBubbleElement().classList.contains('streamed-message')) return;
this.messages[this.messages.length - 1].text = this._streamedText;
this.sendClientUpdate(Messages.createMessageContent(true, this._streamedText), false);
if (this._textToSpeech) TextToSpeech.speak(this._streamedText, this._textToSpeech);
Expand Down

0 comments on commit bd971f1

Please sign in to comment.