Skip to content

Commit

Permalink
fix crossbeam-deque dealing with dangling Boxes
Browse files Browse the repository at this point in the history
  • Loading branch information
RalfJung committed Jun 26, 2022
1 parent a6166ee commit 8d7db8c
Showing 1 changed file with 20 additions and 23 deletions.
43 changes: 20 additions & 23 deletions crossbeam-deque/src/deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ impl<T> Buffer<T> {
/// Returns a pointer to the task at the specified `index`.
unsafe fn at(&self, index: isize) -> *mut T {
// `self.cap` is always a power of two.
// We do all the loads at `MaybeUninit` because we might realize, after loading, that we
// don't actually have the right to access this memory.
self.ptr.offset(index & (self.cap - 1) as isize)
}

Expand All @@ -62,18 +64,18 @@ impl<T> Buffer<T> {
/// technically speaking a data race and therefore UB. We should use an atomic store here, but
/// that would be more expensive and difficult to implement generically for all types `T`.
/// Hence, as a hack, we use a volatile write instead.
unsafe fn write(&self, index: isize, task: T) {
ptr::write_volatile(self.at(index), task)
unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
ptr::write_volatile(self.at(index) as *mut MaybeUninit<T>, task)
}

/// Reads a task from the specified `index`.
///
/// This method might be concurrently called with another `write` at the same index, which is
/// technically speaking a data race and therefore UB. We should use an atomic load here, but
/// that would be more expensive and difficult to implement generically for all types `T`.
/// Hence, as a hack, we use a volatile write instead.
unsafe fn read(&self, index: isize) -> T {
ptr::read_volatile(self.at(index))
/// Hence, as a hack, we use a volatile load instead.
unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
ptr::read_volatile(self.at(index) as *mut MaybeUninit<T>)
}
}

Expand Down Expand Up @@ -406,7 +408,7 @@ impl<T> Worker<T> {

// Write `task` into the slot.
unsafe {
buffer.write(b, task);
buffer.write(b, MaybeUninit::new(task));
}

atomic::fence(Ordering::Release);
Expand Down Expand Up @@ -461,7 +463,7 @@ impl<T> Worker<T> {
unsafe {
// Read the popped task.
let buffer = self.buffer.get();
let task = buffer.read(f);
let task = buffer.read(f).assume_init();

// Shrink the buffer if `len - 1` is less than one fourth of the capacity.
if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
Expand Down Expand Up @@ -509,8 +511,8 @@ impl<T> Worker<T> {
)
.is_err()
{
// Failed. We didn't pop anything.
mem::forget(task.take());
// Failed. We didn't pop anything. Reset to `None`.
task.take();
}

// Restore the back index to the original task.
Expand All @@ -524,7 +526,7 @@ impl<T> Worker<T> {
}
}

task
task.map(|t| unsafe { t.assume_init() })
}
}
}
Expand Down Expand Up @@ -661,12 +663,11 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it.
mem::forget(task);
return Steal::Retry;
}

// Return the stolen task.
Steal::Success(task)
Steal::Success(unsafe { task.assume_init() })
}

/// Steals a batch of tasks and pushes them into another worker.
Expand Down Expand Up @@ -821,7 +822,6 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it and break from the loop.
mem::forget(task);
batch_size = i;
break;
}
Expand Down Expand Up @@ -975,7 +975,6 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it.
mem::forget(task);
return Steal::Retry;
}

Expand All @@ -992,7 +991,6 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it.
mem::forget(task);
return Steal::Retry;
}

Expand Down Expand Up @@ -1037,7 +1035,6 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it and break from the loop.
mem::forget(tmp);
batch_size = i;
break;
}
Expand Down Expand Up @@ -1077,7 +1074,7 @@ impl<T> Stealer<T> {
dest.inner.back.store(dest_b, Ordering::Release);

// Return with success.
Steal::Success(task)
Steal::Success(unsafe { task.assume_init() })
}
}

Expand Down Expand Up @@ -1535,7 +1532,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.get().read();

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add(i as isize), task);
Expand All @@ -1547,7 +1544,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.get().read();

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
Expand Down Expand Up @@ -1689,7 +1686,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.get().read();

match dest.flavor {
Flavor::Fifo => {
Expand All @@ -1698,7 +1695,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i + 1);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.get().read();

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add(i as isize), task);
Expand All @@ -1711,7 +1708,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i + 1);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.get().read();

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
Expand Down Expand Up @@ -1744,7 +1741,7 @@ impl<T> Injector<T> {
}
}

Steal::Success(task)
Steal::Success(task.assume_init())
}
}

Expand Down

0 comments on commit 8d7db8c

Please sign in to comment.