Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for gzip compression #36

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,17 @@ WITH (
...
'delivery-guarantee' = 'EXACTLY_ONCE',
...
```
```

## Compression
With https://github.com/googleapis/java-bigquerystorage/pull/2197 it's possible to enable compression over gRPC
In connector it could be done as
```
...
WITH (
...
'compression' = 'GZIP',
...
```
By default, there is no compression, so far only `GZIP` compression is available.
Please note that while it will reduce the network usage, it will increase CPU usage, thus please verify the trade off per use case.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<assertj.core.version>3.24.2</assertj.core.version>
<error_prone_core.version>2.20.0</error_prone_core.version>
<flink.version>1.16.2</flink.version>
<google-cloud-libraries.version>26.19.0</google-cloud-libraries.version>
<google-cloud-libraries.version>26.21.0</google-cloud-libraries.version>
<java.version>11</java.version>
<junit.version>5.10.0</junit.version>
<googleJavaFormat.version>1.14.0</googleJavaFormat.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,10 @@ public class BigQueryConfigOptions {
.enumType(DeliveryGuarantee.class)
.defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Determines delivery guarantee");

public static final ConfigOption<Compression> COMPRESSION =
ConfigOptions.key("compression")
.enumType(Compression.class)
.defaultValue(Compression.NO_COMPRESSION)
.withDescription("Sets the compression type for data transferring to BigQuery streams");
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@ public class BigQueryConnectionOptions implements Serializable {
private final boolean createIfNotExists;
private final DeliveryGuarantee deliveryGuarantee;

private final Compression compression;

public BigQueryConnectionOptions(
String project,
String dataset,
String table,
boolean createIfNotExists,
DeliveryGuarantee deliveryGuarantee,
Compression compression,
Credentials credentials) {
this.project = project;
this.dataset = dataset;
this.table = table;
this.createIfNotExists = createIfNotExists;
this.deliveryGuarantee = deliveryGuarantee;
this.credentials = credentials;
this.compression = compression;
}

public TableName getTableName() {
Expand All @@ -46,4 +50,8 @@ public boolean isCreateIfNotExists() {
public DeliveryGuarantee getDeliveryGuarantee() {
return deliveryGuarantee;
}

public Compression getCompression() {
return compression;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ protected JsonStreamWriter getStreamWriter(
.build();
WriteStream writeStream = client.createWriteStream(createWriteStreamRequest);

return JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client)
JsonStreamWriter.Builder builder =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client);
if (options.getCompression() != Compression.NO_COMPRESSION) {
builder = builder.setCompressorName(options.getCompression().getValue());
}
return builder
.setExecutorProvider(FixedExecutorProvider.create(Executors.newScheduledThreadPool(100)))
.setChannelProvider(
BigQueryWriteSettings.defaultGrpcTransportProviderBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ protected JsonStreamWriter getStreamWriter(
// Use the JSON stream writer to send records in JSON format.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
return JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client)
.build();
JsonStreamWriter.Builder builder =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client);
if (options.getCompression() == Compression.NO_COMPRESSION) {
return builder.build();
}
return builder.setCompressorName(options.getCompression().getValue()).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.aiven.flink.connectors.bigquery.sink;

import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.COMPRESSION;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.CREATE_TABLE_IF_NOT_PRESENT;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.DATASET;
import static io.aiven.flink.connectors.bigquery.sink.BigQueryConfigOptions.DELIVERY_GUARANTEE;
Expand Down Expand Up @@ -27,7 +28,8 @@ public class BigQueryTableSinkFactory implements DynamicTableSinkFactory {
DATASET,
TABLE,
CREATE_TABLE_IF_NOT_PRESENT,
DELIVERY_GUARANTEE);
DELIVERY_GUARANTEE,
COMPRESSION);

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
Expand All @@ -47,6 +49,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
config.get(TABLE),
config.get(CREATE_TABLE_IF_NOT_PRESENT),
config.get(DELIVERY_GUARANTEE),
config.get(COMPRESSION),
credentials);
return new BigQuerySink(
context.getCatalogTable(), context.getCatalogTable().getResolvedSchema(), options);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.aiven.flink.connectors.bigquery.sink;

public enum Compression {
GZIP("gzip"),
NO_COMPRESSION("no compression");

private final String value;

Compression(String value) {
this.value = value;
}

public String getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,20 @@ public class BigQuerySinkTest {
@ParameterizedTest
@MethodSource("datatypeProvider")
void tableCreationTest(String tableName, String[] fieldNames, DataType[] fieldTypes) {
for (DeliveryGuarantee dg : DeliveryGuarantee.values()) {
BigQueryConnectionOptions options =
new BigQueryConnectionOptions(
BIG_QUERY_PROJECT_ID,
DATASET_NAME,
tableName + "-" + dg.name(),
true,
dg,
CREDENTIALS);
var table = BigQuerySink.ensureTableExists(fieldNames, fieldTypes, options);
table.delete();
for (Compression compression : Compression.values()) {
for (DeliveryGuarantee dg : DeliveryGuarantee.values()) {
BigQueryConnectionOptions options =
new BigQueryConnectionOptions(
BIG_QUERY_PROJECT_ID,
DATASET_NAME,
tableName + "-" + dg.name(),
true,
dg,
compression,
CREDENTIALS);
var table = BigQuerySink.ensureTableExists(fieldNames, fieldTypes, options);
table.delete();
}
}
}

Expand Down