-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix pane info in BigQuery load job id #28272
Changes from all commits
6c6ddb2
6aadd9f
2a131a8
d1dd0d3
43f014c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -208,6 +208,20 @@ private PendingJobData startWriteRename( | |
BigQueryResourceNaming.createJobIdWithDestination( | ||
c.sideInput(jobIdToken), finalTableDestination, -1, c.pane().getIndex()); | ||
|
||
if (isFirstPane) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is another code path relying on pane info. Checked that upstream transform has a GBK and no ReShuffle in between: https://github.com/apache/beam/blob/0ed4c78a799cf5a6cc6a0b40b23ca498096769c5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L404C5-L404C5 so this one should be fine. Nevertheless it's good to add logging to monitor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unrelated to this PR but we're actually seeing a bug due to relying on the pane index in this code path: #28309 |
||
LOG.info( | ||
"Setup write disposition {}, create disposition {} for first pane BigQuery job {}", | ||
writeDisposition, | ||
createDisposition, | ||
jobIdPrefix); | ||
} else { | ||
LOG.debug( | ||
"Setup write disposition {}, create disposition {} for BigQuery job {}", | ||
writeDisposition, | ||
createDisposition, | ||
jobIdPrefix); | ||
} | ||
|
||
BigQueryHelpers.PendingJob retryJob = | ||
startCopy( | ||
bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,7 +36,6 @@ | |
import java.util.stream.Collectors; | ||
import org.apache.beam.sdk.coders.AtomicCoder; | ||
import org.apache.beam.sdk.coders.BooleanCoder; | ||
import org.apache.beam.sdk.coders.CoderException; | ||
import org.apache.beam.sdk.coders.KvCoder; | ||
import org.apache.beam.sdk.coders.StringUtf8Coder; | ||
import org.apache.beam.sdk.coders.VoidCoder; | ||
|
@@ -75,10 +74,8 @@ | |
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; | ||
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; | ||
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; | ||
import org.checkerframework.checker.initialization.qual.Initialized; | ||
import org.checkerframework.checker.nullness.qual.NonNull; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
import org.checkerframework.checker.nullness.qual.UnknownKeyFor; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
|
@@ -101,25 +98,21 @@ class WriteTables<DestinationT extends @NonNull Object> | |
@AutoValue | ||
abstract static class Result { | ||
abstract String getTableName(); | ||
|
||
// Downstream operations may rely on pane info which will get lost after a ReShuffle | ||
abstract Boolean isFirstPane(); | ||
} | ||
|
||
static class ResultCoder extends AtomicCoder<WriteTables.Result> { | ||
static final ResultCoder INSTANCE = new ResultCoder(); | ||
|
||
@Override | ||
public void encode(Result value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) | ||
throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull | ||
@Initialized IOException { | ||
public void encode(Result value, OutputStream outStream) throws IOException { | ||
StringUtf8Coder.of().encode(value.getTableName(), outStream); | ||
BooleanCoder.of().encode(value.isFirstPane(), outStream); | ||
} | ||
|
||
@Override | ||
public Result decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) | ||
throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull | ||
@Initialized IOException { | ||
public Result decode(InputStream inStream) throws IOException { | ||
return new AutoValue_WriteTables_Result( | ||
StringUtf8Coder.of().decode(inStream), BooleanCoder.of().decode(inStream)); | ||
} | ||
|
@@ -156,27 +149,36 @@ private class WriteTablesDoFn | |
private class PendingJobData { | ||
final BoundedWindow window; | ||
final BigQueryHelpers.PendingJob retryJob; | ||
final List<String> partitionFiles; | ||
final WritePartition.Result partitionResult; | ||
final TableDestination tableDestination; | ||
final TableReference tableReference; | ||
final DestinationT destinationT; | ||
final boolean isFirstPane; | ||
|
||
public PendingJobData( | ||
BoundedWindow window, | ||
BigQueryHelpers.PendingJob retryJob, | ||
List<String> partitionFiles, | ||
WritePartition.Result partitionResult, | ||
TableDestination tableDestination, | ||
TableReference tableReference, | ||
DestinationT destinationT, | ||
boolean isFirstPane) { | ||
DestinationT destinationT) { | ||
this.window = window; | ||
this.retryJob = retryJob; | ||
this.partitionFiles = partitionFiles; | ||
this.partitionResult = partitionResult; | ||
this.tableDestination = tableDestination; | ||
this.tableReference = tableReference; | ||
this.destinationT = destinationT; | ||
this.isFirstPane = isFirstPane; | ||
} | ||
|
||
public List<String> paritionFiles() { | ||
return partitionResult.getFilenames(); | ||
} | ||
|
||
public boolean isFirstPane() { | ||
return partitionResult.isFirstPane(); | ||
} | ||
|
||
public long paneIndex() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am going to preserve the change to PendingJobData, as the pane index info would be helpful to resolve possible racing condition of CREATE_TRUNCATE pending jobs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean for debugging purposes? Will note that |
||
return partitionResult.getPaneIndex(); | ||
} | ||
} | ||
// All pending load jobs. | ||
|
@@ -251,7 +253,10 @@ public void processElement( | |
List<String> partitionFiles = Lists.newArrayList(element.getValue().getFilenames()); | ||
String jobIdPrefix = | ||
BigQueryResourceNaming.createJobIdWithDestination( | ||
c.sideInput(loadJobIdPrefixView), tableDestination, partition, c.pane().getIndex()); | ||
c.sideInput(loadJobIdPrefixView), | ||
tableDestination, | ||
partition, | ||
element.getValue().getPaneIndex()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My concern is that if we have this:
Can you tell me which case we are in?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well it's a little bit more subtle. It is the same key, but became ShardedKey after After GBK, key is DestinationT |
||
|
||
if (tempTable) { | ||
if (tempDataset != null) { | ||
|
@@ -291,13 +296,7 @@ public void processElement( | |
|
||
pendingJobs.add( | ||
new PendingJobData( | ||
window, | ||
retryJob, | ||
partitionFiles, | ||
tableDestination, | ||
tableReference, | ||
destination, | ||
element.getValue().isFirstPane())); | ||
window, retryJob, element.getValue(), tableDestination, tableReference, destination)); | ||
} | ||
|
||
@Teardown | ||
|
@@ -361,13 +360,13 @@ public void finishBundle(FinishBundleContext c) throws Exception { | |
Result result = | ||
new AutoValue_WriteTables_Result( | ||
BigQueryHelpers.toJsonString(pendingJob.tableReference), | ||
pendingJob.isFirstPane); | ||
pendingJob.isFirstPane()); | ||
c.output( | ||
mainOutputTag, | ||
KV.of(pendingJob.destinationT, result), | ||
pendingJob.window.maxTimestamp(), | ||
pendingJob.window); | ||
for (String file : pendingJob.partitionFiles) { | ||
for (String file : pendingJob.paritionFiles()) { | ||
c.output( | ||
temporaryFilesTag, | ||
file, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mark with
@Internal
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P.S. also add to
catchUpToNow()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added