Skip to content

Commit

Permalink
Add automatic shutdown after inactivity
Browse files Browse the repository at this point in the history
Signed-off-by: Martin Michaelis <[email protected]>
  • Loading branch information
mgjm committed Aug 15, 2023
1 parent 7d7e415 commit f024597
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 15 deletions.
29 changes: 24 additions & 5 deletions conmon-rs/server/src/child_reaper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use crate::{
child::Child,
container_io::{ContainerIO, ContainerIOType, SharedContainerIO},
inactivity::{Activity, Inactivity},
oom_watcher::OOMWatcher,
};
use anyhow::{bail, Context, Result};
Expand Down Expand Up @@ -37,16 +38,26 @@ use tokio::{
use tokio_util::sync::CancellationToken;
use tracing::{debug, debug_span, error, warn, Instrument};

#[derive(Debug, Default, Getters)]
#[derive(Debug, Getters)]
pub struct ChildReaper {
#[getset(get)]
grandchildren: Arc<Mutex<MultiMap<String, ReapableChild>>>,

#[getset(get)]
inactivity: Inactivity,
}

/// first usable file descriptor after stdin, stdout and stderr
const FIRST_FD_AFTER_STDIO: RawFd = 3;

impl ChildReaper {
pub fn new(inactivity: Inactivity) -> ChildReaper {
Self {
grandchildren: Arc::default(),
inactivity,
}
}

pub fn get(&self, id: &str) -> Result<ReapableChild> {
let locked_grandchildren = &self.grandchildren().clone();
let lock = lock!(locked_grandchildren);
Expand Down Expand Up @@ -155,7 +166,7 @@ impl ChildReaper {
) -> Result<Receiver<ExitChannelData>> {
let locked_grandchildren = &self.grandchildren().clone();
let mut map = lock!(locked_grandchildren);
let mut reapable_grandchild = ReapableChild::from_child(&child);
let mut reapable_grandchild = ReapableChild::from_child(&child, self.inactivity.activity());

let (exit_tx, exit_rx) = reapable_grandchild.watch()?;

Expand Down Expand Up @@ -252,6 +263,8 @@ pub struct ReapableChild {

#[getset(get = "pub")]
cleanup_cmd: Vec<String>,

activity: Activity,
}

#[derive(Clone, CopyGetters, Debug, Getters, Setters)]
Expand All @@ -267,7 +280,7 @@ pub struct ExitChannelData {
}

impl ReapableChild {
pub fn from_child(child: &Child) -> Self {
pub fn from_child(child: &Child, activity: Activity) -> Self {
Self {
exit_paths: child.exit_paths().clone(),
oom_exit_paths: child.oom_exit_paths().clone(),
Expand All @@ -277,6 +290,7 @@ impl ReapableChild {
token: child.token().clone(),
task: None,
cleanup_cmd: child.cleanup_cmd().to_vec(),
activity,
}
}

Expand Down Expand Up @@ -307,6 +321,8 @@ impl ReapableChild {
let stop_token = self.token().clone();
let cleanup_cmd_raw = self.cleanup_cmd().clone();

let activity = self.activity.clone();

let task = task::spawn(
async move {
debug!("Running task");
Expand Down Expand Up @@ -360,14 +376,16 @@ impl ReapableChild {
}

if !cleanup_cmd_raw.is_empty() {
Self::spawn_cleanup_process(&cleanup_cmd_raw).await;
Self::spawn_cleanup_process(&cleanup_cmd_raw, activity.clone()).await;
}

debug!("Sending exit struct to channel: {:?}", exit_channel_data);
if exit_tx_clone.send(exit_channel_data).is_err() {
debug!("Unable to send exit status");
}
debug!("Task done");

activity.stop();
}
.instrument(debug_span!("watch", pid)),
);
Expand All @@ -382,7 +400,7 @@ impl ReapableChild {
Ok((exit_tx, exit_rx))
}

async fn spawn_cleanup_process(raw_cmd: &[String]) {
async fn spawn_cleanup_process(raw_cmd: &[String], activity: Activity) {
let mut cleanup_cmd = Command::new(&raw_cmd[0]);

cleanup_cmd.args(&raw_cmd[1..]);
Expand All @@ -399,6 +417,7 @@ impl ReapableChild {
e
),
}
activity.stop();
});
}

Expand Down
12 changes: 11 additions & 1 deletion conmon-rs/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ macro_rules! prefix {
};
}

#[derive(CopyGetters, Debug, Deserialize, Eq, Getters, Parser, PartialEq, Serialize, Setters)]
#[derive(CopyGetters, Debug, Deserialize, Getters, Parser, PartialEq, Serialize, Setters)]
#[serde(rename_all = "kebab-case")]
#[command(
after_help("More info at: https://github.com/containers/conmon-rs"),
Expand Down Expand Up @@ -125,6 +125,16 @@ pub struct Config {
)]
/// OpenTelemetry GRPC endpoint to be used for tracing.
tracing_endpoint: String,

#[get_copy = "pub"]
#[arg(
default_value_t,
env(concat!(prefix!(), "SHUTDOWN_DELAY")),
long("shutdown-delay"),
value_name("DELAY"),
)]
/// Automatically stop conmon-rs after DELAY seconds of inactivity. (0 = disabled)
shutdown_delay: f64,
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize, Subcommand)]
Expand Down
34 changes: 30 additions & 4 deletions conmon-rs/server/src/fd_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@
//!
//! An empty request does **not** receive a response.
use crate::listener::{Listener, SeqpacketListener};
use crate::{
inactivity::Inactivity,
listener::{Listener, SeqpacketListener},
};
use anyhow::Result;
use std::{
collections::{hash_map, HashMap},
Expand All @@ -46,10 +49,11 @@ use std::{
os::fd::OwnedFd,
path::PathBuf,
sync::{Arc, Mutex},
time::Duration,
};
use tokio::{runtime::Handle, sync::Mutex as AsyncMutex, task};
use tokio_seqpacket::{ancillary::OwnedAncillaryMessage, UnixSeqpacket};
use tracing::{debug_span, Instrument};
use tracing::{debug, debug_span, error, Instrument};

#[derive(Debug, Default)]
pub struct FdSocket {
Expand Down Expand Up @@ -170,12 +174,34 @@ impl Server {
let mut listener = Listener::<SeqpacketListener>::default().bind_long_path(&server.path)?;
let guard = ListenerGuard(fd_socket);

let inactivity = Inactivity::new();
let timeout = Duration::from_secs(3);

task::spawn(
async move {
while let Ok(conn) = listener.accept().await {
loop {
let conn = tokio::select! {
() = inactivity.wait(timeout) => {
debug!("Stop fd socket after inactivity");
break;
}
conn = listener.accept() => match conn {
Ok(conn) => conn,
Err(err) => {
error!("Unable to accept on fd socket: {err}");
break;
}
},
};
let fd_socket = guard.0.clone();
let activity = inactivity.activity();
task::spawn(
Self::serve(conn, fd_socket).instrument(debug_span!("fd_socket_serve")),
async move {
let result = Self::serve(conn, fd_socket).await;
activity.stop();
result
}
.instrument(debug_span!("fd_socket_serve")),
);
}
drop(guard);
Expand Down
128 changes: 128 additions & 0 deletions conmon-rs/server/src/inactivity.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use std::{
future, process,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};

use tokio::sync::{futures::Notified, Notify};

/// Track activity and reacto to inactivity.
///
/// Can be used to exit accept loops after inactivity.
#[derive(Debug, Clone)]
pub struct Inactivity(Option<Arc<Inner>>);

impl Inactivity {
/// Create a new inactivity tracker.
pub fn new() -> Self {
Self(Some(Arc::default()))
}

/// Create a disabled inactivity tracker.
///
/// The wait function will never return. There is always activity.
pub const fn disabled() -> Self {
Self(None)
}

/// Start tracking an activity.
pub fn activity(&self) -> Activity {
Activity::new(self.0.as_ref())
}

/// Async "block" until there is no activity and then wait for an additional`timeout`.
pub async fn wait(&self, timeout: Duration) {
if let Some(inner) = &self.0 {
loop {
let changed = inner.changed();
if inner.no_activity() {
let _ = tokio::time::timeout(timeout, changed).await;
if inner.no_activity() {
break;
}
} else {
changed.await
}
}
} else {
future::pending().await
}
}
}

/// Track an activity. Can be cloned to track additional activities.
///
/// The Activity stops on drop.
#[derive(Debug)]
pub struct Activity(Option<Arc<Inner>>);

impl Activity {
fn new(inner: Option<&Arc<Inner>>) -> Self {
Self(inner.map(Inner::increment))
}

/// Explicitly stop an activity. Just a wrapper for `drop(activity)`.
pub fn stop(self) {
// nothing to to, we take self by value
}
}

impl Clone for Activity {
fn clone(&self) -> Self {
Self::new(self.0.as_ref())
}
}

impl Drop for Activity {
fn drop(&mut self) {
if let Some(inner) = &self.0 {
inner.decrement();
}
}
}

#[derive(Debug, Default)]
struct Inner {
// number of current activites
active: AtomicUsize,
// gets notofied whenever the result of `no_activity` changes
notify: Notify,
}

impl Inner {
/// Abort if more then isize::MAX activities are active.
///
/// This prevents integer overflow of the `active` counter in case
/// someone is `mem::forget`ing activities.
///
/// The same logic is applied internaly by the `Arc` implementation.
const MAX_ACTIVE: usize = isize::MAX as usize;

fn increment(self: &Arc<Self>) -> Arc<Self> {
match self.active.fetch_add(1, Ordering::Relaxed) {
0 => self.notify.notify_waiters(),
1..=Self::MAX_ACTIVE => {}
_ => process::abort(),
}
self.clone()
}

fn decrement(&self) {
match self.active.fetch_sub(1, Ordering::Relaxed) {
1 => self.notify.notify_waiters(),
2..=Self::MAX_ACTIVE => {}
_ => process::abort(),
}
}

fn no_activity(&self) -> bool {
self.active.load(Ordering::Relaxed) == 0
}

fn changed(&self) -> Notified {
self.notify.notified()
}
}
1 change: 1 addition & 0 deletions conmon-rs/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod container_io;
mod container_log;
mod cri_logger;
mod fd_socket;
mod inactivity;
mod init;
mod journal;
mod listener;
Expand Down
Loading

0 comments on commit f024597

Please sign in to comment.