diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java index 6570b7fe81b2..1550a25b7c8f 100644 --- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java +++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/SqlTransformRunner.java @@ -39,12 +39,16 @@ import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTable; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.parquet.ParquetIO; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher; import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.Row; @@ -81,6 +85,9 @@ public class SqlTransformRunner { private static final Logger LOG = LoggerFactory.getLogger(SqlTransformRunner.class); + static final String METRICS_NAMESPACE = "tpcds"; + static final String OUTPUT_COUNTER = "output_rows"; + /** This class is used to extract all SQL query identifiers. */ static class SqlIdentifierVisitor extends SqlBasicVisitor { private final Set identifiers = new HashSet<>(); @@ -305,6 +312,7 @@ public static void runUsingSqlTransform(String[] args) throws Exception { tables .apply(SqlTransform.query(queryString)) .apply(MapElements.into(TypeDescriptors.strings()).via(Row::toString)) + .apply(ParDo.of(new CounterDoFn())) .apply( TextIO.write() .to( @@ -396,4 +404,14 @@ private static InfluxDBSettings getInfluxSettings(final TpcdsOptions options) { .withRetentionPolicy(options.getInfluxRetentionPolicy()) .get(); } + + private static class CounterDoFn extends DoFn { + private final Counter counter = Metrics.counter(METRICS_NAMESPACE, OUTPUT_COUNTER); + + @ProcessElement + public void processElement(ProcessContext context) { + counter.inc(); + context.output(context.element()); + } + } } diff --git a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java index b6235db1c123..700de369b609 100644 --- a/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java +++ b/sdks/java/testing/tpcds/src/main/java/org/apache/beam/sdk/tpcds/TpcdsRun.java @@ -17,10 +17,17 @@ */ package org.apache.beam.sdk.tpcds; +import static org.apache.beam.sdk.tpcds.SqlTransformRunner.METRICS_NAMESPACE; +import static org.apache.beam.sdk.tpcds.SqlTransformRunner.OUTPUT_COUNTER; + import java.util.concurrent.Callable; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +55,35 @@ public TpcdsRunResult call() { // Make sure to set the job status to be successful only when pipelineResult's final state is // DONE. boolean isSuccessful = state == State.DONE; + + // Check a number of output records - it MUST be greater than 0. + if (isSuccessful) { + long outputRecords = 0; + MetricQueryResults metrics = + pipelineResult + .metrics() + .queryMetrics( + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.named(METRICS_NAMESPACE, OUTPUT_COUNTER)) + .build()); + if (metrics.getCounters().iterator().hasNext()) { + // Despite it's iterable, it should contain only one entry + MetricResult metricResult = metrics.getCounters().iterator().next(); + if (metricResult.getAttempted() != null && metricResult.getAttempted() > 0) { + outputRecords = metricResult.getAttempted(); + } + } + + // It's expected a "greater than zero" number of output records for successful jobs. + if (outputRecords <= 0) { + LOG.warn( + "Output records counter for job \"{}\" is {}", + pipeline.getOptions().getJobName(), + outputRecords); + isSuccessful = false; + } + } + tpcdsRunResult = new TpcdsRunResult( isSuccessful, startTimeStamp, endTimeStamp, pipeline.getOptions(), pipelineResult);