Skip to content

Commit

Permalink
Patch fix sched (#419)
Browse files Browse the repository at this point in the history
1.解决waitqueue sleep的时候,由于preempt count不为0,导致sched失败,从而导致该waitqueue下一次wakeup时,会把pcb多次加入调度队列的bug
2.修复socket inode 的read和write方法里面没有使用no_preempt的问题
3. 修复cpu0的内核栈由于脏数据导致new_idle的时候set pcb报错的问题

---------

Co-authored-by: longjin <[email protected]>
  • Loading branch information
GnoCiYeH and fslongjin authored Nov 4, 2023
1 parent 8058ccb commit 2f6f547
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 28 deletions.
1 change: 1 addition & 0 deletions kernel/src/driver/net/e1000e/e1000e_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl phy::TxToken for E1000ETxToken {
let result = f(buffer.as_mut_slice());
let mut device = self.driver.inner.lock();
device.e1000e_transmit(buffer);
buffer.free_buffer();
return result;
}
}
Expand Down
1 change: 0 additions & 1 deletion kernel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,4 @@ pub fn panic(info: &PanicInfo) -> ! {

println!("Current PCB:\n\t{:?}", *(ProcessManager::current_pcb()));
ProcessManager::exit(usize::MAX);
unreachable!();
}
52 changes: 44 additions & 8 deletions kernel/src/libs/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct RwLock<T> {
pub struct RwLockReadGuard<'a, T: 'a> {
data: *const T,
lock: &'a AtomicU32,
irq_guard: Option<IrqFlagsGuard>,
}

/// @brief UPGRADED是介于READER和WRITER之间的一种锁,它可以升级为WRITER,
Expand All @@ -53,6 +54,7 @@ pub struct RwLockReadGuard<'a, T: 'a> {
pub struct RwLockUpgradableGuard<'a, T: 'a> {
data: *const T,
inner: &'a RwLock<T>,
irq_guard: Option<IrqFlagsGuard>,
}

/// @brief WRITER守卫的数据结构
Expand Down Expand Up @@ -144,6 +146,7 @@ impl<T> RwLock<T> {
return Some(RwLockReadGuard {
data: unsafe { &*self.data.get() },
lock: &self.lock,
irq_guard: None,
});
}
}
Expand All @@ -160,6 +163,19 @@ impl<T> RwLock<T> {
} //忙等待
}

pub fn read_irqsave(&self) -> RwLockReadGuard<T> {
loop {
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
match self.try_read() {
Some(mut guard) => {
guard.irq_guard = Some(irq_guard);
return guard;
}
None => spin_loop(),
}
}
}

#[allow(dead_code)]
#[inline]
/// @brief 获取读者+UPGRADER的数量, 不能保证能否获得同步值
Expand Down Expand Up @@ -256,6 +272,7 @@ impl<T> RwLock<T> {
return Some(RwLockUpgradableGuard {
inner: self,
data: unsafe { &mut *self.data.get() },
irq_guard: None,
});
} else {
return None;
Expand All @@ -274,6 +291,21 @@ impl<T> RwLock<T> {
}
}

#[inline]
/// @brief 获得UPGRADER守卫
pub fn upgradeable_read_irqsave(&self) -> RwLockUpgradableGuard<T> {
loop {
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
match self.try_upgradeable_read() {
Some(mut guard) => {
guard.irq_guard = Some(irq_guard);
return guard;
}
None => spin_loop(),
}
}
}

