diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 2217d4e80dfa..9eaa375592ba 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -189,15 +189,14 @@ pub struct FairSpillPool { } #[derive(Debug)] -struct FairSpillPoolState { - /// The number of consumers that can spill - num_spill: usize, - - /// The total amount of memory reserved that can be spilled - spillable: usize, +struct FairSpillPoolMember { + used: usize, + can_spill: bool, +} - /// The total amount of memory reserved by consumers that cannot spill - unspillable: usize, +#[derive(Debug)] +struct FairSpillPoolState { + pool_members: HashMap, } impl FairSpillPool { @@ -207,9 +206,7 @@ impl FairSpillPool { Self { pool_size, state: Mutex::new(FairSpillPoolState { - num_spill: 0, - spillable: 0, - unspillable: 0, + pool_members: HashMap::new(), }), } } @@ -217,32 +214,45 @@ impl FairSpillPool { impl MemoryPool for FairSpillPool { fn register(&self, consumer: &MemoryConsumer) { - if consumer.can_spill { - self.state.lock().num_spill += 1; - } + let mut state = self.state.lock(); + state.pool_members.insert( + consumer.name().into(), + FairSpillPoolMember { + used: 0, + can_spill: consumer.can_spill, + }, + ); } fn unregister(&self, consumer: &MemoryConsumer) { - if consumer.can_spill { - let mut state = self.state.lock(); - state.num_spill = state.num_spill.checked_sub(1).unwrap(); - } + let mut state = self.state.lock(); + state.pool_members.remove(consumer.name()); } fn grow(&self, reservation: &MemoryReservation, additional: usize) { let mut state = self.state.lock(); - match reservation.registration.consumer.can_spill { - true => state.spillable += additional, - false => state.unspillable += additional, - } + let member_state = state + .pool_members + .entry(reservation.consumer().name().into()) + .or_insert(FairSpillPoolMember { + used: 0, + can_spill: reservation.registration.consumer.can_spill, + }); + + member_state.used = member_state.used.saturating_add(additional); } fn shrink(&self, reservation: &MemoryReservation, shrink: usize) { let mut state = self.state.lock(); - match reservation.registration.consumer.can_spill { - true => state.spillable -= shrink, - false => state.unspillable -= shrink, - } + let member_state = state + .pool_members + .entry(reservation.consumer().name().into()) + .or_insert(FairSpillPoolMember { + used: 0, + can_spill: reservation.registration.consumer.can_spill, + }); + + member_state.used = member_state.used.saturating_sub(shrink); } fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { @@ -250,36 +260,69 @@ impl MemoryPool for FairSpillPool { match reservation.registration.consumer.can_spill { true => { + let unspillable: usize = state + .pool_members + .values() + .filter_map(|e| (!e.can_spill).then_some(e.used)) + .sum(); + // The total amount of memory available to spilling consumers - let spill_available = self.pool_size.saturating_sub(state.unspillable); + let spill_available = self.pool_size.saturating_sub(unspillable); + + let num_spill = + state.pool_members.values().filter(|e| e.can_spill).count(); // No spiller may use more than their fraction of the memory available let available = spill_available - .checked_div(state.num_spill) + .checked_div(num_spill) .unwrap_or(spill_available); if reservation.size + additional > available { + // dropping the mutex so that the display trait method does not deadlock + drop(state); + + debug!("Pool Exhausted while trying to allocate {additional} bytes for {}:\n{self}", reservation.registration.consumer.name()); return Err(insufficient_capacity_err( reservation, additional, available, )); } - state.spillable += additional; + + let entry = state + .pool_members + .entry(reservation.consumer().name().into()) + .or_insert(FairSpillPoolMember { + used: 0, + can_spill: reservation.registration.consumer.can_spill, + }); + + entry.used = entry.used.saturating_add(additional); } false => { - let available = self - .pool_size - .saturating_sub(state.unspillable + state.spillable); + let total_used: usize = state.pool_members.values().map(|e| e.used).sum(); + let available = self.pool_size.saturating_sub(total_used); if available < additional { + drop(state); + + debug!("Pool Exhausted while trying to allocate {additional} bytes for {}:\n{self}", reservation.registration.consumer.name()); return Err(insufficient_capacity_err( reservation, additional, available, )); } - state.unspillable += additional; + + let entry = state + .pool_members + .entry(reservation.consumer().name().into()) + .or_insert(FairSpillPoolMember { + used: 0, + can_spill: reservation.registration.consumer.can_spill, + }); + + entry.used = entry.used.saturating_add(additional); } } Ok(()) @@ -287,7 +330,60 @@ impl MemoryPool for FairSpillPool { fn reserved(&self) -> usize { let state = self.state.lock(); - state.spillable + state.unspillable + state + .pool_members + .values() + .map(|FairSpillPoolMember { used, .. }| used) + .sum() + } +} + +impl Display for FairSpillPool { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let state = self.state.lock(); + let mut num_spill = 0; + let mut unspillable_memory = 0; + let mut spillable_memory = 0; + + for member in state.pool_members.values() { + match member.can_spill { + true => { + num_spill += 1; + spillable_memory += member.used + } + false => unspillable_memory += member.used, + } + } + let free = self + .pool_size + .saturating_sub(unspillable_memory + spillable_memory); + + let mut allocations = state.pool_members.iter().collect::>(); + allocations.sort_by(|(_, a), (_, b)| a.used.cmp(&b.used).reverse()); + + let allocation_report = allocations.iter().fold( + "".to_string(), + |acc, (member, FairSpillPoolMember { can_spill, used })| { + let can_spill = if *can_spill { + "spillable" + } else { + "unspillable" + }; + format!("{acc}\t{used}: [{can_spill}] {member}\n") + }, + ); + + write!( + f, + "FairSpillPool {} allocations, {} spillable used, {} total spillable, {} unspillable used, {} free, {} capacity\n{}", + state.pool_members.len(), + spillable_memory, + num_spill, + unspillable_memory, + free, + self.pool_size, + allocation_report + ) } }