Skip to content

Commit

Permalink
Add: status check for naslinterpreter#Scanner
Browse files Browse the repository at this point in the history
Adds status check for naslinterpreter#Scanner and changes u32 within
models::Status to u64 to get rid of down castings.
  • Loading branch information
nichtsfrei committed Jul 26, 2024
1 parent 7f77703 commit 3afd788
Show file tree
Hide file tree
Showing 14 changed files with 190 additions and 84 deletions.
12 changes: 6 additions & 6 deletions rust/models/src/host_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ use std::collections::HashMap;
)]
pub struct HostInfo {
/// Number of all hosts, that are contained in a target
pub all: u32,
pub all: u64,
/// Number of hosts, that are excluded from the target
pub excluded: u32,
pub excluded: u64,
/// Number of hosts, that are not reachable (alive-test failed)
pub dead: u32,
pub dead: u64,
/// Number of hosts, that are reachable (alive-test succeeded)
pub alive: u32,
pub alive: u64,
/// Number of hosts, that are currently queued for scanning
pub queued: u32,
pub queued: u64,
/// Number of hosts, that are already finished scanning
pub finished: u32,
pub finished: u64,
#[cfg_attr(
feature = "serde_support",
serde(skip_serializing_if = "Option::is_none")
Expand Down
4 changes: 4 additions & 0 deletions rust/models/src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ pub enum Error {
InsufficientResources(Vec<ObservableResources>),
#[error("Poisoned")]
Poisoned,
#[error("Scan not found: {0}")]
ScanNotFound(String),
#[error("Unable to schedule scan {0}: {1}")]
SchedulingError(String, String),
}

fn display_resources(v: &[ObservableResources]) -> String {
Expand Down
4 changes: 2 additions & 2 deletions rust/models/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use super::host_info::HostInfo;
)]
pub struct Status {
/// Timestamp for the start of a scan
pub start_time: Option<u32>,
pub start_time: Option<u64>,
/// Timestamp for the end of a scan
pub end_time: Option<u32>,
pub end_time: Option<u64>,
/// The phase, a scan is currently in
pub status: Phase,
/// Information about the hosts of a running scan
Expand Down
4 changes: 2 additions & 2 deletions rust/nasl-builtin-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,10 @@ impl NaslHttp {

uri = format!("{}{}", uri, item);

// TODO: we will run into an issue when this is called within an async thread.
// TODO: we will run into an issue when this is called within an async thread.
// making it effectively worthless when OpenVASD is run in openvasd mode.
// The error is:
// Cannot start a runtime from within a runtime. This happens because a function (like `block_on`)
// Cannot start a runtime from within a runtime. This happens because a function (like `block_on`)
// attempted to block the current thread while the thread is being used to drive asynchronous tasks.
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
Expand Down
2 changes: 1 addition & 1 deletion rust/nasl-interpreter/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ pub use interpreter::Interpreter;
pub use scan_interpreter::*;

pub use scanner::DefaultScannerStack;
pub use scanner::WithStorageScannerStack;
pub use scanner::Scanner;
pub use scanner::WithStorageScannerStack;

// we expose the other libraries to allow users to use them without having to import them
pub use nasl_builtin_std::{nasl_std_functions, ContextFactory, RegisterBuilder};
Expand Down
1 change: 1 addition & 0 deletions rust/nasl-interpreter/src/scan_interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,7 @@ if (description)
{require_udp_ports}
exit(0);
}}
log_message(data: "Ja, junge dat is Kaffee, echt jetzt, und Kaffee ist nun mal lecker.");
exit({rc});
"#
);
Expand Down
168 changes: 136 additions & 32 deletions rust/nasl-interpreter/src/scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{
atomic::{AtomicBool, Ordering},
Arc, Mutex, RwLock,
},
time::SystemTime,
};

use async_trait::async_trait;
Expand All @@ -17,40 +18,81 @@ use nasl_syntax::{FSPluginLoader, Loader};
use storage::{DefaultDispatcher, Storage};
use tokio::task::JoinHandle;

use crate::{scheduling::WaveExecutionPlan, SyncScanInterpreter};
use crate::{
scheduling::{ExecutionPlaner, WaveExecutionPlan},
ExecuteError, SyncScanInterpreter,
};

