Activity logs

This commit is contained in:
Louis Knight-Webb
2025-06-16 18:13:13 -04:00
parent db86ee5905
commit 977e503226
2 changed files with 82 additions and 5 deletions

View File

@@ -135,6 +135,44 @@ async fn execution_monitor(app_state: AppState) {
loop {
interval.tick().await;
// Check for orphaned task attempts with latest activity status = InProgress but no running execution
let inprogress_attempt_ids = match TaskAttemptActivity::find_attempts_with_latest_inprogress_status(&app_state.db_pool).await {
Ok(attempts) => attempts,
Err(e) => {
tracing::error!("Failed to query inprogress attempts: {}", e);
continue;
}
};
for attempt_id in inprogress_attempt_ids {
// Check if this attempt has a running execution
let has_running_execution = {
let executions = app_state.running_executions.lock().await;
executions.values().any(|exec| exec.task_attempt_id == attempt_id)
};
if !has_running_execution {
// This is an orphaned task attempt - mark it as paused
let activity_id = Uuid::new_v4();
let create_activity = CreateTaskAttemptActivity {
task_attempt_id: attempt_id,
status: Some(TaskAttemptStatus::Paused),
note: Some("Execution lost (server restart or crash)".to_string()),
};
if let Err(e) = TaskAttemptActivity::create(
&app_state.db_pool,
&create_activity,
activity_id,
TaskAttemptStatus::Paused,
).await {
tracing::error!("Failed to create paused activity for orphaned attempt: {}", e);
} else {
tracing::info!("Marked orphaned task attempt {} as paused", attempt_id);
}
}
}
// Check for task attempts with latest activity status = Init
let init_attempt_ids = match TaskAttemptActivity::find_attempts_with_latest_init_status(&app_state.db_pool).await {
Ok(attempts) => attempts,
@@ -205,26 +243,26 @@ async fn execution_monitor(app_state: AppState) {
Ok(Some(status)) => {
let success = status.success();
let exit_code = status.code();
completed_executions.push((*execution_id, success, exit_code));
completed_executions.push((*execution_id, running_exec.task_attempt_id, success, exit_code));
}
Ok(None) => {
// Still running
}
Err(e) => {
tracing::error!("Error checking process status: {}", e);
completed_executions.push((*execution_id, false, None));
completed_executions.push((*execution_id, running_exec.task_attempt_id, false, None));
}
}
}
// Remove completed executions from the map
for (execution_id, _, _) in &completed_executions {
for (execution_id, _, _, _) in &completed_executions {
executions.remove(execution_id);
}
}
// Log completed executions
for (execution_id, success, exit_code) in completed_executions {
// Handle completed executions
for (execution_id, task_attempt_id, success, exit_code) in completed_executions {
let status_text = if success { "completed successfully" } else { "failed" };
let exit_text = if let Some(code) = exit_code {
format!(" with exit code {}", code)
@@ -233,6 +271,25 @@ async fn execution_monitor(app_state: AppState) {
};
tracing::info!("Execution {} {}{}", execution_id, status_text, exit_text);
// Create task attempt activity with Paused status
let activity_id = Uuid::new_v4();
let create_activity = CreateTaskAttemptActivity {
task_attempt_id,
status: Some(TaskAttemptStatus::Paused),
note: Some(format!("Execution completed{}", exit_text)),
};
if let Err(e) = TaskAttemptActivity::create(
&app_state.db_pool,
&create_activity,
activity_id,
TaskAttemptStatus::Paused,
).await {
tracing::error!("Failed to create paused activity: {}", e);
} else {
tracing::info!("Task attempt {} set to paused after execution completion", task_attempt_id);
}
}
}
}

View File

@@ -86,4 +86,24 @@ impl TaskAttemptActivity {
Ok(records.into_iter().map(|r| r.id).collect())
}
pub async fn find_attempts_with_latest_inprogress_status(pool: &PgPool) -> Result<Vec<uuid::Uuid>, sqlx::Error> {
let records = sqlx::query!(
r#"SELECT DISTINCT ta.id
FROM task_attempts ta
INNER JOIN (
SELECT task_attempt_id, MAX(created_at) as latest_created_at
FROM task_attempt_activities
GROUP BY task_attempt_id
) latest_activity ON ta.id = latest_activity.task_attempt_id
INNER JOIN task_attempt_activities taa ON ta.id = taa.task_attempt_id
AND taa.created_at = latest_activity.latest_created_at
WHERE taa.status = $1"#,
TaskAttemptStatus::InProgress as TaskAttemptStatus
)
.fetch_all(pool)
.await?;
Ok(records.into_iter().map(|r| r.id).collect())
}
}