From 977e5032264bd3f7cd79e39c49dd91b8cf4d241c Mon Sep 17 00:00:00 2001 From: Louis Knight-Webb Date: Mon, 16 Jun 2025 18:13:13 -0400 Subject: [PATCH] Activity logs --- backend/src/main.rs | 67 +++++++++++++++++++-- backend/src/models/task_attempt_activity.rs | 20 ++++++ 2 files changed, 82 insertions(+), 5 deletions(-) diff --git a/backend/src/main.rs b/backend/src/main.rs index fdb2f787..797493c9 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -135,6 +135,44 @@ async fn execution_monitor(app_state: AppState) { loop { interval.tick().await; + // Check for orphaned task attempts with latest activity status = InProgress but no running execution + let inprogress_attempt_ids = match TaskAttemptActivity::find_attempts_with_latest_inprogress_status(&app_state.db_pool).await { + Ok(attempts) => attempts, + Err(e) => { + tracing::error!("Failed to query inprogress attempts: {}", e); + continue; + } + }; + + for attempt_id in inprogress_attempt_ids { + // Check if this attempt has a running execution + let has_running_execution = { + let executions = app_state.running_executions.lock().await; + executions.values().any(|exec| exec.task_attempt_id == attempt_id) + }; + + if !has_running_execution { + // This is an orphaned task attempt - mark it as paused + let activity_id = Uuid::new_v4(); + let create_activity = CreateTaskAttemptActivity { + task_attempt_id: attempt_id, + status: Some(TaskAttemptStatus::Paused), + note: Some("Execution lost (server restart or crash)".to_string()), + }; + + if let Err(e) = TaskAttemptActivity::create( + &app_state.db_pool, + &create_activity, + activity_id, + TaskAttemptStatus::Paused, + ).await { + tracing::error!("Failed to create paused activity for orphaned attempt: {}", e); + } else { + tracing::info!("Marked orphaned task attempt {} as paused", attempt_id); + } + } + } + // Check for task attempts with latest activity status = Init let init_attempt_ids = match TaskAttemptActivity::find_attempts_with_latest_init_status(&app_state.db_pool).await { Ok(attempts) => attempts, @@ -205,26 +243,26 @@ async fn execution_monitor(app_state: AppState) { Ok(Some(status)) => { let success = status.success(); let exit_code = status.code(); - completed_executions.push((*execution_id, success, exit_code)); + completed_executions.push((*execution_id, running_exec.task_attempt_id, success, exit_code)); } Ok(None) => { // Still running } Err(e) => { tracing::error!("Error checking process status: {}", e); - completed_executions.push((*execution_id, false, None)); + completed_executions.push((*execution_id, running_exec.task_attempt_id, false, None)); } } } // Remove completed executions from the map - for (execution_id, _, _) in &completed_executions { + for (execution_id, _, _, _) in &completed_executions { executions.remove(execution_id); } } - // Log completed executions - for (execution_id, success, exit_code) in completed_executions { + // Handle completed executions + for (execution_id, task_attempt_id, success, exit_code) in completed_executions { let status_text = if success { "completed successfully" } else { "failed" }; let exit_text = if let Some(code) = exit_code { format!(" with exit code {}", code) @@ -233,6 +271,25 @@ async fn execution_monitor(app_state: AppState) { }; tracing::info!("Execution {} {}{}", execution_id, status_text, exit_text); + + // Create task attempt activity with Paused status + let activity_id = Uuid::new_v4(); + let create_activity = CreateTaskAttemptActivity { + task_attempt_id, + status: Some(TaskAttemptStatus::Paused), + note: Some(format!("Execution completed{}", exit_text)), + }; + + if let Err(e) = TaskAttemptActivity::create( + &app_state.db_pool, + &create_activity, + activity_id, + TaskAttemptStatus::Paused, + ).await { + tracing::error!("Failed to create paused activity: {}", e); + } else { + tracing::info!("Task attempt {} set to paused after execution completion", task_attempt_id); + } } } } diff --git a/backend/src/models/task_attempt_activity.rs b/backend/src/models/task_attempt_activity.rs index e02b7f38..fb963687 100644 --- a/backend/src/models/task_attempt_activity.rs +++ b/backend/src/models/task_attempt_activity.rs @@ -86,4 +86,24 @@ impl TaskAttemptActivity { Ok(records.into_iter().map(|r| r.id).collect()) } + + pub async fn find_attempts_with_latest_inprogress_status(pool: &PgPool) -> Result, sqlx::Error> { + let records = sqlx::query!( + r#"SELECT DISTINCT ta.id + FROM task_attempts ta + INNER JOIN ( + SELECT task_attempt_id, MAX(created_at) as latest_created_at + FROM task_attempt_activities + GROUP BY task_attempt_id + ) latest_activity ON ta.id = latest_activity.task_attempt_id + INNER JOIN task_attempt_activities taa ON ta.id = taa.task_attempt_id + AND taa.created_at = latest_activity.latest_created_at + WHERE taa.status = $1"#, + TaskAttemptStatus::InProgress as TaskAttemptStatus + ) + .fetch_all(pool) + .await?; + + Ok(records.into_iter().map(|r| r.id).collect()) + } }