Skip to content

Commit

Permalink
fix crossbeam-deque dealing with dangling Boxes
Browse files Browse the repository at this point in the history
  • Loading branch information
RalfJung committed Jun 26, 2022
1 parent a6166ee commit 42daeaf
Showing 1 changed file with 18 additions and 23 deletions.
41 changes: 18 additions & 23 deletions crossbeam-deque/src/deque.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ impl<T> Buffer<T> {
}

/// Returns a pointer to the task at the specified `index`.
unsafe fn at(&self, index: isize) -> *mut T {
unsafe fn at(&self, index: isize) -> *mut MaybeUninit<T> {
// `self.cap` is always a power of two.
self.ptr.offset(index & (self.cap - 1) as isize)
self.ptr.offset(index & (self.cap - 1) as isize).cast::<MaybeUninit<T>>()
}

/// Writes `task` into the specified `index`.
Expand All @@ -62,7 +62,7 @@ impl<T> Buffer<T> {
/// technically speaking a data race and therefore UB. We should use an atomic store here, but
/// that would be more expensive and difficult to implement generically for all types `T`.
/// Hence, as a hack, we use a volatile write instead.
unsafe fn write(&self, index: isize, task: T) {
unsafe fn write(&self, index: isize, task: MaybeUninit<T>) {
ptr::write_volatile(self.at(index), task)
}

Expand All @@ -71,8 +71,8 @@ impl<T> Buffer<T> {
/// This method might be concurrently called with another `write` at the same index, which is
/// technically speaking a data race and therefore UB. We should use an atomic load here, but
/// that would be more expensive and difficult to implement generically for all types `T`.
/// Hence, as a hack, we use a volatile write instead.
unsafe fn read(&self, index: isize) -> T {
/// Hence, as a hack, we use a volatile load instead.
unsafe fn read(&self, index: isize) -> MaybeUninit<T> {
ptr::read_volatile(self.at(index))
}
}
Expand Down Expand Up @@ -406,7 +406,7 @@ impl<T> Worker<T> {

// Write `task` into the slot.
unsafe {
buffer.write(b, task);
buffer.write(b, MaybeUninit::new(task));
}

atomic::fence(Ordering::Release);
Expand Down Expand Up @@ -461,7 +461,7 @@ impl<T> Worker<T> {
unsafe {
// Read the popped task.
let buffer = self.buffer.get();
let task = buffer.read(f);
let task = buffer.read(f).assume_init();

// Shrink the buffer if `len - 1` is less than one fourth of the capacity.
if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 {
Expand Down Expand Up @@ -509,8 +509,8 @@ impl<T> Worker<T> {
)
.is_err()
{
// Failed. We didn't pop anything.
mem::forget(task.take());
// Failed. We didn't pop anything. Reset to `None`.
task.take();
}

// Restore the back index to the original task.
Expand All @@ -524,7 +524,7 @@ impl<T> Worker<T> {
}
}

task
task.map(|t| unsafe { t.assume_init() })
}
}
}
Expand Down Expand Up @@ -661,12 +661,11 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it.
mem::forget(task);
return Steal::Retry;
}

// Return the stolen task.
Steal::Success(task)
Steal::Success(unsafe { task.assume_init() })
}

/// Steals a batch of tasks and pushes them into another worker.
Expand Down Expand Up @@ -821,7 +820,6 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it and break from the loop.
mem::forget(task);
batch_size = i;
break;
}
Expand Down Expand Up @@ -975,7 +973,6 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it.
mem::forget(task);
return Steal::Retry;
}

Expand All @@ -992,7 +989,6 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it.
mem::forget(task);
return Steal::Retry;
}

Expand Down Expand Up @@ -1037,7 +1033,6 @@ impl<T> Stealer<T> {
.is_err()
{
// We didn't steal this task, forget it and break from the loop.
mem::forget(tmp);
batch_size = i;
break;
}
Expand Down Expand Up @@ -1077,7 +1072,7 @@ impl<T> Stealer<T> {
dest.inner.back.store(dest_b, Ordering::Release);

// Return with success.
Steal::Success(task)
Steal::Success(unsafe { task.assume_init() })
}
}

Expand Down Expand Up @@ -1535,7 +1530,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.get().read();

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add(i as isize), task);
Expand All @@ -1547,7 +1542,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.get().read();

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
Expand Down Expand Up @@ -1689,7 +1684,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.get().read();

match dest.flavor {
Flavor::Fifo => {
Expand All @@ -1698,7 +1693,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i + 1);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.get().read();

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add(i as isize), task);
Expand All @@ -1711,7 +1706,7 @@ impl<T> Injector<T> {
// Read the task.
let slot = (*block).slots.get_unchecked(offset + i + 1);
slot.wait_write();
let task = slot.task.get().read().assume_init();
let task = slot.task.get().read();

// Write it into the destination queue.
dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task);
Expand Down Expand Up @@ -1744,7 +1739,7 @@ impl<T> Injector<T> {
}
}

Steal::Success(task)
Steal::Success(task.assume_init())
}
}

Expand Down

0 comments on commit 42daeaf

Please sign in to comment.