Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-11648] Resubmit Storage API sink with broken test removed. #14309

Merged
merged 2 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ class BeamModulePlugin implements Plugin<Project> {
def classgraph_version = "4.8.65"
def errorprone_version = "2.3.4"
def google_clients_version = "1.31.0"
def google_auth_version = "0.19.0"
def google_cloud_bigdataoss_version = "2.1.6"
def google_cloud_pubsublite_version = "0.7.0"
def google_code_gson_version = "2.8.6"
Expand Down Expand Up @@ -518,7 +519,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_auth_library_credentials : "com.google.auth:google-auth-library-credentials", // google_cloud_platform_libraries_bom sets version
google_auth_library_oauth2_http : "com.google.auth:google-auth-library-oauth2-http", // google_cloud_platform_libraries_bom sets version
google_cloud_bigquery : "com.google.cloud:google-cloud-bigquery", // google_cloud_platform_libraries_bom sets version
google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage:1.8.5",
google_cloud_bigquery_storage : "com.google.cloud:google-cloud-bigquerystorage:1.12.0",
google_cloud_bigtable_client_core : "com.google.cloud.bigtable:bigtable-client-core:1.16.0",
google_cloud_bigtable_emulator : "com.google.cloud:google-cloud-bigtable-emulator:0.125.2",
google_cloud_core : "com.google.cloud:google-cloud-core", // google_cloud_platform_libraries_bom sets version
Expand Down Expand Up @@ -602,8 +603,8 @@ class BeamModulePlugin implements Plugin<Project> {
protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version",
protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version",
proto_google_cloud_bigquery_storage_v1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_bigquerybeta2_storage_v1 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_bigtable_admin_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-admin-v2", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_bigquery_storage_v1beta2 : "com.google.api.grpc:proto-google-cloud-bigquerystorage-v1beta2", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_bigtable_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-v2", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_datacatalog_v1beta1 : "com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1", // google_cloud_platform_libraries_bom sets version
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,7 @@
<Match>
<Package name="org.apache.beam.sdk.io.clickhouse.impl.parser"/>
</Match>
<Match>
<Package name="org.apache.beam.sdk.io.gcp.bigquery.protos"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class GcpCredentialFactory implements CredentialFactory {
"https://www.googleapis.com/auth/devstorage.full_control",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/datastore",
"https://www.googleapis.com/auth/bigquery",
"https://www.googleapis.com/auth/bigquery.insertdata",
"https://www.googleapis.com/auth/pubsub");

private static final GcpCredentialFactory INSTANCE = new GcpCredentialFactory();
Expand Down
3 changes: 1 addition & 2 deletions sdks/java/io/google-cloud-platform/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ dependencies {
compile library.java.netty_tcnative_boringssl_static
permitUnusedDeclared library.java.netty_tcnative_boringssl_static // BEAM-11761
compile library.java.proto_google_cloud_bigquery_storage_v1
compile library.java.proto_google_cloud_bigquerybeta2_storage_v1
permitUnusedDeclared library.java.proto_google_cloud_bigquerybeta2_storage_v1 // BEAM-11761
compile library.java.proto_google_cloud_bigquery_storage_v1beta2
compile library.java.proto_google_cloud_bigtable_admin_v2
compile library.java.proto_google_cloud_bigtable_v2
compile library.java.proto_google_cloud_datastore_v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,13 @@ private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> inpu
tempTables
// Now that the load job has happened, we want the rename to happen immediately.
.apply(
"Window Into Global Windows",
Window.<KV<TableDestination, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
.apply(WithKeys.of((Void) null))
.apply("Add Void Key", WithKeys.of((Void) null))
.setCoder(KvCoder.of(VoidCoder.of(), tempTables.getCoder()))
.apply(GroupByKey.create())
.apply(Values.create())
.apply("GroupByKey", GroupByKey.create())
.apply("Extract Values", Values.create())
.apply(
"WriteRenameTriggered",
ParDo.of(
Expand Down Expand Up @@ -464,7 +465,7 @@ public void process(ProcessContext c) {
// Generate the temporary-file prefix.
private PCollectionView<String> createTempFilePrefixView(
Pipeline p, final PCollectionView<String> jobIdView) {
return p.apply(Create.of(""))
return p.apply("Create dummy value", Create.of(""))
.apply(
"GetTempFilePrefix",
ParDo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1664,6 +1664,7 @@ public static <T> Write<T> write() {
.setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
.setSchemaUpdateOptions(Collections.emptySet())
.setNumFileShards(0)
.setNumStorageWriteApiStreams(0)
.setMethod(Write.Method.DEFAULT)
.setExtendedErrorInfo(false)
.setSkipInvalidRows(false)
Expand Down Expand Up @@ -1724,7 +1725,9 @@ public enum Method {
* href="https://cloud.google.com/bigquery/streaming-data-into-bigquery">Streaming Data into
* BigQuery</a>.
*/
STREAMING_INSERTS
STREAMING_INSERTS,
/** Use the new, experimental Storage Write API. */
STORAGE_WRITE_API
}

abstract @Nullable ValueProvider<String> getJsonTableRef();
Expand Down Expand Up @@ -1771,6 +1774,8 @@ public enum Method {

abstract int getNumFileShards();

abstract int getNumStorageWriteApiStreams();

abstract int getMaxFilesPerPartition();

abstract long getMaxBytesPerPartition();
Expand Down Expand Up @@ -1853,6 +1858,8 @@ abstract Builder<T> setAvroSchemaFactory(

abstract Builder<T> setNumFileShards(int numFileShards);

abstract Builder<T> setNumStorageWriteApiStreams(int numStorageApiStreams);

abstract Builder<T> setMaxFilesPerPartition(int maxFilesPerPartition);

abstract Builder<T> setMaxBytesPerPartition(long maxBytesPerPartition);
Expand Down Expand Up @@ -2285,6 +2292,19 @@ public Write<T> withNumFileShards(int numFileShards) {
return toBuilder().setNumFileShards(numFileShards).build();
}

/**
* Control how many parallel streams are used when using Storage API writes. Applicable only
* when also setting {@link #withTriggeringFrequency}. To let runner determine the sharding at
* runtime, set {@link #withAutoSharding()} instead.
*/
public Write<T> withNumStorageWriteApiStreams(int numStorageWriteApiStreams) {
checkArgument(
numStorageWriteApiStreams > 0,
"numStorageWriteApiStreams must be > 0, but was: %s",
numStorageWriteApiStreams);
return toBuilder().setNumStorageWriteApiStreams(numStorageWriteApiStreams).build();
}

/**
* Provides a custom location on GCS for storing temporary files to be loaded via BigQuery batch
* load jobs. See "Usage with templates" in {@link BigQueryIO} documentation for discussion.
Expand Down Expand Up @@ -2455,13 +2475,33 @@ private Method resolveMethod(PCollection<T> input) {
if (getMethod() != Method.DEFAULT) {
return getMethod();
}
if (input.getPipeline().getOptions().as(BigQueryOptions.class).getUseStorageWriteApi()) {
return Method.STORAGE_WRITE_API;
}
// By default, when writing an Unbounded PCollection, we use StreamingInserts and
// BigQuery's streaming import API.
return (input.isBounded() == IsBounded.UNBOUNDED)
? Method.STREAMING_INSERTS
: Method.FILE_LOADS;
}

private Duration getStorageApiTriggeringFrequency(BigQueryOptions options) {
if (getTriggeringFrequency() != null) {
return getTriggeringFrequency();
}
if (options.getStorageWriteApiTriggeringFrequencySec() != null) {
return Duration.standardSeconds(options.getStorageWriteApiTriggeringFrequencySec());
}
return null;
}

private int getStorageApiNumStreams(BigQueryOptions options) {
if (getNumStorageWriteApiStreams() != 0) {
return getNumStorageWriteApiStreams();
}
return options.getNumStorageWriteApiStreams();
}

@Override
public WriteResult expand(PCollection<T> input) {
// We must have a destination to write to!
Expand Down Expand Up @@ -2492,16 +2532,22 @@ public WriteResult expand(PCollection<T> input) {
"No more than one of jsonSchema, schemaFromView, or dynamicDestinations may be set");

Method method = resolveMethod(input);
if (input.isBounded() == IsBounded.UNBOUNDED && method == Method.FILE_LOADS) {
if (input.isBounded() == IsBounded.UNBOUNDED
&& (method == Method.FILE_LOADS || method == Method.STORAGE_WRITE_API)) {
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
Duration triggeringFrequency =
(method == Method.STORAGE_WRITE_API)
? getStorageApiTriggeringFrequency(bqOptions)
: getTriggeringFrequency();
checkArgument(
getTriggeringFrequency() != null,
"When writing an unbounded PCollection via FILE_LOADS, "
triggeringFrequency != null,
"When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, "
+ "triggering frequency must be specified");
} else {
checkArgument(
getTriggeringFrequency() == null && getNumFileShards() == 0,
"Triggering frequency or number of file shards can be specified only when writing "
+ "an unbounded PCollection via FILE_LOADS, but: the collection was %s "
+ "an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, but: the collection was %s "
+ "and the method was %s",
input.isBounded(),
method);
Expand All @@ -2519,6 +2565,9 @@ public WriteResult expand(PCollection<T> input) {
if (input.isBounded() == IsBounded.BOUNDED) {
checkArgument(!getAutoSharding(), "Auto-sharding is only applicable to unbounded input.");
}
if (method == Method.STORAGE_WRITE_API) {
checkArgument(!getAutoSharding(), "Auto sharding not yet available for Storage API writes");
}

if (getJsonTimePartitioning() != null) {
checkArgument(
Expand Down Expand Up @@ -2613,7 +2662,7 @@ private <DestinationT> WriteResult expandTyped(
"CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
}

Coder<DestinationT> destinationCoder = null;
Coder<DestinationT> destinationCoder;
try {
destinationCoder =
dynamicDestinations.getDestinationCoderWithDefault(
Expand Down Expand Up @@ -2664,27 +2713,33 @@ private <DestinationT> WriteResult expandTyped(
rowWriterFactory =
RowWriterFactory.tableRows(formatFunction, formatRecordOnFailureFunction);
}

PCollection<KV<DestinationT, T>> rowsWithDestination =
input
.apply(
"PrepareWrite",
new PrepareWrite<>(dynamicDestinations, SerializableFunctions.identity()))
.setCoder(KvCoder.of(destinationCoder, input.getCoder()));

return continueExpandTyped(
rowsWithDestination,
input.getCoder(),
getUseBeamSchema() ? input.getSchema() : null,
getUseBeamSchema() ? input.getToRowFunction() : null,
destinationCoder,
dynamicDestinations,
rowWriterFactory,
method);
}

private <DestinationT, ElementT> WriteResult continueExpandTyped(
PCollection<KV<DestinationT, ElementT>> input,
Coder<ElementT> elementCoder,
private <DestinationT> WriteResult continueExpandTyped(
PCollection<KV<DestinationT, T>> input,
Coder<T> elementCoder,
@Nullable Schema elementSchema,
@Nullable SerializableFunction<T, Row> elementToRowFunction,
Coder<DestinationT> destinationCoder,
DynamicDestinations<T, DestinationT> dynamicDestinations,
RowWriterFactory<ElementT, DestinationT> rowWriterFactory,
RowWriterFactory<T, DestinationT> rowWriterFactory,
Method method) {
if (method == Method.STREAMING_INSERTS) {
checkArgument(
Expand All @@ -2700,10 +2755,9 @@ private <DestinationT, ElementT> WriteResult continueExpandTyped(
getSchemaUpdateOptions() == null || getSchemaUpdateOptions().isEmpty(),
"SchemaUpdateOptions are not supported when method == STREAMING_INSERTS");

RowWriterFactory.TableRowWriterFactory<ElementT, DestinationT> tableRowWriterFactory =
(RowWriterFactory.TableRowWriterFactory<ElementT, DestinationT>) rowWriterFactory;

StreamingInserts<DestinationT, ElementT> streamingInserts =
RowWriterFactory.TableRowWriterFactory<T, DestinationT> tableRowWriterFactory =
(RowWriterFactory.TableRowWriterFactory<T, DestinationT>) rowWriterFactory;
StreamingInserts<DestinationT, T> streamingInserts =
new StreamingInserts<>(
getCreateDisposition(),
dynamicDestinations,
Expand All @@ -2719,7 +2773,7 @@ private <DestinationT, ElementT> WriteResult continueExpandTyped(
.withAutoSharding(getAutoSharding())
.withKmsKey(getKmsKey());
return input.apply(streamingInserts);
} else {
} else if (method == Method.FILE_LOADS) {
checkArgument(
getFailedInsertRetryPolicy() == null,
"Record-insert retry policies are not supported when using BigQuery load jobs.");
Expand All @@ -2730,7 +2784,7 @@ private <DestinationT, ElementT> WriteResult continueExpandTyped(
"useAvroLogicalTypes can only be set with Avro output.");
}

BatchLoads<DestinationT, ElementT> batchLoads =
BatchLoads<DestinationT, T> batchLoads =
new BatchLoads<>(
getWriteDisposition(),
getCreateDisposition(),
Expand Down Expand Up @@ -2770,6 +2824,38 @@ private <DestinationT, ElementT> WriteResult continueExpandTyped(
batchLoads.setNumFileShards(getNumFileShards());
}
return input.apply(batchLoads);
} else if (method == Method.STORAGE_WRITE_API) {
StorageApiDynamicDestinations<T, DestinationT> storageApiDynamicDestinations;
if (getUseBeamSchema()) {
// This ensures that the Beam rows are directly translated into protos for Sorage API
// writes, with no
// need to round trip through JSON TableRow objects.
storageApiDynamicDestinations =
new StorageApiDynamicDestinationsBeamRow<T, DestinationT>(
dynamicDestinations, elementSchema, elementToRowFunction);
} else {
RowWriterFactory.TableRowWriterFactory<T, DestinationT> tableRowWriterFactory =
(RowWriterFactory.TableRowWriterFactory<T, DestinationT>) rowWriterFactory;
// Fallback behavior: convert to JSON TableRows and convert those into Beam TableRows.
storageApiDynamicDestinations =
new StorageApiDynamicDestinationsTableRow<>(
dynamicDestinations, tableRowWriterFactory.getToRowFn());
}

BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
StorageApiLoads<DestinationT, T> storageApiLoads =
new StorageApiLoads<DestinationT, T>(
destinationCoder,
elementCoder,
storageApiDynamicDestinations,
getCreateDisposition(),
getKmsKey(),
getStorageApiTriggeringFrequency(bqOptions),
getBigQueryServices(),
getStorageApiNumStreams(bqOptions));
return input.apply("StorageApiLoads", storageApiLoads);
} else {
throw new RuntimeException("Unexpected write method " + method);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,23 @@ public interface BigQueryOptions
Integer getBqStreamingApiLoggingFrequencySec();

void setBqStreamingApiLoggingFrequencySec(Integer value);

@Description("If set, then BigQueryIO.Write will default to using the Storage Write API.")
@Default.Boolean(false)
Boolean getUseStorageWriteApi();

void setUseStorageWriteApi(Boolean value);

@Description(
"If set, then BigQueryIO.Write will default to using this number of Storage Write API streams.")
@Default.Integer(0)
Integer getNumStorageWriteApiStreams();

void setNumStorageWriteApiStreams(Integer value);

@Description(
"If set, then BigQueryIO.Write will default to triggering the Storage Write API writes this often.")
Integer getStorageWriteApiTriggeringFrequencySec();

void setStorageWriteApiTriggeringFrequencySec(Integer value);
}
Loading