struct RunningScan<S: ScannerStack> {
scan: Scan,
// Remove mutex to allow parallel usage of those
storage: Arc<Mutex<S::Storage>>,
loader: Arc<Mutex<S::Loader>>,
function_executor: Arc<Mutex<S::Executor>>,
keep_running: Arc<AtomicBool>,
status: Arc<RwLock<models::Status>>,
}

fn current_time_in_seconds(name: &'static str) -> u64 {
match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(x) => x.as_secs(),
Err(e) => {
tracing::warn!(error=?e, name, "unable to get system time, setting defaulting to 0");
0
}
}
}

impl<S: ScannerStack> RunningScan<S> {
fn set_status_to_running(&self) -> () {
let mut status = self.status.write().unwrap();
status.status = models::Phase::Running;
status.start_time = current_time_in_seconds("start_time").into();
}

async fn run(self) -> Result<(), Error> {
let storage: &S::Storage = &self.storage.lock().unwrap();
let loader: &S::Loader = &self.loader.lock().unwrap();
let function_executor: &S::Executor = &self.function_executor.lock().unwrap();
let interpreter = SyncScanInterpreter::new(storage, loader, function_executor);
let _span = tracing::error_span!("running", scan = self.scan.scan_id).entered();
tracing::debug!(scan_id = self.scan.scan_id);
interpreter
self.set_status_to_running();
let mut end_phase = models::Phase::Succeeded;

let results = interpreter
// Todo: Make this generic over the scheduler
.run::<WaveExecutionPlan>(&self.scan)
.map(|it| {

// TODO: check for error and abort, we need to keep track of the state
for _ in it {


for it in it {
match it {
Ok(x) => {
tracing::debug!(result=?x, "script finished");

if x.has_failed() {
end_phase = models::Phase::Failed;
}
}
Err(x) => {
tracing::warn!(error=?x, "unrecoverable error, aborting whole run");
end_phase = models::Phase::Failed;
}
}
if !self.keep_running.load(Ordering::SeqCst) {
end_phase = models::Phase::Stopped;
break;
}
}
})
.map_err(|_|
// Todo: proper error handling
Error::Poisoned)

.map_err(|e| todo!());

let mut status = self.status.write().unwrap();
status.status = end_phase;
status.end_time = current_time_in_seconds("end_time").into();
results
// Todo: proper error handling
}
}
struct RunningScanHandle {
Expand All @@ -72,6 +114,9 @@ impl RunningScanHandle {
N: NaslFunctionExecuter + Send + 'static,
{
let keep_running: Arc<AtomicBool> = Arc::new(true.into());
let status = Arc::new(RwLock::new(models::Status {
..Default::default()
}));
Self {
handle: tokio::spawn(
RunningScan::<(S, L, N)> {
Expand All @@ -80,11 +125,13 @@ impl RunningScanHandle {
loader,
function_executor,
keep_running: keep_running.clone(),
status: status.clone(),
}
// TODO run per target
.run(),
),
keep_running,
status: Default::default(),
status,
}
}
}
Expand Down Expand Up @@ -112,11 +159,11 @@ pub type DefaultScannerStack = (DefaultDispatcher, FSPluginLoader, NaslFunctionR

/// The with storage scanner strack consisting of a statically living sendable Storage
/// implementation,`FSPPluginLoader` nasl `NaslFunctionRegister`.
pub type WithStorageScannerStack<S> = (S, FSPluginLoader, NaslFunctionRegister) ;
pub type WithStorageScannerStack<S> = (S, FSPluginLoader, NaslFunctionRegister);

/// Allows starting, stopping and managing the results of new scans.
pub struct Scanner<S: ScannerStack> {
running: Arc<Mutex<HashMap<String, RunningScanHandle>>>,
running: Arc<RwLock<HashMap<String, RunningScanHandle>>>,
storage: Arc<Mutex<S::Storage>>,
loader: Arc<Mutex<S::Loader>>,
function_executor: Arc<Mutex<S::Executor>>,
Expand All @@ -130,7 +177,7 @@ where
{
fn new(storage: St, loader: L, executor: F) -> Self {
Self {
running: Arc::new(Mutex::new(HashMap::default())),
running: Arc::new(RwLock::new(HashMap::default())),
storage: Arc::new(Mutex::new(storage)),
loader: Arc::new(Mutex::new(loader)),
function_executor: Arc::new(Mutex::new(executor)),
Expand All @@ -147,12 +194,12 @@ impl Scanner<DefaultScannerStack> {
let executor = crate::nasl_std_functions();
Self::new(storage, loader, executor)
}


}

impl<S> Scanner<WithStorageScannerStack<S>> where S: storage::Storage + Send + 'static {

impl<S> Scanner<WithStorageScannerStack<S>>
where
S: storage::Storage + Send + 'static,
{
/// Creates a new scanner with a Storage and the rest based on the DefaultScannerStack.
///
/// Requires the root path for the loader and the storage implementation.
Expand All @@ -161,7 +208,6 @@ impl<S> Scanner<WithStorageScannerStack<S>> where S: storage::Storage + Send + '
let executor = crate::nasl_std_functions();
Self::new(storage, loader, executor)
}

}

#[async_trait]
Expand All @@ -172,7 +218,7 @@ impl<S: ScannerStack> ScanStarter for Scanner<S> {
let function_executor = self.function_executor.clone();
let id = scan.scan_id.clone();
let handle = RunningScanHandle::start(scan, storage, loader, function_executor);
self.running.lock().unwrap().insert(id, handle);
self.running.write().unwrap().insert(id, handle);
Ok(())
}

Expand All @@ -191,7 +237,7 @@ impl<S: ScannerStack> ScanStopper for Scanner<S> {
let id = id.as_ref();
let handle = self
.running
.lock()
.write()
.unwrap()
.remove(id)
// TODO: Do this properly
Expand All @@ -216,37 +262,95 @@ impl<S: ScannerStack> ScanDeleter for Scanner<S> {

#[async_trait]
impl<S: ScannerStack> ScanResultFetcher for Scanner<S> {
async fn fetch_results<I>(&self, _id: I) -> Result<ScanResults, Error>
async fn fetch_results<I>(&self, id: I) -> Result<ScanResults, Error>
where
I: AsRef<str> + Send + 'static,
{
todo!()
let running = self.running.read().unwrap();
match running.get(id.as_ref()) {
Some(r) => {
let status = r.status.read().unwrap().clone();
Ok(ScanResults {
id: id.as_ref().to_string(),
status,
// The results are directly stored by the storage implementation:
// inmemory.rs
// file.rs
results: vec![],
})
}
None => Err(Error::ScanNotFound(id.as_ref().to_string())),
}
}
}

#[cfg(test)]
mod tests {
use models::{scanner::ScanStarter, Scan};
use std::time::Duration;

use models::{
scanner::{ScanResultFetcher, ScanResults, ScanStarter},
Scan,
};
use nasl_builtin_utils::NaslFunctionRegister;
use tracing_test::traced_test;

use crate::{scanner::Scanner, tests::setup};

fn make_scanner_and_scan() -> (
Scanner<(
storage::DefaultDispatcher,
fn(&str) -> String,
NaslFunctionRegister,
)>,
Scan,
) {
type TestStack = (
storage::DefaultDispatcher,
fn(&str) -> String,
NaslFunctionRegister,
);

fn make_scanner_and_scan() -> (Scanner<TestStack>, Scan) {
let ((storage, loader, executor), scan) = setup();
(Scanner::new(storage, loader, executor), scan)
}

/// Blocks until given id is in given phase or panics after 1 second
async fn wait_for_status(
scanner: Scanner<TestStack>,
id: &str,
phase: models::Phase,
) -> ScanResults {
let start = super::current_time_in_seconds("test");
assert!(start > 0);
loop {
let current = super::current_time_in_seconds("loop test");
assert!(current > 0);
assert!(current - start < 1, "time for finishing scan is up.");

// we need the sloep to not instantly read lock running and preventing write access
tokio::time::sleep(Duration::from_nanos(100)).await;
let scan_results = scanner
.fetch_results(id.to_string())
.await
.expect("no error when fetching results");
tracing::debug!(status=%scan_results.status.status);
if scan_results.status.status == phase {
return scan_results;
}

}
}

#[tokio::test]
async fn start_scan() {
#[traced_test]
async fn start_scan_success() {
let (scanner, scan) = make_scanner_and_scan();
let id = scan.scan_id.clone();
let res = scanner.start_scan(scan).await;
assert!(res.is_ok());
let scan_results = wait_for_status(scanner, &id, models::Phase::Succeeded).await;

assert!(
scan_results.status.start_time.is_some(),
"expect start time to be set when scan starts"
);
assert!(
scan_results.status.end_time.is_some(),
"expect end time to be set when scan finished"
);
}
}
Loading

0 comments on commit 3afd788

Please sign in to comment.