From 0ce09944e5d10d1cee82afa996cace6fb7278a5d Mon Sep 17 00:00:00 2001 From: Louis Knight-Webb Date: Mon, 16 Jun 2025 18:20:17 -0400 Subject: [PATCH] Refactor --- backend/src/execution_monitor.rs | 189 +++++++++++++++++++++++++++++++ backend/src/main.rs | 185 +----------------------------- 2 files changed, 191 insertions(+), 183 deletions(-) create mode 100644 backend/src/execution_monitor.rs diff --git a/backend/src/execution_monitor.rs b/backend/src/execution_monitor.rs new file mode 100644 index 00000000..d27fc09a --- /dev/null +++ b/backend/src/execution_monitor.rs @@ -0,0 +1,189 @@ +use chrono::{DateTime, Utc}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::process::Command; +use uuid::Uuid; + +use crate::models::{ + task_attempt_activity::{CreateTaskAttemptActivity, TaskAttemptActivity}, + task_attempt::TaskAttemptStatus +}; + +#[derive(Debug)] +pub struct RunningExecution { + pub task_attempt_id: Uuid, + pub child: tokio::process::Child, + pub started_at: DateTime, +} + +#[derive(Debug, Clone)] +pub struct AppState { + pub running_executions: Arc>>, + pub db_pool: sqlx::PgPool, +} + +pub async fn execution_monitor(app_state: AppState) { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); + + loop { + interval.tick().await; + + // Check for orphaned task attempts with latest activity status = InProgress but no running execution + let inprogress_attempt_ids = match TaskAttemptActivity::find_attempts_with_latest_inprogress_status(&app_state.db_pool).await { + Ok(attempts) => attempts, + Err(e) => { + tracing::error!("Failed to query inprogress attempts: {}", e); + continue; + } + }; + + for attempt_id in inprogress_attempt_ids { + // Check if this attempt has a running execution + let has_running_execution = { + let executions = app_state.running_executions.lock().await; + executions.values().any(|exec| exec.task_attempt_id == attempt_id) + }; + + if !has_running_execution { + // This is an orphaned task attempt - mark it as paused + let activity_id = Uuid::new_v4(); + let create_activity = CreateTaskAttemptActivity { + task_attempt_id: attempt_id, + status: Some(TaskAttemptStatus::Paused), + note: Some("Execution lost (server restart or crash)".to_string()), + }; + + if let Err(e) = TaskAttemptActivity::create( + &app_state.db_pool, + &create_activity, + activity_id, + TaskAttemptStatus::Paused, + ).await { + tracing::error!("Failed to create paused activity for orphaned attempt: {}", e); + } else { + tracing::info!("Marked orphaned task attempt {} as paused", attempt_id); + } + } + } + + // Check for task attempts with latest activity status = Init + let init_attempt_ids = match TaskAttemptActivity::find_attempts_with_latest_init_status(&app_state.db_pool).await { + Ok(attempts) => attempts, + Err(e) => { + tracing::error!("Failed to query init attempts: {}", e); + continue; + } + }; + + for attempt_id in init_attempt_ids { + + // Check if we already have a running execution for this attempt + { + let executions = app_state.running_executions.lock().await; + if executions.values().any(|exec| exec.task_attempt_id == attempt_id) { + continue; + } + } + + // Spawn the process + let child = match Command::new("echo") + .arg("hello world") + .spawn() { + Ok(child) => child, + Err(e) => { + tracing::error!("Failed to spawn echo command: {}", e); + continue; + } + }; + + // Add to running executions + let execution_id = Uuid::new_v4(); + { + let mut executions = app_state.running_executions.lock().await; + executions.insert(execution_id, RunningExecution { + task_attempt_id: attempt_id, + child, + started_at: Utc::now(), + }); + } + + // Update task attempt activity to InProgress + let activity_id = Uuid::new_v4(); + let create_activity = CreateTaskAttemptActivity { + task_attempt_id: attempt_id, + status: Some(TaskAttemptStatus::InProgress), + note: Some("Started execution".to_string()), + }; + + if let Err(e) = TaskAttemptActivity::create( + &app_state.db_pool, + &create_activity, + activity_id, + TaskAttemptStatus::InProgress, + ).await { + tracing::error!("Failed to create in-progress activity: {}", e); + } + + tracing::info!("Started execution {} for task attempt {}", execution_id, attempt_id); + } + + // Check for completed processes + let mut completed_executions = Vec::new(); + { + let mut executions = app_state.running_executions.lock().await; + for (execution_id, running_exec) in executions.iter_mut() { + match running_exec.child.try_wait() { + Ok(Some(status)) => { + let success = status.success(); + let exit_code = status.code(); + completed_executions.push((*execution_id, running_exec.task_attempt_id, success, exit_code)); + } + Ok(None) => { + // Still running + } + Err(e) => { + tracing::error!("Error checking process status: {}", e); + completed_executions.push((*execution_id, running_exec.task_attempt_id, false, None)); + } + } + } + + // Remove completed executions from the map + for (execution_id, _, _, _) in &completed_executions { + executions.remove(execution_id); + } + } + + // Handle completed executions + for (execution_id, task_attempt_id, success, exit_code) in completed_executions { + let status_text = if success { "completed successfully" } else { "failed" }; + let exit_text = if let Some(code) = exit_code { + format!(" with exit code {}", code) + } else { + String::new() + }; + + tracing::info!("Execution {} {}{}", execution_id, status_text, exit_text); + + // Create task attempt activity with Paused status + let activity_id = Uuid::new_v4(); + let create_activity = CreateTaskAttemptActivity { + task_attempt_id, + status: Some(TaskAttemptStatus::Paused), + note: Some(format!("Execution completed{}", exit_text)), + }; + + if let Err(e) = TaskAttemptActivity::create( + &app_state.db_pool, + &create_activity, + activity_id, + TaskAttemptStatus::Paused, + ).await { + tracing::error!("Failed to create paused activity: {}", e); + } else { + tracing::info!("Task attempt {} set to paused after execution completion", task_attempt_id); + } + } + } +} diff --git a/backend/src/main.rs b/backend/src/main.rs index 797493c9..a9370e26 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -9,29 +9,17 @@ use sqlx::postgres::PgPoolOptions; use std::{collections::HashMap, env, sync::Arc}; use tokio::sync::Mutex; use tower_http::cors::CorsLayer; -use uuid::Uuid; mod auth; +mod execution_monitor; mod models; mod routes; use auth::{auth_middleware, hash_password}; +use execution_monitor::{execution_monitor, AppState}; use models::{ApiResponse, user::User}; use routes::{health, projects, tasks, users, filesystem}; -#[derive(Debug)] -pub struct RunningExecution { - pub task_attempt_id: Uuid, - pub child: tokio::process::Child, - pub started_at: chrono::DateTime, -} - -#[derive(Debug, Clone)] -pub struct AppState { - pub running_executions: Arc>>, - pub db_pool: sqlx::PgPool, -} - async fn echo_handler( Json(payload): Json, ) -> ResponseJson> { @@ -124,172 +112,3 @@ async fn create_admin_account(pool: &sqlx::PgPool) -> anyhow::Result<()> { Ok(()) } - -async fn execution_monitor(app_state: AppState) { - use models::{task_attempt_activity::{CreateTaskAttemptActivity, TaskAttemptActivity}, task_attempt::TaskAttemptStatus}; - use chrono::Utc; - use tokio::process::Command; - - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); - - loop { - interval.tick().await; - - // Check for orphaned task attempts with latest activity status = InProgress but no running execution - let inprogress_attempt_ids = match TaskAttemptActivity::find_attempts_with_latest_inprogress_status(&app_state.db_pool).await { - Ok(attempts) => attempts, - Err(e) => { - tracing::error!("Failed to query inprogress attempts: {}", e); - continue; - } - }; - - for attempt_id in inprogress_attempt_ids { - // Check if this attempt has a running execution - let has_running_execution = { - let executions = app_state.running_executions.lock().await; - executions.values().any(|exec| exec.task_attempt_id == attempt_id) - }; - - if !has_running_execution { - // This is an orphaned task attempt - mark it as paused - let activity_id = Uuid::new_v4(); - let create_activity = CreateTaskAttemptActivity { - task_attempt_id: attempt_id, - status: Some(TaskAttemptStatus::Paused), - note: Some("Execution lost (server restart or crash)".to_string()), - }; - - if let Err(e) = TaskAttemptActivity::create( - &app_state.db_pool, - &create_activity, - activity_id, - TaskAttemptStatus::Paused, - ).await { - tracing::error!("Failed to create paused activity for orphaned attempt: {}", e); - } else { - tracing::info!("Marked orphaned task attempt {} as paused", attempt_id); - } - } - } - - // Check for task attempts with latest activity status = Init - let init_attempt_ids = match TaskAttemptActivity::find_attempts_with_latest_init_status(&app_state.db_pool).await { - Ok(attempts) => attempts, - Err(e) => { - tracing::error!("Failed to query init attempts: {}", e); - continue; - } - }; - - for attempt_id in init_attempt_ids { - - // Check if we already have a running execution for this attempt - { - let executions = app_state.running_executions.lock().await; - if executions.values().any(|exec| exec.task_attempt_id == attempt_id) { - continue; - } - } - - // Spawn the process - let child = match Command::new("echo") - .arg("hello world") - .spawn() { - Ok(child) => child, - Err(e) => { - tracing::error!("Failed to spawn echo command: {}", e); - continue; - } - }; - - // Add to running executions - let execution_id = Uuid::new_v4(); - { - let mut executions = app_state.running_executions.lock().await; - executions.insert(execution_id, RunningExecution { - task_attempt_id: attempt_id, - child, - started_at: Utc::now(), - }); - } - - // Update task attempt activity to InProgress - let activity_id = Uuid::new_v4(); - let create_activity = CreateTaskAttemptActivity { - task_attempt_id: attempt_id, - status: Some(TaskAttemptStatus::InProgress), - note: Some("Started execution".to_string()), - }; - - if let Err(e) = TaskAttemptActivity::create( - &app_state.db_pool, - &create_activity, - activity_id, - TaskAttemptStatus::InProgress, - ).await { - tracing::error!("Failed to create in-progress activity: {}", e); - } - - tracing::info!("Started execution {} for task attempt {}", execution_id, attempt_id); - } - - // Check for completed processes - let mut completed_executions = Vec::new(); - { - let mut executions = app_state.running_executions.lock().await; - for (execution_id, running_exec) in executions.iter_mut() { - match running_exec.child.try_wait() { - Ok(Some(status)) => { - let success = status.success(); - let exit_code = status.code(); - completed_executions.push((*execution_id, running_exec.task_attempt_id, success, exit_code)); - } - Ok(None) => { - // Still running - } - Err(e) => { - tracing::error!("Error checking process status: {}", e); - completed_executions.push((*execution_id, running_exec.task_attempt_id, false, None)); - } - } - } - - // Remove completed executions from the map - for (execution_id, _, _, _) in &completed_executions { - executions.remove(execution_id); - } - } - - // Handle completed executions - for (execution_id, task_attempt_id, success, exit_code) in completed_executions { - let status_text = if success { "completed successfully" } else { "failed" }; - let exit_text = if let Some(code) = exit_code { - format!(" with exit code {}", code) - } else { - String::new() - }; - - tracing::info!("Execution {} {}{}", execution_id, status_text, exit_text); - - // Create task attempt activity with Paused status - let activity_id = Uuid::new_v4(); - let create_activity = CreateTaskAttemptActivity { - task_attempt_id, - status: Some(TaskAttemptStatus::Paused), - note: Some(format!("Execution completed{}", exit_text)), - }; - - if let Err(e) = TaskAttemptActivity::create( - &app_state.db_pool, - &create_activity, - activity_id, - TaskAttemptStatus::Paused, - ).await { - tracing::error!("Failed to create paused activity: {}", e); - } else { - tracing::info!("Task attempt {} set to paused after execution completion", task_attempt_id); - } - } - } -}