Skip to content

Commit

Permalink
fix: fix channel not shut down properly exception.
Browse files Browse the repository at this point in the history
Client being created has to be properly closed, otherwise during garbage
collection an error will be reported showing channel not shutdown
properly
  • Loading branch information
GaoleMeng committed Mar 7, 2024
1 parent 2dd8efc commit 0a56e0e
Showing 1 changed file with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ private static class DataWriter {

private static final int MAX_RECREATE_COUNT = 3;

private BigQueryWriteClient client;

// Track the number of in-flight requests to wait for all responses before shutting down.
private final Phaser inflightRequestCount = new Phaser(1);
private final Object lock = new Object();
Expand All @@ -163,12 +165,16 @@ public void initialize(TableName parentTable)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();

// Initialize client without settings, internally within stream writer a new client will be
// created with full settings.
client = BigQueryWriteClient.create();

// Use the JSON stream writer to send records in JSON format. Specify the table name to write
// to the default stream.
// For more information about JsonStreamWriter, see:
// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
streamWriter =
JsonStreamWriter.newBuilder(parentTable.toString(), BigQueryWriteClient.create())
JsonStreamWriter.newBuilder(parentTable.toString(), client)
.setExecutorProvider(
FixedExecutorProvider.create(Executors.newScheduledThreadPool(100)))
.setChannelProvider(
Expand All @@ -195,7 +201,7 @@ public void append(AppendContext appendContext)
&& recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) {
streamWriter =
JsonStreamWriter.newBuilder(
streamWriter.getStreamName(), BigQueryWriteClient.create())
streamWriter.getStreamName(), client)
.build();
this.error = null;
}
Expand All @@ -217,6 +223,7 @@ public void cleanup() {
// Wait for all in-flight requests to complete.
inflightRequestCount.arriveAndAwaitAdvance();

client.close();
// Close the connection to the server.
streamWriter.close();

Expand Down

0 comments on commit 0a56e0e

Please sign in to comment.