diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 7263f4707b4a..375d5cbe2d84 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -246,7 +246,7 @@ fn insufficient_capacity_err( additional: usize, available: usize, ) -> DataFusionError { - resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {} for the total pool", additional, reservation.registration.consumer.name, reservation.size, available) + resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {}", additional, reservation.registration.consumer.name, reservation.size, available) } /// A [`MemoryPool`] that tracks the consumers that have @@ -367,10 +367,7 @@ impl MemoryPool for TrackConsumersPool { .map_err(|e| match e { DataFusionError::ResourcesExhausted(e) => { DataFusionError::ResourcesExhausted( - parse_error_message_and_insert_top_consumers( - e.to_owned(), - self.report_top(), - ), + e.to_owned() + ". " + &self.report_top(), ) } _ => e, @@ -390,19 +387,6 @@ impl MemoryPool for TrackConsumersPool { } } -/// This is very tied to the implementation of [`insufficient_capacity_err`]. -fn parse_error_message_and_insert_top_consumers( - mut error_msg: String, - top_consumers: String, -) -> String { - let end_of_oom_error = error_msg - .find("for the total pool") - .expect("should have OOM error") - + "for the total pool".len(); - error_msg.insert_str(end_of_oom_error, &format!(". {}", top_consumers)); - error_msg -} - #[cfg(test)] mod tests { use super::*; @@ -426,10 +410,10 @@ mod tests { assert_eq!(pool.reserved(), 4000); let err = r2.try_grow(1).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0 for the total pool"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0"); let err = r2.try_grow(1).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0 for the total pool"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated - maximum available is 0"); r1.shrink(1990); r2.shrink(2000); @@ -454,12 +438,12 @@ mod tests { .register(&pool); let err = r3.try_grow(70).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40 for the total pool"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40"); //Shrinking r2 to zero doesn't allow a3 to allocate more than 45 r2.free(); let err = r3.try_grow(70).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40 for the total pool"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated - maximum available is 40"); // But dropping r2 does drop(r2); @@ -472,7 +456,7 @@ mod tests { let mut r4 = MemoryConsumer::new("s4").register(&pool); let err = r4.try_grow(30).unwrap_err().strip_backtrace(); - assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20 for the total pool"); + assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20"); } #[test] @@ -509,7 +493,7 @@ mod tests { // Test: reports if new reservation causes error // using the previously set sizes for other consumers let mut r5 = MemoryConsumer::new("r5").register(&pool); - let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5 for the total pool. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; + let expected = "Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5. The top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes"; let res = r5.try_grow(150); assert!( matches!( @@ -532,7 +516,7 @@ mod tests { // Test: see error message when no consumers recorded yet let mut r0 = MemoryConsumer::new(same_name).register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100 for the total pool. The top memory consumers (across reservations) are: foo consumed 0 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100. The top memory consumers (across reservations) are: foo consumed 0 bytes"; let res = r0.try_grow(150); assert!( matches!( @@ -551,7 +535,7 @@ mod tests { let mut r1 = new_consumer_same_name.clone().register(&pool); // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. // a followup PR will clarify this message "0 bytes already allocated for this reservation" - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90 for the total pool. The top memory consumers (across reservations) are: foo consumed 10 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90. The top memory consumers (across reservations) are: foo consumed 10 bytes"; let res = r1.try_grow(150); assert!( matches!( @@ -563,7 +547,7 @@ mod tests { // Test: will accumulate size changes per consumer, not per reservation r1.grow(20); - let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70 for the total pool. The top memory consumers (across reservations) are: foo consumed 30 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo consumed 30 bytes"; let res = r1.try_grow(150); assert!( matches!( @@ -578,7 +562,7 @@ mod tests { let consumer_with_same_name_but_different_hash = MemoryConsumer::new(same_name).with_can_spill(true); let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); - let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70 for the total pool. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes"; + let expected = "Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes"; let res = r2.try_grow(150); assert!( matches!( @@ -598,7 +582,7 @@ mod tests { let r1_consumer = MemoryConsumer::new("r1"); let mut r1 = r1_consumer.clone().register(&pool); r1.grow(20); - let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70 for the total pool. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"; + let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes"; let res = r0.try_grow(150); assert!( matches!( @@ -624,7 +608,7 @@ mod tests { // Test: actual message we see is the `available is 70`. When it should be `available is 90`. // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister(). - let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70 for the total pool"; + let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70."; let res = r0.try_grow(150); assert!( matches!( @@ -637,7 +621,7 @@ mod tests { // Test: the registration needs to free itself (or be dropped), // for the proper error message r1.free(); - let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90 for the total pool"; + let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90."; let res = r0.try_grow(150); assert!( matches!(