Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add cache for location in stream writer, and trigger that when location is not presented #1804

Merged
merged 28 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
5a63d95
feat: Split writer into connection worker and wrapper, this is a
GaoleMeng Sep 9, 2022
5a13302
feat: add connection worker pool skeleton, used for multiplexing client
GaoleMeng Sep 13, 2022
0297204
Merge branch 'main' into main
GaoleMeng Sep 14, 2022
8a81ad3
feat: add Load api for connection worker for multiplexing client
GaoleMeng Sep 14, 2022
68fd040
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 14, 2022
3106dae
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 15, 2022
5bf04e5
Merge branch 'googleapis:main' into main
GaoleMeng Sep 15, 2022
2fc7551
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 15, 2022
7a6d919
feat: add multiplexing support to connection worker. We will treat every
GaoleMeng Sep 15, 2022
3ba7659
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
f379a78
Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
9307776
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 16, 2022
de73013
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
19005a1
feat: port the multiplexing client core algorithm and basic tests
GaoleMeng Sep 19, 2022
c5d14ba
Merge branch 'googleapis:main' into main
GaoleMeng Sep 19, 2022
644360a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 20, 2022
3099d82
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
e707dd6
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
9e7a8fa
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 20, 2022
31f1755
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
44c36fc
feat: wire multiplexing connection pool to stream writer
GaoleMeng Sep 20, 2022
87a4036
feat: some fixes for multiplexing client
GaoleMeng Sep 23, 2022
c92ea1b
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 23, 2022
019520c
Merge branch 'googleapis:main' into main
GaoleMeng Sep 26, 2022
47893df
feat: fix some todos, and reject the mixed behavior of passed in clie…
GaoleMeng Sep 27, 2022
8bd4e6a
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
83409b0
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
f7dd72d
Merge branch 'googleapis:main' into main
GaoleMeng Sep 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
Expand All @@ -43,6 +46,12 @@
public class StreamWriter implements AutoCloseable {
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());

private static String datasetsMatching = "projects/[^/]+/datasets/[^/]+/";
private static Pattern streamPattern = Pattern.compile(datasetsMatching);

// Cache of location info for a given dataset.
private static Map<String, String> projectAndDatasetToLocation = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a size limit of this cache? And also probably an expiration time. I am not sure how often is this but the user can delete a dataset and recreate one in different region?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just add a size limit for now and add expiration later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add a size limit because it's plain string, seems not possible to exceed limit. We can cache millions of entries without problem I think and it's kinda hard to imagine we need to cache that many..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure fine, let's first go with this. I am a bit worry about dataset recreate case but that should be a corner case. Let's get going.


/*
* The identifier of stream to write to.
*/
Expand Down Expand Up @@ -167,12 +176,11 @@ public static SingleConnectionOrConnectionPool ofConnectionPool(
}

private StreamWriter(Builder builder) throws IOException {
BigQueryWriteClient client;
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.location = builder.location;
boolean ownsBigQueryWriteClient = builder.client == null;
if (!builder.enableConnectionPool) {
this.location = builder.location;
this.singleConnectionOrConnectionPool =
SingleConnectionOrConnectionPool.ofSingleConnection(
new ConnectionWorker(
Expand All @@ -185,31 +193,79 @@ private StreamWriter(Builder builder) throws IOException {
getBigQueryWriteClient(builder),
ownsBigQueryWriteClient));
} else {
if (builder.location == null || builder.location.isEmpty()) {
throw new IllegalArgumentException("Location must be specified for multiplexing client!");
BigQueryWriteClient client = getBigQueryWriteClient(builder);
String location = builder.location;
if (location == null || location.isEmpty()) {
// Location is not passed in, try to fetch from RPC
String datasetAndProjectName = extractDatasetAndProjectName(builder.streamName);
location =
projectAndDatasetToLocation.computeIfAbsent(
datasetAndProjectName,
(key) -> {
GetWriteStreamRequest writeStreamRequest =
GetWriteStreamRequest.newBuilder()
.setName(this.getStreamName())
.setView(WriteStreamView.BASIC)
.build();

WriteStream writeStream = client.getWriteStream(writeStreamRequest);
TableSchema writeStreamTableSchema = writeStream.getTableSchema();
String fetchedLocation = writeStream.getLocation();
log.info(
String.format(
"Fethed location %s for stream name %s", fetchedLocation, streamName));
return fetchedLocation;
});
if (location.isEmpty()) {
throw new IllegalStateException(
String.format(
"The location is empty for both user passed in value and looked up value for "
+ "stream: %s",
streamName));
}
}
this.location = location;
// Assume the connection in the same pool share the same client and trace id.
// The first StreamWriter for a new stub will create the pool for the other
// streams in the same region, meaning the per StreamWriter settings are no
// longer working unless all streams share the same set of settings
this.singleConnectionOrConnectionPool =
SingleConnectionOrConnectionPool.ofConnectionPool(
connectionPoolMap.computeIfAbsent(
ConnectionPoolKey.create(builder.location),
ConnectionPoolKey.create(location),
(key) -> {
try {
return new ConnectionWorkerPool(
builder.maxInflightRequest,
builder.maxInflightBytes,
builder.limitExceededBehavior,
builder.traceId,
getBigQueryWriteClient(builder),
ownsBigQueryWriteClient);
} catch (IOException e) {
throw new RuntimeException(e);
}
return new ConnectionWorkerPool(
builder.maxInflightRequest,
builder.maxInflightBytes,
builder.limitExceededBehavior,
builder.traceId,
client,
ownsBigQueryWriteClient);
}));
validateFetchedConnectonPool(builder);
// Shut down the passed in client. Internally we will create another client inside connection
// pool for every new connection worker.
if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient()
&& ownsBigQueryWriteClient) {
client.shutdown();
try {
client.awaitTermination(150, TimeUnit.SECONDS);
} catch (InterruptedException unused) {
// Ignore interruption as this client is not used.
}
client.close();
}
}
}

@VisibleForTesting
static String extractDatasetAndProjectName(String streamName) {
Matcher streamMatcher = streamPattern.matcher(streamName);
if (streamMatcher.find()) {
return streamMatcher.group();
} else {
throw new IllegalStateException(
String.format("The passed in stream name does not match standard format %s", streamName));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ public void testAppendOutOfRangeException() throws Exception {
}

@Test
public void testCreateDefaultStream() throws Exception {
public void testCreateDefaultStream_withNoSchemaPassedIn() throws Exception {
TableSchema tableSchema =
TableSchema.newBuilder().addFields(0, TEST_INT).addFields(1, TEST_STRING).build();
testBigQueryWrite.addResponse(
Expand All @@ -411,6 +411,28 @@ public void testCreateDefaultStream() throws Exception {
}
}

@Test
public void testCreateDefaultStream_withNoClientPassedIn() throws Exception {
TableSchema tableSchema =
TableSchema.newBuilder().addFields(0, TEST_INT).addFields(1, TEST_STRING).build();
testBigQueryWrite.addResponse(
WriteStream.newBuilder()
.setName(TEST_STREAM)
.setLocation("aa")
.setTableSchema(tableSchema)
.build());
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(TEST_TABLE, tableSchema)
.setChannelProvider(channelProvider)
.setCredentialsProvider(NoCredentialsProvider.create())
.setExecutorProvider(InstantiatingExecutorProvider.newBuilder().build())
.setEnableConnectionPool(true)
.build()) {
assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamName());
assertEquals("aa", writer.getLocation());
}
}

@Test
public void testCreateDefaultStreamWrongLocation() throws Exception {
TableSchema tableSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,22 +725,20 @@ public void testInitialization_operationKind() throws Exception {
}

@Test
public void createStreamWithDifferentWhetherOwnsClient() throws Exception {
StreamWriter streamWriter1 = getMultiplexingTestStreamWriter();
public void testExtractDatasetName() throws Exception {
Assert.assertEquals(
StreamWriter.extractDatasetAndProjectName(
"projects/project1/datasets/dataset2/tables/something"),
"projects/project1/datasets/dataset2/");

assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
.setEnableConnectionPool(true)
.build();
}
});
IllegalStateException ex =
assertThrows(
IllegalStateException.class,
() -> {
StreamWriter.extractDatasetAndProjectName(
"wrong/projects/project1/wrong/datasets/dataset2/tables/something");
});
Assert.assertTrue(ex.getMessage().contains("The passed in stream name does not match"));
}

// Timeout to ensure close() doesn't wait for done callback timeout.
Expand Down