diff --git a/sdks/typescript/boot.go b/sdks/typescript/boot.go index 7bace0b1b716..60a39b770d00 100644 --- a/sdks/typescript/boot.go +++ b/sdks/typescript/boot.go @@ -169,5 +169,14 @@ func main() { args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl()) } - log.Fatalf("User program exited: %v", execx.Execute("npx", args...)) + workerIds := append([]string{*workerId}, info.GetSiblingWorkerIds()...) + var wg sync.WaitGroup + wg.Add(len(workerIds)) + for _, workerId := range workerIds { + go func(workerId string) { + log.Printf("Executing: python %v", strings.Join(args, " ")) + log.Fatalf("User program exited: %v", execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "npx", args...)) + }(workerId) + } + wg.Wait() } diff --git a/sdks/typescript/src/apache_beam/internal/environments.ts b/sdks/typescript/src/apache_beam/internal/environments.ts index 4ffc45553972..4e45800839ea 100644 --- a/sdks/typescript/src/apache_beam/internal/environments.ts +++ b/sdks/typescript/src/apache_beam/internal/environments.ts @@ -21,8 +21,12 @@ import * as runnerApi from "../proto/beam_runner_api"; export const TYPESCRIPT_DEFAULT_ENVIRONMENT_URN = "js_default"; function javascriptCapabilities(): string[] { - // XXX This is needed for sessions to work... - return ["beam:coder:interval_window:v1"]; // TODO: Cleanup. Actually populate. + // TODO: Cleanup. Actually populate. + return [ + // This is needed for sessions to work... + "beam:coder:interval_window:v1", + "beam:protocol:sibling_workers:v1", + ]; } export function defaultJsEnvironment() { diff --git a/sdks/typescript/src/apache_beam/runners/dataflow.ts b/sdks/typescript/src/apache_beam/runners/dataflow.ts index 1bdd250cb1e6..950e630d82d9 100644 --- a/sdks/typescript/src/apache_beam/runners/dataflow.ts +++ b/sdks/typescript/src/apache_beam/runners/dataflow.ts @@ -32,13 +32,15 @@ export function dataflowRunner(runnerOptions: { pipeline: Pipeline, options: Object = {} ): Promise { + var augmentedOptions = { experiments: [] as string[], ...options }; + augmentedOptions.experiments.push("use_sibling_sdk_workers"); return new PortableRunner( runnerOptions as any, PythonService.forModule( "apache_beam.runners.dataflow.dataflow_job_service", ["--port", "{{PORT}}"] ) - ).runPipeline(pipeline, options); + ).runPipeline(pipeline, augmentedOptions); } })(); }