Skip to content

Commit

Permalink
Activate monitoring on NexmarkSparkRunner and on specific runners
Browse files Browse the repository at this point in the history
issue #28

Fix compilation issue after rebase + make checkstyle happy again
  • Loading branch information
echauchot authored and iemejia committed Aug 23, 2017
1 parent a095e40 commit a1fe33b
Show file tree
Hide file tree
Showing 11 changed files with 31 additions and 58 deletions.
32 changes: 16 additions & 16 deletions integration/java/nexmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -179,28 +179,28 @@
<artifactId>beam-runners-flink_2.10</artifactId>
</dependency>

<!--<dependency>-->
<!--<groupId>org.apache.flink</groupId>-->
<!--<artifactId>flink-shaded-hadoop2</artifactId>-->
<!--<version>${flink.version}</version>-->
<!--<scope>provided</scope>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<!-- Spark runner -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-core_2.10</artifactId>-->
<!--<version>${spark.version}</version>-->
<!--</dependency>-->
<!--<dependency>-->
<!--<groupId>org.apache.spark</groupId>-->
<!--<artifactId>spark-streaming_2.10</artifactId>-->
<!--<version>${spark.version}</version>-->
<!--</dependency>-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>

<!-- Apex runner -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.beam.integration.nexmark;

import javax.annotation.Nullable;
import org.apache.beam.runners.apex.ApexRunnerResult;
import org.apache.beam.sdk.PipelineResult;

/**
* Run a query using the Apex runner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@
*/
package org.apache.beam.integration.nexmark;

import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.PipelineResult;

/**
* Run a single query using the Direct Runner.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
*/
package org.apache.beam.integration.nexmark;

import javax.annotation.Nullable;
import org.apache.beam.runners.flink.FlinkRunnerResult;
import org.apache.beam.sdk.PipelineResult;

/**
* Run a query using the Flink runner.
*/
Expand All @@ -42,7 +38,7 @@ protected int maxNumWorkers() {

@Override
protected boolean canMonitor() {
return false;
return true;
}

@Override
Expand All @@ -56,12 +52,6 @@ protected void waitForPublisherPreload() {
throw new UnsupportedOperationException();
}

@Override
@Nullable
protected NexmarkPerf monitor(NexmarkQuery query) {
return null;
}

public NexmarkFlinkRunner(NexmarkFlinkDriver.NexmarkFlinkOptions options) {
super(options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
*/
package org.apache.beam.integration.nexmark;

import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected boolean canMonitor() {

@Override
protected String getJobId(PipelineResult job) {
return ((DataflowPipelineJob)job).getJobId();
return ((DataflowPipelineJob) job).getJobId();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.joda.time.Duration;

import static org.apache.beam.sdk.metrics.MetricUpdates.MetricUpdate;

/**
* Run a single Nexmark query using a given configuration.
*/
Expand Down Expand Up @@ -203,7 +201,8 @@ protected long getTimestamp(
* Find a 'steady state' events/sec from {@code snapshots} and
* store it in {@code perf} if found.
*/
protected void captureSteadyState(NexmarkPerf perf, List<NexmarkPerf.ProgressSnapshot> snapshots) {
protected void captureSteadyState(NexmarkPerf perf,
List<NexmarkPerf.ProgressSnapshot> snapshots) {
if (!options.isStreaming()) {
return;
}
Expand Down Expand Up @@ -365,7 +364,9 @@ private NexmarkPerf currentPerf(
return perf;
}

String getJobId(PipelineResult job){return "";}
String getJobId(PipelineResult job) {
return "";
}

// TODO specific to dataflow, see if we can find an equivalent
/*
Expand Down Expand Up @@ -926,8 +927,8 @@ private void sinkResultsToBigQuery(
new TableFieldSchema().setName("index").setType("INTEGER"),
new TableFieldSchema().setName("value").setType("STRING")))));
NexmarkUtils.console("Writing results to BigQuery table %s", tableSpec);
BigQueryIO.Write.Bound io =
BigQueryIO.Write.to(tableSpec)
BigQueryIO.Write io =
BigQueryIO.write().to(tableSpec)
.withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.integration.nexmark;

import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

/**
Expand All @@ -39,7 +38,8 @@ public static void main(String[] args) {
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(NexmarkSparkOptions.class);
options.setRunner(SparkRunner.class);
// options.setRunner(org.apache.beam.runners.spark.SparkRunner.class);
options.setRunner(org.apache.beam.runners.spark.SparkRunnerDebugger.class);
NexmarkSparkRunner runner = new NexmarkSparkRunner(options);
new NexmarkSparkDriver().runAll(options, runner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
*/
package org.apache.beam.integration.nexmark;

import javax.annotation.Nullable;
import org.apache.beam.runners.spark.SparkPipelineResult;
import org.apache.beam.sdk.PipelineResult;

/**
* Run a query using the Spark runner.
*/
Expand All @@ -42,7 +38,7 @@ protected int maxNumWorkers() {

@Override
protected boolean canMonitor() {
return false;
return true;
}

@Override
Expand All @@ -56,11 +52,6 @@ protected void waitForPublisherPreload() {
throw new UnsupportedOperationException();
}

@Override
@Nullable
protected NexmarkPerf monitor(NexmarkQuery query) {
return null;
}

public NexmarkSparkRunner(NexmarkSparkDriver.NexmarkSparkOptions options) {
super(options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
Expand Down Expand Up @@ -325,8 +324,8 @@ public static void console(String format, Object... args) {
* Setup pipeline with codes and some other options.
*/
public static void setupPipeline(CoderStrategy coderStrategy, Pipeline p) {
PipelineRunner<?> runner = p.getRunner();
//TODO Ismael check
// PipelineRunner<?> runner = p.getRunner();
// if (runner instanceof DirectRunner) {
// // Disable randomization of output since we want to check batch and streaming match the
// // model both locally and on the cloud.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ private PCollection<AuctionCount> applyTyped(PCollection<Event> events) {
// Count the number of bids per auction id.
.apply(Count.<Long>perElement())

// We'll want to keep all auctions with the maximal number of bids.
//TODO replace by simple key
// We'll want to keep all auctions with the maximal number of bids.
// Start by lifting each into a singleton list.
.apply(name + ".ToSingletons",
ParDo.of(new DoFn<KV<Long, Long>, KV<List<Long>, Long>>() {
Expand Down

0 comments on commit a1fe33b

Please sign in to comment.