diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py index 737fef63b193..ceabf53cddf2 100644 --- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py +++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py @@ -38,6 +38,7 @@ from apache_beam.portability.api import beam_job_api_pb2_grpc from apache_beam.portability.api import endpoints_pb2 from apache_beam.runners import runner +from apache_beam.runners.job import utils as job_utils from apache_beam.runners.portability import fn_api_runner TERMINAL_STATES = [ @@ -159,11 +160,15 @@ def run_pipeline(self, pipeline): pcoll.coder_id = proto_context.coders.get_id(coder) proto_context.coders.populate_map(proto_pipeline.components.coders) + options = {k: v for k, v in pipeline._options.get_all_options().iteritems() + if v is not None} + job_service = self._get_job_service() prepare_response = job_service.Prepare( beam_job_api_pb2.PrepareJobRequest( job_name='job', - pipeline=proto_pipeline)) + pipeline=proto_pipeline, + pipeline_options=job_utils.dict_to_struct(options))) run_response = job_service.Run(beam_job_api_pb2.RunJobRequest( preparation_id=prepare_response.preparation_id)) return PipelineResult(job_service, run_response.job_id)