Skip to content

Commit

Permalink
Merge pull request #23975 Implement sibling protocol for Typescript.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Dec 29, 2022
2 parents 598324a + ea6cf40 commit c5c5b35
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
11 changes: 10 additions & 1 deletion sdks/typescript/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,14 @@ func main() {
args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl())
}

log.Fatalf("User program exited: %v", execx.Execute("npx", args...))
workerIds := append([]string{*workerId}, info.GetSiblingWorkerIds()...)
var wg sync.WaitGroup
wg.Add(len(workerIds))
for _, workerId := range workerIds {
go func(workerId string) {
log.Printf("Executing: python %v", strings.Join(args, " "))
log.Fatalf("User program exited: %v", execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "npx", args...))
}(workerId)
}
wg.Wait()
}
8 changes: 6 additions & 2 deletions sdks/typescript/src/apache_beam/internal/environments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ import * as runnerApi from "../proto/beam_runner_api";
export const TYPESCRIPT_DEFAULT_ENVIRONMENT_URN = "js_default";

function javascriptCapabilities(): string[] {
// XXX This is needed for sessions to work...
return ["beam:coder:interval_window:v1"]; // TODO: Cleanup. Actually populate.
// TODO: Cleanup. Actually populate.
return [
// This is needed for sessions to work...
"beam:coder:interval_window:v1",
"beam:protocol:sibling_workers:v1",
];
}

export function defaultJsEnvironment() {
Expand Down
4 changes: 3 additions & 1 deletion sdks/typescript/src/apache_beam/runners/dataflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ export function dataflowRunner(runnerOptions: {
pipeline: Pipeline,
options: Object = {}
): Promise<PipelineResult> {
var augmentedOptions = { experiments: [] as string[], ...options };
augmentedOptions.experiments.push("use_sibling_sdk_workers");
return new PortableRunner(
runnerOptions as any,
PythonService.forModule(
"apache_beam.runners.dataflow.dataflow_job_service",
["--port", "{{PORT}}"]
)
).runPipeline(pipeline, options);
).runPipeline(pipeline, augmentedOptions);
}
})();
}

0 comments on commit c5c5b35

Please sign in to comment.