Skip to content

Commit

Permalink
feat: 优化异步操作、解耦性、资源销毁
Browse files Browse the repository at this point in the history
  • Loading branch information
MarleneJiang committed Sep 1, 2023
1 parent 8d717a1 commit 1898c1d
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 55 deletions.
36 changes: 32 additions & 4 deletions package/collaboration/pipe/__tests__/pipe.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import {
Pipe,
PipeOptions,
ChainPipeOptions,
EventEmitter,
chainPipes,
Pipeline,
PipelineOptions,
} from "@idealeap/gwt"; // 请替换成你的模块导入方式

test("Pipe", async () => {
Expand Down Expand Up @@ -39,7 +39,7 @@ test("Pipe", async () => {
});

// 创建ChainPipeOptions
const chainPipeOptions: ChainPipeOptions = {
const PipelineOptions: PipelineOptions = {
onProgress: (completed, total) => {
console.log(`Progress: ${completed}/${total}`);
},
Expand All @@ -49,7 +49,7 @@ test("Pipe", async () => {
// 执行管道
async function run() {
try {
const result = await chainPipes([pipe1, pipe2], 5, chainPipeOptions);
const result = await Pipeline([pipe1, pipe2], 5, PipelineOptions);
console.log("Final result:", result);
} catch (error) {
console.log("Error:", error);
Expand All @@ -58,3 +58,31 @@ test("Pipe", async () => {

await run();
});

test("PipeLine", async () => {
// 创建一个Pipe实例
const pipe1 = new Pipe<number, number>(
(input) => {
return input * 2;
},
{
id: "double",
},
);

// 创建另一个Pipe实例
const pipe2 = new Pipe<number, number>(
(input) => {
return input + 1;
},
{
id: "increment",
},
);

// 使用Pipeline函数来连接这些pipes
await Pipeline([pipe1, pipe2], 1).then((results) => {
console.log(results.get("double")); // 输出应该是2
console.log(results.get("increment")); // 输出应该是3
});
});
104 changes: 53 additions & 51 deletions package/collaboration/pipe/pipe.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// 本代码由GPT4生成,具体可见https://pandora.idealeap.cn/share/33072598-a95f-4188-9003-76ccc5d964cb

// 基础类型定义和接口
export type AsyncOrSync<T> = Promise<T> | T;
// 类型和接口定义
export type MaybePromise<T> = T | Promise<T>;

export class EventEmitter {
private events: Record<string, ((args?: any) => void)[]> = {};
private events: Record<string, ((...args: any[]) => void)[]> = {};

on(event: string, listener: (...args: any) => void) {
on(event: string, listener: (...args: any[]) => void) {
if (!this.events[event]) {
this.events[event] = [];
}
Expand All @@ -22,9 +22,10 @@ export interface PipeOptions<T, R> {
id: string;
description?: string;
dependencies?: string[];
preProcess?: (input: T, context: PipelineContext) => AsyncOrSync<T>;
postProcess?: (result: R, context: PipelineContext) => AsyncOrSync<R>;
preProcess?: (input: T, context: PipelineContext) => MaybePromise<T>;
postProcess?: (result: R, context: PipelineContext) => MaybePromise<R>;
onError?: (error: any, context: PipelineContext) => void;
onDestroy?: () => void;
}

export interface PipelineContext {
Expand All @@ -35,59 +36,56 @@ export interface PipelineContext {
export interface PipelineOptions {
onProgress?: (completed: number, total: number) => void;
emitter?: EventEmitter;
onDestroy?: () => void;
}

// 主逻辑
const maybeAwait = async <T>(input: MaybePromise<T>) =>
await Promise.resolve(input);

// Pipe类定义
export class Pipe<T, R> {
constructor(
private callback: (input: T, context: PipelineContext) => AsyncOrSync<R>,
private callback: (input: T, context: PipelineContext) => MaybePromise<R>,
public options: PipeOptions<T, R>,
) {}

async execute(
private async handlePreProcess(
input: T,
context: PipelineContext,
step: number,
totalSteps: number,
): Promise<T> {
return this.options.preProcess
? await maybeAwait(this.options.preProcess(input, context))
: input;
}

private async handlePostProcess(
result: R,
context: PipelineContext,
): Promise<R> {
try {
if (this.options.dependencies) {
for (const dep of this.options.dependencies) {
if (!context.stepResults.has(dep)) {
throw new Error(`Dependency ${dep} not found`);
}
return this.options.postProcess
? await maybeAwait(this.options.postProcess(result, context))
: result;
}

async execute(input: T, context: PipelineContext): Promise<R> {
if (this.options.dependencies) {
for (const dep of this.options.dependencies) {
if (!context.stepResults.has(dep)) {
throw new Error(`Dependency ${dep} not found`);
}
}

const preProcessedInput = this.options.preProcess
? await Promise.resolve(this.options.preProcess(input, context))
: input;

const result = await Promise.resolve(
this.callback(preProcessedInput, context),
);

const postProcessedResult = this.options.postProcess
? await Promise.resolve(this.options.postProcess(result, context))
: result;

context.stepResults.set(this.options.id, postProcessedResult);
context.emitter.emit(
"stepComplete",
step,
totalSteps,
postProcessedResult,
);

return postProcessedResult;
} catch (error) {
this.options.onError?.(error, context);
context.emitter.emit("error", error);
throw error;
}
const preProcessedInput = await this.handlePreProcess(input, context);
const result = await maybeAwait(this.callback(preProcessedInput, context));
const postProcessedResult = await this.handlePostProcess(result, context);

context.stepResults.set(this.options.id, postProcessedResult);

return postProcessedResult;
}
}

// 主函数
export async function Pipeline(
pipes: Pipe<any, any>[],
input: any,
Expand All @@ -99,19 +97,23 @@ export async function Pipeline(
emitter,
};

for (let i = 0; i < pipes.length; i++) {
try {
try {
for (let i = 0; i < pipes.length; i++) {
const pipe = pipes[i];
input = await pipe.execute(input, context, i + 1, pipes.length);
input = await pipe.execute(input, context);
emitter.emit("stepComplete", i + 1, pipes.length, input);
options.onProgress?.(i + 1, pipes.length);
} catch (error) {
throw new Error(
`Chaining failed at pipe ${pipes[i].options.id}: ${String(error)}`,
);
}
emitter.emit("pipelineComplete", context.stepResults);
} catch (error) {
emitter.emit("error", error);
throw error;
} finally {
pipes.forEach((pipe) => pipe.options.onDestroy?.());
options.onDestroy?.();
}

return context.stepResults;
}

//https://pandora.idealeap.cn/share/e8d8f417-d6b2-4096-84db-e72309ee21b3
// 请进一步完善该功能,例如。请给出完整的Ts代码和示例,任何代码都不要省略!!!

0 comments on commit 1898c1d

Please sign in to comment.