#[allow(dead_code)]
#[inline]
//extremely unsafe behavior
Expand Down Expand Up @@ -332,7 +364,7 @@ impl<'rwlock, T> RwLockUpgradableGuard<'rwlock, T> {
#[allow(dead_code)]
#[inline]
/// @brief 尝试将UPGRADER守卫升级为WRITER守卫
pub fn try_upgrade(self) -> Result<RwLockWriteGuard<'rwlock, T>, Self> {
pub fn try_upgrade(mut self) -> Result<RwLockWriteGuard<'rwlock, T>, Self> {
let res = self.inner.lock.compare_exchange(
UPGRADED,
WRITER,
Expand All @@ -343,13 +375,13 @@ impl<'rwlock, T> RwLockUpgradableGuard<'rwlock, T> {

if res.is_ok() {
let inner = self.inner;

let irq_guard = self.irq_guard.take();
mem::forget(self);

Ok(RwLockWriteGuard {
data: unsafe { &mut *inner.data.get() },
inner,
irq_guard: None,
irq_guard,
})
} else {
Err(self)
Expand All @@ -373,19 +405,20 @@ impl<'rwlock, T> RwLockUpgradableGuard<'rwlock, T> {
#[allow(dead_code)]
#[inline]
/// @brief UPGRADER降级为READER
pub fn downgrade(self) -> RwLockReadGuard<'rwlock, T> {
pub fn downgrade(mut self) -> RwLockReadGuard<'rwlock, T> {
while self.inner.current_reader().is_err() {
spin_loop();
}

let inner: &RwLock<T> = self.inner;

let irq_guard = self.irq_guard.take();
// 自动移去UPGRADED比特位
mem::drop(self);

RwLockReadGuard {
data: unsafe { &*inner.data.get() },
lock: &inner.lock,
irq_guard,
}
}

Expand Down Expand Up @@ -426,26 +459,27 @@ impl<'rwlock, T> RwLockWriteGuard<'rwlock, T> {
#[allow(dead_code)]
#[inline]
/// @brief 将WRITER降级为READER
pub fn downgrade(self) -> RwLockReadGuard<'rwlock, T> {
pub fn downgrade(mut self) -> RwLockReadGuard<'rwlock, T> {
while self.inner.current_reader().is_err() {
spin_loop();
}
//本质上来说绝对保证没有任何读者

let inner = self.inner;

let irq_guard = self.irq_guard.take();
mem::drop(self);

return RwLockReadGuard {
data: unsafe { &*inner.data.get() },
lock: &inner.lock,
irq_guard,
};
}

#[allow(dead_code)]
#[inline]
/// @brief 将WRITER降级为UPGRADER
pub fn downgrade_to_upgradeable(self) -> RwLockUpgradableGuard<'rwlock, T> {
pub fn downgrade_to_upgradeable(mut self) -> RwLockUpgradableGuard<'rwlock, T> {
debug_assert_eq!(
self.inner.lock.load(Ordering::Acquire) & (WRITER | UPGRADED),
WRITER
Expand All @@ -455,11 +489,13 @@ impl<'rwlock, T> RwLockWriteGuard<'rwlock, T> {

let inner = self.inner;

let irq_guard = self.irq_guard.take();
mem::forget(self);

return RwLockUpgradableGuard {
inner,
data: unsafe { &*inner.data.get() },
irq_guard,
};
}
}
Expand Down
23 changes: 23 additions & 0 deletions kernel/src/libs/wait_queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#![allow(dead_code)]
use core::intrinsics::unlikely;

use alloc::{collections::LinkedList, sync::Arc, vec::Vec};

use crate::{
Expand Down Expand Up @@ -26,8 +28,20 @@ pub struct WaitQueue(SpinLock<InnerWaitQueue>);
impl WaitQueue {
pub const INIT: WaitQueue = WaitQueue(SpinLock::new(InnerWaitQueue::INIT));

fn before_sleep_check(&self, max_preempt: usize) {
let pcb = ProcessManager::current_pcb();
if unlikely(pcb.preempt_count() > max_preempt) {
kwarn!(
"Process {:?}: Try to sleep when preempt count is {}",
pcb.pid(),
pcb.preempt_count()
);
}
}

/// @brief 让当前进程在等待队列上进行等待,并且,允许被信号打断
pub fn sleep(&self) {
self.before_sleep_check(0);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock_irqsave();
ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
panic!("sleep error: {:?}", e);
Expand All @@ -42,6 +56,7 @@ impl WaitQueue {
where
F: FnOnce(),
{
self.before_sleep_check(0);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
Expand All @@ -50,6 +65,7 @@ impl WaitQueue {
drop(irq_guard);
guard.wait_list.push_back(ProcessManager::current_pcb());
f();

drop(guard);
sched();
}
Expand All @@ -69,6 +85,7 @@ impl WaitQueue {
/// 由于sleep_without_schedule不会调用调度函数,因此,如果开发者忘记在执行本函数之后,手动调用调度函数,
/// 由于时钟中断到来或者‘其他cpu kick了当前cpu’,可能会导致一些未定义的行为。
pub unsafe fn sleep_without_schedule(&self) {
self.before_sleep_check(0);
// 安全检查:确保当前处于中断禁止状态
assert!(CurrentIrqArch::is_irq_enabled() == false);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
Expand All @@ -80,6 +97,7 @@ impl WaitQueue {
}

pub unsafe fn sleep_without_schedule_uninterruptible(&self) {
self.before_sleep_check(0);
// 安全检查:确保当前处于中断禁止状态
assert!(CurrentIrqArch::is_irq_enabled() == false);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
Expand All @@ -91,6 +109,7 @@ impl WaitQueue {
}
/// @brief 让当前进程在等待队列上进行等待,并且,不允许被信号打断
pub fn sleep_uninterruptible(&self) {
self.before_sleep_check(0);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(false).unwrap_or_else(|e| {
Expand All @@ -105,6 +124,7 @@ impl WaitQueue {
/// @brief 让当前进程在等待队列上进行等待,并且,允许被信号打断。
/// 在当前进程的pcb加入队列后,解锁指定的自旋锁。
pub fn sleep_unlock_spinlock<T>(&self, to_unlock: SpinLockGuard<T>) {
self.before_sleep_check(1);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
Expand All @@ -120,6 +140,7 @@ impl WaitQueue {
/// @brief 让当前进程在等待队列上进行等待,并且,允许被信号打断。
/// 在当前进程的pcb加入队列后,解锁指定的Mutex。
pub fn sleep_unlock_mutex<T>(&self, to_unlock: MutexGuard<T>) {
self.before_sleep_check(1);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(true).unwrap_or_else(|e| {
Expand All @@ -135,6 +156,7 @@ impl WaitQueue {
/// @brief 让当前进程在等待队列上进行等待,并且,不允许被信号打断。
/// 在当前进程的pcb加入队列后,解锁指定的自旋锁。
pub fn sleep_uninterruptible_unlock_spinlock<T>(&self, to_unlock: SpinLockGuard<T>) {
self.before_sleep_check(1);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(false).unwrap_or_else(|e| {
Expand All @@ -150,6 +172,7 @@ impl WaitQueue {
/// @brief 让当前进程在等待队列上进行等待,并且,不允许被信号打断。
/// 在当前进程的pcb加入队列后,解锁指定的Mutex。
pub fn sleep_uninterruptible_unlock_mutex<T>(&self, to_unlock: MutexGuard<T>) {
self.before_sleep_check(1);
let mut guard: SpinLockGuard<InnerWaitQueue> = self.0.lock();
let irq_guard = unsafe { CurrentIrqArch::save_and_disable_irq() };
ProcessManager::mark_sleep(false).unwrap_or_else(|e| {
Expand Down
8 changes: 6 additions & 2 deletions kernel/src/net/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,10 @@ impl SocketInode {
pub fn inner(&self) -> SpinLockGuard<Box<dyn Socket>> {
return self.0.lock();
}

pub unsafe fn inner_no_preempt(&self) -> SpinLockGuard<Box<dyn Socket>> {
return self.0.lock_no_preempt();
}
}

impl IndexNode for SocketInode {
Expand Down Expand Up @@ -1173,7 +1177,7 @@ impl IndexNode for SocketInode {
buf: &mut [u8],
_data: &mut crate::filesystem::vfs::FilePrivateData,
) -> Result<usize, SystemError> {
return self.0.lock().read(&mut buf[0..len]).0;
return self.0.lock_no_preempt().read(&mut buf[0..len]).0;
}

fn write_at(
Expand All @@ -1183,7 +1187,7 @@ impl IndexNode for SocketInode {
buf: &[u8],
_data: &mut crate::filesystem::vfs::FilePrivateData,
) -> Result<usize, SystemError> {
return self.0.lock().write(&buf[0..len], None);
return self.0.lock_no_preempt().write(&buf[0..len], None);
}

fn poll(&self) -> Result<crate::filesystem::vfs::PollStatus, SystemError> {
Expand Down
16 changes: 8 additions & 8 deletions kernel/src/net/syscall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl Syscall {
let socket: Arc<SocketInode> = ProcessManager::current_pcb()
.get_socket(fd as i32)
.ok_or(SystemError::EBADF)?;
let mut socket = socket.inner();
let mut socket = unsafe { socket.inner_no_preempt() };
// kdebug!("connect to {:?}...", endpoint);
socket.connect(endpoint)?;
return Ok(0);
Expand All @@ -191,7 +191,7 @@ impl Syscall {
let socket: Arc<SocketInode> = ProcessManager::current_pcb()
.get_socket(fd as i32)
.ok_or(SystemError::EBADF)?;
let mut socket = socket.inner();
let mut socket = unsafe { socket.inner_no_preempt() };
socket.bind(endpoint)?;
return Ok(0);
}
Expand Down Expand Up @@ -221,7 +221,7 @@ impl Syscall {
let socket: Arc<SocketInode> = ProcessManager::current_pcb()
.get_socket(fd as i32)
.ok_or(SystemError::EBADF)?;
let socket = socket.inner();
let socket = unsafe { socket.inner_no_preempt() };
return socket.write(buf, endpoint);
}

Expand All @@ -244,7 +244,7 @@ impl Syscall {
let socket: Arc<SocketInode> = ProcessManager::current_pcb()
.get_socket(fd as i32)
.ok_or(SystemError::EBADF)?;
let socket = socket.inner();
let socket = unsafe { socket.inner_no_preempt() };

let (n, endpoint) = socket.read(buf);
drop(socket);
Expand Down Expand Up @@ -275,7 +275,7 @@ impl Syscall {
let socket: Arc<SocketInode> = ProcessManager::current_pcb()
.get_socket(fd as i32)
.ok_or(SystemError::EBADF)?;
let socket = socket.inner();
let socket = unsafe { socket.inner_no_preempt() };

let mut buf = iovs.new_buf(true);
// 从socket中读取数据
Expand Down Expand Up @@ -304,7 +304,7 @@ impl Syscall {
let socket: Arc<SocketInode> = ProcessManager::current_pcb()
.get_socket(fd as i32)
.ok_or(SystemError::EBADF)?;
let mut socket = socket.inner();
let mut socket = unsafe { socket.inner_no_preempt() };
socket.listen(backlog)?;
return Ok(0);
}
Expand All @@ -319,7 +319,7 @@ impl Syscall {
let socket: Arc<SocketInode> = ProcessManager::current_pcb()
.get_socket(fd as i32)
.ok_or(SystemError::EBADF)?;
let socket = socket.inner();
let socket = unsafe { socket.inner_no_preempt() };
socket.shutdown(ShutdownType::try_from(how as i32)?)?;
return Ok(0);
}
Expand All @@ -336,7 +336,7 @@ impl Syscall {
.get_socket(fd as i32)
.ok_or(SystemError::EBADF)?;
// kdebug!("accept: socket={:?}", socket);
let mut socket = socket.inner();
let mut socket = unsafe { socket.inner_no_preempt() };
// 从socket中接收连接
let (new_socket, remote_endpoint) = socket.accept()?;
drop(socket);
Expand Down
6 changes: 4 additions & 2 deletions kernel/src/process/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ impl ProcessManager {
let stack_ptr =
VirtAddr::new(Self::stack_ptr().data() & (!(KernelStack::ALIGN - 1)));
// 初始化bsp的idle进程
unsafe { KernelStack::from_existed(stack_ptr) }
.expect("Failed to create kernel stack struct for BSP.")
let mut ks = unsafe { KernelStack::from_existed(stack_ptr) }
.expect("Failed to create kernel stack struct for BSP.");
unsafe { ks.clear_pcb(true) };
ks
} else {
KernelStack::new().unwrap_or_else(|e| {
panic!("Failed to create kernel stack struct for AP {}: {:?}", i, e)
Expand Down
Loading

0 comments on commit 2f6f547

Please sign in to comment.