diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 950bff768ef4..7ad6ab04be68 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -97,10 +97,6 @@ def __init__(self, cache=None): def is_fnapi_compatible(self): return False - def apply(self, transform, input, options): - _check_and_add_missing_options(options) - return super().apply(transform, input, options) - @staticmethod def poll_for_job_completion( runner, result, duration, state_update_callback=None): @@ -496,10 +492,6 @@ def _get_coder(typehint, window_coder): coders.registry.get_coder(typehint), window_coder=window_coder) return coders.registry.get_coder(typehint) - # TODO(srohde): Remove this after internal usages have been removed. - def apply_GroupByKey(self, transform, pcoll, options): - return transform.expand(pcoll) - def _verify_gbk_coders(self, transform, pcoll): # Infer coder of parent. # @@ -589,12 +581,14 @@ def _check_and_add_missing_options(options): sdk_location = options.view_as(SetupOptions).sdk_location if 'dev' in beam.version.__version__ and sdk_location == 'default': raise ValueError( - "When launching Dataflow Jobs with an unreleased SDK, " + "You are submitting a pipeline with Apache Beam Python SDK " + f"{beam.version.__version__}. " + "When launching Dataflow jobs with an unreleased (dev) SDK, " "please provide an SDK distribution in the --sdk_location option " - "to use consistent SDK version at " - "pipeline submission and runtime. To ignore this error and use the " - "SDK installed in Dataflow dev containers, use " - "--sdk_location=container.") + "to use a consistent SDK version at " + "pipeline submission and runtime. To ignore this error and use " + "an SDK preinstalled in the default Dataflow dev runtime environment " + "or in a custom container image, use --sdk_location=container.") # Streaming only supports using runner v2 (aka unified worker). # Runner v2 only supports using streaming engine (aka windmill service)