diff --git a/package/collaboration/pipe/pipe.ts b/package/collaboration/pipe/pipe.ts index bf7ce3d..e97ad7b 100644 --- a/package/collaboration/pipe/pipe.ts +++ b/package/collaboration/pipe/pipe.ts @@ -1,33 +1,45 @@ type AsyncOrSync = Promise | T; +// 使用一个简单的事件广播器 +class EventEmitter { + private events: Record = {}; + + on(event: string, listener: any) { + if (!this.events[event]) { + this.events[event] = []; + } + this.events[event].push(listener); + } + + emit(event: string, ...args: any[]) { + this.events[event]?.forEach((listener) => listener(...args)); + } +} + interface PipeOptions { id: string; description: string; dependencies?: string[]; onData?: (input: T, context: ChainPipeContext) => AsyncOrSync; - onResult?: (result: R) => AsyncOrSync; onError?: (error: any) => void; - onStepComplete?: ( - result: R, - step: number, - totalSteps: number, - allResults: Map, - ) => void; } interface ChainPipeContext { stepResults: Map; + emitter: EventEmitter; } interface ChainPipeOptions { onProgress?: (completed: number, total: number) => void; } -class Pipe { +class Pipe extends EventEmitter { constructor( private callback: (input: T, context: ChainPipeContext) => AsyncOrSync, public options: PipeOptions, - ) {} + ) { + super(); + } async execute( input: T, @@ -47,38 +59,27 @@ class Pipe { const processedInput = this.options.onData ? await Promise.resolve(this.options.onData(input, context)) : input; + const result = await Promise.resolve( this.callback(processedInput, context), ); - const finalResult = this.options.onResult - ? await Promise.resolve(this.options.onResult(result)) - : result; - context.stepResults.set(this.options.id, finalResult); - this.options.onStepComplete?.( - finalResult, + context.stepResults.set(this.options.id, result); + context.emitter.emit( + "stepComplete", + result, step, totalSteps, context.stepResults, ); - return finalResult; + return result; } catch (error) { this.options.onError?.(error); + context.emitter.emit("error", error); throw new Error(`Pipe ${this.options.id} failed: ${String(error)}`); } } - - static fromConfig( - config: PipeOptions, - callback: (input: T, context: ChainPipeContext) => AsyncOrSync, - ): Pipe { - return new Pipe(callback, config); - } - - toConfig(): PipeOptions { - return this.options; - } } async function chainPipes( @@ -86,7 +87,10 @@ async function chainPipes( input: any, options?: ChainPipeOptions, ): Promise { - const context: ChainPipeContext = { stepResults: new Map() }; + const context: ChainPipeContext = { + stepResults: new Map(), + emitter: new EventEmitter(), + }; const total = pipes.length; for (let i = 0; i < total; i++) { @@ -104,34 +108,37 @@ async function chainPipes( return context.stepResults; } -// Example usage -const pipe1 = Pipe.fromConfig( - { id: "double", description: "Doubles the input" }, - (input: number) => input * 2, +const pipe1 = new Pipe( + (input) => { + return input + 1; + }, + { id: "pipe1", description: "Increment" }, ); -const pipe2 = Pipe.fromConfig( - { - id: "toString", - description: "Converts to string", - dependencies: ["double"], +const pipe2 = new Pipe( + (input) => { + return input * 2; }, - (input: number, context) => - `Number: ${context.stepResults.get("double") || input}`, + { id: "pipe2", description: "Multiply by 2" }, ); -const pipe3 = Pipe.fromConfig( - { id: "toUpperCase", description: "To uppercase" }, - (input: string) => input.toUpperCase(), -); +// 订阅事件 +pipe1.on("stepComplete", (result: any, step: any, totalSteps: any) => { + console.log(`Step ${step}/${totalSteps} completed with result ${result}`); +}); + +pipe2.on("stepComplete", (result: any, step: any, totalSteps: any) => { + console.log(`Step ${step}/${totalSteps} completed with result ${result}`); +}); -chainPipes([pipe1, pipe2, pipe3], 1, { - onProgress: (completed, total) => - console.log(`Progress: ${completed}/${total}`), -}) - .then((results) => { - console.log("All pipes executed successfully", results); - }) - .catch((error) => { - console.error("An error occurred:", error); +const run = async () => { + const results = await chainPipes([pipe1, pipe2], 1, { + onProgress: (completed, total) => { + console.log(`${completed}/${total} steps completed.`); + }, }); + + console.log("All pipes completed:", results); +}; + +run().catch((error) => console.error(error));