Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#24515] Ensure that portable Java pipelines on Dataflow are not able to opt out of runner v2. #24805

Merged
merged 1 commit into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@

## Breaking Changes

* Go pipelines, Python streaming pipelines, and portable Python batch pipelines on Dataflow are required to
use Runner V2. The `disable_runner_v2`, `disable_runner_v2_until_2023`, `disable_prime_runner_v2`
experiments will raise an error during pipeline construction. You can no longer specify the Dataflow worker
jar override. Note that non-portable Python batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515)).
* Portable Java pipelines, Go pipelines, Python streaming pipelines, and portable Python batch
pipelines on Dataflow are required to use Runner V2. The `disable_runner_v2`,
`disable_runner_v2_until_2023`, `disable_prime_runner_v2` experiments will raise an error during
pipeline construction. You can no longer specify the Dataflow worker jar override. Note that
non-portable Java jobs and non-portable Python batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515)).

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,12 @@ public DataflowPipelineJob run(Pipeline pipeline) {
}
}
if (useUnifiedWorker(options)) {
if (hasExperiment(options, "disable_runner_v2")
|| hasExperiment(options, "disable_runner_v2_until_2023")
|| hasExperiment(options, "disable_prime_runner_v2")) {
throw new IllegalArgumentException(
"Runner V2 both disabled and enabled: at least one of ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set.");
}
List<String> experiments =
new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true
if (!experiments.contains("use_runner_v2")) {
Expand All @@ -1116,6 +1122,18 @@ public DataflowPipelineJob run(Pipeline pipeline) {
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
if (shouldActAsStreaming(pipeline)) {
options.setStreaming(true);

if (useUnifiedWorker(options)) {
options.setEnableStreamingEngine(true);
List<String> experiments =
new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true
if (!experiments.contains("enable_streaming_engine")) {
experiments.add("enable_streaming_engine");
}
if (!experiments.contains("enable_windmill_service")) {
experiments.add("enable_windmill_service");
}
}
}

if (!ExperimentalOptions.hasExperiment(options, "disable_projection_pushdown")) {
Expand Down Expand Up @@ -2412,7 +2430,8 @@ static String getDefaultContainerVersion(DataflowPipelineOptions options) {
static boolean useUnifiedWorker(DataflowPipelineOptions options) {
return hasExperiment(options, "beam_fn_api")
|| hasExperiment(options, "use_runner_v2")
|| hasExperiment(options, "use_unified_worker");
|| hasExperiment(options, "use_unified_worker")
|| hasExperiment(options, "use_portable_job_submission");
}

static boolean useStreamingEngine(DataflowPipelineOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1745,6 +1745,63 @@ public void testSdkHarnessConfigurationPrime() throws IOException {
this.verifySdkHarnessConfiguration(options);
}

@Test
public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception {
for (String experiment :
ImmutableList.of(
"beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) {
DataflowPipelineOptions options = buildPipelineOptions();
ExperimentalOptions.addExperiment(options, experiment);
Pipeline p = Pipeline.create(options);
p.apply(Create.of("A"));
p.run();
assertFalse(options.isEnableStreamingEngine());
assertThat(
options.getExperiments(),
containsInAnyOrder(
"beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission"));
}

for (String experiment :
ImmutableList.of(
"beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) {
DataflowPipelineOptions options = buildPipelineOptions();
options.setStreaming(true);
ExperimentalOptions.addExperiment(options, experiment);
Pipeline p = Pipeline.create(options);
p.apply(Create.of("A"));
p.run();
assertTrue(options.isEnableStreamingEngine());
assertThat(
options.getExperiments(),
containsInAnyOrder(
"beam_fn_api",
"use_runner_v2",
"use_unified_worker",
"use_portable_job_submission",
"enable_windmill_service",
"enable_streaming_engine"));
}
}

@Test
public void testSettingConflictingEnableAndDisableExperimentsThrowsException() throws Exception {
for (String experiment :
ImmutableList.of(
"beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) {
for (String disabledExperiment :
ImmutableList.of(
"disable_runner_v2", "disable_runner_v2_until_2023", "disable_prime_runner_v2")) {
DataflowPipelineOptions options = buildPipelineOptions();
ExperimentalOptions.addExperiment(options, experiment);
ExperimentalOptions.addExperiment(options, disabledExperiment);
Pipeline p = Pipeline.create(options);
p.apply(Create.of("A"));
assertThrows("Runner V2 both disabled and enabled", IllegalArgumentException.class, p::run);
}
}
}

private void verifyMapStateUnsupported(PipelineOptions options) throws Exception {
Pipeline p = Pipeline.create(options);
p.apply(Create.of(KV.of(13, 42)))
Expand Down