From 3cf4f75a4d410e11f5a321b1e81c43cacac6be68 Mon Sep 17 00:00:00 2001 From: Benjamin Boudreau Date: Sun, 1 Dec 2024 20:53:38 -0500 Subject: [PATCH] improved tracing and cleanup --- server/src/scheduler.rs | 6 +- server/task_scheduler/src/task_creator.rs | 320 +++++++++++----------- 2 files changed, 157 insertions(+), 169 deletions(-) diff --git a/server/src/scheduler.rs b/server/src/scheduler.rs index 4173d5dcd..52c7533b5 100644 --- a/server/src/scheduler.rs +++ b/server/src/scheduler.rs @@ -594,7 +594,7 @@ mod tests { let state = IndexifyState::new(temp_dir.path().join("state")).await?; let scheduler = Scheduler::new( state.clone(), - Arc::new(scheduler_stats::Metrics::new(state.clone())), + Arc::new(scheduler_stats::Metrics::new(state.metrics.clone())), ); let graph = { @@ -867,7 +867,7 @@ mod tests { let state = IndexifyState::new(temp_dir.path().join("state")).await?; let scheduler = Scheduler::new( state.clone(), - Arc::new(scheduler_stats::Metrics::new(state.clone())), + Arc::new(scheduler_stats::Metrics::new(state.metrics.clone())), ); let graph = { @@ -1191,7 +1191,7 @@ mod tests { let state = IndexifyState::new(temp_dir.path().join("state")).await?; let scheduler = Scheduler::new( state.clone(), - Arc::new(scheduler_stats::Metrics::new(state.clone())), + Arc::new(scheduler_stats::Metrics::new(state.metrics.clone())), ); let graph = { diff --git a/server/task_scheduler/src/task_creator.rs b/server/task_scheduler/src/task_creator.rs index 732b937f9..9ffab6258 100644 --- a/server/task_scheduler/src/task_creator.rs +++ b/server/task_scheduler/src/task_creator.rs @@ -1,30 +1,20 @@ use std::sync::Arc; use anyhow::{anyhow, Result}; -use data_model::{ - ComputeGraph, - GraphInvocationCtx, - InvokeComputeGraphEvent, - Node, - OutputPayload, - Task, - TaskOutcome, -}; -use state_store::{state_machine::IndexifyObjectsColumns, IndexifyState}; -use tracing::{error, info, instrument, trace, Level}; +use data_model::{ComputeGraph, InvokeComputeGraphEvent, Node, OutputPayload, Task, TaskOutcome}; +use state_store::IndexifyState; +use tracing::{error, info, instrument, trace}; use crate::TaskCreationResult; -#[instrument(ret(level = Level::DEBUG), skip(indexify_state, event), fields(namespace = event.namespace, compute_graph = event.compute_graph, invocation_id = event.invocation_id))] +#[instrument( + skip(indexify_state, event), + fields(namespace = event.namespace, compute_graph = event.compute_graph, invocation_id = event.invocation_id) +)] pub async fn handle_invoke_compute_graph( indexify_state: Arc, event: InvokeComputeGraphEvent, ) -> Result { - trace!( - "Handling invoke compute graph event: namespace = {:?}, compute_graph = {:?}, invocation_id = {:?}", - event.namespace, event.compute_graph, event.invocation_id - ); - let compute_graph = indexify_state .reader() .get_compute_graph(&event.namespace, &event.compute_graph)?; @@ -54,8 +44,8 @@ pub async fn handle_invoke_compute_graph( compute_graph.version, )?; trace!( - "Created task for compute graph: namespace = {:?}, compute_graph = {:?}, invocation_id = {:?}, task_id = {:?}", - event.namespace, event.compute_graph, event.invocation_id, task.id + task_key = task.key(), + "Creating a standard task to start compute graph" ); Ok(TaskCreationResult { namespace: event.namespace.clone(), @@ -68,35 +58,18 @@ pub async fn handle_invoke_compute_graph( }) } -#[instrument(skip(indexify_state, task, compute_graph), fields(namespace = task.namespace, compute_graph = task.compute_graph_name, invocation_id = task.invocation_id, compute_fn_name = task.compute_fn_name, task_id = task.id.to_string()))] +#[instrument( + skip(indexify_state, task, compute_graph), + fields( + namespace = task.namespace, compute_graph = task.compute_graph_name, invocation_id = task.invocation_id, + finished_task_compute_fn_name = task.compute_fn_name, finished_task_key = task.key() + ) +)] pub async fn handle_task_finished( indexify_state: Arc, task: Task, compute_graph: ComputeGraph, ) -> Result { - trace!( - "Handling task finished: namespace = {:?}, compute_graph = {:?}, invocation_id = {:?}, task_id = {:?}", - task.namespace, task.compute_graph_name, task.invocation_id, task.id - ); - - let txn = indexify_state.db.transaction(); - - let key = GraphInvocationCtx::key_from( - task.namespace.as_str(), - task.compute_graph_name.as_str(), - task.invocation_id.as_str(), - ); - let _value = txn.get_for_update_cf( - &IndexifyObjectsColumns::GraphInvocationCtx.cf_db(&indexify_state.db), - &key, - true, - )?; - - let _value = txn.get_for_update_cf( - &IndexifyObjectsColumns::Tasks.cf_db(&indexify_state.db), - &task.key(), - true, - )?; let invocation_ctx = indexify_state .reader() .invocation_ctx( @@ -123,7 +96,7 @@ pub async fn handle_task_finished( task.invocation_id ))?; - trace!("GraphInvocationCtx: {:?}", invocation_ctx); + trace!("invocation context: {:?}", invocation_ctx); if task.outcome == TaskOutcome::Failure { let mut invocation_finished = false; @@ -131,8 +104,8 @@ pub async fn handle_task_finished( invocation_finished = true; } info!( - "Task failed, graph invocation: {:?} {}", - task.compute_graph_name, invocation_finished + "Task failed, graph invocation finishing? {}", + invocation_finished ); return Ok(TaskCreationResult::no_tasks( &task.namespace, @@ -140,59 +113,70 @@ pub async fn handle_task_finished( &task.invocation_id, )); } - let mut new_tasks = vec![]; let outputs = indexify_state .reader() .get_task_outputs(&task.namespace, &task.id.to_string())?; - let mut router_edges = vec![]; - for output in &outputs { - if let OutputPayload::Router(router_output) = &output.payload { - for edge in &router_output.edges { - router_edges.push(edge); + let mut new_tasks = vec![]; + + // Check if the task has a router output and create new tasks for the router + // edges. + { + let mut router_edges = vec![]; + for output in &outputs { + if let OutputPayload::Router(router_output) = &output.payload { + for edge in &router_output.edges { + router_edges.push(edge); + } } } - } - if !router_edges.is_empty() { - for edge in router_edges { - let compute_fn = compute_graph - .nodes - .get(edge) - .ok_or(anyhow!("compute node not found: {:?}", edge))?; - let new_task = compute_fn.create_task( - &task.namespace, - &task.compute_graph_name, - &task.invocation_id, - &task.input_node_output_key, - None, - invocation_ctx.graph_version, - )?; - new_tasks.push(new_task); + if !router_edges.is_empty() { + for edge in router_edges { + let compute_fn = compute_graph + .nodes + .get(edge) + .ok_or(anyhow!("compute node not found: {:?}", edge))?; + let new_task = compute_fn.create_task( + &task.namespace, + &task.compute_graph_name, + &task.invocation_id, + &task.input_node_output_key, + None, + invocation_ctx.graph_version, + )?; + new_tasks.push(new_task); + } + trace!( + task_keys = ?new_tasks.iter().map(|t| t.key()).collect::>(), + "Creating a router edge task", + ); + return Ok(TaskCreationResult { + namespace: task.namespace.clone(), + compute_graph: task.compute_graph_name.clone(), + invocation_id: task.invocation_id.clone(), + tasks: new_tasks, + new_reduction_tasks: vec![], + processed_reduction_tasks: vec![], + invocation_finished: false, + }); } - trace!( - "Created new tasks for router edges: namespace = {:?}, compute_graph = {:?}, invocation_id = {:?}, task_ids = {:?}", - task.namespace, task.compute_graph_name, task.invocation_id, new_tasks.iter().map(|t| t.id.clone()).collect::>() - ); - return Ok(TaskCreationResult { - namespace: task.namespace.clone(), - compute_graph: task.compute_graph_name.clone(), - invocation_id: task.invocation_id.clone(), - tasks: new_tasks, - new_reduction_tasks: vec![], - processed_reduction_tasks: vec![], - invocation_finished: false, - }); } - // Check for pending reduction tasks to create + // When a reducer task finishes, check for more queued reduction tasks to create + // to ensure sequential execution. if let Some(compute_node) = compute_graph.nodes.get(&task.compute_fn_name) { if let Node::Compute(compute_fn) = compute_node { if compute_fn.reducer { + // Do nothing if there is a pending reducer task for this compute node. + // + // This protects against the case where a reducer task finished before the next + // output and another one was created without queuing. if let Some(task_analytics) = invocation_ctx.get_task_analytics(&task.compute_fn_name) { if task_analytics.pending_tasks > 0 { trace!( - "Waiting for pending reducer tasks to finish before queuing new ones" + compute_fn_name = compute_fn.name, + "Waiting for pending reducer tasks to finish before unqueing" ); return Ok(TaskCreationResult { namespace: task.namespace.clone(), @@ -226,8 +210,9 @@ pub async fn handle_task_finished( invocation_ctx.graph_version, )?; trace!( - "Created new task for reduction task: namespace = {:?}, compute_graph = {:?}, invocation_id = {:?}, task_id = {:?}, compute_fn_name = {:?}", - task.namespace, task.compute_graph_name, task.invocation_id, new_task.id, new_task.compute_fn_name + task_keys = ?new_tasks.iter().map(|t| t.key()).collect::>(), + compute_fn_name = new_task.compute_fn_name, + "Creating a reduction task from queue", ); return Ok(TaskCreationResult { namespace: task.namespace.clone(), @@ -240,12 +225,12 @@ pub async fn handle_task_finished( }); } trace!( - "No reduction tasks to create: namespace = {:?}, compute_graph = {:?}, invocation_id = {:?}, compute_fn_name = {:?}", - task.namespace, task.compute_graph_name, task.invocation_id, task.compute_fn_name - ); + computed_fn_name = compute_fn.name, + "No queued reduction tasks to create", + ); - // Prevent early finalization of the invocation if a reduction task finished - // before other parent output have been generated. + // Prevent proceeding to edges too early if there are parent tasks that are + // still pending. if compute_graph .get_compute_parent_nodes(compute_node.name()) .iter() @@ -260,7 +245,8 @@ pub async fn handle_task_finished( }) { trace!( - "Waiting for parent tasks to finish before start child tasks of reducer" + compute_fn_name = compute_fn.name, + "Waiting for parent tasks to finish before starting a new reducer task" ); return Ok(TaskCreationResult { namespace: task.namespace.clone(), @@ -278,16 +264,16 @@ pub async fn handle_task_finished( // Find the edges of the function let edges = compute_graph.edges.get(&task.compute_fn_name); + + // If there are no edges, check if the invocation should be finished. if edges.is_none() { let invocation_finished = if invocation_ctx.outstanding_tasks == 0 { + info!("invocation finished"); true } else { + info!("invocation finishing, waiting for outstanding tasks to finish"); false }; - info!( - "compute graph {} invocation finished: {:?}", - task.compute_graph_name, invocation_finished - ); return Ok(TaskCreationResult { namespace: task.namespace.clone(), compute_graph: task.compute_graph_name.clone(), @@ -298,36 +284,39 @@ pub async fn handle_task_finished( invocation_finished, }); } + + // Create new tasks for the edges of the node of the current task. let mut new_reduction_tasks = vec![]; let edges = edges.unwrap(); for edge in edges { - for output in &outputs { - let compute_node = compute_graph - .nodes - .get(edge) - .ok_or(anyhow!("compute node not found: {:?}", edge))?; + let compute_node = compute_graph + .nodes + .get(edge) + .ok_or(anyhow!("compute node not found: {:?}", edge))?; - let task_analytics_edge = invocation_ctx.get_task_analytics(&edge); - trace!( - compute_fn_name = compute_node.name(), - "task_analytics_edge: {:?}", - task_analytics_edge, - ); - let (outstanding_tasks_for_node, successfull_tasks_for_node) = match task_analytics_edge - { - Some(task_analytics) => ( - task_analytics.pending_tasks, - task_analytics.successful_tasks, - ), - None => { - error!("task analytics not found for edge : {:?}", edge); - (0, 0) - } - }; - // hypothesis: if a previous reducer task finished previously, we need to create - // a new reducer task here BUT if we create a new reduction task and - // not a task, it will not get scheduled. + let task_analytics_edge = invocation_ctx.get_task_analytics(&edge); + trace!( + compute_fn_name = compute_node.name(), + "task_analytics_edge: {:?}", + task_analytics_edge, + ); + let (outstanding_tasks_for_node, successfull_tasks_for_node) = match task_analytics_edge { + Some(task_analytics) => ( + task_analytics.pending_tasks, + task_analytics.successful_tasks, + ), + None => { + error!("task analytics not found for edge : {:?}", edge); + (0, 0) + } + }; + + for output in &outputs { + // hypothesis: if compute_node.reducer() { + // In order to ensure sequential execution of reducer tasks, we queue a + // reduction task for this output if there are still outstanding + // tasks for the node or if we are going to create a new task for the node. if new_tasks.len() > 0 || outstanding_tasks_for_node > 0 { let new_task = compute_node.reducer_task( &task.namespace, @@ -337,12 +326,17 @@ pub async fn handle_task_finished( &output.key(&task.invocation_id), ); trace!( - "Created new reduction task: namespace = {:?}, compute_graph = {:?}, invocation_id = {:?}, compute_fn_name = {:?}, new_tasks_len = {:?}, outstanding_tasks_for_node = {:?}, output_len = {:?}", - new_task.namespace, new_task.compute_graph_name, new_task.invocation_id, new_task.compute_fn_name, new_tasks.len(), outstanding_tasks_for_node, outputs.len() + compute_fn_name = compute_node.name(), + "Creating a queued reduction task", ); new_reduction_tasks.push(new_task); continue; } + + // If a previous reducer task finished previously, we need to create + // a new reducer task here without queuing it. + // + // To do so, we need to find the previous reducer task to reuse its output. if successfull_tasks_for_node > 0 { let (prev_reducer_tasks, _) = indexify_state.reader().get_task_by_fn( &task.namespace, @@ -353,54 +347,47 @@ pub async fn handle_task_finished( Some(1), )?; - if !prev_reducer_tasks.is_empty() { - let prev_reducer_task = prev_reducer_tasks.first().unwrap(); - let prev_reducer_outputs = indexify_state.reader().get_task_outputs( - &prev_reducer_task.namespace, - &prev_reducer_task.id.to_string(), - )?; - - if prev_reducer_outputs.is_empty() { - error!( - "No outputs found for previous reducer task: {:?}", - prev_reducer_task.id - ); - return Err(anyhow!( - "No outputs found for previous reducer task, should never happen: {:?}", - prev_reducer_task.key(), - )); - } - let prev_reducer_output = prev_reducer_outputs.first().unwrap(); - // A reducer task has already finished, so we need to create a new task + if prev_reducer_tasks.is_empty() { + return Err(anyhow!( + "Previous reducer task not found, should never happen: {:?}", + compute_node.name() + )); + } - // we cannot start a normal task - // TODO: Handle the case where a failure happened in a reducer task + let prev_reducer_task = prev_reducer_tasks.first().unwrap(); + let prev_reducer_outputs = indexify_state.reader().get_task_outputs( + &prev_reducer_task.namespace, + &prev_reducer_task.id.to_string(), + )?; - // Create a new task for the queued reduction_task - let output = outputs.first().unwrap(); - let new_task = compute_node.create_task( - &task.namespace, - &task.compute_graph_name, - &task.invocation_id, - &output.key(&task.invocation_id), - Some(prev_reducer_output.id.clone()), - invocation_ctx.graph_version, - )?; - trace!( - "Created new task for reduction task (resume reduce): namespace = {:?}, compute_graph = {:?}, invocation_id = {:?}, task_id = {:?}, compute_fn_name = {:?}", - task.namespace, task.compute_graph_name, task.invocation_id, new_task.id, new_task.compute_fn_name - ); - new_tasks.push(new_task); - } else { - // TODO: consider returning an error. - error!( - "Previous reducer task not found for compute node: {:?}", - compute_node.name() - ); + if prev_reducer_outputs.is_empty() { + return Err(anyhow!( + "No outputs found for previous reducer task, should never happen: {:?}", + prev_reducer_task.key(), + )); } + let prev_reducer_output = prev_reducer_outputs.first().unwrap(); + + let output = outputs.first().unwrap(); + let new_task = compute_node.create_task( + &task.namespace, + &task.compute_graph_name, + &task.invocation_id, + &output.key(&task.invocation_id), + Some(prev_reducer_output.id.clone()), + invocation_ctx.graph_version, + )?; + trace!( + task_key = new_task.key(), + compute_fn_name = new_task.compute_fn_name, + "Creating a reduction task", + ); + new_tasks.push(new_task); + continue; } } + let new_task = compute_node.create_task( &task.namespace, &task.compute_graph_name, @@ -410,8 +397,9 @@ pub async fn handle_task_finished( invocation_ctx.graph_version, )?; trace!( - "Created new task for output: namespace = {:?}, compute_graph = {:?}, invocation_id = {:?}, compute_fn_name = {:?}, new_tasks_len = {:?}, outstanding_tasks_for_node = {:?}, output_len = {:?}", - new_task.namespace, new_task.compute_graph_name, new_task.invocation_id, new_task.compute_fn_name, new_tasks.len(), outstanding_tasks_for_node, outputs.len() + task_key = new_task.key(), + compute_fn_name = new_task.compute_fn_name, + "Creating a standard task", ); new_tasks.push(new_task); }