From ea6cf404b9f730278d49031580fbae30091e9377 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 3 Nov 2022 14:51:40 -0700 Subject: [PATCH] Implement sibling protocol for Typescript. Though there are various worker thread libraries in Node.js, the lack of a shared memory model makes it difficult to share caches, negating most of the benefit of trying to run multiple threads in the same process. In addition, having multiple independent workers connect to the control service is simpler (and possibly more efficient) than adding a proxying service within the javascript process. --- sdks/typescript/boot.go | 11 ++++++++++- .../src/apache_beam/internal/environments.ts | 8 ++++++-- sdks/typescript/src/apache_beam/runners/dataflow.ts | 4 +++- sdks/typescript/test/io_test.ts | 5 ++++- 4 files changed, 23 insertions(+), 5 deletions(-) diff --git a/sdks/typescript/boot.go b/sdks/typescript/boot.go index 7bace0b1b716..60a39b770d00 100644 --- a/sdks/typescript/boot.go +++ b/sdks/typescript/boot.go @@ -169,5 +169,14 @@ func main() { args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl()) } - log.Fatalf("User program exited: %v", execx.Execute("npx", args...)) + workerIds := append([]string{*workerId}, info.GetSiblingWorkerIds()...) + var wg sync.WaitGroup + wg.Add(len(workerIds)) + for _, workerId := range workerIds { + go func(workerId string) { + log.Printf("Executing: python %v", strings.Join(args, " ")) + log.Fatalf("User program exited: %v", execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "npx", args...)) + }(workerId) + } + wg.Wait() } diff --git a/sdks/typescript/src/apache_beam/internal/environments.ts b/sdks/typescript/src/apache_beam/internal/environments.ts index 4ffc45553972..4e45800839ea 100644 --- a/sdks/typescript/src/apache_beam/internal/environments.ts +++ b/sdks/typescript/src/apache_beam/internal/environments.ts @@ -21,8 +21,12 @@ import * as runnerApi from "../proto/beam_runner_api"; export const TYPESCRIPT_DEFAULT_ENVIRONMENT_URN = "js_default"; function javascriptCapabilities(): string[] { - // XXX This is needed for sessions to work... - return ["beam:coder:interval_window:v1"]; // TODO: Cleanup. Actually populate. + // TODO: Cleanup. Actually populate. + return [ + // This is needed for sessions to work... + "beam:coder:interval_window:v1", + "beam:protocol:sibling_workers:v1", + ]; } export function defaultJsEnvironment() { diff --git a/sdks/typescript/src/apache_beam/runners/dataflow.ts b/sdks/typescript/src/apache_beam/runners/dataflow.ts index 66ff4b46229f..6c9f459a1e26 100644 --- a/sdks/typescript/src/apache_beam/runners/dataflow.ts +++ b/sdks/typescript/src/apache_beam/runners/dataflow.ts @@ -32,13 +32,15 @@ export function dataflowRunner(runnerOptions: { pipeline: Pipeline, options: Object = {} ): Promise { + var augmentedOptions = { experiments: [] as string[], ...options }; + augmentedOptions.experiments.push("use_sibling_sdk_workers"); return new PortableRunner( runnerOptions as any, PythonService.forModule( "apache_beam.runners.dataflow.dataflow_job_service", ["--port", "{{PORT}}"] ) - ).runPipeline(pipeline, options); + ).runPipeline(pipeline, augmentedOptions); } })(); } diff --git a/sdks/typescript/test/io_test.ts b/sdks/typescript/test/io_test.ts index a6fd2743aaa4..d7bf8f2f96a7 100644 --- a/sdks/typescript/test/io_test.ts +++ b/sdks/typescript/test/io_test.ts @@ -60,7 +60,10 @@ before(() => { after(() => subprocessCache.stopAll()); function xlang_it(name, fn) { - return (process.env.BEAM_SERVICE_OVERRIDES ? it : it.skip)(name + ' @xlang', fn); + return (process.env.BEAM_SERVICE_OVERRIDES ? it : it.skip)( + name + " @xlang", + fn + ); } // These depends on fixes that will be released in 2.40.