-
Notifications
You must be signed in to change notification settings - Fork 85
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add timeout to inflight queue waiting #1957
Merged
Merged
Changes from all commits
Commits
Show all changes
73 commits
Select commit
Hold shift + click to select a range
5a63d95
feat: Split writer into connection worker and wrapper, this is a
GaoleMeng 5a13302
feat: add connection worker pool skeleton, used for multiplexing client
GaoleMeng 0297204
Merge branch 'main' into main
GaoleMeng 8a81ad3
feat: add Load api for connection worker for multiplexing client
GaoleMeng 68fd040
Merge remote-tracking branch 'upstream/main'
GaoleMeng 3106dae
Merge remote-tracking branch 'upstream/main'
GaoleMeng 5bf04e5
Merge branch 'googleapis:main' into main
GaoleMeng 2fc7551
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng 7a6d919
feat: add multiplexing support to connection worker. We will treat every
GaoleMeng 3ba7659
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] f379a78
Updates from OwlBot post-processor
gcf-owl-bot[bot] 9307776
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng de73013
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] 19005a1
feat: port the multiplexing client core algorithm and basic tests
GaoleMeng c5d14ba
Merge branch 'googleapis:main' into main
GaoleMeng 644360a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] 3099d82
Merge branch 'googleapis:main' into main
GaoleMeng e707dd6
Merge branch 'googleapis:main' into main
GaoleMeng 9e7a8fa
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng 31f1755
Merge branch 'googleapis:main' into main
GaoleMeng 44c36fc
feat: wire multiplexing connection pool to stream writer
GaoleMeng 87a4036
feat: some fixes for multiplexing client
GaoleMeng c92ea1b
Merge remote-tracking branch 'upstream/main'
GaoleMeng 019520c
Merge branch 'googleapis:main' into main
GaoleMeng 47893df
feat: fix some todos, and reject the mixed behavior of passed in clie…
GaoleMeng 8bd4e6a
Merge remote-tracking branch 'upstream/main'
GaoleMeng 83409b0
Merge remote-tracking branch 'upstream/main'
GaoleMeng f7dd72d
Merge branch 'googleapis:main' into main
GaoleMeng a48399f
Merge branch 'googleapis:main' into main
GaoleMeng 6789bc9
feat: fix the bug that we may peek into the write_stream field but it's
GaoleMeng 46b4e6c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] dfd4dd9
Merge branch 'googleapis:main' into main
GaoleMeng d68ae70
feat: fix the bug that we may peek into the write_stream field but it's
GaoleMeng 2983fe9
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng d406256
Merge branch 'googleapis:main' into main
GaoleMeng 22e9e07
feat: add getInflightWaitSeconds implementation
GaoleMeng fdb4e1c
Merge branch 'googleapis:main' into main
GaoleMeng 0469474
Merge branch 'googleapis:main' into main
GaoleMeng d1b7740
feat: Add schema comparision in connection loop to ensure schema upda…
GaoleMeng e4cd529
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] 74ff1c4
Merge branch 'googleapis:main' into main
GaoleMeng 762f49e
feat: add schema update support to multiplexing
GaoleMeng de456c2
Merge branch 'googleapis:main' into main
GaoleMeng c2f6edc
Merge branch 'googleapis:main' into main
GaoleMeng 2487227
fix: fix windows build bug: windows Instant resolution is different with
GaoleMeng 084d6d1
fix: fix another failing tests for windows build
GaoleMeng 89c9701
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng 8441518
fix: fix another test failure for Windows build
GaoleMeng d249add
Merge branch 'googleapis:main' into main
GaoleMeng 83aa7ff
feat: Change new thread for each retry to be a thread pool to avoid
GaoleMeng 92a9c36
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] a713a52
Merge branch 'googleapis:main' into main
GaoleMeng a042d5c
fix: add back the background executor provider that's accidentally
GaoleMeng 53f4ec8
feat: throw error when use connection pool for explicit stream
GaoleMeng c494d8b
Merge branch 'googleapis:main' into main
GaoleMeng 14b0c12
fix: Add precision truncation to the passed in value from JSON float and
GaoleMeng 0da0e4b
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] 33d23ac
Merge branch 'googleapis:main' into main
GaoleMeng d2ee46e
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] be6646e
modify the bom version
GaoleMeng 62d8c41
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] adf5f3f
fix deadlockissue in ConnectionWorkerPool
GaoleMeng c1970ff
Merge branch 'googleapis:main' into main
GaoleMeng 3488df8
fix: fix deadlock issue during close + append for multiplexing
GaoleMeng 6a512e8
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] 05edc2f
Merge branch 'googleapis:main' into main
GaoleMeng 7d3da74
Merge branch 'googleapis:main' into main
GaoleMeng ecf6807
Merge branch 'googleapis:main' into main
GaoleMeng 057dab9
Merge branch 'googleapis:main' into main
GaoleMeng 5db46a2
fix: fix one potential root cause of deadlock issue for non-multiplexing
GaoleMeng 32e9d33
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] 183941a
Merge branch 'googleapis:main' into main
GaoleMeng f93f89e
Add timeout to inflight queue waiting, and also add some extra log
GaoleMeng File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,6 +61,9 @@ | |
public class ConnectionWorker implements AutoCloseable { | ||
private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); | ||
|
||
// Maximum wait time on inflight quota before error out. | ||
private static long INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = 300000; | ||
|
||
private Lock lock; | ||
private Condition hasMessageInWaitingQueue; | ||
private Condition inflightReduced; | ||
|
@@ -322,7 +325,14 @@ private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message) | |
this.inflightBytes += requestWrapper.messageSize; | ||
waitingRequestQueue.addLast(requestWrapper); | ||
hasMessageInWaitingQueue.signal(); | ||
maybeWaitForInflightQuota(); | ||
try { | ||
maybeWaitForInflightQuota(); | ||
} catch (StatusRuntimeException ex) { | ||
--this.inflightRequests; | ||
waitingRequestQueue.pollLast(); | ||
this.inflightBytes -= requestWrapper.messageSize; | ||
throw ex; | ||
} | ||
return requestWrapper.appendResult; | ||
} finally { | ||
this.lock.unlock(); | ||
|
@@ -347,6 +357,15 @@ private void maybeWaitForInflightQuota() { | |
.withCause(e) | ||
.withDescription("Interrupted while waiting for quota.")); | ||
} | ||
long current_wait_time = System.currentTimeMillis() - start_time; | ||
if (current_wait_time > INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI) { | ||
throw new StatusRuntimeException( | ||
Status.fromCode(Code.CANCELLED) | ||
.withDescription( | ||
String.format( | ||
"Interrupted while waiting for quota due to long waiting time %sms", | ||
current_wait_time))); | ||
} | ||
} | ||
inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000); | ||
} | ||
|
@@ -373,7 +392,6 @@ public void close() { | |
log.fine("Waiting for append thread to finish. Stream: " + streamName); | ||
try { | ||
appendThread.join(); | ||
log.info("User close complete. Stream: " + streamName); | ||
} catch (InterruptedException e) { | ||
// Unexpected. Just swallow the exception with logging. | ||
log.warning( | ||
|
@@ -387,6 +405,7 @@ public void close() { | |
} | ||
|
||
try { | ||
log.fine("Begin shutting down user callback thread pool for stream " + streamName); | ||
threadPool.shutdown(); | ||
threadPool.awaitTermination(3, TimeUnit.MINUTES); | ||
} catch (InterruptedException e) { | ||
|
@@ -396,7 +415,10 @@ public void close() { | |
+ streamName | ||
+ " is interrupted with exception: " | ||
+ e.toString()); | ||
throw new IllegalStateException( | ||
"Thread pool shutdown is interrupted for stream " + streamName); | ||
} | ||
log.info("User close finishes for stream " + streamName); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can remove the line on 395? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
|
||
/* | ||
|
@@ -858,6 +880,11 @@ public static void setOverwhelmedCountsThreshold(double newThreshold) { | |
} | ||
} | ||
|
||
@VisibleForTesting | ||
static void setMaxInflightQueueWaitTime(long waitTime) { | ||
INFLIGHT_QUOTA_MAX_WAIT_TIME_MILLI = waitTime; | ||
} | ||
|
||
@AutoValue | ||
abstract static class TableSchemaAndTimestamp { | ||
// Shows the timestamp updated schema is reported from response | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log when we actually timed out here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done