Skip to content

Commit

Permalink
Merge pull request #2 from lukecwik/flink_wordcap
Browse files Browse the repository at this point in the history
Add pipeline options to the Python ULR prepare job request.
  • Loading branch information
axelmagn authored Mar 12, 2018
2 parents 7460342 + e9ffc37 commit 1d8a48d
Showing 1 changed file with 6 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from apache_beam.portability.api import beam_job_api_pb2_grpc
from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners import runner
from apache_beam.runners.job import utils as job_utils
from apache_beam.runners.portability import fn_api_runner

TERMINAL_STATES = [
Expand Down Expand Up @@ -159,11 +160,15 @@ def run_pipeline(self, pipeline):
pcoll.coder_id = proto_context.coders.get_id(coder)
proto_context.coders.populate_map(proto_pipeline.components.coders)

options = {k: v for k, v in pipeline._options.get_all_options().iteritems()
if v is not None}

job_service = self._get_job_service()
prepare_response = job_service.Prepare(
beam_job_api_pb2.PrepareJobRequest(
job_name='job',
pipeline=proto_pipeline))
pipeline=proto_pipeline,
pipeline_options=job_utils.dict_to_struct(options)))
run_response = job_service.Run(beam_job_api_pb2.RunJobRequest(
preparation_id=prepare_response.preparation_id))
return PipelineResult(job_service, run_response.job_id)
Expand Down

0 comments on commit 1d8a48d

Please sign in to comment.