From 2dd514eef6ae8d27d89a5ad0f244854f0b6b7160 Mon Sep 17 00:00:00 2001 From: Luke Cwik Date: Wed, 28 Dec 2022 14:30:53 -0800 Subject: [PATCH] [#24515] Ensure that portable Java pipelines on Dataflow are not able to opt out of runner v2. Fixes #24515 --- CHANGES.md | 9 +-- .../beam/runners/dataflow/DataflowRunner.java | 21 ++++++- .../runners/dataflow/DataflowRunnerTest.java | 57 +++++++++++++++++++ 3 files changed, 82 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e856ef18dab3..c8d3eb97e889 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -68,10 +68,11 @@ ## Breaking Changes -* Go pipelines, Python streaming pipelines, and portable Python batch pipelines on Dataflow are required to - use Runner V2. The `disable_runner_v2`, `disable_runner_v2_until_2023`, `disable_prime_runner_v2` - experiments will raise an error during pipeline construction. You can no longer specify the Dataflow worker - jar override. Note that non-portable Python batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515)). +* Portable Java pipelines, Go pipelines, Python streaming pipelines, and portable Python batch + pipelines on Dataflow are required to use Runner V2. The `disable_runner_v2`, + `disable_runner_v2_until_2023`, `disable_prime_runner_v2` experiments will raise an error during + pipeline construction. You can no longer specify the Dataflow worker jar override. Note that + non-portable Java jobs and non-portable Python batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515)). ## Deprecations diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index fa0632ebaf40..344490bc003b 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -1096,6 +1096,12 @@ public DataflowPipelineJob run(Pipeline pipeline) { } } if (useUnifiedWorker(options)) { + if (hasExperiment(options, "disable_runner_v2") + || hasExperiment(options, "disable_runner_v2_until_2023") + || hasExperiment(options, "disable_prime_runner_v2")) { + throw new IllegalArgumentException( + "Runner V2 both disabled and enabled: at least one of ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set."); + } List experiments = new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true if (!experiments.contains("use_runner_v2")) { @@ -1116,6 +1122,18 @@ public DataflowPipelineJob run(Pipeline pipeline) { logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); if (shouldActAsStreaming(pipeline)) { options.setStreaming(true); + + if (useUnifiedWorker(options)) { + options.setEnableStreamingEngine(true); + List experiments = + new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true + if (!experiments.contains("enable_streaming_engine")) { + experiments.add("enable_streaming_engine"); + } + if (!experiments.contains("enable_windmill_service")) { + experiments.add("enable_windmill_service"); + } + } } if (!ExperimentalOptions.hasExperiment(options, "disable_projection_pushdown")) { @@ -2412,7 +2430,8 @@ static String getDefaultContainerVersion(DataflowPipelineOptions options) { static boolean useUnifiedWorker(DataflowPipelineOptions options) { return hasExperiment(options, "beam_fn_api") || hasExperiment(options, "use_runner_v2") - || hasExperiment(options, "use_unified_worker"); + || hasExperiment(options, "use_unified_worker") + || hasExperiment(options, "use_portable_job_submission"); } static boolean useStreamingEngine(DataflowPipelineOptions options) { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 3737ec27e02a..8da6748dd3e5 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -1745,6 +1745,63 @@ public void testSdkHarnessConfigurationPrime() throws IOException { this.verifySdkHarnessConfiguration(options); } + @Test + public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception { + for (String experiment : + ImmutableList.of( + "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) { + DataflowPipelineOptions options = buildPipelineOptions(); + ExperimentalOptions.addExperiment(options, experiment); + Pipeline p = Pipeline.create(options); + p.apply(Create.of("A")); + p.run(); + assertFalse(options.isEnableStreamingEngine()); + assertThat( + options.getExperiments(), + containsInAnyOrder( + "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")); + } + + for (String experiment : + ImmutableList.of( + "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setStreaming(true); + ExperimentalOptions.addExperiment(options, experiment); + Pipeline p = Pipeline.create(options); + p.apply(Create.of("A")); + p.run(); + assertTrue(options.isEnableStreamingEngine()); + assertThat( + options.getExperiments(), + containsInAnyOrder( + "beam_fn_api", + "use_runner_v2", + "use_unified_worker", + "use_portable_job_submission", + "enable_windmill_service", + "enable_streaming_engine")); + } + } + + @Test + public void testSettingConflictingEnableAndDisableExperimentsThrowsException() throws Exception { + for (String experiment : + ImmutableList.of( + "beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) { + for (String disabledExperiment : + ImmutableList.of( + "disable_runner_v2", "disable_runner_v2_until_2023", "disable_prime_runner_v2")) { + DataflowPipelineOptions options = buildPipelineOptions(); + ExperimentalOptions.addExperiment(options, experiment); + ExperimentalOptions.addExperiment(options, disabledExperiment); + Pipeline p = Pipeline.create(options); + p.apply(Create.of("A")); + assertThrows("Runner V2 both disabled and enabled", IllegalArgumentException.class, p::run); + } + } + } + private void verifyMapStateUnsupported(PipelineOptions options) throws Exception { Pipeline p = Pipeline.create(options); p.apply(Create.of(KV.of(13, 42)))