* All changes have been completed. Here's a summary of what was done: ## Summary Changed both scratches and queued messages to use session IDs instead of workspace IDs: ### Files Modified 1. **`frontend/src/components/tasks/TaskFollowUpSection.tsx`** - Changed `useScratch(ScratchType.DRAFT_FOLLOW_UP, workspaceId ?? '')` → `useScratch(ScratchType.DRAFT_FOLLOW_UP, sessionId ?? '')` 2. **`crates/server/src/routes/sessions/queue.rs`** - Changed `session.workspace_id` → `session.id` in all three queue operations (queue, cancel, get status) 3. **`crates/services/src/services/queued_message.rs`** - Renamed `task_attempt_id` → `session_id` in `QueuedMessage` struct - Updated all method parameters and comments 4. **`crates/local-deployment/src/container.rs`** - Changed `ctx.workspace.id` → `ctx.session.id` for queue lookups and scratch deletion - Updated log messages to reference "session" instead of "workspace" 5. **`shared/types.ts`** (auto-generated) - `QueuedMessage.task_attempt_id` → `QueuedMessage.session_id` * Cleanup script changes for task attempt 4df90bcd-f261-41ca-bac3-8f7c2fc576c5 * Build passes. The fix has been applied. Found and fixed one missed instance in `sessions/mod.rs` where `Scratch::delete` was using `workspace.id` instead of `session.id`.
1373 lines
51 KiB
Rust
1373 lines
51 KiB
Rust
use std::{
|
|
collections::HashMap,
|
|
io,
|
|
path::{Path, PathBuf},
|
|
sync::Arc,
|
|
time::Duration,
|
|
};
|
|
|
|
use anyhow::anyhow;
|
|
use async_trait::async_trait;
|
|
use command_group::AsyncGroupChild;
|
|
use db::{
|
|
DBService,
|
|
models::{
|
|
coding_agent_turn::CodingAgentTurn,
|
|
execution_process::{
|
|
ExecutionContext, ExecutionProcess, ExecutionProcessRunReason, ExecutionProcessStatus,
|
|
},
|
|
execution_process_repo_state::ExecutionProcessRepoState,
|
|
project_repo::ProjectRepo,
|
|
repo::Repo,
|
|
scratch::{DraftFollowUpData, Scratch, ScratchType},
|
|
task::{Task, TaskStatus},
|
|
workspace::Workspace,
|
|
workspace_repo::WorkspaceRepo,
|
|
},
|
|
};
|
|
use deployment::{DeploymentError, RemoteClientNotConfigured};
|
|
use executors::{
|
|
actions::{
|
|
Executable, ExecutorAction, ExecutorActionType,
|
|
coding_agent_follow_up::CodingAgentFollowUpRequest,
|
|
coding_agent_initial::CodingAgentInitialRequest,
|
|
},
|
|
approvals::{ExecutorApprovalService, NoopExecutorApprovalService},
|
|
env::ExecutionEnv,
|
|
executors::{BaseCodingAgent, ExecutorExitResult, ExecutorExitSignal, InterruptSender},
|
|
logs::{NormalizedEntryType, utils::patch::extract_normalized_entry_from_patch},
|
|
profile::ExecutorProfileId,
|
|
};
|
|
use futures::{FutureExt, TryStreamExt, stream::select};
|
|
use serde_json::json;
|
|
use services::services::{
|
|
analytics::AnalyticsContext,
|
|
approvals::{Approvals, executor_approvals::ExecutorApprovalBridge},
|
|
config::Config,
|
|
container::{ContainerError, ContainerRef, ContainerService},
|
|
diff_stream::{self, DiffStreamHandle},
|
|
git::{Commit, GitCli, GitService},
|
|
image::ImageService,
|
|
notification::NotificationService,
|
|
queued_message::QueuedMessageService,
|
|
share::SharePublisher,
|
|
workspace_manager::{RepoWorkspaceInput, WorkspaceManager},
|
|
};
|
|
use tokio::{sync::RwLock, task::JoinHandle};
|
|
use tokio_util::io::ReaderStream;
|
|
use utils::{
|
|
log_msg::LogMsg,
|
|
msg_store::MsgStore,
|
|
text::{git_branch_id, short_uuid, truncate_to_char_boundary},
|
|
};
|
|
use uuid::Uuid;
|
|
|
|
use crate::{command, copy};
|
|
|
|
#[derive(Clone)]
|
|
pub struct LocalContainerService {
|
|
db: DBService,
|
|
child_store: Arc<RwLock<HashMap<Uuid, Arc<RwLock<AsyncGroupChild>>>>>,
|
|
interrupt_senders: Arc<RwLock<HashMap<Uuid, InterruptSender>>>,
|
|
msg_stores: Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>,
|
|
config: Arc<RwLock<Config>>,
|
|
git: GitService,
|
|
image_service: ImageService,
|
|
analytics: Option<AnalyticsContext>,
|
|
approvals: Approvals,
|
|
queued_message_service: QueuedMessageService,
|
|
publisher: Result<SharePublisher, RemoteClientNotConfigured>,
|
|
notification_service: NotificationService,
|
|
}
|
|
|
|
impl LocalContainerService {
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub async fn new(
|
|
db: DBService,
|
|
msg_stores: Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>,
|
|
config: Arc<RwLock<Config>>,
|
|
git: GitService,
|
|
image_service: ImageService,
|
|
analytics: Option<AnalyticsContext>,
|
|
approvals: Approvals,
|
|
queued_message_service: QueuedMessageService,
|
|
publisher: Result<SharePublisher, RemoteClientNotConfigured>,
|
|
) -> Self {
|
|
let child_store = Arc::new(RwLock::new(HashMap::new()));
|
|
let interrupt_senders = Arc::new(RwLock::new(HashMap::new()));
|
|
let notification_service = NotificationService::new(config.clone());
|
|
|
|
let container = LocalContainerService {
|
|
db,
|
|
child_store,
|
|
interrupt_senders,
|
|
msg_stores,
|
|
config,
|
|
git,
|
|
image_service,
|
|
analytics,
|
|
approvals,
|
|
queued_message_service,
|
|
publisher,
|
|
notification_service,
|
|
};
|
|
|
|
container.spawn_workspace_cleanup().await;
|
|
|
|
container
|
|
}
|
|
|
|
pub async fn get_child_from_store(&self, id: &Uuid) -> Option<Arc<RwLock<AsyncGroupChild>>> {
|
|
let map = self.child_store.read().await;
|
|
map.get(id).cloned()
|
|
}
|
|
|
|
pub async fn add_child_to_store(&self, id: Uuid, exec: AsyncGroupChild) {
|
|
let mut map = self.child_store.write().await;
|
|
map.insert(id, Arc::new(RwLock::new(exec)));
|
|
}
|
|
|
|
pub async fn remove_child_from_store(&self, id: &Uuid) {
|
|
let mut map = self.child_store.write().await;
|
|
map.remove(id);
|
|
}
|
|
|
|
async fn add_interrupt_sender(&self, id: Uuid, sender: InterruptSender) {
|
|
let mut map = self.interrupt_senders.write().await;
|
|
map.insert(id, sender);
|
|
}
|
|
|
|
async fn take_interrupt_sender(&self, id: &Uuid) -> Option<InterruptSender> {
|
|
let mut map = self.interrupt_senders.write().await;
|
|
map.remove(id)
|
|
}
|
|
|
|
pub async fn cleanup_workspace(db: &DBService, workspace: &Workspace) {
|
|
let Some(container_ref) = &workspace.container_ref else {
|
|
return;
|
|
};
|
|
let workspace_dir = PathBuf::from(container_ref);
|
|
|
|
let repositories = WorkspaceRepo::find_repos_for_workspace(&db.pool, workspace.id)
|
|
.await
|
|
.unwrap_or_default();
|
|
|
|
if repositories.is_empty() {
|
|
tracing::warn!(
|
|
"No repositories found for workspace {}, cleaning up workspace directory only",
|
|
workspace.id
|
|
);
|
|
if workspace_dir.exists()
|
|
&& let Err(e) = tokio::fs::remove_dir_all(&workspace_dir).await
|
|
{
|
|
tracing::warn!("Failed to remove workspace directory: {}", e);
|
|
}
|
|
} else {
|
|
WorkspaceManager::cleanup_workspace(&workspace_dir, &repositories)
|
|
.await
|
|
.unwrap_or_else(|e| {
|
|
tracing::warn!(
|
|
"Failed to clean up workspace for workspace {}: {}",
|
|
workspace.id,
|
|
e
|
|
);
|
|
});
|
|
}
|
|
|
|
// Clear container_ref so this workspace won't be picked up again
|
|
let _ = Workspace::clear_container_ref(&db.pool, workspace.id).await;
|
|
}
|
|
|
|
pub async fn cleanup_expired_workspaces(db: &DBService) -> Result<(), DeploymentError> {
|
|
let expired_workspaces = Workspace::find_expired_for_cleanup(&db.pool).await?;
|
|
if expired_workspaces.is_empty() {
|
|
tracing::debug!("No expired workspaces found");
|
|
return Ok(());
|
|
}
|
|
tracing::info!(
|
|
"Found {} expired workspaces to clean up",
|
|
expired_workspaces.len()
|
|
);
|
|
for workspace in &expired_workspaces {
|
|
Self::cleanup_workspace(db, workspace).await;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn spawn_workspace_cleanup(&self) {
|
|
let db = self.db.clone();
|
|
let mut cleanup_interval = tokio::time::interval(tokio::time::Duration::from_secs(1800)); // 30 minutes
|
|
WorkspaceManager::cleanup_orphan_workspaces(&self.db.pool).await;
|
|
tokio::spawn(async move {
|
|
loop {
|
|
cleanup_interval.tick().await;
|
|
tracing::info!("Starting periodic workspace cleanup...");
|
|
Self::cleanup_expired_workspaces(&db)
|
|
.await
|
|
.unwrap_or_else(|e| {
|
|
tracing::error!("Failed to clean up expired workspaces: {}", e)
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Record the current HEAD commit for each repository as the "after" state.
|
|
/// Errors are silently ignored since this runs after the main execution completes
|
|
/// and failure should not block process finalization.
|
|
async fn update_after_head_commits(&self, exec_id: Uuid) {
|
|
if let Ok(ctx) = ExecutionProcess::load_context(&self.db.pool, exec_id).await {
|
|
let workspace_root = self.workspace_to_current_dir(&ctx.workspace);
|
|
for repo in &ctx.repos {
|
|
let repo_path = workspace_root.join(&repo.name);
|
|
if let Ok(head) = self.git().get_head_info(&repo_path) {
|
|
let _ = ExecutionProcessRepoState::update_after_head_commit(
|
|
&self.db.pool,
|
|
exec_id,
|
|
repo.id,
|
|
&head.oid,
|
|
)
|
|
.await;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Get the commit message based on the execution run reason.
|
|
async fn get_commit_message(&self, ctx: &ExecutionContext) -> String {
|
|
match ctx.execution_process.run_reason {
|
|
ExecutionProcessRunReason::CodingAgent => {
|
|
// Try to retrieve the task summary from the coding agent turn
|
|
// otherwise fallback to default message
|
|
match CodingAgentTurn::find_by_execution_process_id(
|
|
&self.db().pool,
|
|
ctx.execution_process.id,
|
|
)
|
|
.await
|
|
{
|
|
Ok(Some(turn)) if turn.summary.is_some() => turn.summary.unwrap(),
|
|
Ok(_) => {
|
|
tracing::debug!(
|
|
"No summary found for execution process {}, using default message",
|
|
ctx.execution_process.id
|
|
);
|
|
format!(
|
|
"Commit changes from coding agent for workspace {}",
|
|
ctx.workspace.id
|
|
)
|
|
}
|
|
Err(e) => {
|
|
tracing::debug!(
|
|
"Failed to retrieve summary for execution process {}: {}",
|
|
ctx.execution_process.id,
|
|
e
|
|
);
|
|
format!(
|
|
"Commit changes from coding agent for workspace {}",
|
|
ctx.workspace.id
|
|
)
|
|
}
|
|
}
|
|
}
|
|
ExecutionProcessRunReason::CleanupScript => {
|
|
format!("Cleanup script changes for workspace {}", ctx.workspace.id)
|
|
}
|
|
_ => format!(
|
|
"Changes from execution process {}",
|
|
ctx.execution_process.id
|
|
),
|
|
}
|
|
}
|
|
|
|
/// Check which repos have uncommitted changes. Fails if any repo is inaccessible.
|
|
fn check_repos_for_changes(
|
|
&self,
|
|
workspace_root: &Path,
|
|
repos: &[Repo],
|
|
) -> Result<Vec<(Repo, PathBuf)>, ContainerError> {
|
|
let git = GitCli::new();
|
|
let mut repos_with_changes = Vec::new();
|
|
|
|
for repo in repos {
|
|
let worktree_path = workspace_root.join(&repo.name);
|
|
|
|
match git.has_changes(&worktree_path) {
|
|
Ok(true) => {
|
|
repos_with_changes.push((repo.clone(), worktree_path));
|
|
}
|
|
Ok(false) => {
|
|
tracing::debug!("No changes in repo '{}'", repo.name);
|
|
}
|
|
Err(e) => {
|
|
return Err(ContainerError::Other(anyhow!(
|
|
"Pre-flight check failed for repo '{}': {}",
|
|
repo.name,
|
|
e
|
|
)));
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(repos_with_changes)
|
|
}
|
|
|
|
/// Commit changes to each repo. Logs failures but continues with other repos.
|
|
fn commit_repos(&self, repos_with_changes: Vec<(Repo, PathBuf)>, message: &str) -> bool {
|
|
let mut any_committed = false;
|
|
|
|
for (repo, worktree_path) in repos_with_changes {
|
|
tracing::debug!(
|
|
"Committing changes for repo '{}' at {:?}",
|
|
repo.name,
|
|
&worktree_path
|
|
);
|
|
|
|
match self.git().commit(&worktree_path, message) {
|
|
Ok(true) => {
|
|
any_committed = true;
|
|
tracing::info!("Committed changes in repo '{}'", repo.name);
|
|
}
|
|
Ok(false) => {
|
|
tracing::warn!("No changes committed in repo '{}' (unexpected)", repo.name);
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!("Failed to commit in repo '{}': {}", repo.name, e);
|
|
}
|
|
}
|
|
}
|
|
|
|
any_committed
|
|
}
|
|
|
|
/// Spawn a background task that polls the child process for completion and
|
|
/// cleans up the execution entry when it exits.
|
|
pub fn spawn_exit_monitor(
|
|
&self,
|
|
exec_id: &Uuid,
|
|
exit_signal: Option<ExecutorExitSignal>,
|
|
) -> JoinHandle<()> {
|
|
let exec_id = *exec_id;
|
|
let child_store = self.child_store.clone();
|
|
let msg_stores = self.msg_stores.clone();
|
|
let db = self.db.clone();
|
|
let config = self.config.clone();
|
|
let container = self.clone();
|
|
let analytics = self.analytics.clone();
|
|
let publisher = self.publisher.clone();
|
|
|
|
let mut process_exit_rx = self.spawn_os_exit_watcher(exec_id);
|
|
|
|
tokio::spawn(async move {
|
|
let mut exit_signal_future = exit_signal
|
|
.map(|rx| rx.boxed()) // wait for result
|
|
.unwrap_or_else(|| std::future::pending().boxed()); // no signal, stall forever
|
|
|
|
let status_result: std::io::Result<std::process::ExitStatus>;
|
|
|
|
// Wait for process to exit, or exit signal from executor
|
|
tokio::select! {
|
|
// Exit signal with result.
|
|
// Some coding agent processes do not automatically exit after processing the user request; instead the executor
|
|
// signals when processing has finished to gracefully kill the process.
|
|
exit_result = &mut exit_signal_future => {
|
|
// Executor signaled completion: kill group and use the provided result
|
|
if let Some(child_lock) = child_store.read().await.get(&exec_id).cloned() {
|
|
let mut child = child_lock.write().await ;
|
|
if let Err(err) = command::kill_process_group(&mut child).await {
|
|
tracing::error!("Failed to kill process group after exit signal: {} {}", exec_id, err);
|
|
}
|
|
}
|
|
|
|
// Map the exit result to appropriate exit status
|
|
status_result = match exit_result {
|
|
Ok(ExecutorExitResult::Success) => Ok(success_exit_status()),
|
|
Ok(ExecutorExitResult::Failure) => Ok(failure_exit_status()),
|
|
Err(_) => Ok(success_exit_status()), // Channel closed, assume success
|
|
};
|
|
}
|
|
// Process exit
|
|
exit_status_result = &mut process_exit_rx => {
|
|
status_result = exit_status_result.unwrap_or_else(|e| Err(std::io::Error::other(e)));
|
|
}
|
|
}
|
|
|
|
let (exit_code, status) = match status_result {
|
|
Ok(exit_status) => {
|
|
let code = exit_status.code().unwrap_or(-1) as i64;
|
|
let status = if exit_status.success() {
|
|
ExecutionProcessStatus::Completed
|
|
} else {
|
|
ExecutionProcessStatus::Failed
|
|
};
|
|
(Some(code), status)
|
|
}
|
|
Err(_) => (None, ExecutionProcessStatus::Failed),
|
|
};
|
|
|
|
if !ExecutionProcess::was_stopped(&db.pool, exec_id).await
|
|
&& let Err(e) =
|
|
ExecutionProcess::update_completion(&db.pool, exec_id, status, exit_code).await
|
|
{
|
|
tracing::error!("Failed to update execution process completion: {}", e);
|
|
}
|
|
|
|
if let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await {
|
|
// Update executor session summary if available
|
|
if let Err(e) = container.update_executor_session_summary(&exec_id).await {
|
|
tracing::warn!("Failed to update executor session summary: {}", e);
|
|
}
|
|
|
|
let success = matches!(
|
|
ctx.execution_process.status,
|
|
ExecutionProcessStatus::Completed
|
|
) && exit_code == Some(0);
|
|
|
|
let cleanup_done = matches!(
|
|
ctx.execution_process.run_reason,
|
|
ExecutionProcessRunReason::CleanupScript
|
|
) && !matches!(
|
|
ctx.execution_process.status,
|
|
ExecutionProcessStatus::Running
|
|
);
|
|
|
|
if success || cleanup_done {
|
|
// Commit changes (if any) and get feedback about whether changes were made
|
|
let changes_committed = match container.try_commit_changes(&ctx).await {
|
|
Ok(committed) => committed,
|
|
Err(e) => {
|
|
tracing::error!("Failed to commit changes after execution: {}", e);
|
|
// Treat commit failures as if changes were made to be safe
|
|
true
|
|
}
|
|
};
|
|
|
|
let should_start_next = if matches!(
|
|
ctx.execution_process.run_reason,
|
|
ExecutionProcessRunReason::CodingAgent
|
|
) {
|
|
changes_committed
|
|
} else {
|
|
true
|
|
};
|
|
|
|
if should_start_next {
|
|
// If the process exited successfully, start the next action
|
|
if let Err(e) = container.try_start_next_action(&ctx).await {
|
|
tracing::error!("Failed to start next action after completion: {}", e);
|
|
}
|
|
} else {
|
|
tracing::info!(
|
|
"Skipping cleanup script for workspace {} - no changes made by coding agent",
|
|
ctx.workspace.id
|
|
);
|
|
|
|
// Manually finalize task since we're bypassing normal execution flow
|
|
container.finalize_task(publisher.as_ref().ok(), &ctx).await;
|
|
}
|
|
}
|
|
|
|
if container.should_finalize(&ctx) {
|
|
// Only execute queued messages if the execution succeeded
|
|
// If it failed or was killed, just clear the queue and finalize
|
|
let should_execute_queued = !matches!(
|
|
ctx.execution_process.status,
|
|
ExecutionProcessStatus::Failed | ExecutionProcessStatus::Killed
|
|
);
|
|
|
|
if let Some(queued_msg) =
|
|
container.queued_message_service.take_queued(ctx.session.id)
|
|
{
|
|
if should_execute_queued {
|
|
tracing::info!(
|
|
"Found queued message for session {}, starting follow-up execution",
|
|
ctx.session.id
|
|
);
|
|
|
|
// Delete the scratch since we're consuming the queued message
|
|
if let Err(e) = Scratch::delete(
|
|
&db.pool,
|
|
ctx.session.id,
|
|
&ScratchType::DraftFollowUp,
|
|
)
|
|
.await
|
|
{
|
|
tracing::warn!(
|
|
"Failed to delete scratch after consuming queued message: {}",
|
|
e
|
|
);
|
|
}
|
|
|
|
// Execute the queued follow-up
|
|
if let Err(e) = container
|
|
.start_queued_follow_up(&ctx, &queued_msg.data)
|
|
.await
|
|
{
|
|
tracing::error!("Failed to start queued follow-up: {}", e);
|
|
// Fall back to finalization if follow-up fails
|
|
container.finalize_task(publisher.as_ref().ok(), &ctx).await;
|
|
}
|
|
} else {
|
|
// Execution failed or was killed - discard the queued message and finalize
|
|
tracing::info!(
|
|
"Discarding queued message for session {} due to execution status {:?}",
|
|
ctx.session.id,
|
|
ctx.execution_process.status
|
|
);
|
|
container.finalize_task(publisher.as_ref().ok(), &ctx).await;
|
|
}
|
|
} else {
|
|
container.finalize_task(publisher.as_ref().ok(), &ctx).await;
|
|
}
|
|
}
|
|
|
|
// Fire analytics event when CodingAgent execution has finished
|
|
if config.read().await.analytics_enabled
|
|
&& matches!(
|
|
&ctx.execution_process.run_reason,
|
|
ExecutionProcessRunReason::CodingAgent
|
|
)
|
|
&& let Some(analytics) = &analytics
|
|
{
|
|
analytics.analytics_service.track_event(&analytics.user_id, "task_attempt_finished", Some(json!({
|
|
"task_id": ctx.task.id.to_string(),
|
|
"project_id": ctx.task.project_id.to_string(),
|
|
"workspace_id": ctx.workspace.id.to_string(),
|
|
"session_id": ctx.session.id.to_string(),
|
|
"execution_success": matches!(ctx.execution_process.status, ExecutionProcessStatus::Completed),
|
|
"exit_code": ctx.execution_process.exit_code,
|
|
})));
|
|
}
|
|
}
|
|
|
|
// Now that commit/next-action/finalization steps for this process are complete,
|
|
// capture the HEAD OID as the definitive "after" state (best-effort).
|
|
container.update_after_head_commits(exec_id).await;
|
|
|
|
// Cleanup msg store
|
|
if let Some(msg_arc) = msg_stores.write().await.remove(&exec_id) {
|
|
msg_arc.push_finished();
|
|
tokio::time::sleep(Duration::from_millis(50)).await; // Wait for the finish message to propogate
|
|
match Arc::try_unwrap(msg_arc) {
|
|
Ok(inner) => drop(inner),
|
|
Err(arc) => tracing::error!(
|
|
"There are still {} strong Arcs to MsgStore for {}",
|
|
Arc::strong_count(&arc),
|
|
exec_id
|
|
),
|
|
}
|
|
}
|
|
|
|
// Cleanup child handle
|
|
child_store.write().await.remove(&exec_id);
|
|
})
|
|
}
|
|
|
|
pub fn spawn_os_exit_watcher(
|
|
&self,
|
|
exec_id: Uuid,
|
|
) -> tokio::sync::oneshot::Receiver<std::io::Result<std::process::ExitStatus>> {
|
|
let (tx, rx) = tokio::sync::oneshot::channel::<std::io::Result<std::process::ExitStatus>>();
|
|
let child_store = self.child_store.clone();
|
|
tokio::spawn(async move {
|
|
loop {
|
|
let child_lock = {
|
|
let map = child_store.read().await;
|
|
map.get(&exec_id).cloned()
|
|
};
|
|
if let Some(child_lock) = child_lock {
|
|
let mut child_handler = child_lock.write().await;
|
|
match child_handler.try_wait() {
|
|
Ok(Some(status)) => {
|
|
let _ = tx.send(Ok(status));
|
|
break;
|
|
}
|
|
Ok(None) => {}
|
|
Err(e) => {
|
|
let _ = tx.send(Err(e));
|
|
break;
|
|
}
|
|
}
|
|
} else {
|
|
let _ = tx.send(Err(io::Error::other(format!(
|
|
"Child handle missing for {exec_id}"
|
|
))));
|
|
break;
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(250)).await;
|
|
}
|
|
});
|
|
rx
|
|
}
|
|
|
|
pub fn dir_name_from_workspace(workspace_id: &Uuid, task_title: &str) -> String {
|
|
let task_title_id = git_branch_id(task_title);
|
|
format!("{}-{}", short_uuid(workspace_id), task_title_id)
|
|
}
|
|
|
|
async fn track_child_msgs_in_store(&self, id: Uuid, child: &mut AsyncGroupChild) {
|
|
let store = Arc::new(MsgStore::new());
|
|
|
|
let out = child.inner().stdout.take().expect("no stdout");
|
|
let err = child.inner().stderr.take().expect("no stderr");
|
|
|
|
// Map stdout bytes -> LogMsg::Stdout
|
|
let out = ReaderStream::new(out)
|
|
.map_ok(|chunk| LogMsg::Stdout(String::from_utf8_lossy(&chunk).into_owned()));
|
|
|
|
// Map stderr bytes -> LogMsg::Stderr
|
|
let err = ReaderStream::new(err)
|
|
.map_ok(|chunk| LogMsg::Stderr(String::from_utf8_lossy(&chunk).into_owned()));
|
|
|
|
// If you have a JSON Patch source, map it to LogMsg::JsonPatch too, then select all three.
|
|
|
|
// Merge and forward into the store
|
|
let merged = select(out, err); // Stream<Item = Result<LogMsg, io::Error>>
|
|
store.clone().spawn_forwarder(merged);
|
|
|
|
let mut map = self.msg_stores().write().await;
|
|
map.insert(id, store);
|
|
}
|
|
|
|
/// Create a live diff log stream for ongoing attempts for WebSocket
|
|
/// Returns a stream that owns the filesystem watcher - when dropped, watcher is cleaned up
|
|
async fn create_live_diff_stream(
|
|
&self,
|
|
worktree_path: &Path,
|
|
base_commit: &Commit,
|
|
stats_only: bool,
|
|
path_prefix: Option<String>,
|
|
) -> Result<DiffStreamHandle, ContainerError> {
|
|
diff_stream::create(
|
|
self.git().clone(),
|
|
worktree_path.to_path_buf(),
|
|
base_commit.clone(),
|
|
stats_only,
|
|
path_prefix,
|
|
)
|
|
.await
|
|
.map_err(|e| ContainerError::Other(anyhow!("{e}")))
|
|
}
|
|
|
|
/// Extract the last assistant message from the MsgStore history
|
|
fn extract_last_assistant_message(&self, exec_id: &Uuid) -> Option<String> {
|
|
// Get the MsgStore for this execution
|
|
let msg_stores = self.msg_stores.try_read().ok()?;
|
|
let msg_store = msg_stores.get(exec_id)?;
|
|
|
|
// Get the history and scan in reverse for the last assistant message
|
|
let history = msg_store.get_history();
|
|
|
|
for msg in history.iter().rev() {
|
|
if let LogMsg::JsonPatch(patch) = msg {
|
|
// Try to extract a NormalizedEntry from the patch
|
|
if let Some((_, entry)) = extract_normalized_entry_from_patch(patch)
|
|
&& matches!(entry.entry_type, NormalizedEntryType::AssistantMessage)
|
|
{
|
|
let content = entry.content.trim();
|
|
if !content.is_empty() {
|
|
const MAX_SUMMARY_LENGTH: usize = 4096;
|
|
if content.len() > MAX_SUMMARY_LENGTH {
|
|
let truncated = truncate_to_char_boundary(content, MAX_SUMMARY_LENGTH);
|
|
return Some(format!("{truncated}..."));
|
|
}
|
|
return Some(content.to_string());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
None
|
|
}
|
|
|
|
/// Update the coding agent turn summary with the final assistant message
|
|
async fn update_executor_session_summary(&self, exec_id: &Uuid) -> Result<(), anyhow::Error> {
|
|
// Check if there's a coding agent turn for this execution process
|
|
let turn = CodingAgentTurn::find_by_execution_process_id(&self.db.pool, *exec_id).await?;
|
|
|
|
if let Some(turn) = turn {
|
|
// Only update if summary is not already set
|
|
if turn.summary.is_none() {
|
|
if let Some(summary) = self.extract_last_assistant_message(exec_id) {
|
|
CodingAgentTurn::update_summary(&self.db.pool, *exec_id, &summary).await?;
|
|
} else {
|
|
tracing::debug!("No assistant message found for execution {}", exec_id);
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Copy project files and images to the workspace.
|
|
/// Skips files/images that already exist (fast no-op if all exist).
|
|
async fn copy_files_and_images(
|
|
&self,
|
|
workspace_dir: &Path,
|
|
workspace: &Workspace,
|
|
) -> Result<(), ContainerError> {
|
|
let repos = WorkspaceRepo::find_repos_with_copy_files(&self.db.pool, workspace.id).await?;
|
|
|
|
for repo in &repos {
|
|
if let Some(copy_files) = &repo.copy_files
|
|
&& !copy_files.trim().is_empty()
|
|
{
|
|
let worktree_path = workspace_dir.join(&repo.name);
|
|
self.copy_project_files(&repo.path, &worktree_path, copy_files)
|
|
.await
|
|
.unwrap_or_else(|e| {
|
|
tracing::warn!(
|
|
"Failed to copy project files for repo '{}': {}",
|
|
repo.name,
|
|
e
|
|
);
|
|
});
|
|
}
|
|
}
|
|
|
|
if let Err(e) = self
|
|
.image_service
|
|
.copy_images_by_task_to_worktree(workspace_dir, workspace.task_id)
|
|
.await
|
|
{
|
|
tracing::warn!("Failed to copy task images to workspace: {}", e);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Create workspace-level CLAUDE.md and AGENTS.md files that import from each repo.
|
|
/// Uses the @import syntax to reference each repo's config files.
|
|
/// Skips creating files if they already exist or if no repos have the source file.
|
|
async fn create_workspace_config_files(
|
|
workspace_dir: &Path,
|
|
repos: &[Repo],
|
|
) -> Result<(), ContainerError> {
|
|
const CONFIG_FILES: [&str; 2] = ["CLAUDE.md", "AGENTS.md"];
|
|
|
|
for config_file in CONFIG_FILES {
|
|
let workspace_config_path = workspace_dir.join(config_file);
|
|
|
|
if workspace_config_path.exists() {
|
|
tracing::debug!(
|
|
"Workspace config file {} already exists, skipping",
|
|
config_file
|
|
);
|
|
continue;
|
|
}
|
|
|
|
let mut import_lines = Vec::new();
|
|
for repo in repos {
|
|
let repo_config_path = workspace_dir.join(&repo.name).join(config_file);
|
|
if repo_config_path.exists() {
|
|
import_lines.push(format!("@{}/{}", repo.name, config_file));
|
|
}
|
|
}
|
|
|
|
if import_lines.is_empty() {
|
|
tracing::debug!(
|
|
"No repos have {}, skipping workspace config creation",
|
|
config_file
|
|
);
|
|
continue;
|
|
}
|
|
|
|
let content = import_lines.join("\n") + "\n";
|
|
if let Err(e) = tokio::fs::write(&workspace_config_path, &content).await {
|
|
tracing::warn!(
|
|
"Failed to create workspace config file {}: {}",
|
|
config_file,
|
|
e
|
|
);
|
|
continue;
|
|
}
|
|
|
|
tracing::info!(
|
|
"Created workspace {} with {} import(s)",
|
|
config_file,
|
|
import_lines.len()
|
|
);
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Start a follow-up execution from a queued message
|
|
async fn start_queued_follow_up(
|
|
&self,
|
|
ctx: &ExecutionContext,
|
|
queued_data: &DraftFollowUpData,
|
|
) -> Result<ExecutionProcess, ContainerError> {
|
|
// Get executor profile from the latest CodingAgent process in this session
|
|
let initial_executor_profile_id =
|
|
ExecutionProcess::latest_executor_profile_for_session(&self.db.pool, ctx.session.id)
|
|
.await
|
|
.map_err(|e| {
|
|
ContainerError::Other(anyhow!("Failed to get executor profile: {e}"))
|
|
})?;
|
|
|
|
let executor_profile_id = ExecutorProfileId {
|
|
executor: initial_executor_profile_id.executor,
|
|
variant: queued_data.variant.clone(),
|
|
};
|
|
|
|
// Get latest agent session ID for session continuity (from coding agent turns)
|
|
let latest_agent_session_id = ExecutionProcess::find_latest_coding_agent_turn_session_id(
|
|
&self.db.pool,
|
|
ctx.session.id,
|
|
)
|
|
.await?;
|
|
|
|
let project_repos =
|
|
ProjectRepo::find_by_project_id_with_names(&self.db.pool, ctx.project.id).await?;
|
|
let cleanup_action = self.cleanup_actions_for_repos(&project_repos);
|
|
|
|
let working_dir = ctx
|
|
.workspace
|
|
.agent_working_dir
|
|
.as_ref()
|
|
.filter(|dir| !dir.is_empty())
|
|
.cloned();
|
|
|
|
let action_type = if let Some(agent_session_id) = latest_agent_session_id {
|
|
ExecutorActionType::CodingAgentFollowUpRequest(CodingAgentFollowUpRequest {
|
|
prompt: queued_data.message.clone(),
|
|
session_id: agent_session_id,
|
|
executor_profile_id: executor_profile_id.clone(),
|
|
working_dir: working_dir.clone(),
|
|
})
|
|
} else {
|
|
ExecutorActionType::CodingAgentInitialRequest(CodingAgentInitialRequest {
|
|
prompt: queued_data.message.clone(),
|
|
executor_profile_id: executor_profile_id.clone(),
|
|
working_dir,
|
|
})
|
|
};
|
|
|
|
let action = ExecutorAction::new(action_type, cleanup_action.map(Box::new));
|
|
|
|
self.start_execution(
|
|
&ctx.workspace,
|
|
&ctx.session,
|
|
&action,
|
|
&ExecutionProcessRunReason::CodingAgent,
|
|
)
|
|
.await
|
|
}
|
|
}
|
|
|
|
fn failure_exit_status() -> std::process::ExitStatus {
|
|
#[cfg(unix)]
|
|
{
|
|
use std::os::unix::process::ExitStatusExt;
|
|
ExitStatusExt::from_raw(256) // Exit code 1 (shifted by 8 bits)
|
|
}
|
|
#[cfg(windows)]
|
|
{
|
|
use std::os::windows::process::ExitStatusExt;
|
|
ExitStatusExt::from_raw(1)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl ContainerService for LocalContainerService {
|
|
fn msg_stores(&self) -> &Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>> {
|
|
&self.msg_stores
|
|
}
|
|
|
|
fn db(&self) -> &DBService {
|
|
&self.db
|
|
}
|
|
|
|
fn git(&self) -> &GitService {
|
|
&self.git
|
|
}
|
|
|
|
fn share_publisher(&self) -> Option<&SharePublisher> {
|
|
self.publisher.as_ref().ok()
|
|
}
|
|
|
|
fn notification_service(&self) -> &NotificationService {
|
|
&self.notification_service
|
|
}
|
|
|
|
async fn git_branch_prefix(&self) -> String {
|
|
self.config.read().await.git_branch_prefix.clone()
|
|
}
|
|
|
|
fn workspace_to_current_dir(&self, workspace: &Workspace) -> PathBuf {
|
|
PathBuf::from(workspace.container_ref.clone().unwrap_or_default())
|
|
}
|
|
|
|
async fn create(&self, workspace: &Workspace) -> Result<ContainerRef, ContainerError> {
|
|
let task = workspace
|
|
.parent_task(&self.db.pool)
|
|
.await?
|
|
.ok_or(sqlx::Error::RowNotFound)?;
|
|
|
|
let workspace_dir_name =
|
|
LocalContainerService::dir_name_from_workspace(&workspace.id, &task.title);
|
|
let workspace_dir = WorkspaceManager::get_workspace_base_dir().join(&workspace_dir_name);
|
|
|
|
let workspace_repos =
|
|
WorkspaceRepo::find_by_workspace_id(&self.db.pool, workspace.id).await?;
|
|
if workspace_repos.is_empty() {
|
|
return Err(ContainerError::Other(anyhow!(
|
|
"Workspace has no repositories configured"
|
|
)));
|
|
}
|
|
|
|
let repositories =
|
|
WorkspaceRepo::find_repos_for_workspace(&self.db.pool, workspace.id).await?;
|
|
|
|
let target_branches: HashMap<_, _> = workspace_repos
|
|
.iter()
|
|
.map(|wr| (wr.repo_id, wr.target_branch.clone()))
|
|
.collect();
|
|
|
|
let workspace_inputs: Vec<RepoWorkspaceInput> = repositories
|
|
.iter()
|
|
.map(|repo| {
|
|
let target_branch = target_branches.get(&repo.id).cloned().unwrap_or_default();
|
|
RepoWorkspaceInput::new(repo.clone(), target_branch)
|
|
})
|
|
.collect();
|
|
|
|
let created_workspace = WorkspaceManager::create_workspace(
|
|
&workspace_dir,
|
|
&workspace_inputs,
|
|
&workspace.branch,
|
|
)
|
|
.await?;
|
|
|
|
// Copy project files and images to workspace
|
|
self.copy_files_and_images(&created_workspace.workspace_dir, workspace)
|
|
.await?;
|
|
|
|
Self::create_workspace_config_files(&created_workspace.workspace_dir, &repositories)
|
|
.await?;
|
|
|
|
Workspace::update_container_ref(
|
|
&self.db.pool,
|
|
workspace.id,
|
|
&created_workspace.workspace_dir.to_string_lossy(),
|
|
)
|
|
.await?;
|
|
|
|
Ok(created_workspace
|
|
.workspace_dir
|
|
.to_string_lossy()
|
|
.to_string())
|
|
}
|
|
|
|
async fn delete(&self, workspace: &Workspace) -> Result<(), ContainerError> {
|
|
self.try_stop(workspace, true).await;
|
|
Self::cleanup_workspace(&self.db, workspace).await;
|
|
Ok(())
|
|
}
|
|
|
|
async fn ensure_container_exists(
|
|
&self,
|
|
workspace: &Workspace,
|
|
) -> Result<ContainerRef, ContainerError> {
|
|
let repositories =
|
|
WorkspaceRepo::find_repos_for_workspace(&self.db.pool, workspace.id).await?;
|
|
|
|
if repositories.is_empty() {
|
|
return Err(ContainerError::Other(anyhow!(
|
|
"Workspace has no repositories configured"
|
|
)));
|
|
}
|
|
|
|
let workspace_dir = if let Some(container_ref) = &workspace.container_ref {
|
|
PathBuf::from(container_ref)
|
|
} else {
|
|
let task = workspace
|
|
.parent_task(&self.db.pool)
|
|
.await?
|
|
.ok_or(sqlx::Error::RowNotFound)?;
|
|
let workspace_dir_name =
|
|
LocalContainerService::dir_name_from_workspace(&workspace.id, &task.title);
|
|
WorkspaceManager::get_workspace_base_dir().join(&workspace_dir_name)
|
|
};
|
|
|
|
WorkspaceManager::ensure_workspace_exists(&workspace_dir, &repositories, &workspace.branch)
|
|
.await?;
|
|
|
|
if workspace.container_ref.is_none() {
|
|
Workspace::update_container_ref(
|
|
&self.db.pool,
|
|
workspace.id,
|
|
&workspace_dir.to_string_lossy(),
|
|
)
|
|
.await?;
|
|
}
|
|
|
|
// Copy project files and images (fast no-op if already exist)
|
|
self.copy_files_and_images(&workspace_dir, workspace)
|
|
.await?;
|
|
|
|
Self::create_workspace_config_files(&workspace_dir, &repositories).await?;
|
|
|
|
Ok(workspace_dir.to_string_lossy().to_string())
|
|
}
|
|
|
|
async fn is_container_clean(&self, workspace: &Workspace) -> Result<bool, ContainerError> {
|
|
let Some(container_ref) = &workspace.container_ref else {
|
|
return Ok(true);
|
|
};
|
|
|
|
let workspace_dir = PathBuf::from(container_ref);
|
|
if !workspace_dir.exists() {
|
|
return Ok(true);
|
|
}
|
|
|
|
let repositories =
|
|
WorkspaceRepo::find_repos_for_workspace(&self.db.pool, workspace.id).await?;
|
|
|
|
for repo in &repositories {
|
|
let worktree_path = workspace_dir.join(&repo.name);
|
|
if worktree_path.exists() && !self.git().is_worktree_clean(&worktree_path)? {
|
|
return Ok(false);
|
|
}
|
|
}
|
|
|
|
Ok(true)
|
|
}
|
|
|
|
async fn start_execution_inner(
|
|
&self,
|
|
workspace: &Workspace,
|
|
execution_process: &ExecutionProcess,
|
|
executor_action: &ExecutorAction,
|
|
) -> Result<(), ContainerError> {
|
|
// Get the worktree path
|
|
let container_ref = workspace
|
|
.container_ref
|
|
.as_ref()
|
|
.ok_or(ContainerError::Other(anyhow!(
|
|
"Container ref not found for workspace"
|
|
)))?;
|
|
let current_dir = PathBuf::from(container_ref);
|
|
|
|
let approvals_service: Arc<dyn ExecutorApprovalService> =
|
|
match executor_action.base_executor() {
|
|
Some(
|
|
BaseCodingAgent::Codex
|
|
| BaseCodingAgent::ClaudeCode
|
|
| BaseCodingAgent::Gemini
|
|
| BaseCodingAgent::QwenCode
|
|
| BaseCodingAgent::Opencode,
|
|
) => ExecutorApprovalBridge::new(
|
|
self.approvals.clone(),
|
|
self.db.clone(),
|
|
self.notification_service.clone(),
|
|
execution_process.id,
|
|
),
|
|
_ => Arc::new(NoopExecutorApprovalService {}),
|
|
};
|
|
|
|
// Build ExecutionEnv with VK_* variables
|
|
let mut env = ExecutionEnv::new();
|
|
|
|
// Load task and project context for environment variables
|
|
let task = workspace
|
|
.parent_task(&self.db.pool)
|
|
.await?
|
|
.ok_or(ContainerError::Other(anyhow!(
|
|
"Task not found for workspace"
|
|
)))?;
|
|
let project = task
|
|
.parent_project(&self.db.pool)
|
|
.await?
|
|
.ok_or(ContainerError::Other(anyhow!("Project not found for task")))?;
|
|
|
|
env.insert("VK_PROJECT_NAME", &project.name);
|
|
env.insert("VK_PROJECT_ID", project.id.to_string());
|
|
env.insert("VK_TASK_ID", task.id.to_string());
|
|
env.insert("VK_WORKSPACE_ID", workspace.id.to_string());
|
|
env.insert("VK_WORKSPACE_BRANCH", &workspace.branch);
|
|
|
|
// Create the child and stream, add to execution tracker with timeout
|
|
let mut spawned = tokio::time::timeout(
|
|
Duration::from_secs(30),
|
|
executor_action.spawn(¤t_dir, approvals_service, &env),
|
|
)
|
|
.await
|
|
.map_err(|_| {
|
|
ContainerError::Other(anyhow!(
|
|
"Timeout: process took more than 30 seconds to start"
|
|
))
|
|
})??;
|
|
|
|
self.track_child_msgs_in_store(execution_process.id, &mut spawned.child)
|
|
.await;
|
|
|
|
self.add_child_to_store(execution_process.id, spawned.child)
|
|
.await;
|
|
|
|
// Store interrupt sender for graceful shutdown
|
|
if let Some(interrupt_sender) = spawned.interrupt_sender {
|
|
self.add_interrupt_sender(execution_process.id, interrupt_sender)
|
|
.await;
|
|
}
|
|
|
|
// Spawn unified exit monitor: watches OS exit and optional executor signal
|
|
let _hn = self.spawn_exit_monitor(&execution_process.id, spawned.exit_signal);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn stop_execution(
|
|
&self,
|
|
execution_process: &ExecutionProcess,
|
|
status: ExecutionProcessStatus,
|
|
) -> Result<(), ContainerError> {
|
|
let child = self
|
|
.get_child_from_store(&execution_process.id)
|
|
.await
|
|
.ok_or_else(|| {
|
|
ContainerError::Other(anyhow!("Child process not found for execution"))
|
|
})?;
|
|
let exit_code = if status == ExecutionProcessStatus::Completed {
|
|
Some(0)
|
|
} else {
|
|
None
|
|
};
|
|
|
|
ExecutionProcess::update_completion(&self.db.pool, execution_process.id, status, exit_code)
|
|
.await?;
|
|
|
|
// Try graceful interrupt first, then force kill
|
|
if let Some(interrupt_sender) = self.take_interrupt_sender(&execution_process.id).await {
|
|
// Send interrupt signal (ignore error if receiver dropped)
|
|
let _ = interrupt_sender.send(());
|
|
|
|
// Wait for graceful exit with timeout
|
|
let graceful_exit = {
|
|
let mut child_guard = child.write().await;
|
|
tokio::time::timeout(Duration::from_secs(5), child_guard.wait()).await
|
|
};
|
|
|
|
match graceful_exit {
|
|
Ok(Ok(_)) => {
|
|
tracing::debug!(
|
|
"Process {} exited gracefully after interrupt",
|
|
execution_process.id
|
|
);
|
|
}
|
|
Ok(Err(e)) => {
|
|
tracing::info!("Error waiting for process {}: {}", execution_process.id, e);
|
|
}
|
|
Err(_) => {
|
|
tracing::debug!(
|
|
"Graceful shutdown timed out for process {}, force killing",
|
|
execution_process.id
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Kill the child process and remove from the store
|
|
{
|
|
let mut child_guard = child.write().await;
|
|
if let Err(e) = command::kill_process_group(&mut child_guard).await {
|
|
tracing::error!(
|
|
"Failed to stop execution process {}: {}",
|
|
execution_process.id,
|
|
e
|
|
);
|
|
return Err(e);
|
|
}
|
|
}
|
|
self.remove_child_from_store(&execution_process.id).await;
|
|
|
|
// Mark the process finished in the MsgStore
|
|
if let Some(msg) = self.msg_stores.write().await.remove(&execution_process.id) {
|
|
msg.push_finished();
|
|
}
|
|
|
|
// Update task status to InReview when execution is stopped
|
|
if let Ok(ctx) = ExecutionProcess::load_context(&self.db.pool, execution_process.id).await
|
|
&& !matches!(
|
|
ctx.execution_process.run_reason,
|
|
ExecutionProcessRunReason::DevServer
|
|
)
|
|
{
|
|
match Task::update_status(&self.db.pool, ctx.task.id, TaskStatus::InReview).await {
|
|
Ok(_) => {
|
|
if let Some(publisher) = self.share_publisher()
|
|
&& let Err(err) = publisher.update_shared_task_by_id(ctx.task.id).await
|
|
{
|
|
tracing::warn!(
|
|
?err,
|
|
"Failed to propagate shared task update for {}",
|
|
ctx.task.id
|
|
);
|
|
}
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("Failed to update task status to InReview: {e}");
|
|
}
|
|
}
|
|
}
|
|
|
|
tracing::debug!(
|
|
"Execution process {} stopped successfully",
|
|
execution_process.id
|
|
);
|
|
|
|
// Record after-head commit OID (best-effort)
|
|
self.update_after_head_commits(execution_process.id).await;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn stream_diff(
|
|
&self,
|
|
workspace: &Workspace,
|
|
stats_only: bool,
|
|
) -> Result<futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>, ContainerError>
|
|
{
|
|
let workspace_repos =
|
|
WorkspaceRepo::find_by_workspace_id(&self.db.pool, workspace.id).await?;
|
|
let target_branches: HashMap<_, _> = workspace_repos
|
|
.iter()
|
|
.map(|wr| (wr.repo_id, wr.target_branch.clone()))
|
|
.collect();
|
|
|
|
let repositories =
|
|
WorkspaceRepo::find_repos_for_workspace(&self.db.pool, workspace.id).await?;
|
|
|
|
let mut streams = Vec::new();
|
|
|
|
let container_ref = self.ensure_container_exists(workspace).await?;
|
|
let workspace_root = PathBuf::from(container_ref);
|
|
|
|
for repo in repositories {
|
|
let worktree_path = workspace_root.join(&repo.name);
|
|
let branch = &workspace.branch;
|
|
|
|
let Some(target_branch) = target_branches.get(&repo.id) else {
|
|
tracing::warn!(
|
|
"Skipping diff stream for repo {}: no target branch configured",
|
|
repo.name
|
|
);
|
|
continue;
|
|
};
|
|
|
|
let base_commit = match self
|
|
.git()
|
|
.get_base_commit(&repo.path, branch, target_branch)
|
|
{
|
|
Ok(c) => c,
|
|
Err(e) => {
|
|
tracing::warn!(
|
|
"Skipping diff stream for repo {}: failed to get base commit: {}",
|
|
repo.name,
|
|
e
|
|
);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let stream = self
|
|
.create_live_diff_stream(
|
|
&worktree_path,
|
|
&base_commit,
|
|
stats_only,
|
|
Some(repo.name.clone()),
|
|
)
|
|
.await?;
|
|
|
|
streams.push(Box::pin(stream));
|
|
}
|
|
|
|
if streams.is_empty() {
|
|
return Ok(Box::pin(futures::stream::empty()));
|
|
}
|
|
|
|
// Merge all streams into one
|
|
Ok(Box::pin(futures::stream::select_all(streams)))
|
|
}
|
|
|
|
async fn try_commit_changes(&self, ctx: &ExecutionContext) -> Result<bool, ContainerError> {
|
|
if !matches!(
|
|
ctx.execution_process.run_reason,
|
|
ExecutionProcessRunReason::CodingAgent | ExecutionProcessRunReason::CleanupScript,
|
|
) {
|
|
return Ok(false);
|
|
}
|
|
|
|
let message = self.get_commit_message(ctx).await;
|
|
|
|
let container_ref = ctx
|
|
.workspace
|
|
.container_ref
|
|
.as_ref()
|
|
.ok_or_else(|| ContainerError::Other(anyhow!("Container reference not found")))?;
|
|
let workspace_root = PathBuf::from(container_ref);
|
|
|
|
let repos_with_changes = self.check_repos_for_changes(&workspace_root, &ctx.repos)?;
|
|
if repos_with_changes.is_empty() {
|
|
tracing::debug!("No changes to commit in any repository");
|
|
return Ok(false);
|
|
}
|
|
|
|
Ok(self.commit_repos(repos_with_changes, &message))
|
|
}
|
|
|
|
/// Copy files from the original project directory to the worktree.
|
|
/// Skips files that already exist at target with same size.
|
|
async fn copy_project_files(
|
|
&self,
|
|
source_dir: &Path,
|
|
target_dir: &Path,
|
|
copy_files: &str,
|
|
) -> Result<(), ContainerError> {
|
|
let source_dir = source_dir.to_path_buf();
|
|
let target_dir = target_dir.to_path_buf();
|
|
let copy_files = copy_files.to_string();
|
|
|
|
tokio::time::timeout(
|
|
std::time::Duration::from_secs(30),
|
|
tokio::task::spawn_blocking(move || {
|
|
copy::copy_project_files_impl(&source_dir, &target_dir, ©_files)
|
|
}),
|
|
)
|
|
.await
|
|
.map_err(|_| ContainerError::Other(anyhow!("Copy project files timed out after 30s")))?
|
|
.map_err(|e| ContainerError::Other(anyhow!("Copy files task failed: {e}")))?
|
|
}
|
|
|
|
async fn kill_all_running_processes(&self) -> Result<(), ContainerError> {
|
|
tracing::info!("Killing all running processes");
|
|
let running_processes = ExecutionProcess::find_running(&self.db.pool).await?;
|
|
|
|
for process in running_processes {
|
|
if let Err(error) = self
|
|
.stop_execution(&process, ExecutionProcessStatus::Killed)
|
|
.await
|
|
{
|
|
tracing::error!(
|
|
"Failed to cleanly kill running execution process {:?}: {:?}",
|
|
process,
|
|
error
|
|
);
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
fn success_exit_status() -> std::process::ExitStatus {
|
|
#[cfg(unix)]
|
|
{
|
|
use std::os::unix::process::ExitStatusExt;
|
|
ExitStatusExt::from_raw(0)
|
|
}
|
|
#[cfg(windows)]
|
|
{
|
|
use std::os::windows::process::ExitStatusExt;
|
|
ExitStatusExt::from_raw(0)
|
|
}
|
|
}
|