refactor container and deployments. scope logic appropriately and remove unused code. (#1351)
This commit is contained in:
committed by
GitHub
parent
036dd802d8
commit
bd93f14090
@@ -1,4 +1,4 @@
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::Error as AnyhowError;
|
||||
use async_trait::async_trait;
|
||||
@@ -6,10 +6,8 @@ use axum::response::sse::Event;
|
||||
use db::{
|
||||
DBService,
|
||||
models::{
|
||||
execution_process::{ExecutionProcess, ExecutionProcessRunReason, ExecutionProcessStatus},
|
||||
project::{CreateProject, Project},
|
||||
task::{Task, TaskStatus},
|
||||
task_attempt::{TaskAttempt, TaskAttemptError},
|
||||
task_attempt::TaskAttemptError,
|
||||
},
|
||||
};
|
||||
use executors::executors::ExecutorError;
|
||||
@@ -36,7 +34,7 @@ use services::services::{
|
||||
use sqlx::{Error as SqlxError, types::Uuid};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::{Mutex, RwLock};
|
||||
use utils::{msg_store::MsgStore, sentry as sentry_utils};
|
||||
use utils::sentry as sentry_utils;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Error)]
|
||||
#[error("Remote client not configured")]
|
||||
@@ -82,8 +80,6 @@ pub trait Deployment: Clone + Send + Sync + 'static {
|
||||
|
||||
fn user_id(&self) -> &str;
|
||||
|
||||
fn shared_types() -> Vec<String>;
|
||||
|
||||
fn config(&self) -> &Arc<RwLock<Config>>;
|
||||
|
||||
fn db(&self) -> &DBService;
|
||||
@@ -98,8 +94,6 @@ pub trait Deployment: Clone + Send + Sync + 'static {
|
||||
|
||||
fn filesystem(&self) -> &FilesystemService;
|
||||
|
||||
fn msg_stores(&self) -> &Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>;
|
||||
|
||||
fn events(&self) -> &EventService;
|
||||
|
||||
fn file_search_cache(&self) -> &Arc<FileSearchCache>;
|
||||
@@ -163,130 +157,6 @@ pub trait Deployment: Clone + Send + Sync + 'static {
|
||||
}
|
||||
}
|
||||
|
||||
/// Cleanup executions marked as running in the db, call at startup
|
||||
async fn cleanup_orphan_executions(&self) -> Result<(), DeploymentError> {
|
||||
let running_processes = ExecutionProcess::find_running(&self.db().pool).await?;
|
||||
for process in running_processes {
|
||||
tracing::info!(
|
||||
"Found orphaned execution process {} for task attempt {}",
|
||||
process.id,
|
||||
process.task_attempt_id
|
||||
);
|
||||
// Update the execution process status first
|
||||
if let Err(e) = ExecutionProcess::update_completion(
|
||||
&self.db().pool,
|
||||
process.id,
|
||||
ExecutionProcessStatus::Failed,
|
||||
None, // No exit code for orphaned processes
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
"Failed to update orphaned execution process {} status: {}",
|
||||
process.id,
|
||||
e
|
||||
);
|
||||
continue;
|
||||
}
|
||||
// Capture after-head commit OID (best-effort)
|
||||
if let Ok(Some(task_attempt)) =
|
||||
TaskAttempt::find_by_id(&self.db().pool, process.task_attempt_id).await
|
||||
&& let Some(container_ref) = task_attempt.container_ref
|
||||
{
|
||||
let wt = std::path::PathBuf::from(container_ref);
|
||||
if let Ok(head) = self.git().get_head_info(&wt) {
|
||||
let _ = ExecutionProcess::update_after_head_commit(
|
||||
&self.db().pool,
|
||||
process.id,
|
||||
&head.oid,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
// Process marked as failed
|
||||
tracing::info!("Marked orphaned execution process {} as failed", process.id);
|
||||
// Update task status to InReview for coding agent and setup script failures
|
||||
if matches!(
|
||||
process.run_reason,
|
||||
ExecutionProcessRunReason::CodingAgent
|
||||
| ExecutionProcessRunReason::SetupScript
|
||||
| ExecutionProcessRunReason::CleanupScript
|
||||
) && let Ok(Some(task_attempt)) =
|
||||
TaskAttempt::find_by_id(&self.db().pool, process.task_attempt_id).await
|
||||
&& let Ok(Some(task)) = task_attempt.parent_task(&self.db().pool).await
|
||||
{
|
||||
match Task::update_status(&self.db().pool, task.id, TaskStatus::InReview).await {
|
||||
Ok(_) => {
|
||||
if let Ok(publisher) = self.share_publisher()
|
||||
&& let Err(err) = publisher.update_shared_task_by_id(task.id).await
|
||||
{
|
||||
tracing::warn!(
|
||||
?err,
|
||||
"Failed to propagate shared task update for {}",
|
||||
task.id
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to update task status to InReview for orphaned attempt: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Backfill before_head_commit for legacy execution processes.
|
||||
/// Rules:
|
||||
/// - If a process has after_head_commit and missing before_head_commit,
|
||||
/// then set before_head_commit to the previous process's after_head_commit.
|
||||
/// - If there is no previous process, set before_head_commit to the base branch commit.
|
||||
async fn backfill_before_head_commits(&self) -> Result<(), DeploymentError> {
|
||||
let pool = &self.db().pool;
|
||||
let rows = ExecutionProcess::list_missing_before_context(pool).await?;
|
||||
for row in rows {
|
||||
// Skip if no after commit at all (shouldn't happen due to WHERE)
|
||||
// Prefer previous process after-commit if present
|
||||
let mut before = row.prev_after_head_commit.clone();
|
||||
|
||||
// Fallback to base branch commit OID
|
||||
if before.is_none() {
|
||||
let repo_path =
|
||||
std::path::Path::new(row.git_repo_path.as_deref().unwrap_or_default());
|
||||
match self
|
||||
.git()
|
||||
.get_branch_oid(repo_path, row.target_branch.as_str())
|
||||
{
|
||||
Ok(oid) => before = Some(oid),
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Backfill: Failed to resolve base branch OID for attempt {} (branch {}): {}",
|
||||
row.task_attempt_id,
|
||||
row.target_branch,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(before_oid) = before
|
||||
&& let Err(e) =
|
||||
ExecutionProcess::update_before_head_commit(pool, row.id, &before_oid).await
|
||||
{
|
||||
tracing::warn!(
|
||||
"Backfill: Failed to update before_head_commit for process {}: {}",
|
||||
row.id,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Trigger background auto-setup of default projects for new users
|
||||
async fn trigger_auto_project_setup(&self) {
|
||||
// soft timeout to give the filesystem search a chance to complete
|
||||
|
||||
@@ -47,16 +47,15 @@ use services::services::{
|
||||
diff_stream::{self, DiffStreamHandle},
|
||||
git::{Commit, DiffTarget, GitService},
|
||||
image::ImageService,
|
||||
notification::NotificationService,
|
||||
share::SharePublisher,
|
||||
worktree_manager::WorktreeManager,
|
||||
worktree_manager::{WorktreeCleanup, WorktreeManager},
|
||||
};
|
||||
use tokio::{sync::RwLock, task::JoinHandle};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use utils::{
|
||||
log_msg::LogMsg,
|
||||
msg_store::MsgStore,
|
||||
text::{git_branch_id, short_uuid},
|
||||
text::{git_branch_id, short_uuid, truncate_to_char_boundary},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -77,7 +76,7 @@ pub struct LocalContainerService {
|
||||
|
||||
impl LocalContainerService {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new(
|
||||
pub async fn new(
|
||||
db: DBService,
|
||||
msg_stores: Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>,
|
||||
config: Arc<RwLock<Config>>,
|
||||
@@ -89,7 +88,7 @@ impl LocalContainerService {
|
||||
) -> Self {
|
||||
let child_store = Arc::new(RwLock::new(HashMap::new()));
|
||||
|
||||
LocalContainerService {
|
||||
let container = LocalContainerService {
|
||||
db,
|
||||
child_store,
|
||||
msg_stores,
|
||||
@@ -99,7 +98,11 @@ impl LocalContainerService {
|
||||
analytics,
|
||||
approvals,
|
||||
publisher,
|
||||
}
|
||||
};
|
||||
|
||||
container.spawn_worktree_cleanup().await;
|
||||
|
||||
container
|
||||
}
|
||||
|
||||
pub async fn get_child_from_store(&self, id: &Uuid) -> Option<Arc<RwLock<AsyncGroupChild>>> {
|
||||
@@ -117,48 +120,6 @@ impl LocalContainerService {
|
||||
map.remove(id);
|
||||
}
|
||||
|
||||
/// A context is finalized when
|
||||
/// - The next action is None (no follow-up actions)
|
||||
/// - The run reason is not DevServer
|
||||
fn should_finalize(ctx: &ExecutionContext) -> bool {
|
||||
ctx.execution_process
|
||||
.executor_action()
|
||||
.unwrap()
|
||||
.next_action
|
||||
.is_none()
|
||||
&& (!matches!(
|
||||
ctx.execution_process.run_reason,
|
||||
ExecutionProcessRunReason::DevServer
|
||||
))
|
||||
}
|
||||
|
||||
/// Finalize task execution by updating status to InReview and sending notifications
|
||||
async fn finalize_task(
|
||||
db: &DBService,
|
||||
config: &Arc<RwLock<Config>>,
|
||||
share: &Result<SharePublisher, RemoteClientNotConfigured>,
|
||||
ctx: &ExecutionContext,
|
||||
) {
|
||||
match Task::update_status(&db.pool, ctx.task.id, TaskStatus::InReview).await {
|
||||
Ok(_) => {
|
||||
if let Ok(publisher) = share
|
||||
&& let Err(err) = publisher.update_shared_task_by_id(ctx.task.id).await
|
||||
{
|
||||
tracing::warn!(
|
||||
?err,
|
||||
"Failed to propagate shared task update for {}",
|
||||
ctx.task.id
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to update task status to InReview: {e}");
|
||||
}
|
||||
}
|
||||
let notify_cfg = config.read().await.notifications.clone();
|
||||
NotificationService::notify_execution_halted(notify_cfg, ctx).await;
|
||||
}
|
||||
|
||||
/// Defensively check for externally deleted worktrees and mark them as deleted in the database
|
||||
async fn check_externally_deleted_worktrees(db: &DBService) -> Result<(), DeploymentError> {
|
||||
let active_attempts = TaskAttempt::find_by_worktree_deleted(&db.pool).await?;
|
||||
@@ -236,7 +197,9 @@ impl LocalContainerService {
|
||||
{
|
||||
// This is an orphaned worktree - delete it
|
||||
tracing::info!("Found orphaned worktree: {}", worktree_path_str);
|
||||
if let Err(e) = WorktreeManager::cleanup_worktree(&path, None).await {
|
||||
if let Err(e) =
|
||||
WorktreeManager::cleanup_worktree(&WorktreeCleanup::new(path, None)).await
|
||||
{
|
||||
tracing::error!(
|
||||
"Failed to remove orphaned worktree {}: {}",
|
||||
worktree_path_str,
|
||||
@@ -258,7 +221,11 @@ impl LocalContainerService {
|
||||
worktree_path: PathBuf,
|
||||
git_repo_path: PathBuf,
|
||||
) -> Result<(), DeploymentError> {
|
||||
WorktreeManager::cleanup_worktree(&worktree_path, Some(&git_repo_path)).await?;
|
||||
WorktreeManager::cleanup_worktree(&WorktreeCleanup::new(
|
||||
worktree_path,
|
||||
Some(git_repo_path),
|
||||
))
|
||||
.await?;
|
||||
// Mark worktree as deleted in database after successful cleanup
|
||||
TaskAttempt::mark_worktree_deleted(&db.pool, attempt_id).await?;
|
||||
tracing::info!("Successfully marked worktree as deleted for attempt {attempt_id}",);
|
||||
@@ -429,12 +396,16 @@ impl LocalContainerService {
|
||||
);
|
||||
|
||||
// Manually finalize task since we're bypassing normal execution flow
|
||||
Self::finalize_task(&db, &config, &publisher, &ctx).await;
|
||||
container
|
||||
.finalize_task(&config, publisher.as_ref().ok(), &ctx)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
if Self::should_finalize(&ctx) {
|
||||
Self::finalize_task(&db, &config, &publisher, &ctx).await;
|
||||
if container.should_finalize(&ctx) {
|
||||
container
|
||||
.finalize_task(&config, publisher.as_ref().ok(), &ctx)
|
||||
.await;
|
||||
// After finalization, check if a queued follow-up exists and start it
|
||||
if let Err(e) = container.try_consume_queued_followup(&ctx).await {
|
||||
tracing::error!(
|
||||
@@ -562,23 +533,6 @@ impl LocalContainerService {
|
||||
map.insert(id, store);
|
||||
}
|
||||
|
||||
/// Get the worktree path for a task attempt
|
||||
#[allow(dead_code)]
|
||||
async fn get_worktree_path(
|
||||
&self,
|
||||
task_attempt: &TaskAttempt,
|
||||
) -> Result<PathBuf, ContainerError> {
|
||||
let container_ref = self.ensure_container_exists(task_attempt).await?;
|
||||
let worktree_dir = PathBuf::from(&container_ref);
|
||||
|
||||
if !worktree_dir.exists() {
|
||||
return Err(ContainerError::Other(anyhow!(
|
||||
"Worktree directory not found"
|
||||
)));
|
||||
}
|
||||
|
||||
Ok(worktree_dir)
|
||||
}
|
||||
/// Get the project repository path for a task attempt
|
||||
async fn get_project_repo_path(
|
||||
&self,
|
||||
@@ -951,10 +905,10 @@ impl ContainerService for LocalContainerService {
|
||||
None
|
||||
}
|
||||
};
|
||||
WorktreeManager::cleanup_worktree(
|
||||
&PathBuf::from(task_attempt.container_ref.clone().unwrap_or_default()),
|
||||
git_repo_path.as_deref(),
|
||||
)
|
||||
WorktreeManager::cleanup_worktree(&WorktreeCleanup::new(
|
||||
PathBuf::from(task_attempt.container_ref.clone().unwrap_or_default()),
|
||||
git_repo_path,
|
||||
))
|
||||
.await
|
||||
.unwrap_or_else(|e| {
|
||||
tracing::warn!(
|
||||
@@ -1322,23 +1276,6 @@ impl ContainerService for LocalContainerService {
|
||||
}
|
||||
}
|
||||
|
||||
fn truncate_to_char_boundary(content: &str, max_len: usize) -> &str {
|
||||
if content.len() <= max_len {
|
||||
return content;
|
||||
}
|
||||
|
||||
let cutoff = content
|
||||
.char_indices()
|
||||
.map(|(idx, _)| idx)
|
||||
.chain(std::iter::once(content.len()))
|
||||
.take_while(|&idx| idx <= max_len)
|
||||
.last()
|
||||
.unwrap_or(0);
|
||||
|
||||
debug_assert!(content.is_char_boundary(cutoff));
|
||||
&content[..cutoff]
|
||||
}
|
||||
|
||||
fn success_exit_status() -> std::process::ExitStatus {
|
||||
#[cfg(unix)]
|
||||
{
|
||||
@@ -1351,22 +1288,3 @@ fn success_exit_status() -> std::process::ExitStatus {
|
||||
ExitStatusExt::from_raw(0)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_truncate_to_char_boundary() {
|
||||
use super::truncate_to_char_boundary;
|
||||
|
||||
let input = "a".repeat(10);
|
||||
assert_eq!(truncate_to_char_boundary(&input, 7), "a".repeat(7));
|
||||
|
||||
let input = "hello world";
|
||||
assert_eq!(truncate_to_char_boundary(input, input.len()), input);
|
||||
|
||||
let input = "🔥🔥🔥"; // each fire emoji is 4 bytes
|
||||
assert_eq!(truncate_to_char_boundary(input, 5), "🔥");
|
||||
assert_eq!(truncate_to_char_boundary(input, 3), "");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,7 +38,6 @@ pub struct LocalDeployment {
|
||||
user_id: String,
|
||||
db: DBService,
|
||||
analytics: Option<AnalyticsService>,
|
||||
msg_stores: Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>,
|
||||
container: LocalContainerService,
|
||||
git: GitService,
|
||||
image: ImageService,
|
||||
@@ -183,8 +182,8 @@ impl Deployment for LocalDeployment {
|
||||
analytics_ctx,
|
||||
approvals.clone(),
|
||||
share_publisher.clone(),
|
||||
);
|
||||
container.spawn_worktree_cleanup().await;
|
||||
)
|
||||
.await;
|
||||
|
||||
let events = EventService::new(db.clone(), events_msg_store, events_entry_count);
|
||||
|
||||
@@ -196,7 +195,6 @@ impl Deployment for LocalDeployment {
|
||||
user_id,
|
||||
db,
|
||||
analytics,
|
||||
msg_stores,
|
||||
container,
|
||||
git,
|
||||
image,
|
||||
@@ -224,10 +222,6 @@ impl Deployment for LocalDeployment {
|
||||
&self.user_id
|
||||
}
|
||||
|
||||
fn shared_types() -> Vec<String> {
|
||||
vec![]
|
||||
}
|
||||
|
||||
fn config(&self) -> &Arc<RwLock<Config>> {
|
||||
&self.config
|
||||
}
|
||||
@@ -256,10 +250,6 @@ impl Deployment for LocalDeployment {
|
||||
&self.filesystem
|
||||
}
|
||||
|
||||
fn msg_stores(&self) -> &Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>> {
|
||||
&self.msg_stores
|
||||
}
|
||||
|
||||
fn events(&self) -> &EventService {
|
||||
&self.events
|
||||
}
|
||||
|
||||
@@ -47,8 +47,16 @@ async fn main() -> Result<(), VibeKanbanError> {
|
||||
|
||||
let deployment = DeploymentImpl::new().await?;
|
||||
deployment.update_sentry_scope().await?;
|
||||
deployment.cleanup_orphan_executions().await?;
|
||||
deployment.backfill_before_head_commits().await?;
|
||||
deployment
|
||||
.container()
|
||||
.cleanup_orphan_executions()
|
||||
.await
|
||||
.map_err(DeploymentError::from)?;
|
||||
deployment
|
||||
.container()
|
||||
.backfill_before_head_commits()
|
||||
.await
|
||||
.map_err(DeploymentError::from)?;
|
||||
deployment.spawn_pr_monitor_service().await;
|
||||
deployment
|
||||
.track_if_analytics_allowed("session_start", serde_json::json!({}))
|
||||
|
||||
@@ -22,8 +22,9 @@ use executors::profile::ExecutorProfileId;
|
||||
use futures_util::{SinkExt, StreamExt, TryStreamExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use services::services::{
|
||||
container::{ContainerService, WorktreeCleanupData, cleanup_worktrees_direct},
|
||||
container::ContainerService,
|
||||
share::ShareError,
|
||||
worktree_manager::{WorktreeCleanup, WorktreeManager},
|
||||
};
|
||||
use sqlx::Error as SqlxError;
|
||||
use ts_rs::TS;
|
||||
@@ -285,14 +286,13 @@ pub async fn delete_task(
|
||||
.await?
|
||||
.ok_or_else(|| ApiError::Database(SqlxError::RowNotFound))?;
|
||||
|
||||
let cleanup_data: Vec<WorktreeCleanupData> = attempts
|
||||
let cleanup_args: Vec<WorktreeCleanup> = attempts
|
||||
.iter()
|
||||
.filter_map(|attempt| {
|
||||
attempt
|
||||
.container_ref
|
||||
.as_ref()
|
||||
.map(|worktree_path| WorktreeCleanupData {
|
||||
attempt_id: attempt.id,
|
||||
.map(|worktree_path| WorktreeCleanup {
|
||||
worktree_path: PathBuf::from(worktree_path),
|
||||
git_repo_path: Some(project.git_repo_path.clone()),
|
||||
})
|
||||
@@ -355,10 +355,10 @@ pub async fn delete_task(
|
||||
tracing::info!(
|
||||
"Starting background cleanup for task {} ({} worktrees)",
|
||||
task_id,
|
||||
cleanup_data.len()
|
||||
cleanup_args.len()
|
||||
);
|
||||
|
||||
if let Err(e) = cleanup_worktrees_direct(&cleanup_data).await {
|
||||
if let Err(e) = WorktreeManager::batch_cleanup_worktrees(&cleanup_args).await {
|
||||
tracing::error!(
|
||||
"Background worktree cleanup failed for task {}: {}",
|
||||
task_id,
|
||||
|
||||
@@ -22,13 +22,12 @@ use db::{
|
||||
use executors::{
|
||||
actions::{
|
||||
ExecutorAction, ExecutorActionType,
|
||||
coding_agent_follow_up::CodingAgentFollowUpRequest,
|
||||
coding_agent_initial::CodingAgentInitialRequest,
|
||||
script::{ScriptContext, ScriptRequest, ScriptRequestLanguage},
|
||||
},
|
||||
executors::{ExecutorError, StandardCodingAgentExecutor},
|
||||
logs::{NormalizedEntry, NormalizedEntryError, NormalizedEntryType, utils::ConversationPatch},
|
||||
profile::{ExecutorConfigs, ExecutorProfileId, to_default_variant},
|
||||
profile::{ExecutorConfigs, ExecutorProfileId},
|
||||
};
|
||||
use futures::{StreamExt, future};
|
||||
use sqlx::Error as SqlxError;
|
||||
@@ -42,47 +41,15 @@ use utils::{
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::services::{
|
||||
config::Config,
|
||||
git::{GitService, GitServiceError},
|
||||
image::ImageService,
|
||||
notification::NotificationService,
|
||||
share::SharePublisher,
|
||||
worktree_manager::{WorktreeError, WorktreeManager},
|
||||
worktree_manager::WorktreeError,
|
||||
};
|
||||
pub type ContainerRef = String;
|
||||
|
||||
/// Data needed for background worktree cleanup (doesn't require DB access)
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WorktreeCleanupData {
|
||||
pub attempt_id: Uuid,
|
||||
pub worktree_path: PathBuf,
|
||||
pub git_repo_path: Option<PathBuf>,
|
||||
}
|
||||
|
||||
/// Cleanup worktrees without requiring database access
|
||||
pub async fn cleanup_worktrees_direct(data: &[WorktreeCleanupData]) -> Result<(), ContainerError> {
|
||||
for cleanup_data in data {
|
||||
tracing::debug!(
|
||||
"Cleaning up worktree for attempt {}: {:?}",
|
||||
cleanup_data.attempt_id,
|
||||
cleanup_data.worktree_path
|
||||
);
|
||||
|
||||
if let Err(e) = WorktreeManager::cleanup_worktree(
|
||||
&cleanup_data.worktree_path,
|
||||
cleanup_data.git_repo_path.as_deref(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
"Failed to cleanup worktree for task attempt {}: {}",
|
||||
cleanup_data.attempt_id,
|
||||
e
|
||||
);
|
||||
// Continue with other cleanups even if one fails
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ContainerError {
|
||||
#[error(transparent)]
|
||||
@@ -154,6 +121,172 @@ pub trait ContainerService {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// A context is finalized when
|
||||
/// - The next action is None (no follow-up actions)
|
||||
/// - The run reason is not DevServer
|
||||
fn should_finalize(&self, ctx: &ExecutionContext) -> bool {
|
||||
ctx.execution_process
|
||||
.executor_action()
|
||||
.unwrap()
|
||||
.next_action
|
||||
.is_none()
|
||||
&& (!matches!(
|
||||
ctx.execution_process.run_reason,
|
||||
ExecutionProcessRunReason::DevServer
|
||||
))
|
||||
}
|
||||
|
||||
/// Finalize task execution by updating status to InReview and sending notifications
|
||||
async fn finalize_task(
|
||||
&self,
|
||||
config: &Arc<RwLock<Config>>,
|
||||
share_publisher: Option<&SharePublisher>,
|
||||
ctx: &ExecutionContext,
|
||||
) {
|
||||
match Task::update_status(&self.db().pool, ctx.task.id, TaskStatus::InReview).await {
|
||||
Ok(_) => {
|
||||
if let Some(publisher) = share_publisher
|
||||
&& let Err(err) = publisher.update_shared_task_by_id(ctx.task.id).await
|
||||
{
|
||||
tracing::warn!(
|
||||
?err,
|
||||
"Failed to propagate shared task update for {}",
|
||||
ctx.task.id
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to update task status to InReview: {e}");
|
||||
}
|
||||
}
|
||||
let notify_cfg = config.read().await.notifications.clone();
|
||||
NotificationService::notify_execution_halted(notify_cfg, ctx).await;
|
||||
}
|
||||
|
||||
/// Cleanup executions marked as running in the db, call at startup
|
||||
async fn cleanup_orphan_executions(&self) -> Result<(), ContainerError> {
|
||||
let running_processes = ExecutionProcess::find_running(&self.db().pool).await?;
|
||||
for process in running_processes {
|
||||
tracing::info!(
|
||||
"Found orphaned execution process {} for task attempt {}",
|
||||
process.id,
|
||||
process.task_attempt_id
|
||||
);
|
||||
// Update the execution process status first
|
||||
if let Err(e) = ExecutionProcess::update_completion(
|
||||
&self.db().pool,
|
||||
process.id,
|
||||
ExecutionProcessStatus::Failed,
|
||||
None, // No exit code for orphaned processes
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
"Failed to update orphaned execution process {} status: {}",
|
||||
process.id,
|
||||
e
|
||||
);
|
||||
continue;
|
||||
}
|
||||
// Capture after-head commit OID (best-effort)
|
||||
if let Ok(Some(task_attempt)) =
|
||||
TaskAttempt::find_by_id(&self.db().pool, process.task_attempt_id).await
|
||||
&& let Some(container_ref) = task_attempt.container_ref
|
||||
{
|
||||
let wt = std::path::PathBuf::from(container_ref);
|
||||
if let Ok(head) = self.git().get_head_info(&wt) {
|
||||
let _ = ExecutionProcess::update_after_head_commit(
|
||||
&self.db().pool,
|
||||
process.id,
|
||||
&head.oid,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
// Process marked as failed
|
||||
tracing::info!("Marked orphaned execution process {} as failed", process.id);
|
||||
// Update task status to InReview for coding agent and setup script failures
|
||||
if matches!(
|
||||
process.run_reason,
|
||||
ExecutionProcessRunReason::CodingAgent
|
||||
| ExecutionProcessRunReason::SetupScript
|
||||
| ExecutionProcessRunReason::CleanupScript
|
||||
) && let Ok(Some(task_attempt)) =
|
||||
TaskAttempt::find_by_id(&self.db().pool, process.task_attempt_id).await
|
||||
&& let Ok(Some(task)) = task_attempt.parent_task(&self.db().pool).await
|
||||
{
|
||||
match Task::update_status(&self.db().pool, task.id, TaskStatus::InReview).await {
|
||||
Ok(_) => {
|
||||
if let Some(publisher) = self.share_publisher()
|
||||
&& let Err(err) = publisher.update_shared_task_by_id(task.id).await
|
||||
{
|
||||
tracing::warn!(
|
||||
?err,
|
||||
"Failed to propagate shared task update for {}",
|
||||
task.id
|
||||
);
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to update task status to InReview for orphaned attempt: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Backfill before_head_commit for legacy execution processes.
|
||||
/// Rules:
|
||||
/// - If a process has after_head_commit and missing before_head_commit,
|
||||
/// then set before_head_commit to the previous process's after_head_commit.
|
||||
/// - If there is no previous process, set before_head_commit to the base branch commit.
|
||||
async fn backfill_before_head_commits(&self) -> Result<(), ContainerError> {
|
||||
let pool = &self.db().pool;
|
||||
let rows = ExecutionProcess::list_missing_before_context(pool).await?;
|
||||
for row in rows {
|
||||
// Skip if no after commit at all (shouldn't happen due to WHERE)
|
||||
// Prefer previous process after-commit if present
|
||||
let mut before = row.prev_after_head_commit.clone();
|
||||
|
||||
// Fallback to base branch commit OID
|
||||
if before.is_none() {
|
||||
let repo_path =
|
||||
std::path::Path::new(row.git_repo_path.as_deref().unwrap_or_default());
|
||||
match self
|
||||
.git()
|
||||
.get_branch_oid(repo_path, row.target_branch.as_str())
|
||||
{
|
||||
Ok(oid) => before = Some(oid),
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Backfill: Failed to resolve base branch OID for attempt {} (branch {}): {}",
|
||||
row.task_attempt_id,
|
||||
row.target_branch,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(before_oid) = before
|
||||
&& let Err(e) =
|
||||
ExecutionProcess::update_before_head_commit(pool, row.id, &before_oid).await
|
||||
{
|
||||
tracing::warn!(
|
||||
"Backfill: Failed to update before_head_commit for process {}: {}",
|
||||
row.id,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cleanup_action(&self, cleanup_script: Option<String>) -> Option<Box<ExecutorAction>> {
|
||||
cleanup_script.map(|script| {
|
||||
Box::new(ExecutorAction::new(
|
||||
@@ -772,63 +905,4 @@ pub trait ContainerService {
|
||||
tracing::debug!("Started next action: {:?}", next_action);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn exit_plan_mode_tool(&self, ctx: ExecutionContext) -> Result<(), ContainerError> {
|
||||
let execution_id = ctx.execution_process.id;
|
||||
|
||||
if let Err(err) = self
|
||||
.stop_execution(&ctx.execution_process, ExecutionProcessStatus::Completed)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to stop execution process {}: {}", execution_id, err);
|
||||
return Err(err);
|
||||
}
|
||||
|
||||
let action = ctx.execution_process.executor_action()?;
|
||||
let executor_profile_id = match action.typ() {
|
||||
ExecutorActionType::CodingAgentInitialRequest(req) => req.executor_profile_id.clone(),
|
||||
ExecutorActionType::CodingAgentFollowUpRequest(req) => req.executor_profile_id.clone(),
|
||||
_ => {
|
||||
return Err(ContainerError::Other(anyhow::anyhow!(
|
||||
"exit plan mode tool called on non-coding agent action"
|
||||
)));
|
||||
}
|
||||
};
|
||||
let cleanup_chain = action.next_action().cloned();
|
||||
|
||||
let session_id =
|
||||
ExecutorSession::find_by_execution_process_id(&self.db().pool, execution_id)
|
||||
.await?
|
||||
.and_then(|s| s.session_id);
|
||||
|
||||
if session_id.is_none() {
|
||||
tracing::warn!(
|
||||
"No executor session found for execution process {}",
|
||||
execution_id
|
||||
);
|
||||
return Err(ContainerError::Other(anyhow::anyhow!(
|
||||
"No executor session found"
|
||||
)));
|
||||
}
|
||||
|
||||
let default_profile = to_default_variant(&executor_profile_id);
|
||||
let follow_up = CodingAgentFollowUpRequest {
|
||||
prompt: String::from("The plan has been approved, please execute it."),
|
||||
session_id: session_id.unwrap(),
|
||||
executor_profile_id: default_profile,
|
||||
};
|
||||
let action = ExecutorAction::new(
|
||||
ExecutorActionType::CodingAgentFollowUpRequest(follow_up),
|
||||
cleanup_chain.map(Box::new),
|
||||
);
|
||||
|
||||
let _ = self
|
||||
.start_execution(
|
||||
&ctx.task_attempt,
|
||||
&action,
|
||||
&ExecutionProcessRunReason::CodingAgent,
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,21 @@ lazy_static::lazy_static! {
|
||||
Arc::new(Mutex::new(HashMap::new()));
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WorktreeCleanup {
|
||||
pub worktree_path: PathBuf,
|
||||
pub git_repo_path: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl WorktreeCleanup {
|
||||
pub fn new(worktree_path: PathBuf, git_repo_path: Option<PathBuf>) -> Self {
|
||||
Self {
|
||||
worktree_path,
|
||||
git_repo_path,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum WorktreeError {
|
||||
#[error(transparent)]
|
||||
@@ -377,13 +392,22 @@ impl WorktreeManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clean up multiple worktrees
|
||||
pub async fn batch_cleanup_worktrees(data: &[WorktreeCleanup]) -> Result<(), WorktreeError> {
|
||||
for cleanup_data in data {
|
||||
tracing::debug!("Cleaning up worktree: {:?}", cleanup_data.worktree_path);
|
||||
|
||||
if let Err(e) = Self::cleanup_worktree(cleanup_data).await {
|
||||
tracing::error!("Failed to cleanup worktree: {}", e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clean up a worktree path and its git metadata (non-blocking)
|
||||
/// If git_repo_path is None, attempts to infer it from the worktree itself
|
||||
pub async fn cleanup_worktree(
|
||||
worktree_path: &Path,
|
||||
git_repo_path: Option<&Path>,
|
||||
) -> Result<(), WorktreeError> {
|
||||
let path_str = worktree_path.to_string_lossy().to_string();
|
||||
pub async fn cleanup_worktree(worktree: &WorktreeCleanup) -> Result<(), WorktreeError> {
|
||||
let path_str = worktree.worktree_path.to_string_lossy().to_string();
|
||||
|
||||
// Get the same lock to ensure we don't interfere with creation
|
||||
let lock = {
|
||||
@@ -396,18 +420,18 @@ impl WorktreeManager {
|
||||
|
||||
let _guard = lock.lock().await;
|
||||
|
||||
if let Some(worktree_name) = worktree_path.file_name().and_then(|n| n.to_str()) {
|
||||
if let Some(worktree_name) = worktree.worktree_path.file_name().and_then(|n| n.to_str()) {
|
||||
// Try to determine the git repo path if not provided
|
||||
let resolved_repo_path = if let Some(repo_path) = git_repo_path {
|
||||
let resolved_repo_path = if let Some(repo_path) = &worktree.git_repo_path {
|
||||
Some(repo_path.to_path_buf())
|
||||
} else {
|
||||
Self::infer_git_repo_path(worktree_path).await
|
||||
Self::infer_git_repo_path(&worktree.worktree_path).await
|
||||
};
|
||||
|
||||
if let Some(repo_path) = resolved_repo_path {
|
||||
Self::comprehensive_worktree_cleanup_async(
|
||||
&repo_path,
|
||||
worktree_path,
|
||||
&worktree.worktree_path,
|
||||
worktree_name,
|
||||
)
|
||||
.await?;
|
||||
@@ -417,7 +441,7 @@ impl WorktreeManager {
|
||||
"Cannot determine git repo path for worktree {}, performing simple cleanup",
|
||||
path_str
|
||||
);
|
||||
Self::simple_worktree_cleanup(worktree_path).await?;
|
||||
Self::simple_worktree_cleanup(&worktree.worktree_path).await?;
|
||||
}
|
||||
} else {
|
||||
return Err(WorktreeError::InvalidPath(
|
||||
|
||||
@@ -22,3 +22,39 @@ pub fn short_uuid(u: &Uuid) -> String {
|
||||
let full = u.simple().to_string();
|
||||
full.chars().take(4).collect() // grab the first 4 chars
|
||||
}
|
||||
|
||||
pub fn truncate_to_char_boundary(content: &str, max_len: usize) -> &str {
|
||||
if content.len() <= max_len {
|
||||
return content;
|
||||
}
|
||||
|
||||
let cutoff = content
|
||||
.char_indices()
|
||||
.map(|(idx, _)| idx)
|
||||
.chain(std::iter::once(content.len()))
|
||||
.take_while(|&idx| idx <= max_len)
|
||||
.last()
|
||||
.unwrap_or(0);
|
||||
|
||||
debug_assert!(content.is_char_boundary(cutoff));
|
||||
&content[..cutoff]
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_truncate_to_char_boundary() {
|
||||
use super::truncate_to_char_boundary;
|
||||
|
||||
let input = "a".repeat(10);
|
||||
assert_eq!(truncate_to_char_boundary(&input, 7), "a".repeat(7));
|
||||
|
||||
let input = "hello world";
|
||||
assert_eq!(truncate_to_char_boundary(input, input.len()), input);
|
||||
|
||||
let input = "🔥🔥🔥"; // each fire emoji is 4 bytes
|
||||
assert_eq!(truncate_to_char_boundary(input, 5), "🔥");
|
||||
assert_eq!(truncate_to_char_boundary(input, 3), "");
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user