From db86ee59056b39919612072ddb1121fcd57a0bcb Mon Sep 17 00:00:00 2001 From: Louis Knight-Webb Date: Mon, 16 Jun 2025 18:09:50 -0400 Subject: [PATCH] Command execution --- backend/src/main.rs | 188 +++++++++++++++----- backend/src/models/task_attempt_activity.rs | 20 +++ backend/src/models/user.rs | 44 +++++ 3 files changed, 207 insertions(+), 45 deletions(-) diff --git a/backend/src/main.rs b/backend/src/main.rs index 133b528e..fdb2f787 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -6,17 +6,32 @@ use axum::{ Json, Router, }; use sqlx::postgres::PgPoolOptions; -use std::env; +use std::{collections::HashMap, env, sync::Arc}; +use tokio::sync::Mutex; use tower_http::cors::CorsLayer; +use uuid::Uuid; mod auth; mod models; mod routes; use auth::{auth_middleware, hash_password}; -use models::ApiResponse; +use models::{ApiResponse, user::User}; use routes::{health, projects, tasks, users, filesystem}; +#[derive(Debug)] +pub struct RunningExecution { + pub task_attempt_id: Uuid, + pub child: tokio::process::Child, + pub started_at: chrono::DateTime, +} + +#[derive(Debug, Clone)] +pub struct AppState { + pub running_executions: Arc>>, + pub db_pool: sqlx::PgPool, +} + async fn echo_handler( Json(payload): Json, ) -> ResponseJson> { @@ -55,6 +70,18 @@ async fn main() -> anyhow::Result<()> { tracing::warn!("Failed to create admin account: {}", e); } + // Create app state + let app_state = AppState { + running_executions: Arc::new(Mutex::new(HashMap::new())), + db_pool: pool.clone(), + }; + + // Start background task to check for init status and spawn processes + let state_clone = app_state.clone(); + tokio::spawn(async move { + execution_monitor(state_clone).await; + }); + // Public routes (no auth required) let public_routes = Router::new() .route("/", get(|| async { "Bloop API" })) @@ -75,6 +102,7 @@ async fn main() -> anyhow::Result<()> { .merge(public_routes) .merge(protected_routes) .layer(Extension(pool)) + .layer(Extension(app_state)) .layer(CorsLayer::permissive()); let listener = tokio::net::TcpListener::bind("0.0.0.0:3001").await?; @@ -87,54 +115,124 @@ async fn main() -> anyhow::Result<()> { } async fn create_admin_account(pool: &sqlx::PgPool) -> anyhow::Result<()> { - use chrono::Utc; - use uuid::Uuid; - let admin_email = "admin@example.com"; let admin_password = env::var("ADMIN_PASSWORD").unwrap_or_else(|_| "admin123".to_string()); - // Check if admin already exists - let existing_admin = sqlx::query!( - "SELECT id, password_hash FROM users WHERE email = $1", - admin_email - ) - .fetch_optional(pool) - .await?; - let password_hash = hash_password(&admin_password)?; - if let Some(admin) = existing_admin { - // Update existing admin password - let now = Utc::now(); - sqlx::query!( - "UPDATE users SET password_hash = $2, is_admin = $3, updated_at = $4 WHERE id = $1", - admin.id, - password_hash, - true, - now - ) - .execute(pool) - .await?; - - tracing::info!("Updated admin account"); - } else { - // Create new admin account - let id = Uuid::new_v4(); - let now = Utc::now(); - sqlx::query!( - "INSERT INTO users (id, email, password_hash, is_admin, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6)", - id, - admin_email, - password_hash, - true, - now, - now - ) - .execute(pool) - .await?; - - tracing::info!("Created admin account: {}", admin_email); - } + User::create_or_update_admin(pool, admin_email, &password_hash).await?; Ok(()) } + +async fn execution_monitor(app_state: AppState) { + use models::{task_attempt_activity::{CreateTaskAttemptActivity, TaskAttemptActivity}, task_attempt::TaskAttemptStatus}; + use chrono::Utc; + use tokio::process::Command; + + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5)); + + loop { + interval.tick().await; + + // Check for task attempts with latest activity status = Init + let init_attempt_ids = match TaskAttemptActivity::find_attempts_with_latest_init_status(&app_state.db_pool).await { + Ok(attempts) => attempts, + Err(e) => { + tracing::error!("Failed to query init attempts: {}", e); + continue; + } + }; + + for attempt_id in init_attempt_ids { + + // Check if we already have a running execution for this attempt + { + let executions = app_state.running_executions.lock().await; + if executions.values().any(|exec| exec.task_attempt_id == attempt_id) { + continue; + } + } + + // Spawn the process + let child = match Command::new("echo") + .arg("hello world") + .spawn() { + Ok(child) => child, + Err(e) => { + tracing::error!("Failed to spawn echo command: {}", e); + continue; + } + }; + + // Add to running executions + let execution_id = Uuid::new_v4(); + { + let mut executions = app_state.running_executions.lock().await; + executions.insert(execution_id, RunningExecution { + task_attempt_id: attempt_id, + child, + started_at: Utc::now(), + }); + } + + // Update task attempt activity to InProgress + let activity_id = Uuid::new_v4(); + let create_activity = CreateTaskAttemptActivity { + task_attempt_id: attempt_id, + status: Some(TaskAttemptStatus::InProgress), + note: Some("Started execution".to_string()), + }; + + if let Err(e) = TaskAttemptActivity::create( + &app_state.db_pool, + &create_activity, + activity_id, + TaskAttemptStatus::InProgress, + ).await { + tracing::error!("Failed to create in-progress activity: {}", e); + } + + tracing::info!("Started execution {} for task attempt {}", execution_id, attempt_id); + } + + // Check for completed processes + let mut completed_executions = Vec::new(); + { + let mut executions = app_state.running_executions.lock().await; + for (execution_id, running_exec) in executions.iter_mut() { + match running_exec.child.try_wait() { + Ok(Some(status)) => { + let success = status.success(); + let exit_code = status.code(); + completed_executions.push((*execution_id, success, exit_code)); + } + Ok(None) => { + // Still running + } + Err(e) => { + tracing::error!("Error checking process status: {}", e); + completed_executions.push((*execution_id, false, None)); + } + } + } + + // Remove completed executions from the map + for (execution_id, _, _) in &completed_executions { + executions.remove(execution_id); + } + } + + // Log completed executions + for (execution_id, success, exit_code) in completed_executions { + let status_text = if success { "completed successfully" } else { "failed" }; + let exit_text = if let Some(code) = exit_code { + format!(" with exit code {}", code) + } else { + String::new() + }; + + tracing::info!("Execution {} {}{}", execution_id, status_text, exit_text); + } + } +} diff --git a/backend/src/models/task_attempt_activity.rs b/backend/src/models/task_attempt_activity.rs index 9b953ede..e02b7f38 100644 --- a/backend/src/models/task_attempt_activity.rs +++ b/backend/src/models/task_attempt_activity.rs @@ -66,4 +66,24 @@ impl TaskAttemptActivity { .await?; Ok(()) } + + pub async fn find_attempts_with_latest_init_status(pool: &PgPool) -> Result, sqlx::Error> { + let records = sqlx::query!( + r#"SELECT DISTINCT ta.id + FROM task_attempts ta + INNER JOIN ( + SELECT task_attempt_id, MAX(created_at) as latest_created_at + FROM task_attempt_activities + GROUP BY task_attempt_id + ) latest_activity ON ta.id = latest_activity.task_attempt_id + INNER JOIN task_attempt_activities taa ON ta.id = taa.task_attempt_id + AND taa.created_at = latest_activity.latest_created_at + WHERE taa.status = $1"#, + TaskAttemptStatus::Init as TaskAttemptStatus + ) + .fetch_all(pool) + .await?; + + Ok(records.into_iter().map(|r| r.id).collect()) + } } diff --git a/backend/src/models/user.rs b/backend/src/models/user.rs index d6c58a32..1b26c803 100644 --- a/backend/src/models/user.rs +++ b/backend/src/models/user.rs @@ -134,4 +134,48 @@ impl User { .await?; Ok(result.rows_affected()) } + + pub async fn create_or_update_admin(pool: &PgPool, email: &str, password_hash: &str) -> Result<(), sqlx::Error> { + use chrono::Utc; + + // Check if admin already exists + let existing_admin = sqlx::query!( + "SELECT id, password_hash FROM users WHERE email = $1", + email + ) + .fetch_optional(pool) + .await?; + + if let Some(admin) = existing_admin { + // Update existing admin password + let now = Utc::now(); + sqlx::query!( + "UPDATE users SET password_hash = $2, is_admin = $3, updated_at = $4 WHERE id = $1", + admin.id, + password_hash, + true, + now + ) + .execute(pool) + .await?; + + tracing::info!("Updated admin account"); + } else { + // Create new admin account + let id = Uuid::new_v4(); + sqlx::query!( + "INSERT INTO users (id, email, password_hash, is_admin) VALUES ($1, $2, $3, $4)", + id, + email, + password_hash, + true + ) + .execute(pool) + .await?; + + tracing::info!("Created admin account: {}", email); + } + + Ok(()) + } }