diff --git a/conmon-rs/server/src/child_reaper.rs b/conmon-rs/server/src/child_reaper.rs index 52255749d3..42f2c6815f 100644 --- a/conmon-rs/server/src/child_reaper.rs +++ b/conmon-rs/server/src/child_reaper.rs @@ -2,6 +2,7 @@ use crate::{ child::Child, container_io::{ContainerIO, ContainerIOType, SharedContainerIO}, + inactivity::{Activity, Inactivity}, oom_watcher::OOMWatcher, }; use anyhow::{bail, Context, Result}; @@ -37,16 +38,26 @@ use tokio::{ use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, error, warn, Instrument}; -#[derive(Debug, Default, Getters)] +#[derive(Debug, Getters)] pub struct ChildReaper { #[getset(get)] grandchildren: Arc>>, + + #[getset(get)] + inactivity: Inactivity, } /// first usable file descriptor after stdin, stdout and stderr const FIRST_FD_AFTER_STDIO: RawFd = 3; impl ChildReaper { + pub fn new(inactivity: Inactivity) -> ChildReaper { + Self { + grandchildren: Arc::default(), + inactivity, + } + } + pub fn get(&self, id: &str) -> Result { let locked_grandchildren = &self.grandchildren().clone(); let lock = lock!(locked_grandchildren); @@ -155,7 +166,7 @@ impl ChildReaper { ) -> Result> { let locked_grandchildren = &self.grandchildren().clone(); let mut map = lock!(locked_grandchildren); - let mut reapable_grandchild = ReapableChild::from_child(&child); + let mut reapable_grandchild = ReapableChild::from_child(&child, self.inactivity.activity()); let (exit_tx, exit_rx) = reapable_grandchild.watch()?; @@ -252,6 +263,8 @@ pub struct ReapableChild { #[getset(get = "pub")] cleanup_cmd: Vec, + + activity: Activity, } #[derive(Clone, CopyGetters, Debug, Getters, Setters)] @@ -267,7 +280,7 @@ pub struct ExitChannelData { } impl ReapableChild { - pub fn from_child(child: &Child) -> Self { + pub fn from_child(child: &Child, activity: Activity) -> Self { Self { exit_paths: child.exit_paths().clone(), oom_exit_paths: child.oom_exit_paths().clone(), @@ -277,6 +290,7 @@ impl ReapableChild { token: child.token().clone(), task: None, cleanup_cmd: child.cleanup_cmd().to_vec(), + activity, } } @@ -307,6 +321,8 @@ impl ReapableChild { let stop_token = self.token().clone(); let cleanup_cmd_raw = self.cleanup_cmd().clone(); + let activity = self.activity.clone(); + let task = task::spawn( async move { debug!("Running task"); @@ -360,7 +376,7 @@ impl ReapableChild { } if !cleanup_cmd_raw.is_empty() { - Self::spawn_cleanup_process(&cleanup_cmd_raw).await; + Self::spawn_cleanup_process(&cleanup_cmd_raw, activity.clone()).await; } debug!("Sending exit struct to channel: {:?}", exit_channel_data); @@ -368,6 +384,8 @@ impl ReapableChild { debug!("Unable to send exit status"); } debug!("Task done"); + + activity.stop(); } .instrument(debug_span!("watch", pid)), ); @@ -382,7 +400,7 @@ impl ReapableChild { Ok((exit_tx, exit_rx)) } - async fn spawn_cleanup_process(raw_cmd: &[String]) { + async fn spawn_cleanup_process(raw_cmd: &[String], activity: Activity) { let mut cleanup_cmd = Command::new(&raw_cmd[0]); cleanup_cmd.args(&raw_cmd[1..]); @@ -399,6 +417,7 @@ impl ReapableChild { e ), } + activity.stop(); }); } diff --git a/conmon-rs/server/src/config.rs b/conmon-rs/server/src/config.rs index 5165ebf0cb..c2cc123e66 100644 --- a/conmon-rs/server/src/config.rs +++ b/conmon-rs/server/src/config.rs @@ -12,7 +12,7 @@ macro_rules! prefix { }; } -#[derive(CopyGetters, Debug, Deserialize, Eq, Getters, Parser, PartialEq, Serialize, Setters)] +#[derive(CopyGetters, Debug, Deserialize, Getters, Parser, PartialEq, Serialize, Setters)] #[serde(rename_all = "kebab-case")] #[command( after_help("More info at: https://github.com/containers/conmon-rs"), @@ -125,6 +125,16 @@ pub struct Config { )] /// OpenTelemetry GRPC endpoint to be used for tracing. tracing_endpoint: String, + + #[get_copy = "pub"] + #[arg( + default_value_t, + env(concat!(prefix!(), "SHUTDOWN_DELAY")), + long("shutdown-delay"), + value_name("DELAY"), + )] + /// Automatically stop conmon-rs after DELAY seconds of inactivity. (0 = disabled) + shutdown_delay: f64, } #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, Subcommand)] diff --git a/conmon-rs/server/src/fd_socket.rs b/conmon-rs/server/src/fd_socket.rs index 2d8b38db37..777d166207 100644 --- a/conmon-rs/server/src/fd_socket.rs +++ b/conmon-rs/server/src/fd_socket.rs @@ -36,7 +36,10 @@ //! //! An empty request does **not** receive a response. -use crate::listener::{Listener, SeqpacketListener}; +use crate::{ + inactivity::Inactivity, + listener::{Listener, SeqpacketListener}, +}; use anyhow::Result; use std::{ collections::{hash_map, HashMap}, @@ -46,10 +49,11 @@ use std::{ os::fd::OwnedFd, path::PathBuf, sync::{Arc, Mutex}, + time::Duration, }; use tokio::{runtime::Handle, sync::Mutex as AsyncMutex, task}; use tokio_seqpacket::{ancillary::OwnedAncillaryMessage, UnixSeqpacket}; -use tracing::{debug_span, Instrument}; +use tracing::{debug, debug_span, error, Instrument}; #[derive(Debug, Default)] pub struct FdSocket { @@ -170,12 +174,34 @@ impl Server { let mut listener = Listener::::default().bind_long_path(&server.path)?; let guard = ListenerGuard(fd_socket); + let inactivity = Inactivity::new(); + let timeout = Duration::from_secs(3); + task::spawn( async move { - while let Ok(conn) = listener.accept().await { + loop { + let conn = tokio::select! { + () = inactivity.wait(timeout) => { + debug!("Stop fd socket after inactivity"); + break; + } + conn = listener.accept() => match conn { + Ok(conn) => conn, + Err(err) => { + error!("Unable to accept on fd socket: {err}"); + break; + } + }, + }; let fd_socket = guard.0.clone(); + let activity = inactivity.activity(); task::spawn( - Self::serve(conn, fd_socket).instrument(debug_span!("fd_socket_serve")), + async move { + let result = Self::serve(conn, fd_socket).await; + activity.stop(); + result + } + .instrument(debug_span!("fd_socket_serve")), ); } drop(guard); diff --git a/conmon-rs/server/src/inactivity.rs b/conmon-rs/server/src/inactivity.rs new file mode 100644 index 0000000000..46eed232d7 --- /dev/null +++ b/conmon-rs/server/src/inactivity.rs @@ -0,0 +1,128 @@ +use std::{ + future, process, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + +use tokio::sync::{futures::Notified, Notify}; + +/// Track activity and reacto to inactivity. +/// +/// Can be used to exit accept loops after inactivity. +#[derive(Debug, Clone)] +pub struct Inactivity(Option>); + +impl Inactivity { + /// Create a new inactivity tracker. + pub fn new() -> Self { + Self(Some(Arc::default())) + } + + /// Create a disabled inactivity tracker. + /// + /// The wait function will never return. There is always activity. + pub const fn disabled() -> Self { + Self(None) + } + + /// Start tracking an activity. + pub fn activity(&self) -> Activity { + Activity::new(self.0.as_ref()) + } + + /// Async "block" until there is no activity and then wait for an additional`timeout`. + pub async fn wait(&self, timeout: Duration) { + if let Some(inner) = &self.0 { + loop { + let changed = inner.changed(); + if inner.no_activity() { + let _ = tokio::time::timeout(timeout, changed).await; + if inner.no_activity() { + break; + } + } else { + changed.await + } + } + } else { + future::pending().await + } + } +} + +/// Track an activity. Can be cloned to track additional activities. +/// +/// The Activity stops on drop. +#[derive(Debug)] +pub struct Activity(Option>); + +impl Activity { + fn new(inner: Option<&Arc>) -> Self { + Self(inner.map(Inner::increment)) + } + + /// Explicitly stop an activity. Just a wrapper for `drop(activity)`. + pub fn stop(self) { + // nothing to to, we take self by value + } +} + +impl Clone for Activity { + fn clone(&self) -> Self { + Self::new(self.0.as_ref()) + } +} + +impl Drop for Activity { + fn drop(&mut self) { + if let Some(inner) = &self.0 { + inner.decrement(); + } + } +} + +#[derive(Debug, Default)] +struct Inner { + // number of current activities + active: AtomicUsize, + // gets notofied whenever the result of `no_activity` changes + notify: Notify, +} + +impl Inner { + /// Abort if more then isize::MAX activities are active. + /// + /// This prevents integer overflow of the `active` counter in case + /// someone is `mem::forget`ing activities. + /// + /// The same logic is applied internally by the `Arc` implementation. + const MAX_ACTIVE: usize = isize::MAX as usize; + + fn increment(self: &Arc) -> Arc { + match self.active.fetch_add(1, Ordering::Relaxed) { + 0 => self.notify.notify_waiters(), + 1..=Self::MAX_ACTIVE => {} + _ => process::abort(), + } + self.clone() + } + + fn decrement(&self) { + match self.active.fetch_sub(1, Ordering::Relaxed) { + 1 => self.notify.notify_waiters(), + 2..=Self::MAX_ACTIVE => {} + _ => process::abort(), + } + } + + fn no_activity(&self) -> bool { + self.active.load(Ordering::Relaxed) == 0 + } + + fn changed(&self) -> Notified { + self.notify.notified() + } +} diff --git a/conmon-rs/server/src/lib.rs b/conmon-rs/server/src/lib.rs index 72f3237cd2..a6423a0824 100644 --- a/conmon-rs/server/src/lib.rs +++ b/conmon-rs/server/src/lib.rs @@ -16,6 +16,7 @@ mod container_io; mod container_log; mod cri_logger; mod fd_socket; +mod inactivity; mod init; mod journal; mod listener; diff --git a/conmon-rs/server/src/server.rs b/conmon-rs/server/src/server.rs index 77e00d325b..063209551b 100644 --- a/conmon-rs/server/src/server.rs +++ b/conmon-rs/server/src/server.rs @@ -5,6 +5,7 @@ use crate::{ config::{Commands, Config, LogDriver, Verbosity}, container_io::{ContainerIO, ContainerIOType}, fd_socket::FdSocket, + inactivity::Inactivity, init::{DefaultInit, Init}, journal::Journal, listener::{DefaultListener, Listener}, @@ -16,7 +17,7 @@ use anyhow::{format_err, Context, Result}; use capnp::text_list::Reader; use capnp_rpc::{rpc_twoparty_capnp::Side, twoparty, RpcSystem}; use conmon_common::conmon_capnp::conmon::{self, CgroupManager}; -use futures::{AsyncReadExt, FutureExt}; +use futures::AsyncReadExt; use getset::Getters; use nix::{ errno, @@ -25,7 +26,7 @@ use nix::{ unistd::{fork, ForkResult}, }; use opentelemetry::trace::FutureExt as OpenTelemetryFutureExt; -use std::{fs::File, io::Write, path::Path, process, str::FromStr, sync::Arc}; +use std::{fs::File, io::Write, path::Path, process, str::FromStr, sync::Arc, time::Duration}; use tokio::{ fs, runtime::{Builder, Handle}, @@ -53,15 +54,26 @@ pub struct Server { /// Fd socket instance. #[getset(get = "pub(crate)")] fd_socket: Arc, + + // Shutdown controller. + #[getset(get = "pub(crate)")] + inactivity: Inactivity, } impl Server { /// Create a new `Server` instance. pub fn new() -> Result { + let config = Config::default(); + let inactivity = if config.shutdown_delay() == 0.0 { + Inactivity::disabled() + } else { + Inactivity::new() + }; let server = Self { - config: Default::default(), - reaper: Default::default(), + config, + reaper: ChildReaper::new(inactivity.clone()).into(), fd_socket: Default::default(), + inactivity, }; if let Some(v) = server.config().version() { @@ -274,6 +286,8 @@ impl Server { } async fn start_backend(self, mut shutdown_rx: oneshot::Receiver<()>) -> Result<()> { + let inactivity = self.inactivity().clone(); + let timeout = Duration::from_secs_f64(self.config().shutdown_delay()); let listener = Listener::::default().bind_long_path(self.config().socket())?; let client: conmon::Client = capnp_rpc::new_client(self); @@ -284,6 +298,10 @@ impl Server { debug!("Received shutdown message"); return Ok(()) } + () = inactivity.wait(timeout) => { + debug!("Automatic shutdown after inactivity"); + return Ok(()) + } stream = listener.accept() => { stream?.0 }, @@ -296,7 +314,11 @@ impl Server { Default::default(), )); let rpc_system = RpcSystem::new(network, Some(client.clone().client)); - task::spawn_local(Box::pin(rpc_system.map(|_| ()))); + let activity = inactivity.activity(); + task::spawn_local(async move { + let _ = rpc_system.await; + activity.stop(); + }); } } } diff --git a/pkg/client/client.go b/pkg/client/client.go index e6aa985ef5..4c1f4c79d8 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -95,6 +95,11 @@ type ConmonServerConfig struct { // Tracing can be used to enable OpenTelemetry tracing. Tracing *Tracing + + // ShutdownDelay an be used to automatically stop the conmonrs server after DELAY seconds of inactivity. + // + // 0 = disabled (run until manually stopped by client.Shutdown) + ShutdownDelay time.Duration } // Tracing is the structure for managing server-side OpenTelemetry tracing. @@ -339,6 +344,10 @@ func (c *ConmonClient) toArgs(config *ConmonServerConfig) (entrypoint string, ar } } + if config.ShutdownDelay > 0 { + args = append(args, "--shutdown-delay", fmt.Sprint(config.ShutdownDelay.Seconds())) + } + return entrypoint, args, nil }