diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 3d612bd6ec0f..950bff768ef4 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -586,6 +586,16 @@ def _check_and_add_missing_options(options): elif debug_options.lookup_experiment('enable_prime'): dataflow_service_options.append('enable_prime') + sdk_location = options.view_as(SetupOptions).sdk_location + if 'dev' in beam.version.__version__ and sdk_location == 'default': + raise ValueError( + "When launching Dataflow Jobs with an unreleased SDK, " + "please provide an SDK distribution in the --sdk_location option " + "to use consistent SDK version at " + "pipeline submission and runtime. To ignore this error and use the " + "SDK installed in Dataflow dev containers, use " + "--sdk_location=container.") + # Streaming only supports using runner v2 (aka unified worker). # Runner v2 only supports using streaming engine (aka windmill service) if options.view_as(StandardOptions).streaming: diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index b58531acc6a9..6258aa8f213b 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -501,7 +501,7 @@ def expand(self, pcoll): self.assertIn(packed_step_name, transform_names) def test_batch_is_runner_v2(self): - options = PipelineOptions() + options = PipelineOptions(['--sdk_location=container']) _check_and_add_missing_options(options) for expected in ['beam_fn_api', 'use_unified_worker', @@ -512,7 +512,7 @@ def test_batch_is_runner_v2(self): expected) def test_streaming_is_runner_v2(self): - options = PipelineOptions(['--streaming']) + options = PipelineOptions(['--sdk_location=container', '--streaming']) _check_and_add_missing_options(options) for expected in ['beam_fn_api', 'use_unified_worker', @@ -525,7 +525,11 @@ def test_streaming_is_runner_v2(self): expected) def test_dataflow_service_options_enable_prime_sets_runner_v2(self): - options = PipelineOptions(['--dataflow_service_options=enable_prime']) + options = PipelineOptions([ + '--sdk_location=container', + '--streaming', + '--dataflow_service_options=enable_prime' + ]) _check_and_add_missing_options(options) for expected in ['beam_fn_api', 'use_unified_worker', @@ -535,8 +539,11 @@ def test_dataflow_service_options_enable_prime_sets_runner_v2(self): options.view_as(DebugOptions).lookup_experiment(expected, False), expected) - options = PipelineOptions( - ['--streaming', '--dataflow_service_options=enable_prime']) + options = PipelineOptions([ + '--sdk_location=container', + '--streaming', + '--dataflow_service_options=enable_prime' + ]) _check_and_add_missing_options(options) for expected in ['beam_fn_api', 'use_unified_worker', diff --git a/sdks/python/apache_beam/runners/portability/stager.py b/sdks/python/apache_beam/runners/portability/stager.py index 1f093b1d7bc3..ace573de0a62 100644 --- a/sdks/python/apache_beam/runners/portability/stager.py +++ b/sdks/python/apache_beam/runners/portability/stager.py @@ -594,11 +594,9 @@ def _create_extra_packages(extra_packages, temp_dir): '".tar", ".tar.gz", ".whl" or ".zip" instead of %s' % package) if os.path.basename(package).endswith('.whl'): _LOGGER.warning( - 'The .whl package "%s" is provided in --extra_package. ' - 'This functionality is not officially supported. Since wheel ' - 'packages are binary distributions, this package must be ' - 'binary-compatible with the worker environment (e.g. Python 2.7 ' - 'running on an x64 Linux host).' % package) + 'The .whl package "%s" provided in --extra_package ' + 'must be binary-compatible with the worker runtime environment.' % + package) if not os.path.isfile(package): if Stager._is_remote_path(package):