Skip to content

Commit

Permalink
feat: track member consumers for FairSpillPool
Browse files Browse the repository at this point in the history
  • Loading branch information
asimsedhain committed Jan 29, 2024
1 parent ed04f35 commit 76ce658
Showing 1 changed file with 130 additions and 34 deletions.
164 changes: 130 additions & 34 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,14 @@ pub struct FairSpillPool {
}

#[derive(Debug)]
struct FairSpillPoolState {
/// The number of consumers that can spill
num_spill: usize,

/// The total amount of memory reserved that can be spilled
spillable: usize,
struct FairSpillPoolMember {
used: usize,
can_spill: bool,
}

/// The total amount of memory reserved by consumers that cannot spill
unspillable: usize,
#[derive(Debug)]
struct FairSpillPoolState {
pool_members: HashMap<String, FairSpillPoolMember>,
}

impl FairSpillPool {
Expand All @@ -207,87 +206,184 @@ impl FairSpillPool {
Self {
pool_size,
state: Mutex::new(FairSpillPoolState {
num_spill: 0,
spillable: 0,
unspillable: 0,
pool_members: HashMap::new(),
}),
}
}
}

impl MemoryPool for FairSpillPool {
fn register(&self, consumer: &MemoryConsumer) {
if consumer.can_spill {
self.state.lock().num_spill += 1;
}
let mut state = self.state.lock();
state.pool_members.insert(
consumer.name().into(),
FairSpillPoolMember {
used: 0,
can_spill: consumer.can_spill,
},
);
}

fn unregister(&self, consumer: &MemoryConsumer) {
if consumer.can_spill {
let mut state = self.state.lock();
state.num_spill = state.num_spill.checked_sub(1).unwrap();
}
let mut state = self.state.lock();
state.pool_members.remove(consumer.name());
}

fn grow(&self, reservation: &MemoryReservation, additional: usize) {
let mut state = self.state.lock();
match reservation.registration.consumer.can_spill {
true => state.spillable += additional,
false => state.unspillable += additional,
}
let member_state = state
.pool_members
.entry(reservation.consumer().name().into())
.or_insert(FairSpillPoolMember {
used: 0,
can_spill: reservation.registration.consumer.can_spill,
});

member_state.used = member_state.used.saturating_add(additional);
}

fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
let mut state = self.state.lock();
match reservation.registration.consumer.can_spill {
true => state.spillable -= shrink,
false => state.unspillable -= shrink,
}
let member_state = state
.pool_members
.entry(reservation.consumer().name().into())
.or_insert(FairSpillPoolMember {
used: 0,
can_spill: reservation.registration.consumer.can_spill,
});

member_state.used = member_state.used.saturating_sub(shrink);
}

fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
let mut state = self.state.lock();

match reservation.registration.consumer.can_spill {
true => {
let unspillable: usize = state
.pool_members
.values()
.filter_map(|e| (!e.can_spill).then_some(e.used))
.sum();

// The total amount of memory available to spilling consumers
let spill_available = self.pool_size.saturating_sub(state.unspillable);
let spill_available = self.pool_size.saturating_sub(unspillable);

let num_spill =
state.pool_members.values().filter(|e| e.can_spill).count();

// No spiller may use more than their fraction of the memory available
let available = spill_available
.checked_div(state.num_spill)
.checked_div(num_spill)
.unwrap_or(spill_available);

if reservation.size + additional > available {
// dropping the mutex so that the display trait method does not deadlock
drop(state);

debug!("Pool Exhausted while trying to allocate {additional} bytes for {}:\n{self}", reservation.registration.consumer.name());
return Err(insufficient_capacity_err(
reservation,
additional,
available,
));
}
state.spillable += additional;

let entry = state
.pool_members
.entry(reservation.consumer().name().into())
.or_insert(FairSpillPoolMember {
used: 0,
can_spill: reservation.registration.consumer.can_spill,
});

entry.used = entry.used.saturating_add(additional);
}
false => {
let available = self
.pool_size
.saturating_sub(state.unspillable + state.spillable);
let total_used: usize = state.pool_members.values().map(|e| e.used).sum();
let available = self.pool_size.saturating_sub(total_used);

if available < additional {
drop(state);

debug!("Pool Exhausted while trying to allocate {additional} bytes for {}:\n{self}", reservation.registration.consumer.name());
return Err(insufficient_capacity_err(
reservation,
additional,
available,
));
}
state.unspillable += additional;

let entry = state
.pool_members
.entry(reservation.consumer().name().into())
.or_insert(FairSpillPoolMember {
used: 0,
can_spill: reservation.registration.consumer.can_spill,
});

entry.used = entry.used.saturating_add(additional);
}
}
Ok(())
}

fn reserved(&self) -> usize {
let state = self.state.lock();
state.spillable + state.unspillable
state
.pool_members
.values()
.map(|FairSpillPoolMember { used, .. }| used)
.sum()
}
}

impl Display for FairSpillPool {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state = self.state.lock();
let mut num_spill = 0;
let mut unspillable_memory = 0;
let mut spillable_memory = 0;

for member in state.pool_members.values() {
match member.can_spill {
true => {
num_spill += 1;
spillable_memory += member.used
}
false => unspillable_memory += member.used,
}
}
let free = self
.pool_size
.saturating_sub(unspillable_memory + spillable_memory);

let mut allocations = state.pool_members.iter().collect::<Vec<_>>();
allocations.sort_by(|(_, a), (_, b)| a.used.cmp(&b.used).reverse());

let allocation_report = allocations.iter().fold(
"".to_string(),
|acc, (member, FairSpillPoolMember { can_spill, used })| {
let can_spill = if *can_spill {
"spillable"
} else {
"unspillable"
};
format!("{acc}\t{used}: [{can_spill}] {member}\n")
},
);

write!(
f,
"FairSpillPool {} allocations, {} spillable used, {} total spillable, {} unspillable used, {} free, {} capacity\n{}",
state.pool_members.len(),
spillable_memory,
num_spill,
unspillable_memory,
free,
self.pool_size,
allocation_report
)
}
}

Expand Down

0 comments on commit 76ce658

Please sign in to comment.