From 8cac4733cd6745e23de70cdf036187058ae7d886 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=91=E8=BD=BB=E7=8B=82?= <1677568218@qq.com> Date: Fri, 1 Sep 2023 20:11:10 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=94=AF=E6=8C=81=E9=93=BE=E5=BC=8F?= =?UTF-8?q?=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../collaboration/pipe/__tests__/pipe.test.ts | 47 ++++++- package/collaboration/pipe/pipe.ts | 120 ++++++++++++++++-- 2 files changed, 158 insertions(+), 9 deletions(-) diff --git a/package/collaboration/pipe/__tests__/pipe.test.ts b/package/collaboration/pipe/__tests__/pipe.test.ts index c1a4660..7e95dab 100644 --- a/package/collaboration/pipe/__tests__/pipe.test.ts +++ b/package/collaboration/pipe/__tests__/pipe.test.ts @@ -1,4 +1,4 @@ -import { Pipe, Pipeline } from "@idealeap/gwt"; // 请替换成你的模块导入方式 +import { Pipe, Pipeline, SerializablePipelineOptions } from "@idealeap/gwt"; // 请替换成你的模块导入方式 test("Pipe", async () => { const pipe1 = new Pipe( @@ -33,3 +33,48 @@ test("Pipe", async () => { console.log("Final results:", results); }); }); + +test("Pipeline with JSON", async () => { + // 示例 + const jsonConfig: SerializablePipelineOptions = { + pipes: [{ id: "step1" }, { id: "step2", timeout: 1000 }], + }; + + const fnMap = { + step1: (input: string) => `${input}-step1`, + step2: (input: string) => `${input}-step2`, + }; + + const pipeline = Pipeline.fromJSON(jsonConfig, fnMap); + + // 执行 Pipeline + await pipeline.execute("我饿").then(console.log); +}); + +test("Pipeline with 链式调用", async () => { + // 示例代码 + // 示例 + const pipeline = Pipeline.create() + .addPipe( + Pipe.create((input: number) => input + 1, { + id: "step1", + }).setDescription("Increment by 1"), + ) + .addPipe( + Pipe.create((input: number) => input * 2, { + id: "step2", + }).setDescription("Multiply by 2"), + ) + .setOnProgress((completed, total) => { + console.log(`Progress: ${completed}/${total}`); + }); + + // 执行 + await pipeline.execute(1).then((result) => { + console.log("Final result:", result); + }); + + // 序列化为 JSON + const jsonConfig = JSON.stringify(pipeline.toJSON()); + console.log("Serialized config:", jsonConfig); +}); diff --git a/package/collaboration/pipe/pipe.ts b/package/collaboration/pipe/pipe.ts index e1328f3..c147213 100644 --- a/package/collaboration/pipe/pipe.ts +++ b/package/collaboration/pipe/pipe.ts @@ -45,6 +45,16 @@ export interface PipelineOptions { errProcess?: (error: any, context: PipelineContext) => MaybePromise; } +export type SerializablePipeOptions = Omit< + PipeOptions, + "preProcess" | "postProcess" | "errProcess" +>; + +export interface SerializablePipelineOptions + extends Omit { + pipes: SerializablePipeOptions[]; +} + const maybeAwait = async (input: MaybePromise) => await Promise.resolve(input); @@ -150,9 +160,54 @@ export class Pipe { } } } + + // 从一个 SerializablePipeOptions 对象创建一个 Pipe 实例 + static fromJSON( + json: SerializablePipeOptions, + callback: (input: T, context: PipelineContext) => MaybePromise, + ): Pipe { + if (!json.id) { + throw new Error("JSON configuration for Pipe must contain an 'id' field"); + } + // 这里你可以添加更多验证逻辑 + return new Pipe(callback, json as PipeOptions); + } + + // 新增一个 static 方法用于创建新实例,并支持链式调用 + static create( + callback: (input: T, context: PipelineContext) => MaybePromise, + options?: Partial>, + ): Pipe { + return new Pipe(callback, options as PipeOptions); + } + + setId(id: string): this { + this.options.id = id; + return this; + } + + setDescription(description: string): this { + this.options.description = description; + return this; + } + + setDependencies(deps: string[]): this { + this.options.dependencies = deps; + return this; + } + + enableBatching(): this { + this.options.batch = true; + return this; + } + + setRetries(retries: number): this { + this.options.retries = retries; + return this; + } } -// 主函数变成了一个类 +// 主函数 export class Pipeline { private pipes: Pipe[] = []; private options: PipelineOptions; @@ -162,14 +217,10 @@ export class Pipeline { this.options = options; } - // 添加 Pipe - addPipe(pipe: Pipe) { - this.pipes.push(pipe); - } - // 删除 Pipe - removePipe(id: string) { + removePipe(id: string): this { this.pipes = this.pipes.filter((pipe) => pipe.options.id !== id); + return this; } async execute(input: any): Promise | Map[]> { @@ -205,6 +256,59 @@ export class Pipeline { return context.stepResults; } + // 从一个 SerializablePipelineOptions 和函数映射创建一个 Pipeline + static fromJSON( + json: SerializablePipelineOptions, + fnMap: Record< + string, + (input: any, context: PipelineContext) => MaybePromise + >, + ): Pipeline { + if (!Array.isArray(json.pipes)) { + throw new Error("Invalid JSON configuration: 'pipes' must be an array"); + } + + const pipes = json.pipes.map((pipeJson: SerializablePipeOptions) => { + const fn = fnMap[pipeJson.id]; + if (!fn) { + throw new Error(`Function not found for id: ${pipeJson.id}`); + } + return Pipe.fromJSON(pipeJson, fn); + }); + + return new Pipeline(pipes, json); + } + + // 新增一个 static 方法用于创建新实例,并支持链式调用 + static create(options?: PipelineOptions): Pipeline { + return new Pipeline([], options); + } + + // 添加 Pipe 并返回 this,以支持链式调用 + addPipe(pipe: Pipe): this { + this.pipes.push(pipe); + return this; + } + + // 设置进度回调并返回 this,以支持链式调用 + setOnProgress(callback: (completed: number, total: number) => void): this { + this.options.onProgress = callback; + return this; + } + + // 序列化为 JSON 的方法 + toJSON(): SerializablePipelineOptions { + return { + pipes: this.pipes.map((pipe) => ({ + id: pipe.options.id, + description: pipe.options.description, + dependencies: pipe.options.dependencies, + retries: pipe.options.retries, + timeout: pipe.options.timeout, + batch: pipe.options.batch, + })), + }; + } } -// 请进一步完善该功能,例如。请给出完整的Ts代码和示例,任何代码都不要省略!!! +// 请进一步完善该功能,例如可以将每个 Pipe 的输出类型与下一个 Pipe 的输入类型进行匹配,以确保它们是兼容的。请给出完整的Ts代码和示例,即使没有变化,也千万不要省略之前的代码!