From ffbb3362d659c6472d70b616b9b48510a0d8f975 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 4 Nov 2022 15:52:27 -0700 Subject: [PATCH 1/6] Work around coders bug on Dataflow. --- .../src/apache_beam/worker/worker.ts | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/sdks/typescript/src/apache_beam/worker/worker.ts b/sdks/typescript/src/apache_beam/worker/worker.ts index d670601b059f..2c2eb2430bac 100644 --- a/sdks/typescript/src/apache_beam/worker/worker.ts +++ b/sdks/typescript/src/apache_beam/worker/worker.ts @@ -190,7 +190,10 @@ export class Worker { }, }); } else { - this.processBundleDescriptors.set(descriptorId, value); + this.processBundleDescriptors.set( + descriptorId, + maybeStripDataflowWindowedWrappings(value) + ); this.process(request); } } @@ -440,3 +443,23 @@ function isPrimitive(transform: PTransform): boolean { ); } } + +function maybeStripDataflowWindowedWrappings( + descriptor: ProcessBundleDescriptor +): ProcessBundleDescriptor { + for (const pcoll of Object.values(descriptor.pcollections)) { + const coder = descriptor.coders[pcoll.coderId]; + if ( + coder.spec?.urn == "beam:coder:windowed_value:v1" || + coder.spec?.urn == "beam:coder:param_windowed_value:v1" + ) { + // Dataflow sets PCollection coder_id to a coder of WindowedValues rather + // than the coder of the element type alone. + // Until Dataflow is fixed, we must assume that the element type is not + // actually a windowed value, but that this wrapping was extraneously + // added. + pcoll.coderId = coder.componentCoderIds[0]; + } + } + return descriptor; +} From 34f6ae6ae5f0584275f936f8898855d84151ada4 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 4 Nov 2022 15:56:26 -0700 Subject: [PATCH 2/6] Add a test runner for running multiple pipelines in parallel. This runner combines multiple distinct pipelines into a single "forest" pipelines which is then executed. This is useful for runners, such as dataflow, that have a high per-pipeline overhead. --- .../src/apache_beam/internal/pipeline.ts | 12 +- .../src/apache_beam/runners/dataflow.ts | 2 +- .../src/apache_beam/runners/direct_runner.ts | 7 +- .../src/apache_beam/runners/flink.ts | 2 +- .../runners/portable_runner/runner.ts | 4 +- .../src/apache_beam/runners/runner.ts | 11 +- .../src/apache_beam/runners/universal.ts | 2 +- .../testing/multi_pipeline_runner.ts | 138 ++++++++++++++++++ .../src/apache_beam/worker/operators.ts | 2 +- sdks/typescript/test/primitives_test.ts | 110 +++++++------- 10 files changed, 217 insertions(+), 73 deletions(-) create mode 100644 sdks/typescript/src/apache_beam/testing/multi_pipeline_runner.ts diff --git a/sdks/typescript/src/apache_beam/internal/pipeline.ts b/sdks/typescript/src/apache_beam/internal/pipeline.ts index 0dcd80340629..fc33fdb25072 100644 --- a/sdks/typescript/src/apache_beam/internal/pipeline.ts +++ b/sdks/typescript/src/apache_beam/internal/pipeline.ts @@ -39,7 +39,7 @@ export class PipelineContext { private coders: { [key: string]: Coder } = {}; - constructor(public components: Components) {} + constructor(public components: Components, private componentPrefix: string) {} getCoder(coderId: string): Coder { const this_ = this; @@ -102,7 +102,7 @@ export class PipelineContext { } createUniqueName(prefix: string): string { - return prefix + "_" + this.counter++; + return this.componentPrefix + prefix + "_" + this.counter++; } } @@ -119,15 +119,15 @@ export class Pipeline { private proto: runnerApi.Pipeline; private globalWindowing: string; - constructor() { - this.defaultEnvironment = "jsEnvironment"; - this.globalWindowing = "globalWindowing"; + constructor(componentPrefix: string = "") { + this.defaultEnvironment = componentPrefix + "jsEnvironment"; + this.globalWindowing = componentPrefix + "globalWindowing"; this.proto = runnerApi.Pipeline.create({ components: runnerApi.Components.create({}), }); this.proto.components!.environments[this.defaultEnvironment] = environments.defaultJsEnvironment(); - this.context = new PipelineContext(this.proto.components!); + this.context = new PipelineContext(this.proto.components!, componentPrefix); this.proto.components!.windowingStrategies[this.globalWindowing] = createWindowingStrategyProto(this, globalWindows()); } diff --git a/sdks/typescript/src/apache_beam/runners/dataflow.ts b/sdks/typescript/src/apache_beam/runners/dataflow.ts index 66ff4b46229f..1bdd250cb1e6 100644 --- a/sdks/typescript/src/apache_beam/runners/dataflow.ts +++ b/sdks/typescript/src/apache_beam/runners/dataflow.ts @@ -16,7 +16,7 @@ * limitations under the License. */ -import { Pipeline } from "../internal/pipeline"; +import { Pipeline } from "../proto/beam_runner_api"; import { PipelineResult, Runner } from "./runner"; import { PortableRunner } from "./portable_runner/runner"; import { PythonService } from "../utils/service"; diff --git a/sdks/typescript/src/apache_beam/runners/direct_runner.ts b/sdks/typescript/src/apache_beam/runners/direct_runner.ts index 1b6d00515aa7..f4544aef40e5 100644 --- a/sdks/typescript/src/apache_beam/runners/direct_runner.ts +++ b/sdks/typescript/src/apache_beam/runners/direct_runner.ts @@ -68,8 +68,7 @@ export class DirectRunner extends Runner { return [...this.unsupportedFeaturesIter(pipeline, options)]; } - *unsupportedFeaturesIter(pipeline, options: Object = {}) { - const proto: runnerApi.Pipeline = pipeline.proto; + *unsupportedFeaturesIter(proto : runnerApi.Pipeline, options: Object = {}) { for (const requirement of proto.requirements) { if (!SUPPORTED_REQUIREMENTS.includes(requirement)) { yield requirement; @@ -103,13 +102,13 @@ export class DirectRunner extends Runner { } } - async runPipeline(p): Promise { + async runPipeline(p : runnerApi.Pipeline): Promise { const stateProvider = new InMemoryStateProvider(); const stateCacheRef = uuid.v4(); DirectRunner.inMemoryStatesRefs.set(stateCacheRef, stateProvider); try { - const proto = rewriteSideInputs(p.proto, stateCacheRef); + const proto = rewriteSideInputs(p, stateCacheRef); const descriptor: ProcessBundleDescriptor = { id: "", transforms: proto.components!.transforms, diff --git a/sdks/typescript/src/apache_beam/runners/flink.ts b/sdks/typescript/src/apache_beam/runners/flink.ts index 4acb68e642fa..878112162fdc 100644 --- a/sdks/typescript/src/apache_beam/runners/flink.ts +++ b/sdks/typescript/src/apache_beam/runners/flink.ts @@ -20,7 +20,7 @@ const fs = require("fs"); const os = require("os"); const path = require("path"); -import { Pipeline } from "../internal/pipeline"; +import { Pipeline } from "../proto/beam_runner_api"; import { PipelineResult, Runner } from "./runner"; import { PortableRunner } from "./portable_runner/runner"; import { JavaJarService } from "../utils/service"; diff --git a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts index b3819bea6228..3220852ac740 100644 --- a/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts +++ b/sdks/typescript/src/apache_beam/runners/portable_runner/runner.ts @@ -179,10 +179,10 @@ export class PortableRunner extends Runner { } async runPipeline( - pipeline: Pipeline, + pipeline: runnerApiProto.Pipeline, options?: PipelineOptions ): Promise { - return this.runPipelineWithProto(pipeline.getProto(), options); + return this.runPipelineWithProto(pipeline, options); } async runPipelineWithProto( diff --git a/sdks/typescript/src/apache_beam/runners/runner.ts b/sdks/typescript/src/apache_beam/runners/runner.ts index 7f0d716ef7f0..935b325b38a6 100644 --- a/sdks/typescript/src/apache_beam/runners/runner.ts +++ b/sdks/typescript/src/apache_beam/runners/runner.ts @@ -17,6 +17,7 @@ */ import { JobState_Enum } from "../proto/beam_job_api"; +import * as runnerApi from "../proto/beam_runner_api"; import { MonitoringInfo } from "../proto/metrics"; import { Pipeline } from "../internal/pipeline"; import { Root, PValue } from "../pvalue"; @@ -88,9 +89,7 @@ export abstract class Runner { pipeline: (root: Root) => PValue | Promise>, options?: PipelineOptions ): Promise { - const p = new Pipeline(); - await pipeline(new Root(p)); - const pipelineResult = await this.runPipeline(p, options); + const pipelineResult = await this.runAsync(pipeline, options); const finalState = await pipelineResult.waitUntilFinish(); if (finalState != JobState_Enum.DONE) { // TODO: Grab the last/most severe error message? @@ -110,11 +109,11 @@ export abstract class Runner { ): Promise { const p = new Pipeline(); await pipeline(new Root(p)); - return this.runPipeline(p); + return this.runPipeline(p.getProto()); } abstract runPipeline( - pipeline: Pipeline, + pipeline: runnerApi.Pipeline, options?: PipelineOptions ): Promise; } @@ -122,7 +121,7 @@ export abstract class Runner { export function defaultRunner(defaultOptions: Object): Runner { return new (class extends Runner { async runPipeline( - pipeline: Pipeline, + pipeline: runnerApi.Pipeline, options: Object = {} ): Promise { const directRunner = diff --git a/sdks/typescript/src/apache_beam/runners/universal.ts b/sdks/typescript/src/apache_beam/runners/universal.ts index f0f382c39d58..2a30570adf64 100644 --- a/sdks/typescript/src/apache_beam/runners/universal.ts +++ b/sdks/typescript/src/apache_beam/runners/universal.ts @@ -16,7 +16,7 @@ * limitations under the License. */ -import { Pipeline } from "../internal/pipeline"; +import { Pipeline } from "../proto/beam_runner_api"; import { PipelineResult, Runner } from "./runner"; import { PortableRunner } from "./portable_runner/runner"; import { PythonService } from "../utils/service"; diff --git a/sdks/typescript/src/apache_beam/testing/multi_pipeline_runner.ts b/sdks/typescript/src/apache_beam/testing/multi_pipeline_runner.ts new file mode 100644 index 000000000000..e6932ad46769 --- /dev/null +++ b/sdks/typescript/src/apache_beam/testing/multi_pipeline_runner.ts @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as runnerApi from "../proto/beam_runner_api"; +import * as jobApi from "../proto/beam_job_api"; +import { withName } from "../transforms"; +import { PipelineOptions } from "../options/pipeline_options"; +import { Pipeline } from "../internal/pipeline"; +import { PValue, Root } from "../pvalue"; +import { PipelineResult, Runner } from "../runners/runner"; + +class FakePipelineResult extends PipelineResult { + async waitUntilFinish(duration?: number): Promise { + return jobApi.JobState_Enum.DONE; + } +} + +export class MultiPipelineRunner extends Runner { + allPipelines?: runnerApi.Pipeline; + counter: number = 0; + + constructor( + private underlying: Runner, + private options: PipelineOptions = {} + ) { + super(); + } + + async runAsync( + pipeline: (root: Root) => PValue | Promise>, + options?: PipelineOptions + ): Promise { + const uniqueName = this.getPrefix(); + const p = new Pipeline(uniqueName); + await new Root(p).applyAsync( + withName(uniqueName, async (root) => { + await pipeline(root); + }) + ); + return this.runPipeline(p.getProto()); + } + + async runPipeline( + pipeline: runnerApi.Pipeline, + options?: PipelineOptions + ): Promise { + if (options) { + throw new Error("Per-pipeline options not supported."); + } + this.mergePipeline(pipeline); + return new FakePipelineResult(); + } + + async reallyRunPipelines() { + if (this.allPipelines === undefined) { + return new FakePipelineResult(); + } + console.log(this.allPipelines); + const pipelineResult = await this.underlying.runPipeline( + this.allPipelines, + this.options + ); + const finalState = await pipelineResult.waitUntilFinish(); + if (finalState != jobApi.JobState_Enum.DONE) { + // TODO: Grab the last/most severe error message? + throw new Error( + "Job finished in state " + jobApi.JobState_Enum[finalState] + ); + } + this.allPipelines = undefined; + return pipelineResult; + } + + getPrefix(): string { + try { + return "namespace_" + this.counter + "_"; + } finally { + this.counter += 1; + } + } + + mergePipeline(pipeline: runnerApi.Pipeline) { + if (this.allPipelines === undefined) { + this.allPipelines = runnerApi.Pipeline.create({ + components: runnerApi.Components.create({}), + }); + } + function mergeComponents(src, dest) { + for (const [id, proto] of Object.entries(src)) { + if (dest[id] === undefined) { + dest[id] = proto; + } else if (dest[id] != proto) { + require('assert').deepEqual(dest[id], proto); + throw new Error("Expected distinct components: " + id); + } + } + } + mergeComponents( + pipeline.components?.transforms, + this.allPipelines.components?.transforms + ); + mergeComponents( + pipeline.components?.pcollections, + this.allPipelines.components?.pcollections + ); + mergeComponents( + pipeline.components?.coders, + this.allPipelines.components?.coders + ); + mergeComponents( + pipeline.components?.windowingStrategies, + this.allPipelines.components?.windowingStrategies + ); + mergeComponents( + pipeline.components?.environments, + this.allPipelines.components?.environments + ); + this.allPipelines.requirements = + [...new Set([...this.allPipelines.rootTransformIds, ...pipeline.requirements])]; + this.allPipelines.rootTransformIds = + [...this.allPipelines.rootTransformIds, ...pipeline.rootTransformIds]; + } +} diff --git a/sdks/typescript/src/apache_beam/worker/operators.ts b/sdks/typescript/src/apache_beam/worker/operators.ts index bf319c96202f..e04797cdd44b 100644 --- a/sdks/typescript/src/apache_beam/worker/operators.ts +++ b/sdks/typescript/src/apache_beam/worker/operators.ts @@ -113,7 +113,7 @@ export class OperatorContext { public loggingStageInfo: LoggingStageInfo, public metricsContainer: MetricsContainer ) { - this.pipelineContext = new PipelineContext(descriptor); + this.pipelineContext = new PipelineContext(descriptor, ""); } } diff --git a/sdks/typescript/test/primitives_test.ts b/sdks/typescript/test/primitives_test.ts index 8259754ede14..832fec2a4e33 100644 --- a/sdks/typescript/test/primitives_test.ts +++ b/sdks/typescript/test/primitives_test.ts @@ -36,6 +36,7 @@ import * as windowings from "../src/apache_beam/transforms/windowings"; import * as pardo from "../src/apache_beam/transforms/pardo"; import { withName } from "../src/apache_beam/transforms"; import * as service from "../src/apache_beam/utils/service"; +import { MultiPipelineRunner } from "../src/apache_beam/testing/multi_pipeline_runner"; let subprocessCache; before(async function () { @@ -52,23 +53,23 @@ before(async function () { after(() => subprocessCache.stopAll()); export function suite(runner: beam.Runner = new DirectRunner()) { - describe("testing.assertDeepEqual", function () { - // The tests below won't catch failures if this doesn't fail. - it("fails on bad assert", async function () { - // TODO: There's probably a more idiomatic way to test failures. - var seenError = false; - try { - await runner.run((root) => { - const pcolls = root - .apply(beam.create([1, 2, 3])) - .apply(testing.assertDeepEqual([1, 2])); - }); - } catch (Error) { - seenError = true; - } - assert.equal(true, seenError); - }); - }); +// describe("testing.assertDeepEqual", function () { +// // The tests below won't catch failures if this doesn't fail. +// it("fails on bad assert", async function () { +// // TODO: There's probably a more idiomatic way to test failures. +// var seenError = false; +// try { +// await runner.run((root) => { +// const pcolls = root +// .apply(beam.create([1, 2, 3])) +// .apply(testing.assertDeepEqual([1, 2])); +// }); +// } catch (Error) { +// seenError = true; +// } +// assert.equal(true, seenError); +// }); +// }); describe("runs basic transforms", function () { it("runs a map", async function () { @@ -108,36 +109,36 @@ export function suite(runner: beam.Runner = new DirectRunner()) { }); }); - it("runs a map with counters", async function () { - const result = await new DirectRunner().run((root) => { - root - .apply(beam.create([1, 2, 3])) - .map( - withName( - "mapWithCounter", - pardo.withContext( - (x: number, context) => { - context.myCounter.increment(x); - context.myDist.update(x); - return x * x; - }, - { - myCounter: pardo.counter("myCounter"), - myDist: pardo.distribution("myDist"), - } - ) - ) - ) - .apply(testing.assertDeepEqual([1, 4, 9])); - }); - assert.deepEqual((await result.counters()).myCounter, 1 + 2 + 3); - assert.deepEqual((await result.distributions()).myDist, { - count: 3, - sum: 6, - min: 1, - max: 3, - }); - }); +// it("runs a map with counters", async function () { +// const result = await new DirectRunner().run((root) => { +// root +// .apply(beam.create([1, 2, 3])) +// .map( +// withName( +// "mapWithCounter", +// pardo.withContext( +// (x: number, context) => { +// context.myCounter.increment(x); +// context.myDist.update(x); +// return x * x; +// }, +// { +// myCounter: pardo.counter("myCounter"), +// myDist: pardo.distribution("myDist"), +// } +// ) +// ) +// ) +// .apply(testing.assertDeepEqual([1, 4, 9])); +// }); +// assert.deepEqual((await result.counters()).myCounter, 1 + 2 + 3); +// assert.deepEqual((await result.distributions()).myDist, { +// count: 3, +// sum: 6, +// min: 1, +// max: 3, +// }); +// }); it("runs a map with singleton side input", async function () { await runner.run((root) => { @@ -284,9 +285,16 @@ export function suite(runner: beam.Runner = new DirectRunner()) { describe("primitives module", function () { describe("direct runner", suite.bind(this)); - if (process.env.BEAM_SERVICE_OVERRIDES) { - describe("portable runner @ulr", () => { - suite.bind(this)(loopbackRunner()); + // if (process.env.BEAM_SERVICE_OVERRIDES) { + // describe("portable runner @ulr", () => { + // suite.bind(this)(loopbackRunner()); + // }); + // } + describe("all at once", async () => { + const runner = new MultiPipelineRunner(new DirectRunner()); + suite.bind(this)(runner); + after(async () => { + console.log(await runner.reallyRunPipelines()); }); - } + }); }); From b692d6f2a8ba335a359fd0b2de724a0617bf9294 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 4 Nov 2022 17:55:17 -0700 Subject: [PATCH 3/6] Run tests as single Dataflow pipeline. --- .../runners/dataflow/dataflow_runner.py | 4 + .../portability/fn_api_runner/fn_runner.py | 4 + .../runners/portability/local_job_service.py | 8 +- .../src/apache_beam/runners/direct_runner.ts | 4 +- .../testing/multi_pipeline_runner.ts | 35 +++-- sdks/typescript/test/primitives_test.ts | 133 ++++++++++-------- 6 files changed, 118 insertions(+), 70 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index d581c48cee13..aecbc29425c2 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -1501,6 +1501,10 @@ def job_id(self): def metrics(self): return self.metric_results + def monitoring_infos(self): + logging.warning('Monitoring infos not yet supported for Dataflow runner.') + return [] + @property def has_job(self): return self._job is not None diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 89f7abcc25c7..f803a96a3b15 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -1548,3 +1548,7 @@ def monitoring_metrics(self): self._monitoring_metrics = FnApiMetrics( self._monitoring_infos_by_stage, user_metrics_only=False) return self._monitoring_metrics + + def monitoring_infos(self): + for ms in self._monitoring_infos_by_stage.values(): + yield from ms diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py index 86d14c771d55..7f1908a7e7e1 100644 --- a/sdks/python/apache_beam/runners/portability/local_job_service.py +++ b/sdks/python/apache_beam/runners/portability/local_job_service.py @@ -159,10 +159,10 @@ def GetJobMetrics(self, request, context=None): raise LookupError("Job {} does not exist".format(request.job_id)) result = self._jobs[request.job_id].result - monitoring_info_list = [] - if result is not None: - for mi in result._monitoring_infos_by_stage.values(): - monitoring_info_list.extend(mi) + if result is None: + monitoring_info_list = [] + else: + monitoring_info_list = result.monitoring_infos() # Filter out system metrics user_monitoring_info_list = [ diff --git a/sdks/typescript/src/apache_beam/runners/direct_runner.ts b/sdks/typescript/src/apache_beam/runners/direct_runner.ts index f4544aef40e5..39ff2dc5f8f7 100644 --- a/sdks/typescript/src/apache_beam/runners/direct_runner.ts +++ b/sdks/typescript/src/apache_beam/runners/direct_runner.ts @@ -68,7 +68,7 @@ export class DirectRunner extends Runner { return [...this.unsupportedFeaturesIter(pipeline, options)]; } - *unsupportedFeaturesIter(proto : runnerApi.Pipeline, options: Object = {}) { + *unsupportedFeaturesIter(proto: runnerApi.Pipeline, options: Object = {}) { for (const requirement of proto.requirements) { if (!SUPPORTED_REQUIREMENTS.includes(requirement)) { yield requirement; @@ -102,7 +102,7 @@ export class DirectRunner extends Runner { } } - async runPipeline(p : runnerApi.Pipeline): Promise { + async runPipeline(p: runnerApi.Pipeline): Promise { const stateProvider = new InMemoryStateProvider(); const stateCacheRef = uuid.v4(); DirectRunner.inMemoryStatesRefs.set(stateCacheRef, stateProvider); diff --git a/sdks/typescript/src/apache_beam/testing/multi_pipeline_runner.ts b/sdks/typescript/src/apache_beam/testing/multi_pipeline_runner.ts index e6932ad46769..0272507e0caf 100644 --- a/sdks/typescript/src/apache_beam/testing/multi_pipeline_runner.ts +++ b/sdks/typescript/src/apache_beam/testing/multi_pipeline_runner.ts @@ -33,6 +33,8 @@ class FakePipelineResult extends PipelineResult { export class MultiPipelineRunner extends Runner { allPipelines?: runnerApi.Pipeline; counter: number = 0; + nextTestName?: string; + usedTestNames: Set = new Set(); constructor( private underlying: Runner, @@ -41,17 +43,31 @@ export class MultiPipelineRunner extends Runner { super(); } + setNextTestName(name: string) { + var counter = 1; + var nextTestName = name; + while (this.usedTestNames.has(nextTestName)) { + counter++; + nextTestName = name + counter; + } + this.nextTestName = nextTestName; + } + async runAsync( pipeline: (root: Root) => PValue | Promise>, options?: PipelineOptions ): Promise { - const uniqueName = this.getPrefix(); - const p = new Pipeline(uniqueName); + if (this.nextTestName === undefined) { + this.setNextTestName("pipeline"); + } + this.usedTestNames.add(this.nextTestName!); + const p = new Pipeline(this.getPrefix()); await new Root(p).applyAsync( - withName(uniqueName, async (root) => { + withName(this.nextTestName!, async (root) => { await pipeline(root); }) ); + this.nextTestName = undefined; return this.runPipeline(p.getProto()); } @@ -105,7 +121,7 @@ export class MultiPipelineRunner extends Runner { if (dest[id] === undefined) { dest[id] = proto; } else if (dest[id] != proto) { - require('assert').deepEqual(dest[id], proto); + require("assert").deepEqual(dest[id], proto); throw new Error("Expected distinct components: " + id); } } @@ -130,9 +146,12 @@ export class MultiPipelineRunner extends Runner { pipeline.components?.environments, this.allPipelines.components?.environments ); - this.allPipelines.requirements = - [...new Set([...this.allPipelines.rootTransformIds, ...pipeline.requirements])]; - this.allPipelines.rootTransformIds = - [...this.allPipelines.rootTransformIds, ...pipeline.rootTransformIds]; + this.allPipelines.requirements = [ + ...new Set([...this.allPipelines.requirements, ...pipeline.requirements]), + ]; + this.allPipelines.rootTransformIds = [ + ...this.allPipelines.rootTransformIds, + ...pipeline.rootTransformIds, + ]; } } diff --git a/sdks/typescript/test/primitives_test.ts b/sdks/typescript/test/primitives_test.ts index 832fec2a4e33..29fe978fd67f 100644 --- a/sdks/typescript/test/primitives_test.ts +++ b/sdks/typescript/test/primitives_test.ts @@ -53,23 +53,23 @@ before(async function () { after(() => subprocessCache.stopAll()); export function suite(runner: beam.Runner = new DirectRunner()) { -// describe("testing.assertDeepEqual", function () { -// // The tests below won't catch failures if this doesn't fail. -// it("fails on bad assert", async function () { -// // TODO: There's probably a more idiomatic way to test failures. -// var seenError = false; -// try { -// await runner.run((root) => { -// const pcolls = root -// .apply(beam.create([1, 2, 3])) -// .apply(testing.assertDeepEqual([1, 2])); -// }); -// } catch (Error) { -// seenError = true; -// } -// assert.equal(true, seenError); -// }); -// }); + describe("testing.assertDeepEqual", function () { + // The tests below won't catch failures if this doesn't fail. + it("fails on bad assert", async function () { + // TODO: There's probably a more idiomatic way to test failures. + var seenError = false; + try { + await runner.run((root) => { + const pcolls = root + .apply(beam.create([1, 2, 3])) + .apply(testing.assertDeepEqual([1, 2])); + }); + } catch (Error) { + seenError = true; + } + assert.equal(true, seenError); + }); + }); describe("runs basic transforms", function () { it("runs a map", async function () { @@ -109,36 +109,36 @@ export function suite(runner: beam.Runner = new DirectRunner()) { }); }); -// it("runs a map with counters", async function () { -// const result = await new DirectRunner().run((root) => { -// root -// .apply(beam.create([1, 2, 3])) -// .map( -// withName( -// "mapWithCounter", -// pardo.withContext( -// (x: number, context) => { -// context.myCounter.increment(x); -// context.myDist.update(x); -// return x * x; -// }, -// { -// myCounter: pardo.counter("myCounter"), -// myDist: pardo.distribution("myDist"), -// } -// ) -// ) -// ) -// .apply(testing.assertDeepEqual([1, 4, 9])); -// }); -// assert.deepEqual((await result.counters()).myCounter, 1 + 2 + 3); -// assert.deepEqual((await result.distributions()).myDist, { -// count: 3, -// sum: 6, -// min: 1, -// max: 3, -// }); -// }); + it("runs a map with counters", async function () { + const result = await new DirectRunner().run((root) => { + root + .apply(beam.create([1, 2, 3])) + .map( + withName( + "mapWithCounter", + pardo.withContext( + (x: number, context) => { + context.myCounter.increment(x); + context.myDist.update(x); + return x * x; + }, + { + myCounter: pardo.counter("myCounter"), + myDist: pardo.distribution("myDist"), + } + ) + ) + ) + .apply(testing.assertDeepEqual([1, 4, 9])); + }); + assert.deepEqual((await result.counters()).myCounter, 1 + 2 + 3); + assert.deepEqual((await result.distributions()).myDist, { + count: 3, + sum: 6, + min: 1, + max: 3, + }); + }); it("runs a map with singleton side input", async function () { await runner.run((root) => { @@ -285,16 +285,37 @@ export function suite(runner: beam.Runner = new DirectRunner()) { describe("primitives module", function () { describe("direct runner", suite.bind(this)); - // if (process.env.BEAM_SERVICE_OVERRIDES) { - // describe("portable runner @ulr", () => { - // suite.bind(this)(loopbackRunner()); - // }); - // } - describe("all at once", async () => { - const runner = new MultiPipelineRunner(new DirectRunner()); - suite.bind(this)(runner); - after(async () => { + + if (process.env.BEAM_SERVICE_OVERRIDES) { + describe("portable runner @ulr", () => { + suite.bind(this)(loopbackRunner()); + }); + } + + describe("multi-pipeline @dataflow", async () => { + const runner = new MultiPipelineRunner( + require("../src/apache_beam/runners/dataflow").dataflowRunner({ + project: "apache-beam-testing", + tempLocation: "gs://temp-storage-for-end-to-end-tests/temp-it", + region: "us-central1", + }) + ); + + beforeEach(function () { + if ( + this.test!.title.includes("fails") || + this.test!.title.includes("counter") + ) { + this.skip(); + } + runner.setNextTestName(this.test!.title.match(/([^"]+)"$/)![1]); + }); + + after(async function () { + this.timeout(10 * 60 * 1000 /* 10 min */); console.log(await runner.reallyRunPipelines()); }); + + suite.bind(this)(runner); }); }); From 26d4021a36c8ee046418cd24e36f4d61b4699b7b Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 7 Nov 2022 11:36:59 -0800 Subject: [PATCH 4/6] Github hook for dataflow pipelines. --- .github/workflows/typescript_tests.yml | 46 +++++++++++++++++++ sdks/typescript/test/primitives_test.ts | 61 ++++++++++++++++--------- 2 files changed, 86 insertions(+), 21 deletions(-) diff --git a/.github/workflows/typescript_tests.yml b/.github/workflows/typescript_tests.yml index 41b1ad4019e8..0ad75902f9bb 100644 --- a/.github/workflows/typescript_tests.yml +++ b/.github/workflows/typescript_tests.yml @@ -103,3 +103,49 @@ jobs: working-directory: ./sdks/typescript env: BEAM_SERVICE_OVERRIDES: '{"python:*": "python"}' + + typescript_dataflow_tests: + name: 'TypeScript Dataflow Tests' + runs-on: ubuntu-latest + strategy: + fail-fast: false + steps: + - name: Checkout code + uses: actions/checkout@v3 + with: + persist-credentials: false + submodules: recursive + - name: Install node + uses: actions/setup-node@v3 + with: + node-version: '16' + - name: Install python + uses: actions/setup-python@v4 + with: + python-version: 3.8 + - name: Setup Beam Python + working-directory: ./sdks/python + run: | + pip install pip setuptools --upgrade + pip install -r build-requirements.txt + pip install 'pandas>=1.0,<1.5' + python setup.py develop + pip install -e ".[gcp]" + - name: Authenticate on GCP + uses: google-github-actions/setup-gcloud@v0 + with: + service_account_email: ${{ secrets.GCP_SA_EMAIL }} + service_account_key: ${{ secrets.GCP_SA_KEY }} + project_id: ${{ secrets.GCP_PROJECT_ID }} + export_default_credentials: true + - run: npm ci + working-directory: ./sdks/typescript + - run: npm run build + working-directory: ./sdks/typescript + - run: npm test -- --grep "@dataflow" + working-directory: ./sdks/typescript + env: + BEAM_SERVICE_OVERRIDES: '{"python:*": "python"}' + GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }} + GCP_REGION: ${{ secrets.GCP_REGION }} + GCP_TESTING_BUCKET: ${{ secrets.GCP_TESTING_BUCKET }} diff --git a/sdks/typescript/test/primitives_test.ts b/sdks/typescript/test/primitives_test.ts index 29fe978fd67f..ab8edd0bb299 100644 --- a/sdks/typescript/test/primitives_test.ts +++ b/sdks/typescript/test/primitives_test.ts @@ -290,32 +290,51 @@ describe("primitives module", function () { describe("portable runner @ulr", () => { suite.bind(this)(loopbackRunner()); }); + } else { + it("Portable tests not run because BEAM_SERVICE_OVERRIDES not set.", function () { + this.skip(); + }); } describe("multi-pipeline @dataflow", async () => { - const runner = new MultiPipelineRunner( - require("../src/apache_beam/runners/dataflow").dataflowRunner({ - project: "apache-beam-testing", - tempLocation: "gs://temp-storage-for-end-to-end-tests/temp-it", - region: "us-central1", - }) - ); - - beforeEach(function () { - if ( - this.test!.title.includes("fails") || - this.test!.title.includes("counter") - ) { - this.skip(); + if (process.env.GCP_PROJECT_ID) { + if (!process.env.BEAM_SERVICE_OVERRIDES) { + throw new Error("Please specify BEAM_SERVICE_OVERRIDES env var."); } - runner.setNextTestName(this.test!.title.match(/([^"]+)"$/)![1]); - }); + if (!process.env.GCP_TESTING_BUCKET) { + throw new Error("Please specify GCP_REGION env var."); + } + if (!process.env.GCP_REGION) { + throw new Error("Please specify GCP_TESTING_BUCKET env var."); + } + const runner = new MultiPipelineRunner( + require("../src/apache_beam/runners/dataflow").dataflowRunner({ + project: process.env.GCP_PROJECT_ID, + tempLocation: process.env.GCP_TESTING_BUCKET, + region: process.env.GCP_REGION, + }) + ); - after(async function () { - this.timeout(10 * 60 * 1000 /* 10 min */); - console.log(await runner.reallyRunPipelines()); - }); + beforeEach(function () { + if ( + this.test!.title.includes("fails") || + this.test!.title.includes("counter") + ) { + this.skip(); + } + runner.setNextTestName(this.test!.title.match(/([^"]+)"$/)![1]); + }); - suite.bind(this)(runner); + after(async function () { + this.timeout(10 * 60 * 1000 /* 10 min */); + console.log(await runner.reallyRunPipelines()); + }); + + suite.bind(this)(runner); + } else { + it("Dataflow tests not run because GCP_PROJECT_ID not set.", function () { + this.skip(); + }); + } }); }); From 15c6da28d6232709c4b4a896a8621e54361da4c1 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 8 Nov 2022 23:55:30 -0800 Subject: [PATCH 5/6] Guard dataflow run against GCP credentials. --- .github/workflows/typescript_tests.yml | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/.github/workflows/typescript_tests.yml b/.github/workflows/typescript_tests.yml index 0ad75902f9bb..47759c67f1a6 100644 --- a/.github/workflows/typescript_tests.yml +++ b/.github/workflows/typescript_tests.yml @@ -104,9 +104,30 @@ jobs: env: BEAM_SERVICE_OVERRIDES: '{"python:*": "python"}' + check_gcp_variables: + timeout-minutes: 5 + name: "Check GCP variables" + runs-on: ubuntu-latest + outputs: + gcp-variables-set: ${{ steps.check_gcp_variables.outputs.gcp-variables-set }} + steps: + - uses: actions/checkout@v3 + - name: "Check are GCP variables set" + run: "./scripts/ci/ci_check_are_gcp_variables_set.sh" + id: check_gcp_variables + env: + GCP_PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }} + GCP_REGION: ${{ secrets.GCP_REGION }} + GCP_SA_EMAIL: ${{ secrets.GCP_SA_EMAIL }} + GCP_SA_KEY: ${{ secrets.GCP_SA_KEY }} + GCP_TESTING_BUCKET: ${{ secrets.GCP_TESTING_BUCKET }} + GCP_PYTHON_WHEELS_BUCKET: "not-needed-here" + typescript_dataflow_tests: name: 'TypeScript Dataflow Tests' runs-on: ubuntu-latest + needs: + - check_gcp_variables strategy: fail-fast: false steps: From 177f9b9b3884a098ef66370a196a3b2f7cc66dbe Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Wed, 9 Nov 2022 13:36:59 -0800 Subject: [PATCH 6/6] Guard running of precommit against having variables set. --- .github/workflows/typescript_tests.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/typescript_tests.yml b/.github/workflows/typescript_tests.yml index 47759c67f1a6..3d51ecb2675e 100644 --- a/.github/workflows/typescript_tests.yml +++ b/.github/workflows/typescript_tests.yml @@ -128,6 +128,7 @@ jobs: runs-on: ubuntu-latest needs: - check_gcp_variables + if: needs.check_gcp_variables.outputs.gcp-variables-set == 'true' strategy: fail-fast: false steps: