Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove unrecoverable connection from connection pool during multiplexing #1967

Merged
merged 79 commits into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
79 commits
Select commit Hold shift + click to select a range
5a63d95
feat: Split writer into connection worker and wrapper, this is a
GaoleMeng Sep 9, 2022
5a13302
feat: add connection worker pool skeleton, used for multiplexing client
GaoleMeng Sep 13, 2022
0297204
Merge branch 'main' into main
GaoleMeng Sep 14, 2022
8a81ad3
feat: add Load api for connection worker for multiplexing client
GaoleMeng Sep 14, 2022
68fd040
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 14, 2022
3106dae
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 15, 2022
5bf04e5
Merge branch 'googleapis:main' into main
GaoleMeng Sep 15, 2022
2fc7551
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 15, 2022
7a6d919
feat: add multiplexing support to connection worker. We will treat every
GaoleMeng Sep 15, 2022
3ba7659
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
f379a78
Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
9307776
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 16, 2022
de73013
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
19005a1
feat: port the multiplexing client core algorithm and basic tests
GaoleMeng Sep 19, 2022
c5d14ba
Merge branch 'googleapis:main' into main
GaoleMeng Sep 19, 2022
644360a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 20, 2022
3099d82
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
e707dd6
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
9e7a8fa
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 20, 2022
31f1755
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
44c36fc
feat: wire multiplexing connection pool to stream writer
GaoleMeng Sep 20, 2022
87a4036
feat: some fixes for multiplexing client
GaoleMeng Sep 23, 2022
c92ea1b
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 23, 2022
019520c
Merge branch 'googleapis:main' into main
GaoleMeng Sep 26, 2022
47893df
feat: fix some todos, and reject the mixed behavior of passed in clie…
GaoleMeng Sep 27, 2022
8bd4e6a
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
83409b0
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
f7dd72d
Merge branch 'googleapis:main' into main
GaoleMeng Sep 27, 2022
a48399f
Merge branch 'googleapis:main' into main
GaoleMeng Sep 29, 2022
6789bc9
feat: fix the bug that we may peek into the write_stream field but it's
GaoleMeng Sep 29, 2022
46b4e6c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 29, 2022
dfd4dd9
Merge branch 'googleapis:main' into main
GaoleMeng Sep 29, 2022
d68ae70
feat: fix the bug that we may peek into the write_stream field but it's
GaoleMeng Sep 29, 2022
2983fe9
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 29, 2022
d406256
Merge branch 'googleapis:main' into main
GaoleMeng Oct 13, 2022
22e9e07
feat: add getInflightWaitSeconds implementation
GaoleMeng Oct 13, 2022
fdb4e1c
Merge branch 'googleapis:main' into main
GaoleMeng Oct 21, 2022
0469474
Merge branch 'googleapis:main' into main
GaoleMeng Nov 2, 2022
d1b7740
feat: Add schema comparision in connection loop to ensure schema upda…
GaoleMeng Nov 3, 2022
e4cd529
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Nov 4, 2022
74ff1c4
Merge branch 'googleapis:main' into main
GaoleMeng Nov 4, 2022
762f49e
feat: add schema update support to multiplexing
GaoleMeng Nov 5, 2022
de456c2
Merge branch 'googleapis:main' into main
GaoleMeng Nov 11, 2022
c2f6edc
Merge branch 'googleapis:main' into main
GaoleMeng Nov 15, 2022
2487227
fix: fix windows build bug: windows Instant resolution is different with
GaoleMeng Nov 15, 2022
084d6d1
fix: fix another failing tests for windows build
GaoleMeng Nov 16, 2022
89c9701
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Nov 16, 2022
8441518
fix: fix another test failure for Windows build
GaoleMeng Nov 16, 2022
d249add
Merge branch 'googleapis:main' into main
GaoleMeng Nov 30, 2022
83aa7ff
feat: Change new thread for each retry to be a thread pool to avoid
GaoleMeng Nov 30, 2022
92a9c36
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Nov 30, 2022
a713a52
Merge branch 'googleapis:main' into main
GaoleMeng Nov 30, 2022
a042d5c
fix: add back the background executor provider that's accidentally
GaoleMeng Nov 30, 2022
53f4ec8
feat: throw error when use connection pool for explicit stream
GaoleMeng Dec 2, 2022
c494d8b
Merge branch 'googleapis:main' into main
GaoleMeng Dec 20, 2022
14b0c12
fix: Add precision truncation to the passed in value from JSON float and
GaoleMeng Jan 17, 2023
0da0e4b
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 17, 2023
33d23ac
Merge branch 'googleapis:main' into main
GaoleMeng Jan 17, 2023
d2ee46e
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 17, 2023
be6646e
modify the bom version
GaoleMeng Jan 17, 2023
62d8c41
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 17, 2023
adf5f3f
fix deadlockissue in ConnectionWorkerPool
GaoleMeng Jan 18, 2023
c1970ff
Merge branch 'googleapis:main' into main
GaoleMeng Jan 18, 2023
3488df8
fix: fix deadlock issue during close + append for multiplexing
GaoleMeng Jan 19, 2023
6a512e8
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 20, 2023
05edc2f
Merge branch 'googleapis:main' into main
GaoleMeng Jan 20, 2023
7d3da74
Merge branch 'googleapis:main' into main
GaoleMeng Jan 20, 2023
ecf6807
Merge branch 'googleapis:main' into main
GaoleMeng Jan 20, 2023
057dab9
Merge branch 'googleapis:main' into main
GaoleMeng Jan 23, 2023
5db46a2
fix: fix one potential root cause of deadlock issue for non-multiplexing
GaoleMeng Jan 23, 2023
32e9d33
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 25, 2023
183941a
Merge branch 'googleapis:main' into main
GaoleMeng Jan 25, 2023
f93f89e
Add timeout to inflight queue waiting, and also add some extra log
GaoleMeng Jan 25, 2023
e2ec7e7
Merge branch 'googleapis:main' into main
GaoleMeng Jan 27, 2023
1be6ab4
feat: allow java client lib handle switch table schema for the same s…
GaoleMeng Jan 27, 2023
544f063
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 31, 2023
3cf5482
Merge branch 'googleapis:main' into main
GaoleMeng Jan 31, 2023
d9da007
Merge branch 'googleapis:main' into main
GaoleMeng Jan 31, 2023
4626857
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Jan 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,11 @@ public String getWriterId() {
return writerId;
}

boolean isConnectionInUnrecoverableState() {
// If final status is set, there's no
return connectionFinalStatus != null;
yirutang marked this conversation as resolved.
Show resolved Hide resolved
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,17 @@ ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows,
streamWriter,
(key, existingStream) -> {
// Stick to the existing stream if it's not overwhelmed.
if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) {
if (existingStream != null
&& !existingStream.getLoad().isOverwhelmed()
&& !existingStream.isConnectionInUnrecoverableState()) {
return existingStream;
}
if (existingStream != null && existingStream.isConnectionInUnrecoverableState()) {
existingStream = null;
}
// Before search for the next connection to attach, clear the finalized connections
// first so that they will not be selected.
clearFinalizedConnectionWorker();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a global remove, is it OK just to have a local remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is we want to prevent the search algorithm to hit the connection with unrecoverable state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The global remove is super quick as it won't need close

// Try to create or find another existing stream to reuse.
ConnectionWorker createdOrExistingConnection = null;
try {
Expand Down Expand Up @@ -299,7 +307,6 @@ private ConnectionWorker createOrReuseConnectionWorker(
}
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
} else {

// Stick to the original connection if all the connections are overwhelmed.
if (existingConnectionWorker != null) {
return existingConnectionWorker;
Expand All @@ -310,6 +317,18 @@ private ConnectionWorker createOrReuseConnectionWorker(
}
}

private void clearFinalizedConnectionWorker() {
Set<ConnectionWorker> connectionWorkerSet = new HashSet<>();
for (ConnectionWorker existingWorker : connectionWorkerPool) {
if (existingWorker.isConnectionInUnrecoverableState()) {
connectionWorkerSet.add(existingWorker);
}
}
for (ConnectionWorker workerToRemove : connectionWorkerSet) {
connectionWorkerPool.remove(workerToRemove);
}
}

/** Select out the best connection worker among the given connection workers. */
static ConnectionWorker pickBestLoadConnection(
Comparator<Load> comparator, List<ConnectionWorker> connectionWorkerList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -450,6 +451,15 @@ static void cleanUp() {
connectionPoolMap.clear();
}

@VisibleForTesting
ConnectionWorkerPool getTestOnlyConnectionWorkerPool() {
ConnectionWorkerPool connectionWorkerPool = null;
for (Entry<ConnectionPoolKey, ConnectionWorkerPool> entry : connectionPoolMap.entrySet()) {
connectionWorkerPool = entry.getValue();
}
return connectionWorkerPool;
}

/** A builder of {@link StreamWriter}s. */
public static final class Builder {
private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class StreamWriterTest {
private static final Logger log = Logger.getLogger(StreamWriterTest.class.getName());
private static final String TEST_STREAM_1 = "projects/p/datasets/d1/tables/t1/streams/_default";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
private static final String TEST_STREAM_3 = "projects/p/datasets/d3/tables/t3/streams/_default";
private static final String TEST_STREAM_SHORTEN = "projects/p/datasets/d2/tables/t2/_default";
private static final String EXPLICIT_STEAM = "projects/p/datasets/d1/tables/t1/streams/s1";
private static final String TEST_TRACE_ID = "DATAFLOW:job_id";
Expand Down Expand Up @@ -1090,6 +1091,115 @@ public void testExtractDatasetName() throws Exception {
Assert.assertTrue(ex.getMessage().contains("The passed in stream name does not match"));
}

@Test
public void testRetryInUnrecoverableStatus_MultiplexingCase() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(4).build());
ConnectionWorkerPool.enableTestingLogic();

// Setup: create three stream writers, two of them are writing to the same stream.
// Those four stream writers should be assigned to the same connection.
// 1. Submit three requests at first to trigger connection retry limitation.
// 2. At this point the connection should be entering a unrecoverable state.
// 3. Further submit requests to those stream writers would trigger connection reassignment.
StreamWriter writer1 = getMultiplexingStreamWriter(TEST_STREAM_1);
StreamWriter writer2 = getMultiplexingStreamWriter(TEST_STREAM_2);
StreamWriter writer3 = getMultiplexingStreamWriter(TEST_STREAM_3);
StreamWriter writer4 = getMultiplexingStreamWriter(TEST_STREAM_3);

testBigQueryWrite.setCloseForeverAfter(2);
testBigQueryWrite.setTimesToClose(1);
testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(createAppendResponse(1));

// Connection will be failed after triggering the third append.
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer1, new String[] {"A"}, 0);
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer2, new String[] {"B"}, 1);
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer3, new String[] {"C"}, 2);
TimeUnit.SECONDS.sleep(1);
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue());
assertThrows(
ExecutionException.class,
() -> {
assertEquals(2, appendFuture3.get().getAppendResult().getOffset().getValue());
});
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getTotalConnectionCount(), 1);
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getCreateConnectionCount(), 1);

