From 09b20d289f53d3b61b976313f8731e8a6711f370 Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 29 Jul 2024 12:49:09 -0700 Subject: [PATCH] fix(11523): splice error message to get consumers prior to error message --- datafusion/execution/src/memory_pool/pool.rs | 46 +++++++++++++------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 375d5cbe2d84..7263f4707b4a 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -246,7 +246,7 @@ fn insufficient_capacity_err( additional: usize, available: usize, ) -> DataFusionError { - resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {}", additional, reservation.registration.consumer.name, reservation.size, available) + resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {} for the total pool", additional, reservation.registration.consumer.name, reservation.size, available) } /// A [`MemoryPool`] that tracks the consumers that have @@ -367,7 +367,10 @@ impl MemoryPool for TrackConsumersPool { .map_err(|e| match e { DataFusionError::ResourcesExhausted(e) => { DataFusionError::ResourcesExhausted( - e.to_owned() + ". " + &self.report_top(), + parse_error_message_and_insert_top_consumers( + e.to_owned(), + self.report_top(), + ), ) } _ => e, @@ -387,6 +390,19 @@ impl MemoryPool for TrackConsumersPool { } } +/// This is very tied to the implementation of [`insufficient_capacity_err`]. +fn parse_error_message_and_insert_top_consumers( + mut error_msg: String, + top_consumers: String, +) -> String { + let end_of_oom_error = error_msg + .find("for the total pool") + .expect("should have OOM error") + + "for the total pool".len(); + error_msg.insert_str(end_of_oom_error, &format!(". {}", top_consumers)); + error_msg +} + #[cfg(test)] mod tests { use super::*; @@ -410,10 +426,10 @@ mod tests { assert_eq!(pool.reserved(), 4000); let err = r2.try_grow(1).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0 for the total pool"); let err = r2.try_grow(1).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0 for the total pool"); r1.shrink(1990); r2.shrink(2000); @@ -438,12 +454,12 @@ mod tests { .register(&pool); let err = r3.try_grow(70).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40 for the total pool"); //Shrinking r2 to zero doesn't allow a3 to allocate more than 45 r2.free(); let err = r3.try_grow(70).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40 for the total pool"); // But dropping r2 does drop(r2); @@ -456,7 +472,7 @@ mod tests { let mut r4 = MemoryConsumer::new("s4").register(&pool); let err = r4.try_grow(30).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20 for the total pool"); } #[test] @@ -493,7 +509,7 @@ mod tests { // Test: reports if new reservation causes error // using the previously set sizes for other consumers let mut r5 = MemoryConsumer::new("r5").register(&pool); - let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; + let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5 for the total pool. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; let res = r5.try_grow(150); assert!( matches!( @@ -516,7 +532,7 @@ mod tests { // Test: see error message when no consumers recorded yet let mut r0 = MemoryConsumer::new(same_name).register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100. The top memory consumers (across reservations) are: foo consumed 0 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100 for the total pool. The top memory consumers (across reservations) are: foo consumed 0 bytes"; let res = r0.try_grow(150); assert!( matches!( @@ -535,7 +551,7 @@ mod tests { let mut r1 = new_consumer_same_name.clone().register(&pool); // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. // a followup PR will clarify this message "0 bytes already allocated for this reservation" - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90. The top memory consumers (across reservations) are: foo consumed 10 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90 for the total pool. The top memory consumers (across reservations) are: foo consumed 10 bytes"; let res = r1.try_grow(150); assert!( matches!( @@ -547,7 +563,7 @@ mod tests { // Test: will accumulate size changes per consumer, not per reservation r1.grow(20); - let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo consumed 30 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70 for the total pool. The top memory consumers (across reservations) are: foo consumed 30 bytes"; let res = r1.try_grow(150); assert!( matches!( @@ -562,7 +578,7 @@ mod tests { let consumer_with_same_name_but_different_hash = MemoryConsumer::new(same_name).with_can_spill(true); let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70 for the total pool. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes"; let res = r2.try_grow(150); assert!( matches!( @@ -582,7 +598,7 @@ mod tests { let r1_consumer = MemoryConsumer::new("r1"); let mut r1 = r1_consumer.clone().register(&pool); r1.grow(20); - let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"; + let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70 for the total pool. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"; let res = r0.try_grow(150); assert!( matches!( @@ -608,7 +624,7 @@ mod tests { // Test: actual message we see is the `available is 70`. When it should be `available is 90`. // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister(). - let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70."; + let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70 for the total pool"; let res = r0.try_grow(150); assert!( matches!( @@ -621,7 +637,7 @@ mod tests { // Test: the registration needs to free itself (or be dropped), // for the proper error message r1.free(); - let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90."; + let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90 for the total pool"; let res = r0.try_grow(150); assert!( matches!(