This commit is contained in:
Louis Knight-Webb
2025-06-20 21:46:28 +01:00
parent 37099032ee
commit 03133c9787
4 changed files with 156 additions and 95 deletions

View File

@@ -12,26 +12,157 @@ use crate::models::{
task_attempt_activity::{CreateTaskAttemptActivity, TaskAttemptActivity},
};
// #[derive(Debug)]
// pub enum ExecutionType {
// SetupScript,
// CodingAgent,
// DevServer,
// }
#[derive(Debug)]
pub enum ExecutionType {
SetupScript,
CodingAgent,
DevServer,
}
#[derive(Debug)]
pub struct RunningExecution {
pub task_attempt_id: Uuid,
// pub execution_type: ExecutionType,
pub execution_type: ExecutionType,
pub child: tokio::process::Child,
pub started_at: DateTime<Utc>,
}
#[derive(Debug, Clone)]
pub struct AppState {
pub running_executions: Arc<Mutex<HashMap<Uuid, RunningExecution>>>,
running_executions: Arc<Mutex<HashMap<Uuid, RunningExecution>>>,
pub db_pool: sqlx::SqlitePool,
pub config: Arc<tokio::sync::RwLock<crate::models::config::Config>>,
config: Arc<tokio::sync::RwLock<crate::models::config::Config>>,
}
impl AppState {
pub fn new(
db_pool: sqlx::SqlitePool,
config: Arc<tokio::sync::RwLock<crate::models::config::Config>>,
) -> Self {
Self {
running_executions: Arc::new(Mutex::new(HashMap::new())),
db_pool,
config,
}
}
// Running executions getters
pub async fn has_running_execution(&self, attempt_id: Uuid) -> bool {
let executions = self.running_executions.lock().await;
executions
.values()
.any(|exec| exec.task_attempt_id == attempt_id)
}
pub async fn get_running_execution_ids(&self) -> Vec<Uuid> {
let executions = self.running_executions.lock().await;
executions.keys().copied().collect()
}
pub async fn get_running_executions_for_monitor(&self) -> Vec<(Uuid, Uuid, bool, Option<i32>)> {
let mut executions = self.running_executions.lock().await;
let mut completed_executions = Vec::new();
for (execution_id, running_exec) in executions.iter_mut() {
match running_exec.child.try_wait() {
Ok(Some(status)) => {
let success = status.success();
let exit_code = status.code();
completed_executions.push((
*execution_id,
running_exec.task_attempt_id,
success,
exit_code,
));
}
Ok(None) => {
// Still running
}
Err(e) => {
tracing::error!("Error checking process status: {}", e);
completed_executions.push((
*execution_id,
running_exec.task_attempt_id,
false,
None,
));
}
}
}
// Remove completed executions from the map
for (execution_id, _, _, _) in &completed_executions {
executions.remove(execution_id);
}
completed_executions
}
// Running executions setters
pub async fn add_running_execution(&self, execution_id: Uuid, execution: RunningExecution) {
let mut executions = self.running_executions.lock().await;
executions.insert(execution_id, execution);
}
pub async fn remove_running_execution(&self, execution_id: Uuid) -> Option<RunningExecution> {
let mut executions = self.running_executions.lock().await;
executions.remove(&execution_id)
}
pub async fn stop_running_execution(
&self,
attempt_id: Uuid,
) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
let mut executions = self.running_executions.lock().await;
let mut execution_id_to_remove = None;
let mut stopped = false;
// Find the execution for this attempt
for (exec_id, execution) in executions.iter_mut() {
if execution.task_attempt_id == attempt_id {
// Kill the process
match execution.child.kill().await {
Ok(_) => {
stopped = true;
execution_id_to_remove = Some(*exec_id);
tracing::info!("Stopped execution for task attempt {}", attempt_id);
break;
}
Err(e) => {
tracing::error!("Failed to kill process for attempt {}: {}", attempt_id, e);
return Err(Box::new(e));
}
}
}
}
// Remove the stopped execution from the map
if let Some(exec_id) = execution_id_to_remove {
executions.remove(&exec_id);
}
Ok(stopped)
}
// Config getters
pub async fn get_sound_alerts_enabled(&self) -> bool {
let config = self.config.read().await;
config.sound_alerts
}
pub async fn get_config(&self) -> crate::models::config::Config {
let config = self.config.read().await;
config.clone()
}
// Config setters
pub async fn update_config<F>(&self, update_fn: F)
where
F: FnOnce(&mut crate::models::config::Config),
{
let mut config = self.config.write().await;
update_fn(&mut *config);
}
}
/// Commit any unstaged changes in the worktree after execution completion
@@ -149,14 +280,7 @@ pub async fn execution_monitor(app_state: AppState) {
for attempt_id in executor_running_attempt_ids {
// Check if this attempt has a running execution
let has_running_execution = {
let executions = app_state.running_executions.lock().await;
executions
.values()
.any(|exec| exec.task_attempt_id == attempt_id)
};
if !has_running_execution {
if !app_state.has_running_execution(attempt_id).await {
// This is an orphaned task attempt - mark it as paused
let activity_id = Uuid::new_v4();
let create_activity = CreateTaskAttemptActivity {
@@ -207,41 +331,7 @@ pub async fn execution_monitor(app_state: AppState) {
// Note: Execution starting logic moved to create_task_attempt endpoint
// Check for completed processes
let mut completed_executions = Vec::new();
{
let mut executions = app_state.running_executions.lock().await;
for (execution_id, running_exec) in executions.iter_mut() {
match running_exec.child.try_wait() {
Ok(Some(status)) => {
let success = status.success();
let exit_code = status.code();
completed_executions.push((
*execution_id,
running_exec.task_attempt_id,
success,
exit_code,
));
}
Ok(None) => {
// Still running
}
Err(e) => {
tracing::error!("Error checking process status: {}", e);
completed_executions.push((
*execution_id,
running_exec.task_attempt_id,
false,
None,
));
}
}
}
// Remove completed executions from the map
for (execution_id, _, _, _) in &completed_executions {
executions.remove(execution_id);
}
}
let completed_executions = app_state.get_running_executions_for_monitor().await;
// Handle completed executions
for (execution_id, task_attempt_id, success, exit_code) in completed_executions {
@@ -259,11 +349,7 @@ pub async fn execution_monitor(app_state: AppState) {
tracing::info!("Execution {} {}{}", execution_id, status_text, exit_text);
// Play sound notification if enabled
let sound_enabled = {
let config = app_state.config.read().await;
config.sound_alerts
};
if sound_enabled {
if app_state.get_sound_alerts_enabled().await {
play_sound_notification().await;
}

View File

@@ -107,11 +107,7 @@ async fn main() -> anyhow::Result<()> {
let config_arc = Arc::new(RwLock::new(config));
// Create app state
let app_state = AppState {
running_executions: Arc::new(Mutex::new(HashMap::new())),
db_pool: pool.clone(),
config: config_arc.clone(),
};
let app_state = AppState::new(pool.clone(), config_arc.clone());
// Start background task to check for init status and spawn processes
let state_clone = app_state.clone();

View File

@@ -10,6 +10,7 @@ use uuid::Uuid;
use super::project::Project;
use super::task::Task;
use crate::execution_monitor::ExecutionType;
use crate::executor::ExecutorConfig;
#[derive(Debug)]
@@ -587,17 +588,17 @@ impl TaskAttempt {
// Add to running executions
let execution_id = Uuid::new_v4();
{
let mut executions = app_state.running_executions.lock().await;
executions.insert(
app_state
.add_running_execution(
execution_id,
crate::execution_monitor::RunningExecution {
task_attempt_id: attempt_id,
execution_type: ExecutionType::CodingAgent,
child,
started_at: chrono::Utc::now(),
},
);
}
)
.await;
tracing::info!(
"Started execution {} for task attempt {}",

View File

@@ -421,35 +421,13 @@ pub async fn stop_task_attempt(
}
// Find and stop the running execution
let mut stopped = false;
{
let mut executions = app_state.running_executions.lock().await;
let mut execution_id_to_remove = None;
// Find the execution for this attempt
for (exec_id, execution) in executions.iter_mut() {
if execution.task_attempt_id == attempt_id {
// Kill the process
match execution.child.kill().await {
Ok(_) => {
stopped = true;
execution_id_to_remove = Some(*exec_id);
tracing::info!("Stopped execution for task attempt {}", attempt_id);
break;
}
Err(e) => {
tracing::error!("Failed to kill process for attempt {}: {}", attempt_id, e);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
}
}
let stopped = match app_state.stop_running_execution(attempt_id).await {
Ok(stopped) => stopped,
Err(e) => {
tracing::error!("Failed to stop execution for attempt {}: {}", attempt_id, e);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
// Remove the stopped execution from the map
if let Some(exec_id) = execution_id_to_remove {
executions.remove(&exec_id);
}
}
};
if !stopped {
return Ok(ResponseJson(ApiResponse {