Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TrackedMemoryPool with better error messages on exhaustion #11665

Merged
merged 17 commits into from
Aug 1, 2024

Conversation

wiedld
Copy link
Contributor

@wiedld wiedld commented Jul 26, 2024

Which issue does this PR close?

Part of #11523

Rationale for this change

The OOMing error message returns information about the next incremental request of memory, and not for the biggest consumers of memory. As a result, we have spent time chasing OOMs in the wrong place.

What changes are included in this PR?

Includes a new MemoryPool implementation, which when (optionally) used with an inner MemoryPool will return a list of the top memory consumers on OOM.

This new TrackConsumersPool is optional, such that individual users can determine whether or not to use this capability (along with any associated overhead).

Are these changes tested?

Yes.

Are there any user-facing changes?

Yes, as a new TrackConsumersPool.

@github-actions github-actions bot added the core Core DataFusion crate label Jul 26, 2024
// Test: reports if new reservation causes error
// using the previous set size for other consumers
let mut r5 = MemoryConsumer::new("r5").register(&pool);
let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes";
Copy link
Contributor Author

@wiedld wiedld Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the proposed change in a follow PR, the final error message would read:

Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes

Resources exhausted with top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool

Comment on lines +238 to 244
/// Constructs a resources error based upon the individual [`MemoryReservation`].
///
/// The error references the `bytes already allocated` for the reservation,
/// and not the total within the collective [`MemoryPool`],
/// nor the total across multiple reservations with the same [`MemoryConsumer`].
#[inline(always)]
fn insufficient_capacity_err(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a follow up PR, I would like to iterate on this error message.

Take the original/current:
Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {}

And change into something like:
Failed to allocate additional {} bytes for {} with {} bytes already allocated for this reservation - {} bytes remain available for the total pool

Copy link
Contributor Author

@wiedld wiedld Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This^^ suggestion is because this error message is inherently about only the reservation itself. Each reservation can request bytes from the pool -- but the pool itself only incr/decrements totals for the pool. It does NOT track each reservation, nor if multiple reservations have the same consumer.

As a result, the simplest solution is to: (a) update this error message, and (b) continue to rely upon each reservation to resize itself to zero upon drop (as it does).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of updating the error message in a follow on PR


// Test: see error message when no consumers recorded yet
let mut r0 = MemoryConsumer::new(same_name).register(&pool);
let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100. The top memory consumers (across reservations) are: foo consumed 0 bytes";
Copy link
Contributor Author

@wiedld wiedld Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the proposed change in a follow PR, the final error message would read:

Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool. The top memory consumers (across reservations) are: foo consumed 0 bytes

Resources exhausted with top memory consumers (across reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool

@wiedld wiedld changed the title Provide actionable error messaging due to resource exhuastion. Provide actionable error messaging due to resource exhaustion. Jul 26, 2024
@wiedld wiedld force-pushed the 11523/biggest-memory-consumers branch from 1795da6 to 1b1223f Compare July 26, 2024 19:39
Comment on lines +526 to +527
// API: multiple registrations using the same hashed consumer,
// will be recognized as the same in the TrackConsumersPool.
Copy link
Contributor Author

@wiedld wiedld Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted in the struct docs, the TrackConsumersPool is all about the MemoryConsumer. Therefore the top K consumers are returned, even if a given consumer has multiple reservations. This is intended to make the error messages (and subsequent debugging) more useful by correctly reflecting the aggregated top K.

}

#[test]
fn test_tracked_consumers_pool_deregister() {
Copy link
Contributor Author

@wiedld wiedld Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is about an existing behavior. The current MemoryPool implementations will register/deregister on the MemoryConsumer level; whereas the bytes incr/decrement is on the MemoryReservation level.

This means that a consumer can be deregistered, even while the reservation still holds memory.

The new TrackConsumersPool has nothing to do with this existing behavior. However, this test is added in order to demonstrate what/how the error messages will read.

@wiedld
Copy link
Contributor Author

wiedld commented Jul 29, 2024

@alamb -- I cannot reproduce these test failures locally, even with the same flags & tokio multithreaded tests enabled. Can you help?

@alamb
Copy link
Contributor

alamb commented Jul 29, 2024

@alamb -- I cannot reproduce these test failures locally, even with the same flags & tokio multithreaded tests enabled. Can you help?

I also could not reproduce it with

cargo test --lib  -p datafusion-execution

One way to debug such failures is to add additional debug logging to the tests -- right now the test failure simply says "failed" with a static message, but it doesn't say what the actual result was

For example you could potentially update your asserts like this:

diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs
index 9ce14e41a..0df795a2b 100644
--- a/datafusion/execution/src/memory_pool/pool.rs
+++ b/datafusion/execution/src/memory_pool/pool.rs
@@ -494,12 +494,13 @@ mod tests {
         // using the previously set sizes for other consumers
         let mut r5 = MemoryConsumer::new("r5").register(&pool);
         let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes";
+        let actual_result = r5.try_grow(150);
         assert!(
             matches!(
-                r5.try_grow(150),
+                actual_result,
                 Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected)
             ),
-            "should provide list of top memory consumers"
+            "should provide list of top memory consumers, got {actual_result:?}",
         );
     }

And the message from your assertion failure should be more helpful for debugging

@alamb
Copy link
Contributor

alamb commented Jul 29, 2024

given the tests pass on amd64 maybe the difference is related to newlines or something in the message?

@wiedld
Copy link
Contributor Author

wiedld commented Jul 29, 2024

given the tests pass on amd64 maybe the difference is related to newlines or something in the message?

We have a backtrace inserted into the error message. Specifically, the expected is:
Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes

When running locally, we get that^^ response.
Yet in CI, we get the backtrace:

 should provide proper error with both consumers, instead found Err(ResourcesExhausted(
"Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70
\n\nbacktrace:    0: std::backtrace_rs::backtrace::libunwind::trace\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/../../backtrace/src/backtrace/libunwind.rs:116:5\n   1: std::backtrace_rs::backtrace::trace_unsynchronized\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5\n   2: std::backtrace::Backtrace::create\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/backtrace.rs:331:13\n   3: get_back_trace\n             at /Users/runner/work/datafusion/datafusion/datafusion/common/src/error.rs:391:30\n   4: insufficient_capacity_err\n             at ./src/memory_pool/pool.rs:249:5\n   5: try_grow\n             at ./src/memory_pool/pool.rs:220:32\n   6: try_grow<datafusion_execution::memory_pool::pool::FairSpillPool>\n             at ./src/memory_pool/pool.rs:365:9\n   7: try_grow\n             at ./src/memory_pool/mod.rs:269:9\n   8: test_per_pool_type\n             at ./src/memory_pool/pool.rs:586:23\n   9: test_tracked_consumers_pool_deregister\n             at ./src/memory_pool/pool.rs:639:9\n  10: {closure#0}\n             at ./src/memory_pool/pool.rs:577:48\n  11: call_once<datafusion_execution::memory_pool::pool::tests::test_tracked_consumers_pool_deregister::{closure_env#0}, ()>\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/core/src/ops/function.rs:250:5\n  12: core::ops::function::FnOnce::call_once\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/core/src/ops/function.rs:250:5\n  13: test::__rust_begin_short_backtrace\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/test/src/lib.rs:625:18\n  14: test::run_test_in_process::{{closure}}\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/test/src/lib.rs:648:60\n  15: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/core/src/panic/unwind_safe.rs:272:9\n  16: std::panicking::try::do_call\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/panicking.rs:559:40\n  17: std::panicking::try\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/panicking.rs:523:19\n  18: std::panic::catch_unwind\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/panic.rs:149:14\n  19: test::run_test_in_process\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/test/src/lib.rs:648:27\n  20: test::run_test::{{closure}}\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/test/src/lib.rs:569:43\n  21: test::run_test::{{closure}}\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/test/src/lib.rs:599:41\n  22: std::sys_common::backtrace::__rust_begin_short_backtrace\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/sys_common/backtrace.rs:155:18\n  23: std::thread::Builder::spawn_unchecked_::{{closure}}::{{closure}}\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/thread/mod.rs:542:17\n  24: <core::panic::unwind_safe::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/core/src/panic/unwind_safe.rs:272:9\n  25: std::panicking::try::do_call\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/panicking.rs:559:40\n  26: std::panicking::try\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/panicking.rs:523:19\n  27: std::panic::catch_unwind\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/panic.rs:149:14\n  28: std::thread::Builder::spawn_unchecked_::{{closure}}\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/thread/mod.rs:541:30\n  29: core::ops::function::FnOnce::call_once{{vtable.shim}}\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/core/src/ops/function.rs:250:5\n  30: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/alloc/src/boxed.rs:2063:9\n  31: <alloc::boxed::Box<F,A> as core::ops::function::FnOnce<Args>>::call_once\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/alloc/src/boxed.rs:2063:9\n  32: std::sys::pal::unix::thread::Thread::new::thread_start\n             at /rustc/051478957371ee0084a7c0913941d2a8c4757bb9/library/std/src/sys/pal/unix/thread.rs:108:17\n  33: __pthread_joiner_wake\n
. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"))

This means I need to construct the concatenated error message differently. Fixing...

@wiedld wiedld force-pushed the 11523/biggest-memory-consumers branch from bf07b81 to 09b20d2 Compare July 29, 2024 20:20
@wiedld wiedld force-pushed the 11523/biggest-memory-consumers branch from fdc60de to f75764e Compare July 29, 2024 20:45
@wiedld
Copy link
Contributor Author

wiedld commented Jul 29, 2024

CI was failing due to the concat of error messages. I fixed it two ways -- in order to demonstrate why the 2nd way is better.

@wiedld wiedld marked this pull request as ready for review July 29, 2024 21:33
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @wiedld -- I think this is looking good

I had some comments but overall I think this could be merged as is with the follow ons you identified (improve the messages)

I also think once we have this in, we should consider using this pool as the default (rather than GreedyPool) given its better user experience, but I think it would be good to make that proposal as a follow on PR as well

Comment on lines +238 to 244
/// Constructs a resources error based upon the individual [`MemoryReservation`].
///
/// The error references the `bytes already allocated` for the reservation,
/// and not the total within the collective [`MemoryPool`],
/// nor the total across multiple reservations with the same [`MemoryConsumer`].
#[inline(always)]
fn insufficient_capacity_err(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of updating the error message in a follow on PR

datafusion/execution/src/memory_pool/pool.rs Show resolved Hide resolved
}

/// The top consumers in a report string.
fn report_top(&self) -> String {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since top is only used for this report maybe it should be a parameter to report_top rather than a parameter on the pool 🤔

It seems like someone could want both the "top 10" consumers and "all consumers" from the same tracked pool but with the implementation they could only have one or the other

Is this the message you are proposing to fix in a follow on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of TrackConsumersPool for error reporting (when passed as Arc<dyn MemoryPool>) is constrained by the trait definition. However, we could use the downcasted struct itself for runtime metrics as shown in this added commit. Is this what you were thinking?


// Test: will accumulate size changes per consumer, not per reservation
r1.grow(20);
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat confusing that in the same message foo is reported to have two different allocations:

  1. foo consumed 30 bytes
  2. for foo with 20 bytes already allocated

Is it possible to rationalize the errors somehow make this less confusing?

Copy link
Contributor Author

@wiedld wiedld Jul 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is confusing. Because the error message for foo with 20 bytes already allocated is the error from the MemoryReservation, which has 20 bytes.

Whereas the MemoryConsumer happens to have 2 reservations -- with a total of 30 bytes allotted (across reservations).

I'm hoping the followup PR (with the message change for the reservation-specific error) will make this more clear. Does the below read better?

Resources exhausted with top memory consumers (across reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please feel free to suggest better wording for this^^. 🙏🏼

@alamb alamb changed the title Provide actionable error messaging due to resource exhaustion. Add TrackedMemoryPool that has better error messages on exhaustion Jul 30, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @wiedld -- I think this looks good to me

I will note as follow on tasks

  1. Improving the default error message
  2. Using the TrackConsumersPool as the default memory pool

impl<I: MemoryPool> TrackConsumersPool<I> {
/// Creates a new [`TrackConsumersPool`].
///
/// The `top` determines how many Top K [`MemoryConsumer`]s to include
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@alamb alamb changed the title Add TrackedMemoryPool that has better error messages on exhaustion Add TrackedMemoryPool with better error messages on exhaustion Jul 31, 2024
@alamb alamb merged commit 921c3b6 into apache:main Aug 1, 2024
25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants