use std::{ collections::HashMap, io, path::{Path, PathBuf}, sync::{Arc, atomic::AtomicUsize}, time::Duration, }; use anyhow::anyhow; use async_trait::async_trait; use command_group::AsyncGroupChild; use db::{ DBService, models::{ execution_process::{ ExecutionContext, ExecutionProcess, ExecutionProcessRunReason, ExecutionProcessStatus, }, executor_session::ExecutorSession, merge::Merge, project::Project, scratch::{DraftFollowUpData, Scratch, ScratchType}, task::{Task, TaskStatus}, task_attempt::TaskAttempt, }, }; use deployment::{DeploymentError, RemoteClientNotConfigured}; use executors::{ actions::{ Executable, ExecutorAction, ExecutorActionType, coding_agent_follow_up::CodingAgentFollowUpRequest, coding_agent_initial::CodingAgentInitialRequest, }, approvals::{ExecutorApprovalService, NoopExecutorApprovalService}, env::ExecutionEnv, executors::{BaseCodingAgent, ExecutorExitResult, ExecutorExitSignal, InterruptSender}, logs::{ NormalizedEntryType, utils::{ ConversationPatch, patch::{escape_json_pointer_segment, extract_normalized_entry_from_patch}, }, }, profile::ExecutorProfileId, }; use futures::{FutureExt, StreamExt, TryStreamExt, stream::select}; use serde_json::json; use services::services::{ analytics::AnalyticsContext, approvals::{Approvals, executor_approvals::ExecutorApprovalBridge}, config::Config, container::{ContainerError, ContainerRef, ContainerService}, diff_stream::{self, DiffStreamHandle}, git::{Commit, DiffTarget, GitService}, image::ImageService, notification::NotificationService, queued_message::QueuedMessageService, share::SharePublisher, worktree_manager::{WorktreeCleanup, WorktreeManager}, }; use tokio::{sync::RwLock, task::JoinHandle}; use tokio_util::io::ReaderStream; use utils::{ log_msg::LogMsg, msg_store::MsgStore, text::{git_branch_id, short_uuid, truncate_to_char_boundary}, }; use uuid::Uuid; use crate::{command, copy}; #[derive(Clone)] pub struct LocalContainerService { db: DBService, child_store: Arc>>>>, interrupt_senders: Arc>>, msg_stores: Arc>>>, config: Arc>, git: GitService, image_service: ImageService, analytics: Option, approvals: Approvals, queued_message_service: QueuedMessageService, publisher: Result, notification_service: NotificationService, } impl LocalContainerService { #[allow(clippy::too_many_arguments)] pub async fn new( db: DBService, msg_stores: Arc>>>, config: Arc>, git: GitService, image_service: ImageService, analytics: Option, approvals: Approvals, queued_message_service: QueuedMessageService, publisher: Result, ) -> Self { let child_store = Arc::new(RwLock::new(HashMap::new())); let interrupt_senders = Arc::new(RwLock::new(HashMap::new())); let notification_service = NotificationService::new(config.clone()); let container = LocalContainerService { db, child_store, interrupt_senders, msg_stores, config, git, image_service, analytics, approvals, queued_message_service, publisher, notification_service, }; container.spawn_worktree_cleanup().await; container } pub async fn get_child_from_store(&self, id: &Uuid) -> Option>> { let map = self.child_store.read().await; map.get(id).cloned() } pub async fn add_child_to_store(&self, id: Uuid, exec: AsyncGroupChild) { let mut map = self.child_store.write().await; map.insert(id, Arc::new(RwLock::new(exec))); } pub async fn remove_child_from_store(&self, id: &Uuid) { let mut map = self.child_store.write().await; map.remove(id); } async fn add_interrupt_sender(&self, id: Uuid, sender: InterruptSender) { let mut map = self.interrupt_senders.write().await; map.insert(id, sender); } async fn take_interrupt_sender(&self, id: &Uuid) -> Option { let mut map = self.interrupt_senders.write().await; map.remove(id) } /// Defensively check for externally deleted worktrees and mark them as deleted in the database async fn check_externally_deleted_worktrees(db: &DBService) -> Result<(), DeploymentError> { let active_attempts = TaskAttempt::find_by_worktree_deleted(&db.pool).await?; tracing::debug!( "Checking {} active worktrees for external deletion...", active_attempts.len() ); for (attempt_id, worktree_path) in active_attempts { // Check if worktree directory exists if !std::path::Path::new(&worktree_path).exists() { // Worktree was deleted externally, mark as deleted in database if let Err(e) = TaskAttempt::mark_worktree_deleted(&db.pool, attempt_id).await { tracing::error!( "Failed to mark externally deleted worktree as deleted for attempt {}: {}", attempt_id, e ); } else { tracing::info!( "Marked externally deleted worktree as deleted for attempt {} (path: {})", attempt_id, worktree_path ); } } } Ok(()) } /// Find and delete orphaned worktrees that don't correspond to any task attempts async fn cleanup_orphaned_worktrees(&self) { // Check if orphan cleanup is disabled via environment variable if std::env::var("DISABLE_WORKTREE_ORPHAN_CLEANUP").is_ok() { tracing::debug!( "Orphan worktree cleanup is disabled via DISABLE_WORKTREE_ORPHAN_CLEANUP environment variable" ); return; } let worktree_base_dir = WorktreeManager::get_worktree_base_dir(); if !worktree_base_dir.exists() { tracing::debug!( "Worktree base directory {} does not exist, skipping orphan cleanup", worktree_base_dir.display() ); return; } let entries = match std::fs::read_dir(&worktree_base_dir) { Ok(entries) => entries, Err(e) => { tracing::error!( "Failed to read worktree base directory {}: {}", worktree_base_dir.display(), e ); return; } }; for entry in entries { let entry = match entry { Ok(entry) => entry, Err(e) => { tracing::warn!("Failed to read directory entry: {}", e); continue; } }; let path = entry.path(); // Only process directories if !path.is_dir() { continue; } let worktree_path_str = path.to_string_lossy().to_string(); if let Ok(false) = TaskAttempt::container_ref_exists(&self.db().pool, &worktree_path_str).await { // This is an orphaned worktree - delete it tracing::info!("Found orphaned worktree: {}", worktree_path_str); if let Err(e) = WorktreeManager::cleanup_worktree(&WorktreeCleanup::new(path, None)).await { tracing::error!( "Failed to remove orphaned worktree {}: {}", worktree_path_str, e ); } else { tracing::info!( "Successfully removed orphaned worktree: {}", worktree_path_str ); } } } } pub async fn cleanup_expired_attempt( db: &DBService, attempt_id: Uuid, worktree_path: PathBuf, git_repo_path: PathBuf, ) -> Result<(), DeploymentError> { WorktreeManager::cleanup_worktree(&WorktreeCleanup::new( worktree_path, Some(git_repo_path), )) .await?; // Mark worktree as deleted in database after successful cleanup TaskAttempt::mark_worktree_deleted(&db.pool, attempt_id).await?; tracing::info!("Successfully marked worktree as deleted for attempt {attempt_id}",); Ok(()) } pub async fn cleanup_expired_attempts(db: &DBService) -> Result<(), DeploymentError> { let expired_attempts = TaskAttempt::find_expired_for_cleanup(&db.pool).await?; if expired_attempts.is_empty() { tracing::debug!("No expired worktrees found"); return Ok(()); } tracing::info!( "Found {} expired worktrees to clean up", expired_attempts.len() ); for (attempt_id, worktree_path, git_repo_path) in expired_attempts { Self::cleanup_expired_attempt( db, attempt_id, PathBuf::from(worktree_path), PathBuf::from(git_repo_path), ) .await .unwrap_or_else(|e| { tracing::error!("Failed to clean up expired attempt {attempt_id}: {e}",); }); } Ok(()) } pub async fn spawn_worktree_cleanup(&self) { let db = self.db.clone(); let mut cleanup_interval = tokio::time::interval(tokio::time::Duration::from_secs(1800)); // 30 minutes self.cleanup_orphaned_worktrees().await; tokio::spawn(async move { loop { cleanup_interval.tick().await; tracing::info!("Starting periodic worktree cleanup..."); Self::check_externally_deleted_worktrees(&db) .await .unwrap_or_else(|e| { tracing::error!("Failed to check externally deleted worktrees: {}", e); }); Self::cleanup_expired_attempts(&db) .await .unwrap_or_else(|e| { tracing::error!("Failed to clean up expired worktree attempts: {}", e) }); } }); } /// Spawn a background task that polls the child process for completion and /// cleans up the execution entry when it exits. pub fn spawn_exit_monitor( &self, exec_id: &Uuid, exit_signal: Option, ) -> JoinHandle<()> { let exec_id = *exec_id; let child_store = self.child_store.clone(); let msg_stores = self.msg_stores.clone(); let db = self.db.clone(); let config = self.config.clone(); let container = self.clone(); let analytics = self.analytics.clone(); let publisher = self.publisher.clone(); let mut process_exit_rx = self.spawn_os_exit_watcher(exec_id); tokio::spawn(async move { let mut exit_signal_future = exit_signal .map(|rx| rx.boxed()) // wait for result .unwrap_or_else(|| std::future::pending().boxed()); // no signal, stall forever let status_result: std::io::Result; // Wait for process to exit, or exit signal from executor tokio::select! { // Exit signal with result. // Some coding agent processes do not automatically exit after processing the user request; instead the executor // signals when processing has finished to gracefully kill the process. exit_result = &mut exit_signal_future => { // Executor signaled completion: kill group and use the provided result if let Some(child_lock) = child_store.read().await.get(&exec_id).cloned() { let mut child = child_lock.write().await ; if let Err(err) = command::kill_process_group(&mut child).await { tracing::error!("Failed to kill process group after exit signal: {} {}", exec_id, err); } } // Map the exit result to appropriate exit status status_result = match exit_result { Ok(ExecutorExitResult::Success) => Ok(success_exit_status()), Ok(ExecutorExitResult::Failure) => Ok(failure_exit_status()), Err(_) => Ok(success_exit_status()), // Channel closed, assume success }; } // Process exit exit_status_result = &mut process_exit_rx => { status_result = exit_status_result.unwrap_or_else(|e| Err(std::io::Error::other(e))); } } let (exit_code, status) = match status_result { Ok(exit_status) => { let code = exit_status.code().unwrap_or(-1) as i64; let status = if exit_status.success() { ExecutionProcessStatus::Completed } else { ExecutionProcessStatus::Failed }; (Some(code), status) } Err(_) => (None, ExecutionProcessStatus::Failed), }; if !ExecutionProcess::was_stopped(&db.pool, exec_id).await && let Err(e) = ExecutionProcess::update_completion(&db.pool, exec_id, status, exit_code).await { tracing::error!("Failed to update execution process completion: {}", e); } if let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await { // Update executor session summary if available if let Err(e) = container.update_executor_session_summary(&exec_id).await { tracing::warn!("Failed to update executor session summary: {}", e); } let success = matches!( ctx.execution_process.status, ExecutionProcessStatus::Completed ) && exit_code == Some(0); let cleanup_done = matches!( ctx.execution_process.run_reason, ExecutionProcessRunReason::CleanupScript ) && !matches!( ctx.execution_process.status, ExecutionProcessStatus::Running ); if success || cleanup_done { // Commit changes (if any) and get feedback about whether changes were made let changes_committed = match container.try_commit_changes(&ctx).await { Ok(committed) => committed, Err(e) => { tracing::error!("Failed to commit changes after execution: {}", e); // Treat commit failures as if changes were made to be safe true } }; let should_start_next = if matches!( ctx.execution_process.run_reason, ExecutionProcessRunReason::CodingAgent ) { changes_committed } else { true }; if should_start_next { // If the process exited successfully, start the next action if let Err(e) = container.try_start_next_action(&ctx).await { tracing::error!("Failed to start next action after completion: {}", e); } } else { tracing::info!( "Skipping cleanup script for task attempt {} - no changes made by coding agent", ctx.task_attempt.id ); // Manually finalize task since we're bypassing normal execution flow container.finalize_task(publisher.as_ref().ok(), &ctx).await; } } if container.should_finalize(&ctx) { // Only execute queued messages if the execution succeeded // If it failed or was killed, just clear the queue and finalize let should_execute_queued = !matches!( ctx.execution_process.status, ExecutionProcessStatus::Failed | ExecutionProcessStatus::Killed ); if let Some(queued_msg) = container .queued_message_service .take_queued(ctx.task_attempt.id) { if should_execute_queued { tracing::info!( "Found queued message for attempt {}, starting follow-up execution", ctx.task_attempt.id ); // Delete the scratch since we're consuming the queued message if let Err(e) = Scratch::delete( &db.pool, ctx.task_attempt.id, &ScratchType::DraftFollowUp, ) .await { tracing::warn!( "Failed to delete scratch after consuming queued message: {}", e ); } // Execute the queued follow-up if let Err(e) = container .start_queued_follow_up(&ctx, &queued_msg.data) .await { tracing::error!("Failed to start queued follow-up: {}", e); // Fall back to finalization if follow-up fails container.finalize_task(publisher.as_ref().ok(), &ctx).await; } } else { // Execution failed or was killed - discard the queued message and finalize tracing::info!( "Discarding queued message for attempt {} due to execution status {:?}", ctx.task_attempt.id, ctx.execution_process.status ); container.finalize_task(publisher.as_ref().ok(), &ctx).await; } } else { container.finalize_task(publisher.as_ref().ok(), &ctx).await; } } // Fire analytics event when CodingAgent execution has finished if config.read().await.analytics_enabled && matches!( &ctx.execution_process.run_reason, ExecutionProcessRunReason::CodingAgent ) && let Some(analytics) = &analytics { analytics.analytics_service.track_event(&analytics.user_id, "task_attempt_finished", Some(json!({ "task_id": ctx.task.id.to_string(), "project_id": ctx.task.project_id.to_string(), "attempt_id": ctx.task_attempt.id.to_string(), "execution_success": matches!(ctx.execution_process.status, ExecutionProcessStatus::Completed), "exit_code": ctx.execution_process.exit_code, }))); } } // Now that commit/next-action/finalization steps for this process are complete, // capture the HEAD OID as the definitive "after" state (best-effort). if let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await { let worktree_dir = container.task_attempt_to_current_dir(&ctx.task_attempt); if let Ok(head) = container.git().get_head_info(&worktree_dir) && let Err(e) = ExecutionProcess::update_after_head_commit(&db.pool, exec_id, &head.oid) .await { tracing::warn!("Failed to update after_head_commit for {}: {}", exec_id, e); } } // Cleanup msg store if let Some(msg_arc) = msg_stores.write().await.remove(&exec_id) { msg_arc.push_finished(); tokio::time::sleep(Duration::from_millis(50)).await; // Wait for the finish message to propogate match Arc::try_unwrap(msg_arc) { Ok(inner) => drop(inner), Err(arc) => tracing::error!( "There are still {} strong Arcs to MsgStore for {}", Arc::strong_count(&arc), exec_id ), } } // Cleanup child handle child_store.write().await.remove(&exec_id); }) } pub fn spawn_os_exit_watcher( &self, exec_id: Uuid, ) -> tokio::sync::oneshot::Receiver> { let (tx, rx) = tokio::sync::oneshot::channel::>(); let child_store = self.child_store.clone(); tokio::spawn(async move { loop { let child_lock = { let map = child_store.read().await; map.get(&exec_id).cloned() }; if let Some(child_lock) = child_lock { let mut child_handler = child_lock.write().await; match child_handler.try_wait() { Ok(Some(status)) => { let _ = tx.send(Ok(status)); break; } Ok(None) => {} Err(e) => { let _ = tx.send(Err(e)); break; } } } else { let _ = tx.send(Err(io::Error::other(format!( "Child handle missing for {exec_id}" )))); break; } tokio::time::sleep(Duration::from_millis(250)).await; } }); rx } pub fn dir_name_from_task_attempt(attempt_id: &Uuid, task_title: &str) -> String { let task_title_id = git_branch_id(task_title); format!("{}-{}", short_uuid(attempt_id), task_title_id) } async fn track_child_msgs_in_store(&self, id: Uuid, child: &mut AsyncGroupChild) { let store = Arc::new(MsgStore::new()); let out = child.inner().stdout.take().expect("no stdout"); let err = child.inner().stderr.take().expect("no stderr"); // Map stdout bytes -> LogMsg::Stdout let out = ReaderStream::new(out) .map_ok(|chunk| LogMsg::Stdout(String::from_utf8_lossy(&chunk).into_owned())); // Map stderr bytes -> LogMsg::Stderr let err = ReaderStream::new(err) .map_ok(|chunk| LogMsg::Stderr(String::from_utf8_lossy(&chunk).into_owned())); // If you have a JSON Patch source, map it to LogMsg::JsonPatch too, then select all three. // Merge and forward into the store let merged = select(out, err); // Stream> store.clone().spawn_forwarder(merged); let mut map = self.msg_stores().write().await; map.insert(id, store); } /// Get the project repository path for a task attempt async fn get_project_repo_path( &self, task_attempt: &TaskAttempt, ) -> Result { let project_repo_path = task_attempt .parent_task(&self.db().pool) .await? .ok_or(ContainerError::Other(anyhow!("Parent task not found")))? .parent_project(&self.db().pool) .await? .ok_or(ContainerError::Other(anyhow!("Parent project not found")))? .git_repo_path; Ok(project_repo_path) } /// Create a diff log stream for merged attempts (never changes) for WebSocket fn create_merged_diff_stream( &self, project_repo_path: &Path, merge_commit_id: &str, stats_only: bool, ) -> Result { let diffs = self.git().get_diffs( DiffTarget::Commit { repo_path: project_repo_path, commit_sha: merge_commit_id, }, None, )?; let cum = Arc::new(AtomicUsize::new(0)); let diffs: Vec<_> = diffs .into_iter() .map(|mut d| { diff_stream::apply_stream_omit_policy(&mut d, &cum, stats_only); d }) .collect(); let stream = futures::stream::iter(diffs.into_iter().map(|diff| { let entry_index = GitService::diff_path(&diff); let patch = ConversationPatch::add_diff(escape_json_pointer_segment(&entry_index), diff); Ok::<_, std::io::Error>(LogMsg::JsonPatch(patch)) })) .chain(futures::stream::once(async { Ok::<_, std::io::Error>(LogMsg::Finished) })) .boxed(); Ok(diff_stream::DiffStreamHandle::new(stream, None)) } /// Create a live diff log stream for ongoing attempts for WebSocket /// Returns a stream that owns the filesystem watcher - when dropped, watcher is cleaned up async fn create_live_diff_stream( &self, worktree_path: &Path, base_commit: &Commit, stats_only: bool, ) -> Result { diff_stream::create( self.git().clone(), worktree_path.to_path_buf(), base_commit.clone(), stats_only, ) .await .map_err(|e| ContainerError::Other(anyhow!("{e}"))) } /// Extract the last assistant message from the MsgStore history fn extract_last_assistant_message(&self, exec_id: &Uuid) -> Option { // Get the MsgStore for this execution let msg_stores = self.msg_stores.try_read().ok()?; let msg_store = msg_stores.get(exec_id)?; // Get the history and scan in reverse for the last assistant message let history = msg_store.get_history(); for msg in history.iter().rev() { if let LogMsg::JsonPatch(patch) = msg { // Try to extract a NormalizedEntry from the patch if let Some((_, entry)) = extract_normalized_entry_from_patch(patch) && matches!(entry.entry_type, NormalizedEntryType::AssistantMessage) { let content = entry.content.trim(); if !content.is_empty() { const MAX_SUMMARY_LENGTH: usize = 4096; if content.len() > MAX_SUMMARY_LENGTH { let truncated = truncate_to_char_boundary(content, MAX_SUMMARY_LENGTH); return Some(format!("{truncated}...")); } return Some(content.to_string()); } } } } None } /// Update the executor session summary with the final assistant message async fn update_executor_session_summary(&self, exec_id: &Uuid) -> Result<(), anyhow::Error> { // Check if there's an executor session for this execution process let session = ExecutorSession::find_by_execution_process_id(&self.db.pool, *exec_id).await?; if let Some(session) = session { // Only update if summary is not already set if session.summary.is_none() { if let Some(summary) = self.extract_last_assistant_message(exec_id) { ExecutorSession::update_summary(&self.db.pool, *exec_id, &summary).await?; } else { tracing::debug!("No assistant message found for execution {}", exec_id); } } } Ok(()) } /// Start a follow-up execution from a queued message async fn start_queued_follow_up( &self, ctx: &ExecutionContext, queued_data: &DraftFollowUpData, ) -> Result { // Get executor profile from the latest CodingAgent process let initial_executor_profile_id = ExecutionProcess::latest_executor_profile_for_attempt( &self.db.pool, ctx.task_attempt.id, ) .await .map_err(|e| ContainerError::Other(anyhow!("Failed to get executor profile: {e}")))?; let executor_profile_id = ExecutorProfileId { executor: initial_executor_profile_id.executor, variant: queued_data.variant.clone(), }; // Get latest session ID for session continuity let latest_session_id = ExecutionProcess::find_latest_session_id_by_task_attempt( &self.db.pool, ctx.task_attempt.id, ) .await?; // Get project for cleanup script let project = Project::find_by_id(&self.db.pool, ctx.task.project_id) .await? .ok_or_else(|| ContainerError::Other(anyhow!("Project not found")))?; let cleanup_action = self.cleanup_action(project.cleanup_script); let action_type = if let Some(session_id) = latest_session_id { ExecutorActionType::CodingAgentFollowUpRequest(CodingAgentFollowUpRequest { prompt: queued_data.message.clone(), session_id, executor_profile_id: executor_profile_id.clone(), }) } else { ExecutorActionType::CodingAgentInitialRequest(CodingAgentInitialRequest { prompt: queued_data.message.clone(), executor_profile_id: executor_profile_id.clone(), }) }; let action = ExecutorAction::new(action_type, cleanup_action); self.start_execution( &ctx.task_attempt, &action, &ExecutionProcessRunReason::CodingAgent, ) .await } } fn failure_exit_status() -> std::process::ExitStatus { #[cfg(unix)] { use std::os::unix::process::ExitStatusExt; ExitStatusExt::from_raw(256) // Exit code 1 (shifted by 8 bits) } #[cfg(windows)] { use std::os::windows::process::ExitStatusExt; ExitStatusExt::from_raw(1) } } #[async_trait] impl ContainerService for LocalContainerService { fn msg_stores(&self) -> &Arc>>> { &self.msg_stores } fn db(&self) -> &DBService { &self.db } fn git(&self) -> &GitService { &self.git } fn share_publisher(&self) -> Option<&SharePublisher> { self.publisher.as_ref().ok() } fn notification_service(&self) -> &NotificationService { &self.notification_service } async fn git_branch_prefix(&self) -> String { self.config.read().await.git_branch_prefix.clone() } fn task_attempt_to_current_dir(&self, task_attempt: &TaskAttempt) -> PathBuf { PathBuf::from(task_attempt.container_ref.clone().unwrap_or_default()) } /// Create a container async fn create(&self, task_attempt: &TaskAttempt) -> Result { let task = task_attempt .parent_task(&self.db.pool) .await? .ok_or(sqlx::Error::RowNotFound)?; let worktree_dir_name = LocalContainerService::dir_name_from_task_attempt(&task_attempt.id, &task.title); let worktree_path = WorktreeManager::get_worktree_base_dir().join(&worktree_dir_name); let project = task .parent_project(&self.db.pool) .await? .ok_or(sqlx::Error::RowNotFound)?; WorktreeManager::create_worktree( &project.git_repo_path, &task_attempt.branch, &worktree_path, &task_attempt.target_branch, true, // create new branch ) .await?; // Copy files specified in the project's copy_files field if let Some(copy_files) = &project.copy_files && !copy_files.trim().is_empty() { self.copy_project_files(&project.git_repo_path, &worktree_path, copy_files) .await .unwrap_or_else(|e| { tracing::warn!("Failed to copy project files: {}", e); }); } // Copy task images from cache to worktree if let Err(e) = self .image_service .copy_images_by_task_to_worktree(&worktree_path, task.id) .await { tracing::warn!("Failed to copy task images to worktree: {}", e); } // Update both container_ref and branch in the database TaskAttempt::update_container_ref( &self.db.pool, task_attempt.id, &worktree_path.to_string_lossy(), ) .await?; Ok(worktree_path.to_string_lossy().to_string()) } async fn delete_inner(&self, task_attempt: &TaskAttempt) -> Result<(), ContainerError> { // cleanup the container, here that means deleting the worktree let task = task_attempt .parent_task(&self.db.pool) .await? .ok_or(sqlx::Error::RowNotFound)?; let git_repo_path = match Project::find_by_id(&self.db.pool, task.project_id).await { Ok(Some(project)) => Some(project.git_repo_path.clone()), Ok(None) => None, Err(e) => { tracing::error!("Failed to fetch project {}: {}", task.project_id, e); None } }; WorktreeManager::cleanup_worktree(&WorktreeCleanup::new( PathBuf::from(task_attempt.container_ref.clone().unwrap_or_default()), git_repo_path, )) .await .unwrap_or_else(|e| { tracing::warn!( "Failed to clean up worktree for task attempt {}: {}", task_attempt.id, e ); }); Ok(()) } async fn ensure_container_exists( &self, task_attempt: &TaskAttempt, ) -> Result { // Get required context let task = task_attempt .parent_task(&self.db.pool) .await? .ok_or(sqlx::Error::RowNotFound)?; let project = task .parent_project(&self.db.pool) .await? .ok_or(sqlx::Error::RowNotFound)?; let container_ref = task_attempt.container_ref.as_ref().ok_or_else(|| { ContainerError::Other(anyhow!("Container ref not found for task attempt")) })?; let worktree_path = PathBuf::from(container_ref); WorktreeManager::ensure_worktree_exists( &project.git_repo_path, &task_attempt.branch, &worktree_path, ) .await?; Ok(container_ref.to_string()) } async fn is_container_clean(&self, task_attempt: &TaskAttempt) -> Result { if let Some(container_ref) = &task_attempt.container_ref { // If container_ref is set, check if the worktree exists let path = PathBuf::from(container_ref); if path.exists() { self.git().is_worktree_clean(&path).map_err(|e| e.into()) } else { return Ok(true); // No worktree means it's clean } } else { return Ok(true); // No container_ref means no worktree, so it's clean } } async fn start_execution_inner( &self, task_attempt: &TaskAttempt, execution_process: &ExecutionProcess, executor_action: &ExecutorAction, ) -> Result<(), ContainerError> { // Get the worktree path let container_ref = task_attempt .container_ref .as_ref() .ok_or(ContainerError::Other(anyhow!( "Container ref not found for task attempt" )))?; let current_dir = PathBuf::from(container_ref); let approvals_service: Arc = match executor_action.base_executor() { Some(BaseCodingAgent::Codex) | Some(BaseCodingAgent::ClaudeCode) => { ExecutorApprovalBridge::new( self.approvals.clone(), self.db.clone(), self.notification_service.clone(), execution_process.id, ) } _ => Arc::new(NoopExecutorApprovalService {}), }; // Build ExecutionEnv with VK_* variables let mut env = ExecutionEnv::new(); // Load task and project context for environment variables let task = task_attempt .parent_task(&self.db.pool) .await? .ok_or(ContainerError::Other(anyhow!( "Task not found for task attempt" )))?; let project = task .parent_project(&self.db.pool) .await? .ok_or(ContainerError::Other(anyhow!("Project not found for task")))?; env.insert("VK_PROJECT_NAME", &project.name); env.insert("VK_PROJECT_ID", project.id.to_string()); env.insert("VK_TASK_ID", task.id.to_string()); env.insert("VK_ATTEMPT_ID", task_attempt.id.to_string()); env.insert("VK_ATTEMPT_BRANCH", &task_attempt.branch); // Create the child and stream, add to execution tracker with timeout let mut spawned = tokio::time::timeout( Duration::from_secs(30), executor_action.spawn(¤t_dir, approvals_service, &env), ) .await .map_err(|_| { ContainerError::Other(anyhow!( "Timeout: process took more than 30 seconds to start" )) })??; self.track_child_msgs_in_store(execution_process.id, &mut spawned.child) .await; self.add_child_to_store(execution_process.id, spawned.child) .await; // Store interrupt sender for graceful shutdown if let Some(interrupt_sender) = spawned.interrupt_sender { self.add_interrupt_sender(execution_process.id, interrupt_sender) .await; } // Spawn unified exit monitor: watches OS exit and optional executor signal let _hn = self.spawn_exit_monitor(&execution_process.id, spawned.exit_signal); Ok(()) } async fn stop_execution( &self, execution_process: &ExecutionProcess, status: ExecutionProcessStatus, ) -> Result<(), ContainerError> { let child = self .get_child_from_store(&execution_process.id) .await .ok_or_else(|| { ContainerError::Other(anyhow!("Child process not found for execution")) })?; let exit_code = if status == ExecutionProcessStatus::Completed { Some(0) } else { None }; ExecutionProcess::update_completion(&self.db.pool, execution_process.id, status, exit_code) .await?; // Try graceful interrupt first, then force kill if let Some(interrupt_sender) = self.take_interrupt_sender(&execution_process.id).await { // Send interrupt signal (ignore error if receiver dropped) let _ = interrupt_sender.send(()); // Wait for graceful exit with timeout let graceful_exit = { let mut child_guard = child.write().await; tokio::time::timeout(Duration::from_secs(5), child_guard.wait()).await }; match graceful_exit { Ok(Ok(_)) => { tracing::debug!( "Process {} exited gracefully after interrupt", execution_process.id ); } Ok(Err(e)) => { tracing::info!("Error waiting for process {}: {}", execution_process.id, e); } Err(_) => { tracing::debug!( "Graceful shutdown timed out for process {}, force killing", execution_process.id ); } } } // Kill the child process and remove from the store { let mut child_guard = child.write().await; if let Err(e) = command::kill_process_group(&mut child_guard).await { tracing::error!( "Failed to stop execution process {}: {}", execution_process.id, e ); return Err(e); } } self.remove_child_from_store(&execution_process.id).await; // Mark the process finished in the MsgStore if let Some(msg) = self.msg_stores.write().await.remove(&execution_process.id) { msg.push_finished(); } // Update task status to InReview when execution is stopped if let Ok(ctx) = ExecutionProcess::load_context(&self.db.pool, execution_process.id).await && !matches!( ctx.execution_process.run_reason, ExecutionProcessRunReason::DevServer ) { match Task::update_status(&self.db.pool, ctx.task.id, TaskStatus::InReview).await { Ok(_) => { if let Some(publisher) = self.share_publisher() && let Err(err) = publisher.update_shared_task_by_id(ctx.task.id).await { tracing::warn!( ?err, "Failed to propagate shared task update for {}", ctx.task.id ); } } Err(e) => { tracing::error!("Failed to update task status to InReview: {e}"); } } } tracing::debug!( "Execution process {} stopped successfully", execution_process.id ); // Record after-head commit OID (best-effort) if let Ok(ctx) = ExecutionProcess::load_context(&self.db.pool, execution_process.id).await { let worktree = self.task_attempt_to_current_dir(&ctx.task_attempt); if let Ok(head) = self.git().get_head_info(&worktree) { let _ = ExecutionProcess::update_after_head_commit( &self.db.pool, execution_process.id, &head.oid, ) .await; } } Ok(()) } async fn stream_diff( &self, task_attempt: &TaskAttempt, stats_only: bool, ) -> Result>, ContainerError> { let project_repo_path = self.get_project_repo_path(task_attempt).await?; let latest_merge = Merge::find_latest_by_task_attempt_id(&self.db.pool, task_attempt.id).await?; let is_ahead = if let Ok((ahead, _)) = self.git().get_branch_status( &project_repo_path, &task_attempt.branch, &task_attempt.target_branch, ) { ahead > 0 } else { false }; if let Some(merge) = &latest_merge && let Some(commit) = merge.merge_commit() && self.is_container_clean(task_attempt).await? && !is_ahead { let wrapper = self.create_merged_diff_stream(&project_repo_path, &commit, stats_only)?; return Ok(Box::pin(wrapper)); } let container_ref = self.ensure_container_exists(task_attempt).await?; let worktree_path = PathBuf::from(container_ref); let base_commit = self.git().get_base_commit( &project_repo_path, &task_attempt.branch, &task_attempt.target_branch, )?; let wrapper = self .create_live_diff_stream(&worktree_path, &base_commit, stats_only) .await?; Ok(Box::pin(wrapper)) } async fn try_commit_changes(&self, ctx: &ExecutionContext) -> Result { if !matches!( ctx.execution_process.run_reason, ExecutionProcessRunReason::CodingAgent | ExecutionProcessRunReason::CleanupScript, ) { return Ok(false); } let message = match ctx.execution_process.run_reason { ExecutionProcessRunReason::CodingAgent => { // Try to retrieve the task summary from the executor session // otherwise fallback to default message match ExecutorSession::find_by_execution_process_id( &self.db().pool, ctx.execution_process.id, ) .await { Ok(Some(session)) if session.summary.is_some() => session.summary.unwrap(), Ok(_) => { tracing::debug!( "No summary found for execution process {}, using default message", ctx.execution_process.id ); format!( "Commit changes from coding agent for task attempt {}", ctx.task_attempt.id ) } Err(e) => { tracing::debug!( "Failed to retrieve summary for execution process {}: {}", ctx.execution_process.id, e ); format!( "Commit changes from coding agent for task attempt {}", ctx.task_attempt.id ) } } } ExecutionProcessRunReason::CleanupScript => { format!( "Cleanup script changes for task attempt {}", ctx.task_attempt.id ) } _ => Err(ContainerError::Other(anyhow::anyhow!( "Invalid run reason for commit" )))?, }; let container_ref = ctx.task_attempt.container_ref.as_ref().ok_or_else(|| { ContainerError::Other(anyhow::anyhow!("Container reference not found")) })?; tracing::debug!( "Committing changes for task attempt {} at path {:?}: '{}'", ctx.task_attempt.id, &container_ref, message ); let changes_committed = self.git().commit(Path::new(container_ref), &message)?; Ok(changes_committed) } /// Copy files from the original project directory to the worktree async fn copy_project_files( &self, source_dir: &Path, target_dir: &Path, copy_files: &str, ) -> Result<(), ContainerError> { let source_dir = source_dir.to_path_buf(); let target_dir = target_dir.to_path_buf(); let copy_files = copy_files.to_string(); tokio::time::timeout( std::time::Duration::from_secs(30), tokio::task::spawn_blocking(move || { copy::copy_project_files_impl(&source_dir, &target_dir, ©_files) }), ) .await .map_err(|_| ContainerError::Other(anyhow!("Copy project files timed out after 30s")))? .map_err(|e| ContainerError::Other(anyhow!("Copy files task failed: {e}")))? } async fn kill_all_running_processes(&self) -> Result<(), ContainerError> { tracing::info!("Killing all running processes"); let running_processes = ExecutionProcess::find_running(&self.db.pool).await?; for process in running_processes { if let Err(error) = self .stop_execution(&process, ExecutionProcessStatus::Killed) .await { tracing::error!( "Failed to cleanly kill running execution process {:?}: {:?}", process, error ); } } Ok(()) } } fn success_exit_status() -> std::process::ExitStatus { #[cfg(unix)] { use std::os::unix::process::ExitStatusExt; ExitStatusExt::from_raw(0) } #[cfg(windows)] { use std::os::windows::process::ExitStatusExt; ExitStatusExt::from_raw(0) } }