Skip to content

Commit

Permalink
WIP HACK hardcode python environment
Browse files Browse the repository at this point in the history
  • Loading branch information
axelmagn committed Mar 9, 2018
1 parent da0e7b4 commit da298f5
Showing 1 changed file with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;

import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.fnexecution.artifact.ArtifactSource;
Expand Down Expand Up @@ -39,14 +41,30 @@ public JobInvocation invoke(JobPreparation preparation, @Nullable String artifac
String invocationId =
String.format("%s_%d", preparation.id(), ThreadLocalRandom.current().nextInt());
// TODO: handle empty struct intelligently

LOG.trace("Parsing pipeline options");
// PipelineOptions options = PipelineOptionsTranslation.fromProto(preparation.options());
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(FlinkRunner.class);

LOG.trace("Translating pipeline from proto");
Pipeline pipeline = PipelineTranslation.fromProto(preparation.pipeline());
// TODO(axelmagn): remove this hack once python pipeline is working
RunnerApi.Pipeline origPipeline = preparation.pipeline();
RunnerApi.Environment hackEnv =
RunnerApi.Environment
.newBuilder()
.setUrl("gcr.io/google.com/hadoop-cloud-dev/beam/python")
.build();
RunnerApi.Components hackComponents =
RunnerApi.Components
.newBuilder(origPipeline.getComponents())
.putEnvironments("", hackEnv)
.build();
RunnerApi.Pipeline hackPipeline =
RunnerApi.Pipeline
.newBuilder(origPipeline)
.setComponents(hackComponents)
.build();
Pipeline pipeline = PipelineTranslation.fromProto(hackPipeline);

LOG.trace("Creating flink runner");
FlinkRunner runner = FlinkRunner.fromOptions(options);
Expand Down

0 comments on commit da298f5

Please sign in to comment.