Skip to content

Commit

Permalink
feat: 支持链式调用
Browse files Browse the repository at this point in the history
  • Loading branch information
MarleneJiang committed Sep 1, 2023
1 parent faa5da9 commit 8cac473
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 9 deletions.
47 changes: 46 additions & 1 deletion package/collaboration/pipe/__tests__/pipe.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Pipe, Pipeline } from "@idealeap/gwt"; // 请替换成你的模块导入方式
import { Pipe, Pipeline, SerializablePipelineOptions } from "@idealeap/gwt"; // 请替换成你的模块导入方式

test("Pipe", async () => {
const pipe1 = new Pipe<number, number>(
Expand Down Expand Up @@ -33,3 +33,48 @@ test("Pipe", async () => {
console.log("Final results:", results);
});
});

test("Pipeline with JSON", async () => {
// 示例
const jsonConfig: SerializablePipelineOptions = {
pipes: [{ id: "step1" }, { id: "step2", timeout: 1000 }],
};

const fnMap = {
step1: (input: string) => `${input}-step1`,
step2: (input: string) => `${input}-step2`,
};

const pipeline = Pipeline.fromJSON(jsonConfig, fnMap);

// 执行 Pipeline
await pipeline.execute("我饿").then(console.log);
});

test("Pipeline with 链式调用", async () => {
// 示例代码
// 示例
const pipeline = Pipeline.create()
.addPipe(
Pipe.create((input: number) => input + 1, {
id: "step1",
}).setDescription("Increment by 1"),
)
.addPipe(
Pipe.create((input: number) => input * 2, {
id: "step2",
}).setDescription("Multiply by 2"),
)
.setOnProgress((completed, total) => {
console.log(`Progress: ${completed}/${total}`);
});

// 执行
await pipeline.execute(1).then((result) => {
console.log("Final result:", result);
});

// 序列化为 JSON
const jsonConfig = JSON.stringify(pipeline.toJSON());
console.log("Serialized config:", jsonConfig);
});
120 changes: 112 additions & 8 deletions package/collaboration/pipe/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ export interface PipelineOptions {
errProcess?: (error: any, context: PipelineContext) => MaybePromise<boolean>;
}

export type SerializablePipeOptions = Omit<
PipeOptions<any, any>,
"preProcess" | "postProcess" | "errProcess"
>;

export interface SerializablePipelineOptions
extends Omit<PipelineOptions, "emitter" | "errProcess" | "onProgress"> {
pipes: SerializablePipeOptions[];
}

const maybeAwait = async <T>(input: MaybePromise<T>) =>
await Promise.resolve(input);

Expand Down Expand Up @@ -150,9 +160,54 @@ export class Pipe<T, R> {
}
}
}

// 从一个 SerializablePipeOptions 对象创建一个 Pipe 实例
static fromJSON<T, R>(
json: SerializablePipeOptions,
callback: (input: T, context: PipelineContext) => MaybePromise<R>,
): Pipe<T, R> {
if (!json.id) {
throw new Error("JSON configuration for Pipe must contain an 'id' field");
}
// 这里你可以添加更多验证逻辑
return new Pipe(callback, json as PipeOptions<T, R>);
}

// 新增一个 static 方法用于创建新实例,并支持链式调用
static create<T, R>(
callback: (input: T, context: PipelineContext) => MaybePromise<R>,
options?: Partial<PipeOptions<T, R>>,
): Pipe<T, R> {
return new Pipe(callback, options as PipeOptions<T, R>);
}

setId(id: string): this {
this.options.id = id;
return this;
}

setDescription(description: string): this {
this.options.description = description;
return this;
}

setDependencies(deps: string[]): this {
this.options.dependencies = deps;
return this;
}

enableBatching(): this {
this.options.batch = true;
return this;
}

setRetries(retries: number): this {
this.options.retries = retries;
return this;
}
}

// 主函数变成了一个类
// 主函数
export class Pipeline {
private pipes: Pipe<any, any>[] = [];
private options: PipelineOptions;
Expand All @@ -162,14 +217,10 @@ export class Pipeline {
this.options = options;
}

// 添加 Pipe
addPipe(pipe: Pipe<any, any>) {
this.pipes.push(pipe);
}

// 删除 Pipe
removePipe(id: string) {
removePipe(id: string): this {
this.pipes = this.pipes.filter((pipe) => pipe.options.id !== id);
return this;
}

async execute(input: any): Promise<Map<string, any> | Map<string, any>[]> {
Expand Down Expand Up @@ -205,6 +256,59 @@ export class Pipeline {

return context.stepResults;
}
// 从一个 SerializablePipelineOptions 和函数映射创建一个 Pipeline
static fromJSON(
json: SerializablePipelineOptions,
fnMap: Record<
string,
(input: any, context: PipelineContext) => MaybePromise<any>
>,
): Pipeline {
if (!Array.isArray(json.pipes)) {
throw new Error("Invalid JSON configuration: 'pipes' must be an array");
}

const pipes = json.pipes.map((pipeJson: SerializablePipeOptions) => {
const fn = fnMap[pipeJson.id];
if (!fn) {
throw new Error(`Function not found for id: ${pipeJson.id}`);
}
return Pipe.fromJSON(pipeJson, fn);
});

return new Pipeline(pipes, json);
}

// 新增一个 static 方法用于创建新实例,并支持链式调用
static create(options?: PipelineOptions): Pipeline {
return new Pipeline([], options);
}

// 添加 Pipe 并返回 this,以支持链式调用
addPipe(pipe: Pipe<any, any>): this {
this.pipes.push(pipe);
return this;
}

// 设置进度回调并返回 this,以支持链式调用
setOnProgress(callback: (completed: number, total: number) => void): this {
this.options.onProgress = callback;
return this;
}

// 序列化为 JSON 的方法
toJSON(): SerializablePipelineOptions {
return {
pipes: this.pipes.map((pipe) => ({
id: pipe.options.id,
description: pipe.options.description,
dependencies: pipe.options.dependencies,
retries: pipe.options.retries,
timeout: pipe.options.timeout,
batch: pipe.options.batch,
})),
};
}
}

// 请进一步完善该功能,例如。请给出完整的Ts代码和示例,任何代码都不要省略!!
// 请进一步完善该功能,例如可以将每个 Pipe 的输出类型与下一个 Pipe 的输入类型进行匹配,以确保它们是兼容的。请给出完整的Ts代码和示例,即使没有变化,也千万不要省略之前的代码

0 comments on commit 8cac473

Please sign in to comment.