Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk Load CDK: Unit tests for memory manager #45091

Merged

Conversation

johnny-schmidt
Copy link
Contributor

What

  • Unit tests for the memory manager
  • Replace java lock/condition with kotlin unit channel
  • Abstract out AvailableMemoryProvider for testing

@johnny-schmidt johnny-schmidt requested a review from a team as a code owner September 2, 2024 20:42
Copy link

vercel bot commented Sep 2, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Skipped Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview Sep 5, 2024 1:27am

@octavia-squidington-iii octavia-squidington-iii added the CDK Connector Development Kit label Sep 2, 2024
}
usedMemoryBytes.addAndGet(memoryBytes)
while (usedMemoryBytes.get() + memoryBytes > availableMemoryBytes) {
releaseUpdates.receive()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if memoryBytes > availableMemoryBytes, we just wait forever, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this isn't memory safe. 2 threads could allocate more memory than is available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a throw and a test for it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, as soon as I typed if, copilot knew what I was doing :)

class MemoryManager {
private val availableMemoryBytes: Long = Runtime.getRuntime().maxMemory()
class MemoryManager(availableMemoryProvider: AvailableMemoryProvider) {
private val availableMemoryBytes: Long = availableMemoryProvider.availableMemoryBytes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

really should be totalMemoryBytes. Once they're reserved, they're not really available anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed done

}

@Test
fun testReserveBlocking() = runTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need tests imvolving several threads

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it took a lot but I did finally flush out the bug you were afraid of. See testReserveBlockingMultithreaded()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took a lot of efforts, or a lot of jobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of jobs to get one error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I usually test those by having some king of test-triggered breakpoint in the production code. As much as I dislike having test-only code in production codepath, anything else is always going to be very flaky or involve a lot of jobs/threads/workers...

@johnny-schmidt johnny-schmidt force-pushed the issue-9615/load-cdk-unit-tests-memory-manager branch from 73f4ed4 to a7de516 Compare September 5, 2024 01:23
@johnny-schmidt johnny-schmidt force-pushed the issue-9615/load-cdk-unit-tests-memory-manager branch from a7de516 to b382d98 Compare September 5, 2024 01:27
@@ -17,31 +19,49 @@ import kotlin.concurrent.withLock
* TODO: Some degree of logging/monitoring around how accurate we're actually being?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a TODO: Be more fair. The current implementation will guarantee a first-come first-served order, which means if a function requests a large amount of memory that's not available yet, any subsequent call will block until the large memory request is satisfied, even though there may be enough available memory to serve the smaller requests

val reserved = AtomicBoolean(false)

try {
withTimeout(5000) { memoryManager.reserveBlocking(900) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we use a lock (or a channel) instead of a timeout here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm testing that it doesn't block, so I'm not sure what else I can do.

}

@Test
fun testReserveBlocking() = runTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

took a lot of efforts, or a lot of jobs?

withContext(Dispatchers.IO) {
memoryManager.reserveBlocking(1000)
Assertions.assertEquals(0, memoryManager.remainingMemoryBytes)
val nIterations = 100000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess a lot of jobs...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, 10,000 wasn't enough to fail consistently. 100,000 and I'd always hit it at least once.

}

mutex.withLock {
while (usedMemoryBytes.get() + memoryBytes > totalMemoryBytes) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I'm pretty sure this is correct.
Just as an interesting case, we could also do without the AtomicLong and a volatile would suffice :)
Thank you for fixing it.

@johnny-schmidt johnny-schmidt merged commit 081a0ca into master Sep 5, 2024
35 checks passed
@johnny-schmidt johnny-schmidt deleted the issue-9615/load-cdk-unit-tests-memory-manager branch September 5, 2024 22:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CDK Connector Development Kit
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants