Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: populate location info if we already called GetWriteStream #1802

Merged
merged 3 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,15 @@ public Descriptor getDescriptor() {
return this.descriptor;
}

/**
* Gets the location of the destination
*
* @return Descriptor
*/
public String getLocation() {
return this.streamWriter.getLocation();
}

/**
* Returns the wait of a request in Client side before sending to the Server. Request could wait
* in Client because it reached the client side inflight request limit (adjustable when
Expand Down Expand Up @@ -407,6 +416,7 @@ private Builder(String streamOrTableName, TableSchema tableSchema, BigQueryWrite
TableSchema writeStreamTableSchema = writeStream.getTableSchema();

this.tableSchema = writeStreamTableSchema;
this.location = writeStream.getLocation();
} else {
this.tableSchema = tableSchema;
}
Expand Down Expand Up @@ -526,6 +536,10 @@ public Builder setEnableConnectionPool(boolean enableConnectionPool) {
* @return Builder
*/
public Builder setLocation(String location) {
if (this.location != null && !this.location.equals(location)) {
throw new IllegalArgumentException(
"Specified location " + location + " does not match the system value " + this.location);
}
this.location = location;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public class StreamWriter implements AutoCloseable {
/** Every writer has a fixed proto schema. */
private final ProtoSchema writerSchema;

/*
* Location of the destination.
*/
private final String location;

/*
* A String that uniquely identifies this writer.
*/
Expand Down Expand Up @@ -162,6 +167,7 @@ private StreamWriter(Builder builder) throws IOException {
BigQueryWriteClient client;
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.location = builder.location;
boolean ownsBigQueryWriteClient;
if (builder.client == null) {
BigQueryWriteSettings stubSettings =
Expand Down Expand Up @@ -193,7 +199,7 @@ private StreamWriter(Builder builder) throws IOException {
client,
ownsBigQueryWriteClient));
} else {
if (builder.location == "") {
if (builder.location == null || builder.location.isEmpty()) {
throw new IllegalArgumentException("Location must be specified for multiplexing client!");
}
// Assume the connection in the same pool share the same client and trace id.
Expand Down Expand Up @@ -318,6 +324,11 @@ public ProtoSchema getProtoSchema() {
return writerSchema;
}

/** @return the location of the destination. */
public String getLocation() {
return location;
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
Expand Down Expand Up @@ -379,7 +390,7 @@ public static final class Builder {

private TableSchema updatedTableSchema = null;

private String location;
private String location = null;

private boolean enableConnectionPool = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,48 @@ public void testAppendOutOfRangeException() throws Exception {
public void testCreateDefaultStream() throws Exception {
TableSchema tableSchema =
TableSchema.newBuilder().addFields(0, TEST_INT).addFields(1, TEST_STRING).build();
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setLocation("aa")
.setTableSchema(tableSchema)
.build());
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(TEST_TABLE, tableSchema)
JsonStreamWriter.newBuilder(TEST_TABLE, client)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.build()) {
assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamName());
assertEquals("aa", writer.getLocation());
}
}

@Test
public void testCreateDefaultStreamWrongLocation() throws Exception {
TableSchema tableSchema =
TableSchema.newBuilder().addFields(0, TEST_INT).addFields(1, TEST_STRING).build();
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setLocation("aa")
.setTableSchema(tableSchema)
.build());
IllegalArgumentException ex =
assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
JsonStreamWriter.newBuilder(TEST_TABLE, client)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.setLocation("bb")
.build();
}
});
assertEquals("Specified location bb does not match the system value aa", ex.getMessage());
}

@Test
public void testSimpleSchemaUpdate() throws Exception {
try (JsonStreamWriter writer =
Expand Down