Skip to content

Commit

Permalink
Add pipeline options to the Python ULR prepare job request.
Browse files Browse the repository at this point in the history
  • Loading branch information
lukecwik committed Mar 12, 2018
1 parent da298f5 commit e9ffc37
Showing 1 changed file with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from apache_beam.portability.api import beam_job_api_pb2_grpc
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners import runner
from apache_beam.runners.job import utils as job_utils
from apache_beam.runners.portability import fn_api_runner

TERMINAL_STATES = [
Expand Down Expand Up @@ -159,11 +160,15 @@ def run_pipeline(self, pipeline):
pcoll.coder_id = proto_context.coders.get_id(coder)
proto_context.coders.populate_map(proto_pipeline.components.coders)

options = {k: v for k, v in pipeline._options.get_all_options().iteritems()
if v is not None}

job_service = self._get_job_service()
prepare_response = job_service.Prepare(
beam_job_api_pb2.PrepareJobRequest(
job_name='job',
pipeline=proto_pipeline))
pipeline=proto_pipeline,
pipeline_options=job_utils.dict_to_struct(options)))
run_response = job_service.Run(beam_job_api_pb2.RunJobRequest(
preparation_id=prepare_response.preparation_id))
return PipelineResult(job_service, run_response.job_id)
Expand Down

0 comments on commit e9ffc37

Please sign in to comment.