// Insert another request to the writer attached to closed connection would create another
// connection.

testBigQueryWrite.setCloseForeverAfter(0);
testBigQueryWrite.addResponse(createAppendResponse(4));
testBigQueryWrite.addResponse(createAppendResponse(5));
testBigQueryWrite.addResponse(createAppendResponse(6));
ApiFuture<AppendRowsResponse> appendFuture4 = sendTestMessage(writer4, new String[] {"A"}, 2);
ApiFuture<AppendRowsResponse> appendFuture5 = sendTestMessage(writer1, new String[] {"A"}, 3);
ApiFuture<AppendRowsResponse> appendFuture6 = sendTestMessage(writer2, new String[] {"B"}, 4);
assertEquals(4, appendFuture4.get().getAppendResult().getOffset().getValue());
assertEquals(5, appendFuture5.get().getAppendResult().getOffset().getValue());
assertEquals(6, appendFuture6.get().getAppendResult().getOffset().getValue());
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getTotalConnectionCount(), 1);
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getCreateConnectionCount(), 2);

writer1.close();
writer2.close();
writer3.close();
writer4.close();
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getTotalConnectionCount(), 0);
}

@Test
public void testCloseWhileInUnrecoverableState() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMinConnectionsPerRegion(1).setMaxConnectionsPerRegion(4).build());
ConnectionWorkerPool.enableTestingLogic();

// Setup: create three stream writers
// 1. Submit three requests at first to trigger connection retry limitation.
// 2. Submit request to writer3 to trigger reassignment
// 3. Close the previous two writers would be succesful
StreamWriter writer1 = getMultiplexingStreamWriter(TEST_STREAM_1);
StreamWriter writer2 = getMultiplexingStreamWriter(TEST_STREAM_2);
StreamWriter writer3 = getMultiplexingStreamWriter(TEST_STREAM_3);

testBigQueryWrite.setCloseForeverAfter(2);
testBigQueryWrite.setTimesToClose(1);
testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(createAppendResponse(1));

// Connection will be failed after triggering the third append.
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer1, new String[] {"A"}, 0);
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer2, new String[] {"B"}, 1);
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer3, new String[] {"C"}, 2);
TimeUnit.SECONDS.sleep(1);
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1, appendFuture2.get().getAppendResult().getOffset().getValue());
assertThrows(
ExecutionException.class,
() -> {
assertEquals(2, appendFuture3.get().getAppendResult().getOffset().getValue());
});
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getTotalConnectionCount(), 1);
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getCreateConnectionCount(), 1);

writer1.close();
writer2.close();
// We will still be left with one request
assertEquals(writer1.getTestOnlyConnectionWorkerPool().getCreateConnectionCount(), 1);
}

public StreamWriter getMultiplexingStreamWriter(String streamName) throws IOException {
return StreamWriter.newBuilder(streamName, client)
.setWriterSchema(createProtoSchema())
.setEnableConnectionPool(true)
.setMaxInflightRequests(10)
.setLocation("US")
.setMaxRetryDuration(java.time.Duration.ofMillis(100))
.build();
}

// Timeout to ensure close() doesn't wait for done callback timeout.
@Test(timeout = 10000)
public void testCloseDisconnectedStream() throws Exception {
Expand Down