Skip to content

Commit

Permalink
fix(11523): fix without splicing error messages, and instead handle t…
Browse files Browse the repository at this point in the history
…he proper error bubbling (msg wrapping)
  • Loading branch information
wiedld committed Jul 29, 2024
1 parent f405795 commit fdc60de
Showing 1 changed file with 32 additions and 27 deletions.
59 changes: 32 additions & 27 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,23 +300,17 @@ impl<I: MemoryPool> TrackConsumersPool<I> {
.collect::<Vec<_>>();
consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering

format!(
"The top memory consumers (across reservations) are: {}",
consumers[0..std::cmp::min(self.top.into(), consumers.len())]
.iter()
.map(|((name, can_spill), size)| {
if self.has_multiple_consumers(name) {
format!(
"{name}(can_spill={}) consumed {:?} bytes",
can_spill, size
)
} else {
format!("{name} consumed {:?} bytes", size)
}
})
.collect::<Vec<_>>()
.join(", ")
)
consumers[0..std::cmp::min(self.top.into(), consumers.len())]
.iter()
.map(|((name, can_spill), size)| {
if self.has_multiple_consumers(name) {
format!("{name}(can_spill={}) consumed {:?} bytes", can_spill, size)
} else {
format!("{name} consumed {:?} bytes", size)
}
})
.collect::<Vec<_>>()
.join(", ")
}
}

Expand Down Expand Up @@ -366,8 +360,12 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
.try_grow(reservation, additional)
.map_err(|e| match e {
DataFusionError::ResourcesExhausted(e) => {
// wrap OOM message in top consumers
DataFusionError::ResourcesExhausted(
e.to_owned() + ". " + &self.report_top(),
provide_top_memory_consumers_to_error_msg(
e.to_owned(),
self.report_top(),
),
)
}
_ => e,
Expand All @@ -387,6 +385,13 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
}
}

fn provide_top_memory_consumers_to_error_msg(
error_msg: String,
top_consumers: String,
) -> String {
format!("Resources exhausted with top memory consumers (across reservations) are: {}. Error: {}", top_consumers, error_msg)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -493,7 +498,7 @@ mod tests {
// Test: reports if new reservation causes error
// using the previously set sizes for other consumers
let mut r5 = MemoryConsumer::new("r5").register(&pool);
let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes";
let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5";
let res = r5.try_grow(150);
assert!(
matches!(
Expand All @@ -516,7 +521,7 @@ mod tests {

// Test: see error message when no consumers recorded yet
let mut r0 = MemoryConsumer::new(same_name).register(&pool);
let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100. The top memory consumers (across reservations) are: foo consumed 0 bytes";
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100";
let res = r0.try_grow(150);
assert!(
matches!(
Expand All @@ -535,7 +540,7 @@ mod tests {
let mut r1 = new_consumer_same_name.clone().register(&pool);
// TODO: the insufficient_capacity_err() message is per reservation, not per consumer.
// a followup PR will clarify this message "0 bytes already allocated for this reservation"
let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90. The top memory consumers (across reservations) are: foo consumed 10 bytes";
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90";
let res = r1.try_grow(150);
assert!(
matches!(
Expand All @@ -547,7 +552,7 @@ mod tests {

// Test: will accumulate size changes per consumer, not per reservation
r1.grow(20);
let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo consumed 30 bytes";
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70";
let res = r1.try_grow(150);
assert!(
matches!(
Expand All @@ -562,7 +567,7 @@ mod tests {
let consumer_with_same_name_but_different_hash =
MemoryConsumer::new(same_name).with_can_spill(true);
let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes";
let expected = "Resources exhausted with top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70";
let res = r2.try_grow(150);
assert!(
matches!(
Expand All @@ -582,7 +587,7 @@ mod tests {
let r1_consumer = MemoryConsumer::new("r1");
let mut r1 = r1_consumer.clone().register(&pool);
r1.grow(20);
let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes";
let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70";
let res = r0.try_grow(150);
assert!(
matches!(
Expand All @@ -596,7 +601,7 @@ mod tests {
// Test: unregister one
// only the remaining one should be listed
pool.unregister(&r1_consumer);
let expected_consumers = "The top memory consumers (across reservations) are: r0 consumed 10 bytes";
let expected_consumers = "Resources exhausted with top memory consumers (across reservations) are: r0 consumed 10 bytes";
let res = r0.try_grow(150);
assert!(
matches!(
Expand All @@ -608,7 +613,7 @@ mod tests {

// Test: actual message we see is the `available is 70`. When it should be `available is 90`.
// This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70.";
let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70";
let res = r0.try_grow(150);
assert!(
matches!(
Expand All @@ -621,7 +626,7 @@ mod tests {
// Test: the registration needs to free itself (or be dropped),
// for the proper error message
r1.free();
let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90.";
let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90";
let res = r0.try_grow(150);
assert!(
matches!(
Expand Down

0 comments on commit fdc60de

Please sign in to comment.