From 921c3b6b181b6175041c6927d38c7ce2c0735121 Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 1 Aug 2024 08:15:58 -0700 Subject: [PATCH] Add `TrackedMemoryPool` with better error messages on exhaustion (#11665) * feat(11523): TrackConsumersPool impl which includes errors messages with top K of consumers * test(11523): unit tests for TrackConsumersPool * test(11523): integration test for tracked consumers oom message * chore(11523): use nonzero usize * chore(11523): document the what the memory insufficient_capacity_err is actually returning * chore(11523): improve test failure coverage for TrackConsumersPool * fix(11523): handle additive tracking of same hashed consumer, across different reservations * refactor(11523): update error message to delineate the multiple consumer with the same name, but different hash * test(11523): demonstrate the underlying pool behavior on deregister * chore: make explicit what the insufficient_capacity_err() logs * fix(11523): remove to_root() for the error, since the immediate inner child should be returning an OOM * chore(11523): add result to logging of failed CI tests * fix(11523): splice error message to get consumers prior to error message * Revert "fix(11523): splice error message to get consumers prior to error message" This reverts commit 09b20d289f53d3b61b976313f8731e8a6711f370. * fix(11523): fix without splicing error messages, and instead handle the proper error bubbling (msg wrapping) * chore: update docs to explain purpose of TrackConsumersPool Co-authored-by: Andrew Lamb * refactor(11523): enable TrackConsumersPool to be used in runtime metrics --------- Co-authored-by: Andrew Lamb --- datafusion/core/tests/memory_limit/mod.rs | 55 ++- datafusion/execution/src/memory_pool/mod.rs | 2 +- datafusion/execution/src/memory_pool/pool.rs | 377 ++++++++++++++++++- 3 files changed, 431 insertions(+), 3 deletions(-) diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index a2bdbe64aa43..5c712af80192 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -26,10 +26,14 @@ use datafusion::assert_batches_eq; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::streaming::PartitionStream; +use datafusion_execution::memory_pool::{ + GreedyMemoryPool, MemoryPool, TrackConsumersPool, +}; use datafusion_expr::{Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; use futures::StreamExt; use std::any::Any; +use std::num::NonZeroUsize; use std::sync::{Arc, OnceLock}; use tokio::fs::File; @@ -371,6 +375,39 @@ async fn oom_parquet_sink() { .await } +#[tokio::test] +async fn oom_with_tracked_consumer_pool() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.into_path().join("test.parquet"); + let _ = File::create(path.clone()).await.unwrap(); + + TestCase::new() + .with_config( + SessionConfig::new() + ) + .with_query(format!( + " + COPY (select * from t) + TO '{}' + STORED AS PARQUET OPTIONS (compression 'uncompressed'); + ", + path.to_string_lossy() + )) + .with_expected_errors(vec![ + "Failed to allocate additional", + "for ParquetSink(ArrowColumnWriter)", + "Resources exhausted with top memory consumers (across reservations) are: ParquetSink(ArrowColumnWriter)" + ]) + .with_memory_pool(Arc::new( + TrackConsumersPool::new( + GreedyMemoryPool::new(200_000), + NonZeroUsize::new(1).unwrap() + ) + )) + .run() + .await +} + /// Run the query with the specified memory limit, /// and verifies the expected errors are returned #[derive(Clone, Debug)] @@ -378,6 +415,7 @@ struct TestCase { query: Option, expected_errors: Vec, memory_limit: usize, + memory_pool: Option>, config: SessionConfig, scenario: Scenario, /// How should the disk manager (that allows spilling) be @@ -396,6 +434,7 @@ impl TestCase { expected_errors: vec![], memory_limit: 0, config: SessionConfig::new(), + memory_pool: None, scenario: Scenario::AccessLog, disk_manager_config: DiskManagerConfig::Disabled, expected_plan: vec![], @@ -425,6 +464,15 @@ impl TestCase { self } + /// Set the memory pool to be used + /// + /// This will override the memory_limit requested, + /// as the memory pool includes the limit. + fn with_memory_pool(mut self, memory_pool: Arc) -> Self { + self.memory_pool = Some(memory_pool); + self + } + /// Specify the configuration to use pub fn with_config(mut self, config: SessionConfig) -> Self { self.config = config; @@ -465,6 +513,7 @@ impl TestCase { query, expected_errors, memory_limit, + memory_pool, config, scenario, disk_manager_config, @@ -474,11 +523,15 @@ impl TestCase { let table = scenario.table(); - let rt_config = RuntimeConfig::new() + let mut rt_config = RuntimeConfig::new() // disk manager setting controls the spilling .with_disk_manager(disk_manager_config) .with_memory_limit(memory_limit, MEMORY_FRACTION); + if let Some(pool) = memory_pool { + rt_config = rt_config.with_memory_pool(pool); + }; + let runtime = RuntimeEnv::new(rt_config).unwrap(); // Configure execution diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index 3df212d466c9..dcd59acbd49e 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -117,7 +117,7 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug { /// For help with allocation accounting, see the [proxy] module. /// /// [proxy]: crate::memory_pool::proxy -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct MemoryConsumer { name: String, can_spill: bool, diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index fd7724f3076c..9cb6f207e59c 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -17,9 +17,13 @@ use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation}; use datafusion_common::{resources_datafusion_err, DataFusionError, Result}; +use hashbrown::HashMap; use log::debug; use parking_lot::Mutex; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::{ + num::NonZeroUsize, + sync::atomic::{AtomicU64, AtomicUsize, Ordering}, +}; /// A [`MemoryPool`] that enforces no limit #[derive(Debug, Default)] @@ -231,6 +235,11 @@ impl MemoryPool for FairSpillPool { } } +/// Constructs a resources error based upon the individual [`MemoryReservation`]. +/// +/// The error references the `bytes already allocated` for the reservation, +/// and not the total within the collective [`MemoryPool`], +/// nor the total across multiple reservations with the same [`MemoryConsumer`]. #[inline(always)] fn insufficient_capacity_err( reservation: &MemoryReservation, @@ -240,6 +249,152 @@ fn insufficient_capacity_err( resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated - maximum available is {}", additional, reservation.registration.consumer.name, reservation.size, available) } +/// A [`MemoryPool`] that tracks the consumers that have +/// reserved memory within the inner memory pool. +/// +/// By tracking memory reservations more carefully this pool +/// can provide better error messages on the largest memory users +/// +/// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`]. +/// The same consumer can have multiple reservations. +#[derive(Debug)] +pub struct TrackConsumersPool { + inner: I, + top: NonZeroUsize, + tracked_consumers: Mutex>, +} + +impl TrackConsumersPool { + /// Creates a new [`TrackConsumersPool`]. + /// + /// The `top` determines how many Top K [`MemoryConsumer`]s to include + /// in the reported [`DataFusionError::ResourcesExhausted`]. + pub fn new(inner: I, top: NonZeroUsize) -> Self { + Self { + inner, + top, + tracked_consumers: Default::default(), + } + } + + /// Determine if there are multiple [`MemoryConsumer`]s registered + /// which have the same name. + /// + /// This is very tied to the implementation of the memory consumer. + fn has_multiple_consumers(&self, name: &String) -> bool { + let consumer = MemoryConsumer::new(name); + let consumer_with_spill = consumer.clone().with_can_spill(true); + let guard = self.tracked_consumers.lock(); + guard.contains_key(&consumer) && guard.contains_key(&consumer_with_spill) + } + + /// The top consumers in a report string. + pub fn report_top(&self, top: usize) -> String { + let mut consumers = self + .tracked_consumers + .lock() + .iter() + .map(|(consumer, reserved)| { + ( + (consumer.name().to_owned(), consumer.can_spill()), + reserved.load(Ordering::Acquire), + ) + }) + .collect::>(); + consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering + + consumers[0..std::cmp::min(top, consumers.len())] + .iter() + .map(|((name, can_spill), size)| { + if self.has_multiple_consumers(name) { + format!("{name}(can_spill={}) consumed {:?} bytes", can_spill, size) + } else { + format!("{name} consumed {:?} bytes", size) + } + }) + .collect::>() + .join(", ") + } +} + +impl MemoryPool for TrackConsumersPool { + fn register(&self, consumer: &MemoryConsumer) { + self.inner.register(consumer); + + let mut guard = self.tracked_consumers.lock(); + if let Some(already_reserved) = guard.insert(consumer.clone(), Default::default()) + { + guard.entry_ref(consumer).and_modify(|bytes| { + bytes.fetch_add( + already_reserved.load(Ordering::Acquire), + Ordering::AcqRel, + ); + }); + } + } + + fn unregister(&self, consumer: &MemoryConsumer) { + self.inner.unregister(consumer); + self.tracked_consumers.lock().remove(consumer); + } + + fn grow(&self, reservation: &MemoryReservation, additional: usize) { + self.inner.grow(reservation, additional); + self.tracked_consumers + .lock() + .entry_ref(reservation.consumer()) + .and_modify(|bytes| { + bytes.fetch_add(additional as u64, Ordering::AcqRel); + }); + } + + fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { + self.inner.shrink(reservation, shrink); + self.tracked_consumers + .lock() + .entry_ref(reservation.consumer()) + .and_modify(|bytes| { + bytes.fetch_sub(shrink as u64, Ordering::AcqRel); + }); + } + + fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { + self.inner + .try_grow(reservation, additional) + .map_err(|e| match e { + DataFusionError::ResourcesExhausted(e) => { + // wrap OOM message in top consumers + DataFusionError::ResourcesExhausted( + provide_top_memory_consumers_to_error_msg( + e.to_owned(), + self.report_top(self.top.into()), + ), + ) + } + _ => e, + })?; + + self.tracked_consumers + .lock() + .entry_ref(reservation.consumer()) + .and_modify(|bytes| { + bytes.fetch_add(additional as u64, Ordering::AcqRel); + }); + Ok(()) + } + + fn reserved(&self) -> usize { + self.inner.reserved() + } +} + +fn provide_top_memory_consumers_to_error_msg( + error_msg: String, + top_consumers: String, +) -> String { + format!("Resources exhausted with top memory consumers (across reservations) are: {}. Error: {}", top_consumers, error_msg) +} + #[cfg(test)] mod tests { use super::*; @@ -311,4 +466,224 @@ mod tests { let err = r4.try_grow(30).unwrap_err().strip_backtrace(); assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated - maximum available is 20"); } + + #[test] + fn test_tracked_consumers_pool() { + let pool: Arc = Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); + + // Test: use all the different interfaces to change reservation size + + // set r1=50, using grow and shrink + let mut r1 = MemoryConsumer::new("r1").register(&pool); + r1.grow(70); + r1.shrink(20); + + // set r2=15 using try_grow + let mut r2 = MemoryConsumer::new("r2").register(&pool); + r2.try_grow(15) + .expect("should succeed in memory allotment for r2"); + + // set r3=20 using try_resize + let mut r3 = MemoryConsumer::new("r3").register(&pool); + r3.try_resize(25) + .expect("should succeed in memory allotment for r3"); + r3.try_resize(20) + .expect("should succeed in memory allotment for r3"); + + // set r4=10 + // this should not be reported in top 3 + let mut r4 = MemoryConsumer::new("r4").register(&pool); + r4.grow(10); + + // Test: reports if new reservation causes error + // using the previously set sizes for other consumers + let mut r5 = MemoryConsumer::new("r5").register(&pool); + let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated - maximum available is 5"; + let res = r5.try_grow(150); + assert!( + matches!( + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) + ), + "should provide list of top memory consumers, instead found {:?}", + res + ); + } + + #[test] + fn test_tracked_consumers_pool_register() { + let pool: Arc = Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); + + let same_name = "foo"; + + // Test: see error message when no consumers recorded yet + let mut r0 = MemoryConsumer::new(same_name).register(&pool); + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 100"; + let res = r0.try_grow(150); + assert!( + matches!( + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) + ), + "should provide proper error when no reservations have been made yet, instead found {:?}", res + ); + + // API: multiple registrations using the same hashed consumer, + // will be recognized as the same in the TrackConsumersPool. + + // Test: will be the same per Top Consumers reported. + r0.grow(10); // make r0=10, pool available=90 + let new_consumer_same_name = MemoryConsumer::new(same_name); + let mut r1 = new_consumer_same_name.clone().register(&pool); + // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. + // a followup PR will clarify this message "0 bytes already allocated for this reservation" + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 90"; + let res = r1.try_grow(150); + assert!( + matches!( + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) + ), + "should provide proper error with same hashed consumer (a single foo=10 bytes, available=90), instead found {:?}", res + ); + + // Test: will accumulate size changes per consumer, not per reservation + r1.grow(20); + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated - maximum available is 70"; + let res = r1.try_grow(150); + assert!( + matches!( + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) + ), + "should provide proper error with same hashed consumer (a single foo=30 bytes, available=70), instead found {:?}", res + ); + + // Test: different hashed consumer, (even with the same name), + // will be recognized as different in the TrackConsumersPool + let consumer_with_same_name_but_different_hash = + MemoryConsumer::new(same_name).with_can_spill(true); + let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); + let expected = "Resources exhausted with top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated - maximum available is 70"; + let res = r2.try_grow(150); + assert!( + matches!( + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) + ), + "should provide proper error with different hashed consumer (foo(can_spill=false)=30 bytes and foo(can_spill=true)=0 bytes, available=70), instead found {:?}", res + ); + } + + #[test] + fn test_tracked_consumers_pool_deregister() { + fn test_per_pool_type(pool: Arc) { + // Baseline: see the 2 memory consumers + let mut r0 = MemoryConsumer::new("r0").register(&pool); + r0.grow(10); + let r1_consumer = MemoryConsumer::new("r1"); + let mut r1 = r1_consumer.clone().register(&pool); + r1.grow(20); + let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70"; + let res = r0.try_grow(150); + assert!( + matches!( + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected) + ), + "should provide proper error with both consumers, instead found {:?}", + res + ); + + // Test: unregister one + // only the remaining one should be listed + pool.unregister(&r1_consumer); + let expected_consumers = "Resources exhausted with top memory consumers (across reservations) are: r0 consumed 10 bytes"; + let res = r0.try_grow(150); + assert!( + matches!( + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_consumers) + ), + "should provide proper error with only 1 consumer left registered, instead found {:?}", res + ); + + // Test: actual message we see is the `available is 70`. When it should be `available is 90`. + // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister(). + let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70"; + let res = r0.try_grow(150); + assert!( + matches!( + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_70_available) + ), + "should find that the inner pool will still count all bytes for the deregistered consumer until the reservation is dropped, instead found {:?}", res + ); + + // Test: the registration needs to free itself (or be dropped), + // for the proper error message + r1.free(); + let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90"; + let res = r0.try_grow(150); + assert!( + matches!( + &res, + Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_90_available) + ), + "should correctly account the total bytes after reservation is free, instead found {:?}", res + ); + } + + let tracked_spill_pool: Arc = Arc::new(TrackConsumersPool::new( + FairSpillPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); + test_per_pool_type(tracked_spill_pool); + + let tracked_greedy_pool: Arc = Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); + test_per_pool_type(tracked_greedy_pool); + } + + #[test] + fn test_tracked_consumers_pool_use_beyond_errors() { + let upcasted: Arc = + Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(100), + NonZeroUsize::new(3).unwrap(), + )); + let pool: Arc = Arc::clone(&upcasted) + .downcast::>() + .unwrap(); + // set r1=20 + let mut r1 = MemoryConsumer::new("r1").register(&pool); + r1.grow(20); + // set r2=15 + let mut r2 = MemoryConsumer::new("r2").register(&pool); + r2.grow(15); + // set r3=45 + let mut r3 = MemoryConsumer::new("r3").register(&pool); + r3.grow(45); + + let downcasted = upcasted + .downcast::>() + .unwrap(); + + // Test: can get runtime metrics, even without an error thrown + let expected = "r3 consumed 45 bytes, r1 consumed 20 bytes"; + let res = downcasted.report_top(2); + assert_eq!( + res, expected, + "should provide list of top memory consumers, instead found {:?}", + res + ); + } }