Skip to content

Commit

Permalink
Merge pull request #29330: TPC-DS query processing should fail if out…
Browse files Browse the repository at this point in the history
…put is empty
  • Loading branch information
aromanenko-dev authored Nov 14, 2023
2 parents 7fabc12 + 570f41f commit 38db0e5
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@
import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTable;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.testutils.publishing.InfluxDBPublisher;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
Expand Down Expand Up @@ -81,6 +85,9 @@ public class SqlTransformRunner {

private static final Logger LOG = LoggerFactory.getLogger(SqlTransformRunner.class);

static final String METRICS_NAMESPACE = "tpcds";
static final String OUTPUT_COUNTER = "output_rows";

/** This class is used to extract all SQL query identifiers. */
static class SqlIdentifierVisitor extends SqlBasicVisitor<Void> {
private final Set<String> identifiers = new HashSet<>();
Expand Down Expand Up @@ -305,6 +312,7 @@ public static void runUsingSqlTransform(String[] args) throws Exception {
tables
.apply(SqlTransform.query(queryString))
.apply(MapElements.into(TypeDescriptors.strings()).via(Row::toString))
.apply(ParDo.of(new CounterDoFn()))
.apply(
TextIO.write()
.to(
Expand Down Expand Up @@ -396,4 +404,14 @@ private static InfluxDBSettings getInfluxSettings(final TpcdsOptions options) {
.withRetentionPolicy(options.getInfluxRetentionPolicy())
.get();
}

private static class CounterDoFn extends DoFn<String, String> {
private final Counter counter = Metrics.counter(METRICS_NAMESPACE, OUTPUT_COUNTER);

@ProcessElement
public void processElement(ProcessContext context) {
counter.inc();
context.output(context.element());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,17 @@
*/
package org.apache.beam.sdk.tpcds;

import static org.apache.beam.sdk.tpcds.SqlTransformRunner.METRICS_NAMESPACE;
import static org.apache.beam.sdk.tpcds.SqlTransformRunner.OUTPUT_COUNTER;

import java.util.concurrent.Callable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,6 +55,35 @@ public TpcdsRunResult call() {
// Make sure to set the job status to be successful only when pipelineResult's final state is
// DONE.
boolean isSuccessful = state == State.DONE;

// Check a number of output records - it MUST be greater than 0.
if (isSuccessful) {
long outputRecords = 0;
MetricQueryResults metrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(MetricNameFilter.named(METRICS_NAMESPACE, OUTPUT_COUNTER))
.build());
if (metrics.getCounters().iterator().hasNext()) {
// Despite it's iterable, it should contain only one entry
MetricResult<Long> metricResult = metrics.getCounters().iterator().next();
if (metricResult.getAttempted() != null && metricResult.getAttempted() > 0) {
outputRecords = metricResult.getAttempted();
}
}

// It's expected a "greater than zero" number of output records for successful jobs.
if (outputRecords <= 0) {
LOG.warn(
"Output records counter for job \"{}\" is {}",
pipeline.getOptions().getJobName(),
outputRecords);
isSuccessful = false;
}
}

tpcdsRunResult =
new TpcdsRunResult(
isSuccessful, startTimeStamp, endTimeStamp, pipeline.getOptions(), pipelineResult);
Expand Down

0 comments on commit 38db0e5

Please sign in to comment.