From fdc60de47f7df5c37116864086b38c9b14cf47ea Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 29 Jul 2024 13:37:36 -0700 Subject: [PATCH] fix(11523): fix without splicing error messages, and instead handle the proper error bubbling (msg wrapping) --- datafusion/execution/src/memory_pool/pool.rs | 59 +++++++++++--------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 375d5cbe2d841..f8c4772843b4b 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -300,23 +300,17 @@ impl TrackConsumersPool { .collect::>(); consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering - format!( - "The top memory consumers (across reservations) are: {}", - consumers[0..std::cmp::min(self.top.into(), consumers.len())] - .iter() - .map(|((name, can_spill), size)| { - if self.has_multiple_consumers(name) { - format!( - "{name}(can_spill={}) consumed {:?} bytes", - can_spill, size - ) - } else { - format!("{name} consumed {:?} bytes", size) - } - }) - .collect::>() - .join(", ") - ) + consumers[0..std::cmp::min(self.top.into(), consumers.len())] + .iter() + .map(|((name, can_spill), size)| { + if self.has_multiple_consumers(name) { + format!("{name}(can_spill={}) consumed {:?} bytes", can_spill, size) + } else { + format!("{name} consumed {:?} bytes", size) + } + }) + .collect::>() + .join(", ") } } @@ -366,8 +360,12 @@ impl MemoryPool for TrackConsumersPool { .try_grow(reservation, additional) .map_err(|e| match e { DataFusionError::ResourcesExhausted(e) => { + // wrap OOM message in top consumers DataFusionError::ResourcesExhausted( - e.to_owned() + ". " + &self.report_top(), + provide_top_memory_consumers_to_error_msg( + e.to_owned(), + self.report_top(), + ), ) } _ => e, @@ -387,6 +385,13 @@ impl MemoryPool for TrackConsumersPool { } } +fn provide_top_memory_consumers_to_error_msg( + error_msg: String, + top_consumers: String, +) -> String { + format!("Resources exhausted with top memory consumers (across reservations) are: {}. Error: {}", top_consumers, error_msg) +} + #[cfg(test)] mod tests { use super::*; @@ -493,7 +498,7 @@ mod tests { // Test: reports if new reservation causes error // using the previously set sizes for other consumers let mut r5 = MemoryConsumer::new("r5").register(&pool); - let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; + let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5"; let res = r5.try_grow(150); assert!( matches!( @@ -516,7 +521,7 @@ mod tests { // Test: see error message when no consumers recorded yet let mut r0 = MemoryConsumer::new(same_name).register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100. The top memory consumers (across reservations) are: foo consumed 0 bytes"; + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100"; let res = r0.try_grow(150); assert!( matches!( @@ -535,7 +540,7 @@ mod tests { let mut r1 = new_consumer_same_name.clone().register(&pool); // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. // a followup PR will clarify this message "0 bytes already allocated for this reservation" - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90. The top memory consumers (across reservations) are: foo consumed 10 bytes"; + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90"; let res = r1.try_grow(150); assert!( matches!( @@ -547,7 +552,7 @@ mod tests { // Test: will accumulate size changes per consumer, not per reservation r1.grow(20); - let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo consumed 30 bytes"; + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70"; let res = r1.try_grow(150); assert!( matches!( @@ -562,7 +567,7 @@ mod tests { let consumer_with_same_name_but_different_hash = MemoryConsumer::new(same_name).with_can_spill(true); let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes"; + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70"; let res = r2.try_grow(150); assert!( matches!( @@ -582,7 +587,7 @@ mod tests { let r1_consumer = MemoryConsumer::new("r1"); let mut r1 = r1_consumer.clone().register(&pool); r1.grow(20); - let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"; + let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70"; let res = r0.try_grow(150); assert!( matches!( @@ -596,7 +601,7 @@ mod tests { // Test: unregister one // only the remaining one should be listed pool.unregister(&r1_consumer); - let expected_consumers = "The top memory consumers (across reservations) are: r0 consumed 10 bytes"; + let expected_consumers = "Resources exhausted with top memory consumers (across reservations) are: r0 consumed 10 bytes"; let res = r0.try_grow(150); assert!( matches!( @@ -608,7 +613,7 @@ mod tests { // Test: actual message we see is the `available is 70`. When it should be `available is 90`. // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister(). - let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70."; + let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70"; let res = r0.try_grow(150); assert!( matches!( @@ -621,7 +626,7 @@ mod tests { // Test: the registration needs to free itself (or be dropped), // for the proper error message r1.free(); - let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90."; + let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90"; let res = r0.try_grow(150); assert!( matches!(