diff --git a/crates/deployment/src/lib.rs b/crates/deployment/src/lib.rs index d5bbe00c..bf305862 100644 --- a/crates/deployment/src/lib.rs +++ b/crates/deployment/src/lib.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use anyhow::Error as AnyhowError; use async_trait::async_trait; @@ -6,10 +6,8 @@ use axum::response::sse::Event; use db::{ DBService, models::{ - execution_process::{ExecutionProcess, ExecutionProcessRunReason, ExecutionProcessStatus}, project::{CreateProject, Project}, - task::{Task, TaskStatus}, - task_attempt::{TaskAttempt, TaskAttemptError}, + task_attempt::TaskAttemptError, }, }; use executors::executors::ExecutorError; @@ -36,7 +34,7 @@ use services::services::{ use sqlx::{Error as SqlxError, types::Uuid}; use thiserror::Error; use tokio::sync::{Mutex, RwLock}; -use utils::{msg_store::MsgStore, sentry as sentry_utils}; +use utils::sentry as sentry_utils; #[derive(Debug, Clone, Copy, Error)] #[error("Remote client not configured")] @@ -82,8 +80,6 @@ pub trait Deployment: Clone + Send + Sync + 'static { fn user_id(&self) -> &str; - fn shared_types() -> Vec; - fn config(&self) -> &Arc>; fn db(&self) -> &DBService; @@ -98,8 +94,6 @@ pub trait Deployment: Clone + Send + Sync + 'static { fn filesystem(&self) -> &FilesystemService; - fn msg_stores(&self) -> &Arc>>>; - fn events(&self) -> &EventService; fn file_search_cache(&self) -> &Arc; @@ -163,130 +157,6 @@ pub trait Deployment: Clone + Send + Sync + 'static { } } - /// Cleanup executions marked as running in the db, call at startup - async fn cleanup_orphan_executions(&self) -> Result<(), DeploymentError> { - let running_processes = ExecutionProcess::find_running(&self.db().pool).await?; - for process in running_processes { - tracing::info!( - "Found orphaned execution process {} for task attempt {}", - process.id, - process.task_attempt_id - ); - // Update the execution process status first - if let Err(e) = ExecutionProcess::update_completion( - &self.db().pool, - process.id, - ExecutionProcessStatus::Failed, - None, // No exit code for orphaned processes - ) - .await - { - tracing::error!( - "Failed to update orphaned execution process {} status: {}", - process.id, - e - ); - continue; - } - // Capture after-head commit OID (best-effort) - if let Ok(Some(task_attempt)) = - TaskAttempt::find_by_id(&self.db().pool, process.task_attempt_id).await - && let Some(container_ref) = task_attempt.container_ref - { - let wt = std::path::PathBuf::from(container_ref); - if let Ok(head) = self.git().get_head_info(&wt) { - let _ = ExecutionProcess::update_after_head_commit( - &self.db().pool, - process.id, - &head.oid, - ) - .await; - } - } - // Process marked as failed - tracing::info!("Marked orphaned execution process {} as failed", process.id); - // Update task status to InReview for coding agent and setup script failures - if matches!( - process.run_reason, - ExecutionProcessRunReason::CodingAgent - | ExecutionProcessRunReason::SetupScript - | ExecutionProcessRunReason::CleanupScript - ) && let Ok(Some(task_attempt)) = - TaskAttempt::find_by_id(&self.db().pool, process.task_attempt_id).await - && let Ok(Some(task)) = task_attempt.parent_task(&self.db().pool).await - { - match Task::update_status(&self.db().pool, task.id, TaskStatus::InReview).await { - Ok(_) => { - if let Ok(publisher) = self.share_publisher() - && let Err(err) = publisher.update_shared_task_by_id(task.id).await - { - tracing::warn!( - ?err, - "Failed to propagate shared task update for {}", - task.id - ); - } - } - Err(e) => { - tracing::error!( - "Failed to update task status to InReview for orphaned attempt: {}", - e - ); - } - } - } - } - Ok(()) - } - - /// Backfill before_head_commit for legacy execution processes. - /// Rules: - /// - If a process has after_head_commit and missing before_head_commit, - /// then set before_head_commit to the previous process's after_head_commit. - /// - If there is no previous process, set before_head_commit to the base branch commit. - async fn backfill_before_head_commits(&self) -> Result<(), DeploymentError> { - let pool = &self.db().pool; - let rows = ExecutionProcess::list_missing_before_context(pool).await?; - for row in rows { - // Skip if no after commit at all (shouldn't happen due to WHERE) - // Prefer previous process after-commit if present - let mut before = row.prev_after_head_commit.clone(); - - // Fallback to base branch commit OID - if before.is_none() { - let repo_path = - std::path::Path::new(row.git_repo_path.as_deref().unwrap_or_default()); - match self - .git() - .get_branch_oid(repo_path, row.target_branch.as_str()) - { - Ok(oid) => before = Some(oid), - Err(e) => { - tracing::warn!( - "Backfill: Failed to resolve base branch OID for attempt {} (branch {}): {}", - row.task_attempt_id, - row.target_branch, - e - ); - } - } - } - - if let Some(before_oid) = before - && let Err(e) = - ExecutionProcess::update_before_head_commit(pool, row.id, &before_oid).await - { - tracing::warn!( - "Backfill: Failed to update before_head_commit for process {}: {}", - row.id, - e - ); - } - } - - Ok(()) - } - /// Trigger background auto-setup of default projects for new users async fn trigger_auto_project_setup(&self) { // soft timeout to give the filesystem search a chance to complete diff --git a/crates/local-deployment/src/container.rs b/crates/local-deployment/src/container.rs index c65415a2..d51011b0 100644 --- a/crates/local-deployment/src/container.rs +++ b/crates/local-deployment/src/container.rs @@ -47,16 +47,15 @@ use services::services::{ diff_stream::{self, DiffStreamHandle}, git::{Commit, DiffTarget, GitService}, image::ImageService, - notification::NotificationService, share::SharePublisher, - worktree_manager::WorktreeManager, + worktree_manager::{WorktreeCleanup, WorktreeManager}, }; use tokio::{sync::RwLock, task::JoinHandle}; use tokio_util::io::ReaderStream; use utils::{ log_msg::LogMsg, msg_store::MsgStore, - text::{git_branch_id, short_uuid}, + text::{git_branch_id, short_uuid, truncate_to_char_boundary}, }; use uuid::Uuid; @@ -77,7 +76,7 @@ pub struct LocalContainerService { impl LocalContainerService { #[allow(clippy::too_many_arguments)] - pub fn new( + pub async fn new( db: DBService, msg_stores: Arc>>>, config: Arc>, @@ -89,7 +88,7 @@ impl LocalContainerService { ) -> Self { let child_store = Arc::new(RwLock::new(HashMap::new())); - LocalContainerService { + let container = LocalContainerService { db, child_store, msg_stores, @@ -99,7 +98,11 @@ impl LocalContainerService { analytics, approvals, publisher, - } + }; + + container.spawn_worktree_cleanup().await; + + container } pub async fn get_child_from_store(&self, id: &Uuid) -> Option>> { @@ -117,48 +120,6 @@ impl LocalContainerService { map.remove(id); } - /// A context is finalized when - /// - The next action is None (no follow-up actions) - /// - The run reason is not DevServer - fn should_finalize(ctx: &ExecutionContext) -> bool { - ctx.execution_process - .executor_action() - .unwrap() - .next_action - .is_none() - && (!matches!( - ctx.execution_process.run_reason, - ExecutionProcessRunReason::DevServer - )) - } - - /// Finalize task execution by updating status to InReview and sending notifications - async fn finalize_task( - db: &DBService, - config: &Arc>, - share: &Result, - ctx: &ExecutionContext, - ) { - match Task::update_status(&db.pool, ctx.task.id, TaskStatus::InReview).await { - Ok(_) => { - if let Ok(publisher) = share - && let Err(err) = publisher.update_shared_task_by_id(ctx.task.id).await - { - tracing::warn!( - ?err, - "Failed to propagate shared task update for {}", - ctx.task.id - ); - } - } - Err(e) => { - tracing::error!("Failed to update task status to InReview: {e}"); - } - } - let notify_cfg = config.read().await.notifications.clone(); - NotificationService::notify_execution_halted(notify_cfg, ctx).await; - } - /// Defensively check for externally deleted worktrees and mark them as deleted in the database async fn check_externally_deleted_worktrees(db: &DBService) -> Result<(), DeploymentError> { let active_attempts = TaskAttempt::find_by_worktree_deleted(&db.pool).await?; @@ -236,7 +197,9 @@ impl LocalContainerService { { // This is an orphaned worktree - delete it tracing::info!("Found orphaned worktree: {}", worktree_path_str); - if let Err(e) = WorktreeManager::cleanup_worktree(&path, None).await { + if let Err(e) = + WorktreeManager::cleanup_worktree(&WorktreeCleanup::new(path, None)).await + { tracing::error!( "Failed to remove orphaned worktree {}: {}", worktree_path_str, @@ -258,7 +221,11 @@ impl LocalContainerService { worktree_path: PathBuf, git_repo_path: PathBuf, ) -> Result<(), DeploymentError> { - WorktreeManager::cleanup_worktree(&worktree_path, Some(&git_repo_path)).await?; + WorktreeManager::cleanup_worktree(&WorktreeCleanup::new( + worktree_path, + Some(git_repo_path), + )) + .await?; // Mark worktree as deleted in database after successful cleanup TaskAttempt::mark_worktree_deleted(&db.pool, attempt_id).await?; tracing::info!("Successfully marked worktree as deleted for attempt {attempt_id}",); @@ -429,12 +396,16 @@ impl LocalContainerService { ); // Manually finalize task since we're bypassing normal execution flow - Self::finalize_task(&db, &config, &publisher, &ctx).await; + container + .finalize_task(&config, publisher.as_ref().ok(), &ctx) + .await; } } - if Self::should_finalize(&ctx) { - Self::finalize_task(&db, &config, &publisher, &ctx).await; + if container.should_finalize(&ctx) { + container + .finalize_task(&config, publisher.as_ref().ok(), &ctx) + .await; // After finalization, check if a queued follow-up exists and start it if let Err(e) = container.try_consume_queued_followup(&ctx).await { tracing::error!( @@ -562,23 +533,6 @@ impl LocalContainerService { map.insert(id, store); } - /// Get the worktree path for a task attempt - #[allow(dead_code)] - async fn get_worktree_path( - &self, - task_attempt: &TaskAttempt, - ) -> Result { - let container_ref = self.ensure_container_exists(task_attempt).await?; - let worktree_dir = PathBuf::from(&container_ref); - - if !worktree_dir.exists() { - return Err(ContainerError::Other(anyhow!( - "Worktree directory not found" - ))); - } - - Ok(worktree_dir) - } /// Get the project repository path for a task attempt async fn get_project_repo_path( &self, @@ -951,10 +905,10 @@ impl ContainerService for LocalContainerService { None } }; - WorktreeManager::cleanup_worktree( - &PathBuf::from(task_attempt.container_ref.clone().unwrap_or_default()), - git_repo_path.as_deref(), - ) + WorktreeManager::cleanup_worktree(&WorktreeCleanup::new( + PathBuf::from(task_attempt.container_ref.clone().unwrap_or_default()), + git_repo_path, + )) .await .unwrap_or_else(|e| { tracing::warn!( @@ -1322,23 +1276,6 @@ impl ContainerService for LocalContainerService { } } -fn truncate_to_char_boundary(content: &str, max_len: usize) -> &str { - if content.len() <= max_len { - return content; - } - - let cutoff = content - .char_indices() - .map(|(idx, _)| idx) - .chain(std::iter::once(content.len())) - .take_while(|&idx| idx <= max_len) - .last() - .unwrap_or(0); - - debug_assert!(content.is_char_boundary(cutoff)); - &content[..cutoff] -} - fn success_exit_status() -> std::process::ExitStatus { #[cfg(unix)] { @@ -1351,22 +1288,3 @@ fn success_exit_status() -> std::process::ExitStatus { ExitStatusExt::from_raw(0) } } - -#[cfg(test)] -mod tests { - - #[test] - fn test_truncate_to_char_boundary() { - use super::truncate_to_char_boundary; - - let input = "a".repeat(10); - assert_eq!(truncate_to_char_boundary(&input, 7), "a".repeat(7)); - - let input = "hello world"; - assert_eq!(truncate_to_char_boundary(input, input.len()), input); - - let input = "🔥🔥🔥"; // each fire emoji is 4 bytes - assert_eq!(truncate_to_char_boundary(input, 5), "🔥"); - assert_eq!(truncate_to_char_boundary(input, 3), ""); - } -} diff --git a/crates/local-deployment/src/lib.rs b/crates/local-deployment/src/lib.rs index 65fe10b8..8c3bc44f 100644 --- a/crates/local-deployment/src/lib.rs +++ b/crates/local-deployment/src/lib.rs @@ -38,7 +38,6 @@ pub struct LocalDeployment { user_id: String, db: DBService, analytics: Option, - msg_stores: Arc>>>, container: LocalContainerService, git: GitService, image: ImageService, @@ -183,8 +182,8 @@ impl Deployment for LocalDeployment { analytics_ctx, approvals.clone(), share_publisher.clone(), - ); - container.spawn_worktree_cleanup().await; + ) + .await; let events = EventService::new(db.clone(), events_msg_store, events_entry_count); @@ -196,7 +195,6 @@ impl Deployment for LocalDeployment { user_id, db, analytics, - msg_stores, container, git, image, @@ -224,10 +222,6 @@ impl Deployment for LocalDeployment { &self.user_id } - fn shared_types() -> Vec { - vec![] - } - fn config(&self) -> &Arc> { &self.config } @@ -256,10 +250,6 @@ impl Deployment for LocalDeployment { &self.filesystem } - fn msg_stores(&self) -> &Arc>>> { - &self.msg_stores - } - fn events(&self) -> &EventService { &self.events } diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs index 0e9db6e0..c608a3a9 100644 --- a/crates/server/src/main.rs +++ b/crates/server/src/main.rs @@ -47,8 +47,16 @@ async fn main() -> Result<(), VibeKanbanError> { let deployment = DeploymentImpl::new().await?; deployment.update_sentry_scope().await?; - deployment.cleanup_orphan_executions().await?; - deployment.backfill_before_head_commits().await?; + deployment + .container() + .cleanup_orphan_executions() + .await + .map_err(DeploymentError::from)?; + deployment + .container() + .backfill_before_head_commits() + .await + .map_err(DeploymentError::from)?; deployment.spawn_pr_monitor_service().await; deployment .track_if_analytics_allowed("session_start", serde_json::json!({})) diff --git a/crates/server/src/routes/tasks.rs b/crates/server/src/routes/tasks.rs index 655b20da..96322181 100644 --- a/crates/server/src/routes/tasks.rs +++ b/crates/server/src/routes/tasks.rs @@ -22,8 +22,9 @@ use executors::profile::ExecutorProfileId; use futures_util::{SinkExt, StreamExt, TryStreamExt}; use serde::{Deserialize, Serialize}; use services::services::{ - container::{ContainerService, WorktreeCleanupData, cleanup_worktrees_direct}, + container::ContainerService, share::ShareError, + worktree_manager::{WorktreeCleanup, WorktreeManager}, }; use sqlx::Error as SqlxError; use ts_rs::TS; @@ -285,14 +286,13 @@ pub async fn delete_task( .await? .ok_or_else(|| ApiError::Database(SqlxError::RowNotFound))?; - let cleanup_data: Vec = attempts + let cleanup_args: Vec = attempts .iter() .filter_map(|attempt| { attempt .container_ref .as_ref() - .map(|worktree_path| WorktreeCleanupData { - attempt_id: attempt.id, + .map(|worktree_path| WorktreeCleanup { worktree_path: PathBuf::from(worktree_path), git_repo_path: Some(project.git_repo_path.clone()), }) @@ -355,10 +355,10 @@ pub async fn delete_task( tracing::info!( "Starting background cleanup for task {} ({} worktrees)", task_id, - cleanup_data.len() + cleanup_args.len() ); - if let Err(e) = cleanup_worktrees_direct(&cleanup_data).await { + if let Err(e) = WorktreeManager::batch_cleanup_worktrees(&cleanup_args).await { tracing::error!( "Background worktree cleanup failed for task {}: {}", task_id, diff --git a/crates/services/src/services/container.rs b/crates/services/src/services/container.rs index 2dcd9196..d3f217b6 100644 --- a/crates/services/src/services/container.rs +++ b/crates/services/src/services/container.rs @@ -22,13 +22,12 @@ use db::{ use executors::{ actions::{ ExecutorAction, ExecutorActionType, - coding_agent_follow_up::CodingAgentFollowUpRequest, coding_agent_initial::CodingAgentInitialRequest, script::{ScriptContext, ScriptRequest, ScriptRequestLanguage}, }, executors::{ExecutorError, StandardCodingAgentExecutor}, logs::{NormalizedEntry, NormalizedEntryError, NormalizedEntryType, utils::ConversationPatch}, - profile::{ExecutorConfigs, ExecutorProfileId, to_default_variant}, + profile::{ExecutorConfigs, ExecutorProfileId}, }; use futures::{StreamExt, future}; use sqlx::Error as SqlxError; @@ -42,47 +41,15 @@ use utils::{ use uuid::Uuid; use crate::services::{ + config::Config, git::{GitService, GitServiceError}, image::ImageService, + notification::NotificationService, share::SharePublisher, - worktree_manager::{WorktreeError, WorktreeManager}, + worktree_manager::WorktreeError, }; pub type ContainerRef = String; -/// Data needed for background worktree cleanup (doesn't require DB access) -#[derive(Debug, Clone)] -pub struct WorktreeCleanupData { - pub attempt_id: Uuid, - pub worktree_path: PathBuf, - pub git_repo_path: Option, -} - -/// Cleanup worktrees without requiring database access -pub async fn cleanup_worktrees_direct(data: &[WorktreeCleanupData]) -> Result<(), ContainerError> { - for cleanup_data in data { - tracing::debug!( - "Cleaning up worktree for attempt {}: {:?}", - cleanup_data.attempt_id, - cleanup_data.worktree_path - ); - - if let Err(e) = WorktreeManager::cleanup_worktree( - &cleanup_data.worktree_path, - cleanup_data.git_repo_path.as_deref(), - ) - .await - { - tracing::error!( - "Failed to cleanup worktree for task attempt {}: {}", - cleanup_data.attempt_id, - e - ); - // Continue with other cleanups even if one fails - } - } - Ok(()) -} - #[derive(Debug, Error)] pub enum ContainerError { #[error(transparent)] @@ -154,6 +121,172 @@ pub trait ContainerService { Ok(()) } + /// A context is finalized when + /// - The next action is None (no follow-up actions) + /// - The run reason is not DevServer + fn should_finalize(&self, ctx: &ExecutionContext) -> bool { + ctx.execution_process + .executor_action() + .unwrap() + .next_action + .is_none() + && (!matches!( + ctx.execution_process.run_reason, + ExecutionProcessRunReason::DevServer + )) + } + + /// Finalize task execution by updating status to InReview and sending notifications + async fn finalize_task( + &self, + config: &Arc>, + share_publisher: Option<&SharePublisher>, + ctx: &ExecutionContext, + ) { + match Task::update_status(&self.db().pool, ctx.task.id, TaskStatus::InReview).await { + Ok(_) => { + if let Some(publisher) = share_publisher + && let Err(err) = publisher.update_shared_task_by_id(ctx.task.id).await + { + tracing::warn!( + ?err, + "Failed to propagate shared task update for {}", + ctx.task.id + ); + } + } + Err(e) => { + tracing::error!("Failed to update task status to InReview: {e}"); + } + } + let notify_cfg = config.read().await.notifications.clone(); + NotificationService::notify_execution_halted(notify_cfg, ctx).await; + } + + /// Cleanup executions marked as running in the db, call at startup + async fn cleanup_orphan_executions(&self) -> Result<(), ContainerError> { + let running_processes = ExecutionProcess::find_running(&self.db().pool).await?; + for process in running_processes { + tracing::info!( + "Found orphaned execution process {} for task attempt {}", + process.id, + process.task_attempt_id + ); + // Update the execution process status first + if let Err(e) = ExecutionProcess::update_completion( + &self.db().pool, + process.id, + ExecutionProcessStatus::Failed, + None, // No exit code for orphaned processes + ) + .await + { + tracing::error!( + "Failed to update orphaned execution process {} status: {}", + process.id, + e + ); + continue; + } + // Capture after-head commit OID (best-effort) + if let Ok(Some(task_attempt)) = + TaskAttempt::find_by_id(&self.db().pool, process.task_attempt_id).await + && let Some(container_ref) = task_attempt.container_ref + { + let wt = std::path::PathBuf::from(container_ref); + if let Ok(head) = self.git().get_head_info(&wt) { + let _ = ExecutionProcess::update_after_head_commit( + &self.db().pool, + process.id, + &head.oid, + ) + .await; + } + } + // Process marked as failed + tracing::info!("Marked orphaned execution process {} as failed", process.id); + // Update task status to InReview for coding agent and setup script failures + if matches!( + process.run_reason, + ExecutionProcessRunReason::CodingAgent + | ExecutionProcessRunReason::SetupScript + | ExecutionProcessRunReason::CleanupScript + ) && let Ok(Some(task_attempt)) = + TaskAttempt::find_by_id(&self.db().pool, process.task_attempt_id).await + && let Ok(Some(task)) = task_attempt.parent_task(&self.db().pool).await + { + match Task::update_status(&self.db().pool, task.id, TaskStatus::InReview).await { + Ok(_) => { + if let Some(publisher) = self.share_publisher() + && let Err(err) = publisher.update_shared_task_by_id(task.id).await + { + tracing::warn!( + ?err, + "Failed to propagate shared task update for {}", + task.id + ); + } + } + Err(e) => { + tracing::error!( + "Failed to update task status to InReview for orphaned attempt: {}", + e + ); + } + } + } + } + Ok(()) + } + + /// Backfill before_head_commit for legacy execution processes. + /// Rules: + /// - If a process has after_head_commit and missing before_head_commit, + /// then set before_head_commit to the previous process's after_head_commit. + /// - If there is no previous process, set before_head_commit to the base branch commit. + async fn backfill_before_head_commits(&self) -> Result<(), ContainerError> { + let pool = &self.db().pool; + let rows = ExecutionProcess::list_missing_before_context(pool).await?; + for row in rows { + // Skip if no after commit at all (shouldn't happen due to WHERE) + // Prefer previous process after-commit if present + let mut before = row.prev_after_head_commit.clone(); + + // Fallback to base branch commit OID + if before.is_none() { + let repo_path = + std::path::Path::new(row.git_repo_path.as_deref().unwrap_or_default()); + match self + .git() + .get_branch_oid(repo_path, row.target_branch.as_str()) + { + Ok(oid) => before = Some(oid), + Err(e) => { + tracing::warn!( + "Backfill: Failed to resolve base branch OID for attempt {} (branch {}): {}", + row.task_attempt_id, + row.target_branch, + e + ); + } + } + } + + if let Some(before_oid) = before + && let Err(e) = + ExecutionProcess::update_before_head_commit(pool, row.id, &before_oid).await + { + tracing::warn!( + "Backfill: Failed to update before_head_commit for process {}: {}", + row.id, + e + ); + } + } + + Ok(()) + } + fn cleanup_action(&self, cleanup_script: Option) -> Option> { cleanup_script.map(|script| { Box::new(ExecutorAction::new( @@ -772,63 +905,4 @@ pub trait ContainerService { tracing::debug!("Started next action: {:?}", next_action); Ok(()) } - - async fn exit_plan_mode_tool(&self, ctx: ExecutionContext) -> Result<(), ContainerError> { - let execution_id = ctx.execution_process.id; - - if let Err(err) = self - .stop_execution(&ctx.execution_process, ExecutionProcessStatus::Completed) - .await - { - tracing::error!("Failed to stop execution process {}: {}", execution_id, err); - return Err(err); - } - - let action = ctx.execution_process.executor_action()?; - let executor_profile_id = match action.typ() { - ExecutorActionType::CodingAgentInitialRequest(req) => req.executor_profile_id.clone(), - ExecutorActionType::CodingAgentFollowUpRequest(req) => req.executor_profile_id.clone(), - _ => { - return Err(ContainerError::Other(anyhow::anyhow!( - "exit plan mode tool called on non-coding agent action" - ))); - } - }; - let cleanup_chain = action.next_action().cloned(); - - let session_id = - ExecutorSession::find_by_execution_process_id(&self.db().pool, execution_id) - .await? - .and_then(|s| s.session_id); - - if session_id.is_none() { - tracing::warn!( - "No executor session found for execution process {}", - execution_id - ); - return Err(ContainerError::Other(anyhow::anyhow!( - "No executor session found" - ))); - } - - let default_profile = to_default_variant(&executor_profile_id); - let follow_up = CodingAgentFollowUpRequest { - prompt: String::from("The plan has been approved, please execute it."), - session_id: session_id.unwrap(), - executor_profile_id: default_profile, - }; - let action = ExecutorAction::new( - ExecutorActionType::CodingAgentFollowUpRequest(follow_up), - cleanup_chain.map(Box::new), - ); - - let _ = self - .start_execution( - &ctx.task_attempt, - &action, - &ExecutionProcessRunReason::CodingAgent, - ) - .await?; - Ok(()) - } } diff --git a/crates/services/src/services/worktree_manager.rs b/crates/services/src/services/worktree_manager.rs index e52fc7da..10bef1cc 100644 --- a/crates/services/src/services/worktree_manager.rs +++ b/crates/services/src/services/worktree_manager.rs @@ -17,6 +17,21 @@ lazy_static::lazy_static! { Arc::new(Mutex::new(HashMap::new())); } +#[derive(Debug, Clone)] +pub struct WorktreeCleanup { + pub worktree_path: PathBuf, + pub git_repo_path: Option, +} + +impl WorktreeCleanup { + pub fn new(worktree_path: PathBuf, git_repo_path: Option) -> Self { + Self { + worktree_path, + git_repo_path, + } + } +} + #[derive(Debug, Error)] pub enum WorktreeError { #[error(transparent)] @@ -377,13 +392,22 @@ impl WorktreeManager { Ok(()) } + /// Clean up multiple worktrees + pub async fn batch_cleanup_worktrees(data: &[WorktreeCleanup]) -> Result<(), WorktreeError> { + for cleanup_data in data { + tracing::debug!("Cleaning up worktree: {:?}", cleanup_data.worktree_path); + + if let Err(e) = Self::cleanup_worktree(cleanup_data).await { + tracing::error!("Failed to cleanup worktree: {}", e); + } + } + Ok(()) + } + /// Clean up a worktree path and its git metadata (non-blocking) /// If git_repo_path is None, attempts to infer it from the worktree itself - pub async fn cleanup_worktree( - worktree_path: &Path, - git_repo_path: Option<&Path>, - ) -> Result<(), WorktreeError> { - let path_str = worktree_path.to_string_lossy().to_string(); + pub async fn cleanup_worktree(worktree: &WorktreeCleanup) -> Result<(), WorktreeError> { + let path_str = worktree.worktree_path.to_string_lossy().to_string(); // Get the same lock to ensure we don't interfere with creation let lock = { @@ -396,18 +420,18 @@ impl WorktreeManager { let _guard = lock.lock().await; - if let Some(worktree_name) = worktree_path.file_name().and_then(|n| n.to_str()) { + if let Some(worktree_name) = worktree.worktree_path.file_name().and_then(|n| n.to_str()) { // Try to determine the git repo path if not provided - let resolved_repo_path = if let Some(repo_path) = git_repo_path { + let resolved_repo_path = if let Some(repo_path) = &worktree.git_repo_path { Some(repo_path.to_path_buf()) } else { - Self::infer_git_repo_path(worktree_path).await + Self::infer_git_repo_path(&worktree.worktree_path).await }; if let Some(repo_path) = resolved_repo_path { Self::comprehensive_worktree_cleanup_async( &repo_path, - worktree_path, + &worktree.worktree_path, worktree_name, ) .await?; @@ -417,7 +441,7 @@ impl WorktreeManager { "Cannot determine git repo path for worktree {}, performing simple cleanup", path_str ); - Self::simple_worktree_cleanup(worktree_path).await?; + Self::simple_worktree_cleanup(&worktree.worktree_path).await?; } } else { return Err(WorktreeError::InvalidPath( diff --git a/crates/utils/src/text.rs b/crates/utils/src/text.rs index 9904d7b1..4fafe3ca 100644 --- a/crates/utils/src/text.rs +++ b/crates/utils/src/text.rs @@ -22,3 +22,39 @@ pub fn short_uuid(u: &Uuid) -> String { let full = u.simple().to_string(); full.chars().take(4).collect() // grab the first 4 chars } + +pub fn truncate_to_char_boundary(content: &str, max_len: usize) -> &str { + if content.len() <= max_len { + return content; + } + + let cutoff = content + .char_indices() + .map(|(idx, _)| idx) + .chain(std::iter::once(content.len())) + .take_while(|&idx| idx <= max_len) + .last() + .unwrap_or(0); + + debug_assert!(content.is_char_boundary(cutoff)); + &content[..cutoff] +} + +#[cfg(test)] +mod tests { + + #[test] + fn test_truncate_to_char_boundary() { + use super::truncate_to_char_boundary; + + let input = "a".repeat(10); + assert_eq!(truncate_to_char_boundary(&input, 7), "a".repeat(7)); + + let input = "hello world"; + assert_eq!(truncate_to_char_boundary(input, input.len()), input); + + let input = "🔥🔥🔥"; // each fire emoji is 4 bytes + assert_eq!(truncate_to_char_boundary(input, 5), "🔥"); + assert_eq!(truncate_to_char_boundary(input, 3), ""); + } +}