Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require --sdk_location for Dataflow pipelines running with dev SDKs. #28670

Merged
merged 2 commits into from
Sep 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,16 @@ def _check_and_add_missing_options(options):
elif debug_options.lookup_experiment('enable_prime'):
dataflow_service_options.append('enable_prime')

sdk_location = options.view_as(SetupOptions).sdk_location
if 'dev' in beam.version.__version__ and sdk_location == 'default':
raise ValueError(
"When launching Dataflow Jobs with an unreleased SDK, "
"please provide an SDK distribution in the --sdk_location option "
"to use consistent SDK version at "
"pipeline submission and runtime. To ignore this error and use the "
"SDK installed in Dataflow dev containers, use "
"--sdk_location=container.")

# Streaming only supports using runner v2 (aka unified worker).
# Runner v2 only supports using streaming engine (aka windmill service)
if options.view_as(StandardOptions).streaming:
Expand Down
17 changes: 12 additions & 5 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ def expand(self, pcoll):
self.assertIn(packed_step_name, transform_names)

def test_batch_is_runner_v2(self):
options = PipelineOptions()
options = PipelineOptions(['--sdk_location=container'])
_check_and_add_missing_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
Expand All @@ -512,7 +512,7 @@ def test_batch_is_runner_v2(self):
expected)

def test_streaming_is_runner_v2(self):
options = PipelineOptions(['--streaming'])
options = PipelineOptions(['--sdk_location=container', '--streaming'])
_check_and_add_missing_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
Expand All @@ -525,7 +525,11 @@ def test_streaming_is_runner_v2(self):
expected)

def test_dataflow_service_options_enable_prime_sets_runner_v2(self):
options = PipelineOptions(['--dataflow_service_options=enable_prime'])
options = PipelineOptions([
'--sdk_location=container',
'--streaming',
'--dataflow_service_options=enable_prime'
])
_check_and_add_missing_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
Expand All @@ -535,8 +539,11 @@ def test_dataflow_service_options_enable_prime_sets_runner_v2(self):
options.view_as(DebugOptions).lookup_experiment(expected, False),
expected)

options = PipelineOptions(
['--streaming', '--dataflow_service_options=enable_prime'])
options = PipelineOptions([
'--sdk_location=container',
'--streaming',
'--dataflow_service_options=enable_prime'
])
_check_and_add_missing_options(options)
for expected in ['beam_fn_api',
'use_unified_worker',
Expand Down
8 changes: 3 additions & 5 deletions sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,11 +594,9 @@ def _create_extra_packages(extra_packages, temp_dir):
'".tar", ".tar.gz", ".whl" or ".zip" instead of %s' % package)
if os.path.basename(package).endswith('.whl'):
_LOGGER.warning(
'The .whl package "%s" is provided in --extra_package. '
'This functionality is not officially supported. Since wheel '
'packages are binary distributions, this package must be '
'binary-compatible with the worker environment (e.g. Python 2.7 '
'running on an x64 Linux host).' % package)
'The .whl package "%s" provided in --extra_package '
'must be binary-compatible with the worker runtime environment.' %
package)

if not os.path.isfile(package):
if Stager._is_remote_path(package):
Expand Down