Skip to content

Commit

Permalink
feat: init pipe func
Browse files Browse the repository at this point in the history
  • Loading branch information
MarleneJiang committed Aug 31, 2023
1 parent df33d7a commit 801b0f9
Showing 1 changed file with 59 additions and 52 deletions.
111 changes: 59 additions & 52 deletions package/collaboration/pipe/pipe.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,45 @@
type AsyncOrSync<T> = Promise<T> | T;

// 使用一个简单的事件广播器
class EventEmitter {
private events: Record<string, any[]> = {};

on(event: string, listener: any) {
if (!this.events[event]) {
this.events[event] = [];
}
this.events[event].push(listener);
}

emit(event: string, ...args: any[]) {
this.events[event]?.forEach((listener) => listener(...args));
}
}

interface PipeOptions<T, R> {
id: string;
description: string;
dependencies?: string[];
onData?: (input: T, context: ChainPipeContext) => AsyncOrSync<T>;
onResult?: (result: R) => AsyncOrSync<R>;
onError?: (error: any) => void;
onStepComplete?: (
result: R,
step: number,
totalSteps: number,
allResults: Map<string, any>,
) => void;
}

interface ChainPipeContext {
stepResults: Map<string, any>;
emitter: EventEmitter;
}

interface ChainPipeOptions {
onProgress?: (completed: number, total: number) => void;
}

class Pipe<T, R> {
class Pipe<T, R> extends EventEmitter {
constructor(
private callback: (input: T, context: ChainPipeContext) => AsyncOrSync<R>,
public options: PipeOptions<T, R>,
) {}
) {
super();
}

async execute(
input: T,
Expand All @@ -47,46 +59,38 @@ class Pipe<T, R> {
const processedInput = this.options.onData
? await Promise.resolve(this.options.onData(input, context))
: input;

const result = await Promise.resolve(
this.callback(processedInput, context),
);
const finalResult = this.options.onResult
? await Promise.resolve(this.options.onResult(result))
: result;

context.stepResults.set(this.options.id, finalResult);
this.options.onStepComplete?.(
finalResult,
context.stepResults.set(this.options.id, result);
context.emitter.emit(
"stepComplete",
result,
step,
totalSteps,
context.stepResults,
);

return finalResult;
return result;
} catch (error) {
this.options.onError?.(error);
context.emitter.emit("error", error);
throw new Error(`Pipe ${this.options.id} failed: ${String(error)}`);
}
}

static fromConfig<T, R>(
config: PipeOptions<T, R>,
callback: (input: T, context: ChainPipeContext) => AsyncOrSync<R>,
): Pipe<T, R> {
return new Pipe(callback, config);
}

toConfig(): PipeOptions<T, R> {
return this.options;
}
}

async function chainPipes(
pipes: Pipe<any, any>[],
input: any,
options?: ChainPipeOptions,
): Promise<any> {
const context: ChainPipeContext = { stepResults: new Map() };
const context: ChainPipeContext = {
stepResults: new Map(),
emitter: new EventEmitter(),
};
const total = pipes.length;

for (let i = 0; i < total; i++) {
Expand All @@ -104,34 +108,37 @@ async function chainPipes(
return context.stepResults;
}

// Example usage
const pipe1 = Pipe.fromConfig(
{ id: "double", description: "Doubles the input" },
(input: number) => input * 2,
const pipe1 = new Pipe<number, number>(
(input) => {
return input + 1;
},
{ id: "pipe1", description: "Increment" },
);

const pipe2 = Pipe.fromConfig(
{
id: "toString",
description: "Converts to string",
dependencies: ["double"],
const pipe2 = new Pipe<number, number>(
(input) => {
return input * 2;
},
(input: number, context) =>
`Number: ${context.stepResults.get("double") || input}`,
{ id: "pipe2", description: "Multiply by 2" },
);

const pipe3 = Pipe.fromConfig(
{ id: "toUpperCase", description: "To uppercase" },
(input: string) => input.toUpperCase(),
);
// 订阅事件
pipe1.on("stepComplete", (result: any, step: any, totalSteps: any) => {
console.log(`Step ${step}/${totalSteps} completed with result ${result}`);
});

pipe2.on("stepComplete", (result: any, step: any, totalSteps: any) => {
console.log(`Step ${step}/${totalSteps} completed with result ${result}`);
});

chainPipes([pipe1, pipe2, pipe3], 1, {
onProgress: (completed, total) =>
console.log(`Progress: ${completed}/${total}`),
})
.then((results) => {
console.log("All pipes executed successfully", results);
})
.catch((error) => {
console.error("An error occurred:", error);
const run = async () => {
const results = await chainPipes([pipe1, pipe2], 1, {
onProgress: (completed, total) => {
console.log(`${completed}/${total} steps completed.`);
},
});

console.log("All pipes completed:", results);
};

run().catch((error) => console.error(error));

0 comments on commit 801b0f9

Please sign in to comment.