Skip to content

Commit

Permalink
Implement sibling protocol for Typescript.
Browse files Browse the repository at this point in the history
Though there are various worker thread libraries in
Node.js, the lack of a shared memory model makes it
difficult to share caches, negating most of the benefit
of trying to run multiple threads in the same process.
In addition, having multiple independent workers connect
to the control service is simpler (and possibly more
efficient) than adding a proxying service within the
javascript process.
  • Loading branch information
robertwb committed Nov 3, 2022
1 parent 8dd8749 commit ea6cf40
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 5 deletions.
11 changes: 10 additions & 1 deletion sdks/typescript/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,5 +169,14 @@ func main() {
args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl())
}

log.Fatalf("User program exited: %v", execx.Execute("npx", args...))
workerIds := append([]string{*workerId}, info.GetSiblingWorkerIds()...)
var wg sync.WaitGroup
wg.Add(len(workerIds))
for _, workerId := range workerIds {
go func(workerId string) {
log.Printf("Executing: python %v", strings.Join(args, " "))
log.Fatalf("User program exited: %v", execx.ExecuteEnv(map[string]string{"WORKER_ID": workerId}, "npx", args...))
}(workerId)
}
wg.Wait()
}
8 changes: 6 additions & 2 deletions sdks/typescript/src/apache_beam/internal/environments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@ import * as runnerApi from "../proto/beam_runner_api";
export const TYPESCRIPT_DEFAULT_ENVIRONMENT_URN = "js_default";

function javascriptCapabilities(): string[] {
// XXX This is needed for sessions to work...
return ["beam:coder:interval_window:v1"]; // TODO: Cleanup. Actually populate.
// TODO: Cleanup. Actually populate.
return [
// This is needed for sessions to work...
"beam:coder:interval_window:v1",
"beam:protocol:sibling_workers:v1",
];
}

export function defaultJsEnvironment() {
Expand Down
4 changes: 3 additions & 1 deletion sdks/typescript/src/apache_beam/runners/dataflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ export function dataflowRunner(runnerOptions: {
pipeline: Pipeline,
options: Object = {}
): Promise<PipelineResult> {
var augmentedOptions = { experiments: [] as string[], ...options };
augmentedOptions.experiments.push("use_sibling_sdk_workers");
return new PortableRunner(
runnerOptions as any,
PythonService.forModule(
"apache_beam.runners.dataflow.dataflow_job_service",
["--port", "{{PORT}}"]
)
).runPipeline(pipeline, options);
).runPipeline(pipeline, augmentedOptions);
}
})();
}
5 changes: 4 additions & 1 deletion sdks/typescript/test/io_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ before(() => {
after(() => subprocessCache.stopAll());

function xlang_it(name, fn) {
return (process.env.BEAM_SERVICE_OVERRIDES ? it : it.skip)(name + ' @xlang', fn);
return (process.env.BEAM_SERVICE_OVERRIDES ? it : it.skip)(
name + " @xlang",
fn
);
}

// These depends on fixes that will be released in 2.40.
Expand Down

0 comments on commit ea6cf40

Please sign in to comment.