Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pane info in BigQuery load job id #28272

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -612,9 +612,6 @@ task googleCloudPlatformRunnerV2IntegrationTest(type: Test) {
exclude '**/FhirIOLROIT.class'
exclude '**/FhirIOSearchIT.class'
exclude '**/FhirIOPatientEverythingIT.class'
// failing due to pane index not incrementing after Reshuffle:
// https://github.com/apache/beam/issues/28219
exclude '**/FileLoadsStreamingIT.class'

maxParallelForks 4
classpath = configurations.googleCloudPlatformIntegrationTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1755,7 +1755,7 @@ void maybeRecordPCollectionWithAutoSharding(PCollection<?> pcol) {
options.isEnableStreamingEngine(),
"Runner determined sharding not available in Dataflow for GroupIntoBatches for"
+ " non-Streaming-Engine jobs. In order to use runner determined sharding, please use"
+ " --streaming --enable_streaming_engine");
+ " --streaming --experiments=enable_streaming_engine");
pCollectionsPreservedKeys.add(pcol);
pcollectionsRequiringAutoSharding.add(pcol);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
*/
package org.apache.beam.sdk.transforms;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand All @@ -34,28 +38,58 @@
*/
public class PeriodicImpulse extends PTransform<PBegin, PCollection<Instant>> {

Instant startTimestamp = Instant.now();
Instant stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
Duration fireInterval = Duration.standardMinutes(1);
Instant startTimestamp;
Instant stopTimestamp;
@Nullable Duration stopDuration;
Duration fireInterval;
boolean applyWindowing = false;
boolean catchUpToNow = true;

private PeriodicImpulse() {}
private PeriodicImpulse() {
this.startTimestamp = Instant.now();
this.stopTimestamp = BoundedWindow.TIMESTAMP_MAX_VALUE;
this.fireInterval = Duration.standardMinutes(1);
}

public static PeriodicImpulse create() {
return new PeriodicImpulse();
}

/**
* Assign a timestamp when the pipeliene starts to produce data.
*
* <p>Cannot be used along with {@link #stopAfter}.
*/
public PeriodicImpulse startAt(Instant startTime) {
checkArgument(stopDuration == null, "startAt and stopAfter cannot be set at the same time");
this.startTimestamp = startTime;
return this;
}

/**
* Assign a timestamp when the pipeliene stops producing data.
*
* <p>Cannot be used along with {@link #stopAfter}.
*/
public PeriodicImpulse stopAt(Instant stopTime) {
checkArgument(stopDuration == null, "stopAt and stopAfter cannot be set at the same time");
this.stopTimestamp = stopTime;
return this;
}

/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Assign a time interval at which the pipeliene produces data. This is different from setting
* {@link #startAt} and {@link #stopAt}, as the first timestamp is determined at run time
* (pipeline starts processing).
*/
@Internal
public PeriodicImpulse stopAfter(Duration duration) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark with @Internal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P.S. also add to catchUpToNow()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

this.stopDuration = duration;
return this;
}

public PeriodicImpulse withInterval(Duration interval) {
this.fireInterval = interval;
return this;
Expand All @@ -67,31 +101,65 @@ public PeriodicImpulse applyWindowing() {
}

/**
* The default behavior is that PeriodicImpulse emits all instants until Instant.now(), then
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>The default behavior is that PeriodicImpulse emits all instants until Instant.now(), then
* starts firing at the specified interval. If this is set to false, the PeriodicImpulse will
* perform the interval wait before firing each instant.
*/
@Internal
public PeriodicImpulse catchUpToNow(boolean catchUpToNow) {
this.catchUpToNow = catchUpToNow;
return this;
}

@Override
public PCollection<Instant> expand(PBegin input) {
PCollection<Instant> result =
input
.apply(
Create.<PeriodicSequence.SequenceDefinition>of(
new PeriodicSequence.SequenceDefinition(
startTimestamp, stopTimestamp, fireInterval, catchUpToNow)))
.apply(PeriodicSequence.create());
PCollection<PeriodicSequence.SequenceDefinition> seqDef;
if (stopDuration != null) {
// nonnull guaranteed
Duration d = stopDuration;
seqDef =
input
.apply(Impulse.create())
.apply(ParDo.of(new RuntimeSequenceFn(d, fireInterval, catchUpToNow)));
} else {
seqDef =
input.apply(
Create.of(
new PeriodicSequence.SequenceDefinition(
startTimestamp, stopTimestamp, fireInterval, catchUpToNow)));
}
PCollection<Instant> result = seqDef.apply(PeriodicSequence.create());

if (this.applyWindowing) {
result =
result.apply(
Window.<Instant>into(FixedWindows.of(Duration.millis(fireInterval.getMillis()))));
result.apply(Window.into(FixedWindows.of(Duration.millis(fireInterval.getMillis()))));
}

return result;
}

/**
* A DoFn generated a SequenceDefinition at run time. This enables set first element timestamp at
* pipeline start processing data.
*/
private static class RuntimeSequenceFn extends DoFn<byte[], PeriodicSequence.SequenceDefinition> {
Duration stopDuration;
Duration fireInterval;
boolean catchUpToNow;

RuntimeSequenceFn(Duration stopDuration, Duration fireInterval, boolean catchUpToNow) {
this.stopDuration = stopDuration;
this.fireInterval = fireInterval;
this.catchUpToNow = catchUpToNow;
}

@ProcessElement
public void process(ProcessContext c) {
Instant now = Instant.now();
c.output(
new PeriodicSequence.SequenceDefinition(
now, now.plus(stopDuration), fireInterval, catchUpToNow));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.util.Objects;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
Expand Down Expand Up @@ -67,6 +68,8 @@ public SequenceDefinition(Instant first, Instant last, Duration duration) {
this.catchUpToNow = true;
}

/** <b><i>catchUpToNow is experimental; no backwards-compatibility guarantees.</i></b> */
@Internal
public SequenceDefinition(
Instant first, Instant last, Duration duration, boolean catchUpToNow) {
this.first = first;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
Expand All @@ -48,25 +49,31 @@ class WritePartition<DestinationT>
@AutoValue
abstract static class Result {
public abstract List<String> getFilenames();

// Downstream operations may rely on pane info which will get lost after a ReShuffle
abstract Boolean isFirstPane();

abstract Long getPaneIndex();
}

static class ResultCoder extends AtomicCoder<Result> {
private static final Coder<List<String>> FILENAMES_CODER = ListCoder.of(StringUtf8Coder.of());
private static final Coder<Boolean> FIRST_PANE_CODER = BooleanCoder.of();
private static final Coder<Long> PANE_INDEX_CODER = VarLongCoder.of();
static final ResultCoder INSTANCE = new ResultCoder();

@Override
public void encode(Result value, OutputStream outStream) throws IOException {
FILENAMES_CODER.encode(value.getFilenames(), outStream);
FIRST_PANE_CODER.encode(value.isFirstPane(), outStream);
PANE_INDEX_CODER.encode(value.getPaneIndex(), outStream);
}

@Override
public Result decode(InputStream inStream) throws IOException {
return new AutoValue_WritePartition_Result(
FILENAMES_CODER.decode(inStream), FIRST_PANE_CODER.decode(inStream));
FILENAMES_CODER.decode(inStream),
FIRST_PANE_CODER.decode(inStream),
PANE_INDEX_CODER.decode(inStream));
}
}

Expand Down Expand Up @@ -234,7 +241,7 @@ public void processElement(ProcessContext c) throws Exception {
KV.of(
ShardedKey.of(destination, i + 1),
new AutoValue_WritePartition_Result(
partitionData.getFilenames(), c.pane().isFirst())));
partitionData.getFilenames(), c.pane().isFirst(), c.pane().getIndex())));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,20 @@ private PendingJobData startWriteRename(
BigQueryResourceNaming.createJobIdWithDestination(
c.sideInput(jobIdToken), finalTableDestination, -1, c.pane().getIndex());

if (isFirstPane) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is another code path relying on pane info. Checked that upstream transform has a GBK and no ReShuffle in between: https://github.com/apache/beam/blob/0ed4c78a799cf5a6cc6a0b40b23ca498096769c5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L404C5-L404C5

so this one should be fine. Nevertheless it's good to add logging to monitor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR but we're actually seeing a bug due to relying on the pane index in this code path: #28309

LOG.info(
"Setup write disposition {}, create disposition {} for first pane BigQuery job {}",
writeDisposition,
createDisposition,
jobIdPrefix);
} else {
LOG.debug(
"Setup write disposition {}, create disposition {} for BigQuery job {}",
writeDisposition,
createDisposition,
jobIdPrefix);
}

BigQueryHelpers.PendingJob retryJob =
startCopy(
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.stream.Collectors;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
Expand Down Expand Up @@ -75,10 +74,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -101,25 +98,21 @@ class WriteTables<DestinationT extends @NonNull Object>
@AutoValue
abstract static class Result {
abstract String getTableName();

// Downstream operations may rely on pane info which will get lost after a ReShuffle
abstract Boolean isFirstPane();
}

static class ResultCoder extends AtomicCoder<WriteTables.Result> {
static final ResultCoder INSTANCE = new ResultCoder();

@Override
public void encode(Result value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream)
throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
@Initialized IOException {
public void encode(Result value, OutputStream outStream) throws IOException {
StringUtf8Coder.of().encode(value.getTableName(), outStream);
BooleanCoder.of().encode(value.isFirstPane(), outStream);
}

@Override
public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream)
throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull
@Initialized IOException {
public Result decode(InputStream inStream) throws IOException {
return new AutoValue_WriteTables_Result(
StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream));
}
Expand Down Expand Up @@ -156,27 +149,36 @@ private class WriteTablesDoFn
private class PendingJobData {
final BoundedWindow window;
final BigQueryHelpers.PendingJob retryJob;
final List<String> partitionFiles;
final WritePartition.Result partitionResult;
final TableDestination tableDestination;
final TableReference tableReference;
final DestinationT destinationT;
final boolean isFirstPane;

public PendingJobData(
BoundedWindow window,
BigQueryHelpers.PendingJob retryJob,
List<String> partitionFiles,
WritePartition.Result partitionResult,
TableDestination tableDestination,
TableReference tableReference,
DestinationT destinationT,
boolean isFirstPane) {
DestinationT destinationT) {
this.window = window;
this.retryJob = retryJob;
this.partitionFiles = partitionFiles;
this.partitionResult = partitionResult;
this.tableDestination = tableDestination;
this.tableReference = tableReference;
this.destinationT = destinationT;
this.isFirstPane = isFirstPane;
}

public List<String> paritionFiles() {
return partitionResult.getFilenames();
}

public boolean isFirstPane() {
return partitionResult.isFirstPane();
}

public long paneIndex() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am going to preserve the change to PendingJobData, as the pane index info would be helpful to resolve possible racing condition of CREATE_TRUNCATE pending jobs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean for debugging purposes? Will note that paneIndex() is still not being called anywhere

return partitionResult.getPaneIndex();
}
}
// All pending load jobs.
Expand Down Expand Up @@ -251,7 +253,10 @@ public void processElement(
List<String> partitionFiles = Lists.newArrayList(element.getValue().getFilenames());
String jobIdPrefix =
BigQueryResourceNaming.createJobIdWithDestination(
c.sideInput(loadJobIdPrefixView), tableDestination, partition, c.pane().getIndex());
c.sideInput(loadJobIdPrefixView),
tableDestination,
partition,
element.getValue().getPaneIndex());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that if we have this:

GBK -> processing -> reshuffle -> this DoFn

Can you tell me which case we are in?

  1. The processing left the key the same so the reshuffle is just for checkpoint
  2. The processing changed the key so that the current element key + pane index is no longer unique

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well it's a little bit more subtle. It is the same key, but became ShardedKey after processing

After GBK, key is DestinationT
After processing, key is ShardedKey (shard is added in WritePartitions)


if (tempTable) {
if (tempDataset != null) {
Expand Down Expand Up @@ -291,13 +296,7 @@ public void processElement(

pendingJobs.add(
new PendingJobData(
window,
retryJob,
partitionFiles,
tableDestination,
tableReference,
destination,
element.getValue().isFirstPane()));
window, retryJob, element.getValue(), tableDestination, tableReference, destination));
}

@Teardown
Expand Down Expand Up @@ -361,13 +360,13 @@ public void finishBundle(FinishBundleContext c) throws Exception {
Result result =
new AutoValue_WriteTables_Result(
BigQueryHelpers.toJsonString(pendingJob.tableReference),
pendingJob.isFirstPane);
pendingJob.isFirstPane());
c.output(
mainOutputTag,
KV.of(pendingJob.destinationT, result),
pendingJob.window.maxTimestamp(),
pendingJob.window);
for (String file : pendingJob.partitionFiles) {
for (String file : pendingJob.paritionFiles()) {
c.output(
temporaryFilesTag,
file,
Expand Down
Loading