From 42daeaf6f60262c6d1656240a38beed854aef055 Mon Sep 17 00:00:00 2001 From: Ralf Jung Date: Sun, 26 Jun 2022 18:25:05 -0400 Subject: [PATCH] fix crossbeam-deque dealing with dangling Boxes --- crossbeam-deque/src/deque.rs | 41 ++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/crossbeam-deque/src/deque.rs b/crossbeam-deque/src/deque.rs index 802a2fef5..f852215d6 100644 --- a/crossbeam-deque/src/deque.rs +++ b/crossbeam-deque/src/deque.rs @@ -51,9 +51,9 @@ impl Buffer { } /// Returns a pointer to the task at the specified `index`. - unsafe fn at(&self, index: isize) -> *mut T { + unsafe fn at(&self, index: isize) -> *mut MaybeUninit { // `self.cap` is always a power of two. - self.ptr.offset(index & (self.cap - 1) as isize) + self.ptr.offset(index & (self.cap - 1) as isize).cast::>() } /// Writes `task` into the specified `index`. @@ -62,7 +62,7 @@ impl Buffer { /// technically speaking a data race and therefore UB. We should use an atomic store here, but /// that would be more expensive and difficult to implement generically for all types `T`. /// Hence, as a hack, we use a volatile write instead. - unsafe fn write(&self, index: isize, task: T) { + unsafe fn write(&self, index: isize, task: MaybeUninit) { ptr::write_volatile(self.at(index), task) } @@ -71,8 +71,8 @@ impl Buffer { /// This method might be concurrently called with another `write` at the same index, which is /// technically speaking a data race and therefore UB. We should use an atomic load here, but /// that would be more expensive and difficult to implement generically for all types `T`. - /// Hence, as a hack, we use a volatile write instead. - unsafe fn read(&self, index: isize) -> T { + /// Hence, as a hack, we use a volatile load instead. + unsafe fn read(&self, index: isize) -> MaybeUninit { ptr::read_volatile(self.at(index)) } } @@ -406,7 +406,7 @@ impl Worker { // Write `task` into the slot. unsafe { - buffer.write(b, task); + buffer.write(b, MaybeUninit::new(task)); } atomic::fence(Ordering::Release); @@ -461,7 +461,7 @@ impl Worker { unsafe { // Read the popped task. let buffer = self.buffer.get(); - let task = buffer.read(f); + let task = buffer.read(f).assume_init(); // Shrink the buffer if `len - 1` is less than one fourth of the capacity. if buffer.cap > MIN_CAP && len <= buffer.cap as isize / 4 { @@ -509,8 +509,8 @@ impl Worker { ) .is_err() { - // Failed. We didn't pop anything. - mem::forget(task.take()); + // Failed. We didn't pop anything. Reset to `None`. + task.take(); } // Restore the back index to the original task. @@ -524,7 +524,7 @@ impl Worker { } } - task + task.map(|t| unsafe { t.assume_init() }) } } } @@ -661,12 +661,11 @@ impl Stealer { .is_err() { // We didn't steal this task, forget it. - mem::forget(task); return Steal::Retry; } // Return the stolen task. - Steal::Success(task) + Steal::Success(unsafe { task.assume_init() }) } /// Steals a batch of tasks and pushes them into another worker. @@ -821,7 +820,6 @@ impl Stealer { .is_err() { // We didn't steal this task, forget it and break from the loop. - mem::forget(task); batch_size = i; break; } @@ -975,7 +973,6 @@ impl Stealer { .is_err() { // We didn't steal this task, forget it. - mem::forget(task); return Steal::Retry; } @@ -992,7 +989,6 @@ impl Stealer { .is_err() { // We didn't steal this task, forget it. - mem::forget(task); return Steal::Retry; } @@ -1037,7 +1033,6 @@ impl Stealer { .is_err() { // We didn't steal this task, forget it and break from the loop. - mem::forget(tmp); batch_size = i; break; } @@ -1077,7 +1072,7 @@ impl Stealer { dest.inner.back.store(dest_b, Ordering::Release); // Return with success. - Steal::Success(task) + Steal::Success(unsafe { task.assume_init() }) } } @@ -1535,7 +1530,7 @@ impl Injector { // Read the task. let slot = (*block).slots.get_unchecked(offset + i); slot.wait_write(); - let task = slot.task.get().read().assume_init(); + let task = slot.task.get().read(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add(i as isize), task); @@ -1547,7 +1542,7 @@ impl Injector { // Read the task. let slot = (*block).slots.get_unchecked(offset + i); slot.wait_write(); - let task = slot.task.get().read().assume_init(); + let task = slot.task.get().read(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); @@ -1689,7 +1684,7 @@ impl Injector { // Read the task. let slot = (*block).slots.get_unchecked(offset); slot.wait_write(); - let task = slot.task.get().read().assume_init(); + let task = slot.task.get().read(); match dest.flavor { Flavor::Fifo => { @@ -1698,7 +1693,7 @@ impl Injector { // Read the task. let slot = (*block).slots.get_unchecked(offset + i + 1); slot.wait_write(); - let task = slot.task.get().read().assume_init(); + let task = slot.task.get().read(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add(i as isize), task); @@ -1711,7 +1706,7 @@ impl Injector { // Read the task. let slot = (*block).slots.get_unchecked(offset + i + 1); slot.wait_write(); - let task = slot.task.get().read().assume_init(); + let task = slot.task.get().read(); // Write it into the destination queue. dest_buffer.write(dest_b.wrapping_add((batch_size - 1 - i) as isize), task); @@ -1744,7 +1739,7 @@ impl Injector { } } - Steal::Success(task) + Steal::Success(task.assume_init()) } }