diff --git a/backend/src/executor.rs b/backend/src/executor.rs index 48aebc4f..e0641164 100644 --- a/backend/src/executor.rs +++ b/backend/src/executor.rs @@ -106,6 +106,19 @@ impl SpawnContext { self.additional_context = Some(context.into()); self } + + /// Create SpawnContext from Command, then use builder methods for additional context + pub fn from_command( + command: &tokio::process::Command, + executor_type: impl Into, + ) -> Self { + Self::from(command).with_executor_type(executor_type) + } + + /// Finalize the context and create an ExecutorError + pub fn spawn_error(self, error: std::io::Error) -> ExecutorError { + ExecutorError::spawn_failed(error, self) + } } /// Extract SpawnContext from a tokio::process::Command @@ -147,6 +160,8 @@ pub enum ExecutorError { DatabaseError(sqlx::Error), ContextCollectionFailed(String), GitError(String), + InvalidSessionId(String), + FollowUpNotSupported, } impl std::fmt::Display for ExecutorError { @@ -185,6 +200,10 @@ impl std::fmt::Display for ExecutorError { write!(f, "Context collection failed: {}", msg) } ExecutorError::GitError(msg) => write!(f, "Git operation error: {}", msg), + ExecutorError::InvalidSessionId(msg) => write!(f, "Invalid session_id: {}", msg), + ExecutorError::FollowUpNotSupported => { + write!(f, "This executor does not support follow-up sessions") + } } } } @@ -235,23 +254,7 @@ impl ExecutorError { } } -/// Helper to create SpawnContext from Command with builder pattern -impl SpawnContext { - /// Create SpawnContext from Command, then use builder methods for additional context - pub fn from_command( - command: &tokio::process::Command, - executor_type: impl Into, - ) -> Self { - Self::from(command).with_executor_type(executor_type) - } - - /// Finalize the context and create an ExecutorError - pub fn spawn_error(self, error: std::io::Error) -> ExecutorError { - ExecutorError::spawn_failed(error, self) - } -} - -/// Trait for defining CLI commands that can be executed for task attempts +/// Trait for coding agents that can execute tasks, normalize logs, and support follow-up sessions #[async_trait] pub trait Executor: Send + Sync { /// Spawn the command for a given task attempt @@ -262,6 +265,22 @@ pub trait Executor: Send + Sync { worktree_path: &str, ) -> Result; + /// Spawn a follow-up session for executors that support it + /// + /// This method is used to continue an existing session with a new prompt. + /// Not all executors support follow-up sessions, so the default implementation + /// returns an error. + async fn spawn_followup( + &self, + _pool: &sqlx::SqlitePool, + _task_id: Uuid, + _session_id: &str, + _prompt: &str, + _worktree_path: &str, + ) -> Result { + Err(ExecutorError::FollowUpNotSupported) + } + /// Normalize executor logs into a standard format fn normalize_logs( &self, @@ -278,22 +297,14 @@ pub trait Executor: Send + Sync { }) } - // Note: Fast-path streaming is now handled by the Gemini WAL system. - // The Gemini executor uses its own push_patch() method to emit patches, - // which are automatically served via SSE endpoints with resumable streaming. - - /// Execute the command and stream output to database in real-time - async fn execute_streaming( + #[allow(clippy::result_large_err)] + fn setup_streaming( &self, + child: &mut command_group::AsyncGroupChild, pool: &sqlx::SqlitePool, - task_id: Uuid, attempt_id: Uuid, execution_process_id: Uuid, - worktree_path: &str, - ) -> Result { - let mut child = self.spawn(pool, task_id, worktree_path).await?; - - // Take stdout and stderr pipes for streaming + ) -> Result<(), ExecutorError> { let stdout = child .inner() .stdout @@ -305,7 +316,6 @@ pub trait Executor: Send + Sync { .take() .expect("Failed to take stderr from child process"); - // Start streaming tasks let pool_clone1 = pool.clone(); let pool_clone2 = pool.clone(); @@ -324,6 +334,39 @@ pub trait Executor: Send + Sync { false, )); + Ok(()) + } + + /// Execute the command and stream output to database in real-time + async fn execute_streaming( + &self, + pool: &sqlx::SqlitePool, + task_id: Uuid, + attempt_id: Uuid, + execution_process_id: Uuid, + worktree_path: &str, + ) -> Result { + let mut child = self.spawn(pool, task_id, worktree_path).await?; + Self::setup_streaming(self, &mut child, pool, attempt_id, execution_process_id)?; + Ok(child) + } + + /// Execute a follow-up command and stream output to database in real-time + #[allow(clippy::too_many_arguments)] + async fn execute_followup_streaming( + &self, + pool: &sqlx::SqlitePool, + task_id: Uuid, + attempt_id: Uuid, + execution_process_id: Uuid, + session_id: &str, + prompt: &str, + worktree_path: &str, + ) -> Result { + let mut child = self + .spawn_followup(pool, task_id, session_id, prompt, worktree_path) + .await?; + Self::setup_streaming(self, &mut child, pool, attempt_id, execution_process_id)?; Ok(child) } } @@ -333,14 +376,19 @@ pub trait Executor: Send + Sync { pub enum ExecutorType { SetupScript(String), DevServer(String), - CodingAgent(ExecutorConfig), - FollowUpCodingAgent { + CodingAgent { config: ExecutorConfig, - session_id: Option, - prompt: String, + follow_up: Option, }, } +/// Information needed to continue a previous session +#[derive(Debug, Clone)] +pub struct FollowUpInfo { + pub session_id: String, + pub prompt: String, +} + /// Configuration for different executor types #[derive(Debug, Clone, Serialize, Deserialize, TS)] #[serde(tag = "type", rename_all = "kebab-case")] @@ -360,9 +408,6 @@ pub enum ExecutorConfig { CharmOpencode, #[serde(alias = "opencode")] SstOpencode, - // Future executors can be added here - // Shell { command: String }, - // Docker { image: String, command: String }, } // Constants for frontend @@ -558,7 +603,6 @@ async fn stream_stdout_to_db( session_id_parsed = true; } } - accumulated_output.push_str(&line); update_counter += 1; diff --git a/backend/src/executors/amp.rs b/backend/src/executors/amp.rs index b3b5e6b8..96377f34 100644 --- a/backend/src/executors/amp.rs +++ b/backend/src/executors/amp.rs @@ -17,12 +17,6 @@ use crate::{ /// An executor that uses Amp to process tasks pub struct AmpExecutor; -/// An executor that continues an Amp thread -pub struct AmpFollowupExecutor { - pub thread_id: String, - pub prompt: String, -} - #[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] #[serde(tag = "type")] pub enum AmpJson { @@ -259,6 +253,67 @@ Task title: {}"#, Ok(child) } + async fn spawn_followup( + &self, + _pool: &sqlx::SqlitePool, + _task_id: Uuid, + session_id: &str, + prompt: &str, + worktree_path: &str, + ) -> Result { + use std::process::Stdio; + + use tokio::{io::AsyncWriteExt, process::Command}; + + // Use shell command for cross-platform compatibility + let (shell_cmd, shell_arg) = get_shell_command(); + let amp_command = format!( + "npx @sourcegraph/amp@0.0.1752148945-gd8844f threads continue {} --format=jsonl", + session_id + ); + + let mut command = Command::new(shell_cmd); + command + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(worktree_path) + .arg(shell_arg) + .arg(&_command); + + let mut child = command.group_spawn().map_err(|e| { + crate::executor::SpawnContext::from_command(&command, "Amp") + .with_context(format!( + "Amp CLI followup execution for thread {}", + session_id + )) + .spawn_error(e) + })?; + + // Feed the prompt in, then close the pipe so amp sees EOF + if let Some(mut stdin) = child.inner().stdin.take() { + stdin.write_all(prompt.as_bytes()).await.map_err(|e| { + crate::executor::SpawnContext::from_command(&command, "Amp") + .with_context(format!( + "Failed to write prompt to Amp CLI stdin for thread {}", + session_id + )) + .spawn_error(e) + })?; + stdin.shutdown().await.map_err(|e| { + crate::executor::SpawnContext::from_command(&command, "Amp") + .with_context(format!( + "Failed to close Amp CLI stdin for thread {}", + session_id + )) + .spawn_error(e) + })?; + } + + Ok(child) + } + fn normalize_logs( &self, logs: &str, @@ -590,66 +645,6 @@ impl AmpExecutor { } } -#[async_trait] -impl Executor for AmpFollowupExecutor { - async fn spawn( - &self, - _pool: &sqlx::SqlitePool, - _task_id: Uuid, - worktree_path: &str, - ) -> Result { - use std::process::Stdio; - - use tokio::{io::AsyncWriteExt, process::Command}; - - // Use shell command for cross-platform compatibility - let (shell_cmd, shell_arg) = get_shell_command(); - let amp_command = format!( - "npx @sourcegraph/amp@0.0.1752148945-gd8844f threads continue {} --format=jsonl", - self.thread_id - ); - - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdin(Stdio::piped()) // <-- open a pipe - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(worktree_path) - .arg(shell_arg) - .arg(&_command); - - let mut child = command - .group_spawn() // Create new process group so we can kill entire tree - .map_err(|e| { - crate::executor::SpawnContext::from_command(&command, "Amp") - .with_context(format!( - "Amp CLI followup execution for thread {}", - self.thread_id - )) - .spawn_error(e) - })?; - - // feed the prompt in, then close the pipe so `amp` sees EOF - if let Some(mut stdin) = child.inner().stdin.take() { - stdin.write_all(self.prompt.as_bytes()).await.unwrap(); - stdin.shutdown().await.unwrap(); // or `drop(stdin);` - } - - Ok(child) - } - - fn normalize_logs( - &self, - logs: &str, - worktree_path: &str, - ) -> Result { - // Reuse the same logic as the main AmpExecutor - let main_executor = AmpExecutor; - main_executor.normalize_logs(logs, worktree_path) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/backend/src/executors/ccr.rs b/backend/src/executors/ccr.rs index f126cac6..74b52c93 100644 --- a/backend/src/executors/ccr.rs +++ b/backend/src/executors/ccr.rs @@ -4,7 +4,7 @@ use uuid::Uuid; use crate::{ executor::{Executor, ExecutorError, NormalizedConversation}, - executors::{ClaudeExecutor, ClaudeFollowupExecutor}, + executors::ClaudeExecutor, }; /// An executor that uses Claude Code Router (CCR) to process tasks @@ -37,6 +37,19 @@ impl Executor for CCRExecutor { self.0.spawn(pool, task_id, worktree_path).await } + async fn spawn_followup( + &self, + pool: &sqlx::SqlitePool, + task_id: Uuid, + session_id: &str, + prompt: &str, + worktree_path: &str, + ) -> Result { + self.0 + .spawn_followup(pool, task_id, session_id, prompt, worktree_path) + .await + } + fn normalize_logs( &self, logs: &str, @@ -76,40 +89,3 @@ fn filter_ccr_service_messages(logs: &str) -> String { .collect::>() .join("\n") } - -/// Claude Code Router followup executor - forwards to ClaudeFollowupExecutor with Claude Code Router command -pub struct CCRFollowupExecutor(ClaudeFollowupExecutor); - -impl CCRFollowupExecutor { - pub fn new(session_id: String, prompt: String) -> Self { - Self(ClaudeFollowupExecutor::with_command( - session_id, - prompt, - "claude-code-router".to_string(), - "npx -y @musistudio/claude-code-router code -p --dangerously-skip-permissions --verbose --output-format=stream-json".to_string(), - )) - } -} - -#[async_trait] -impl Executor for CCRFollowupExecutor { - async fn spawn( - &self, - pool: &sqlx::SqlitePool, - task_id: Uuid, - worktree_path: &str, - ) -> Result { - self.0.spawn(pool, task_id, worktree_path).await - } - - fn normalize_logs( - &self, - logs: &str, - worktree_path: &str, - ) -> Result { - let filtered_logs = filter_ccr_service_messages(logs); - let mut result = self.0.normalize_logs(&filtered_logs, worktree_path)?; - result.executor_type = "claude-code-router".to_string(); - Ok(result) - } -} diff --git a/backend/src/executors/charm_opencode.rs b/backend/src/executors/charm_opencode.rs index 21a22d3f..21b40e35 100644 --- a/backend/src/executors/charm_opencode.rs +++ b/backend/src/executors/charm_opencode.rs @@ -11,12 +11,6 @@ use crate::{ /// An executor that uses OpenCode to process tasks pub struct CharmOpencodeExecutor; -/// An executor that continues an OpenCode thread -pub struct CharmOpencodeFollowupExecutor { - pub session_id: String, - pub prompt: String, -} - #[async_trait] impl Executor for CharmOpencodeExecutor { async fn spawn( @@ -78,25 +72,25 @@ Task title: {}"#, Ok(child) } -} -#[async_trait] -impl Executor for CharmOpencodeFollowupExecutor { - async fn spawn( + async fn spawn_followup( &self, _pool: &sqlx::SqlitePool, _task_id: Uuid, + _session_id: &str, + prompt: &str, worktree_path: &str, ) -> Result { use std::process::Stdio; use tokio::process::Command; - // Use shell command for cross-platform compatibility + // CharmOpencode doesn't support session-based followup, so we ignore session_id + // and just run with the new prompt let (shell_cmd, shell_arg) = get_shell_command(); let opencode_command = format!( "opencode -p \"{}\" --output-format=json", - self.prompt.replace('"', "\\\"") + prompt.replace('"', "\\\"") ); let mut command = Command::new(shell_cmd); @@ -108,16 +102,11 @@ impl Executor for CharmOpencodeFollowupExecutor { .arg(shell_arg) .arg(&opencode_command); - let child = command - .group_spawn() // Create new process group so we can kill entire tree - .map_err(|e| { - crate::executor::SpawnContext::from_command(&command, "CharmOpenCode") - .with_context(format!( - "CharmOpenCode CLI followup execution for session {}", - self.session_id - )) - .spawn_error(e) - })?; + let child = command.group_spawn().map_err(|e| { + crate::executor::SpawnContext::from_command(&command, "CharmOpenCode") + .with_context("CharmOpenCode CLI followup execution") + .spawn_error(e) + })?; Ok(child) } diff --git a/backend/src/executors/claude.rs b/backend/src/executors/claude.rs index dc149c07..77253f2d 100644 --- a/backend/src/executors/claude.rs +++ b/backend/src/executors/claude.rs @@ -78,57 +78,6 @@ impl ClaudeExecutor { } } -/// An executor that resumes a Claude session -pub struct ClaudeFollowupExecutor { - pub session_id: String, - pub prompt: String, - executor_type: String, - command_base: String, -} - -impl ClaudeFollowupExecutor { - /// Create a new ClaudeFollowupExecutor with default settings - pub fn new(session_id: String, prompt: String) -> Self { - Self { - session_id, - prompt, - executor_type: "Claude".to_string(), - command_base: "npx -y @anthropic-ai/claude-code@latest -p --dangerously-skip-permissions --verbose --output-format=stream-json".to_string(), - } - } - - pub fn new_plan_mode(session_id: String, prompt: String) -> Self { - let command = format!( - "npx -y @anthropic-ai/claude-code@latest -p --permission-mode=plan --verbose --output-format=stream-json --resume={}", - session_id - ); - - let script = create_watchkill_script(&command); - - Self { - session_id, - prompt, - executor_type: "ClaudePlan".to_string(), - command_base: script, - } - } - - /// Create a new ClaudeFollowupExecutor with custom settings - pub fn with_command( - session_id: String, - prompt: String, - executor_type: String, - command_base: String, - ) -> Self { - Self { - session_id, - prompt, - executor_type, - command_base, - } - } -} - #[async_trait] impl Executor for ClaudeExecutor { async fn spawn( @@ -214,6 +163,80 @@ Task title: {}"#, Ok(child) } + async fn spawn_followup( + &self, + _pool: &sqlx::SqlitePool, + _task_id: Uuid, + session_id: &str, + prompt: &str, + worktree_path: &str, + ) -> Result { + // Use shell command for cross-platform compatibility + let (shell_cmd, shell_arg) = get_shell_command(); + + // Determine the command based on whether this is plan mode or not + let claude_command = if self.executor_type == "ClaudePlan" { + let command = format!( + "npx -y @anthropic-ai/claude-code@latest -p --permission-mode=plan --verbose --output-format=stream-json --resume={}", + session_id + ); + create_watchkill_script(&command) + } else { + format!("{} --resume={}", self.command, session_id) + }; + + let mut command = Command::new(shell_cmd); + command + .kill_on_drop(true) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .current_dir(worktree_path) + .arg(shell_arg) + .arg(&claude_command) + .env("NODE_NO_WARNINGS", "1"); + + let mut child = command.group_spawn().map_err(|e| { + crate::executor::SpawnContext::from_command(&command, &self.executor_type) + .with_context(format!( + "{} CLI followup execution for session {}", + self.executor_type, session_id + )) + .spawn_error(e) + })?; + + // Write prompt to stdin safely + if let Some(mut stdin) = child.inner().stdin.take() { + use tokio::io::AsyncWriteExt; + tracing::debug!( + "Writing prompt to {} stdin for session {}: {:?}", + self.executor_type, + session_id, + prompt + ); + stdin.write_all(prompt.as_bytes()).await.map_err(|e| { + let context = + crate::executor::SpawnContext::from_command(&command, &self.executor_type) + .with_context(format!( + "Failed to write prompt to {} CLI stdin for session {}", + self.executor_type, session_id + )); + ExecutorError::spawn_failed(e, context) + })?; + stdin.shutdown().await.map_err(|e| { + let context = + crate::executor::SpawnContext::from_command(&command, &self.executor_type) + .with_context(format!( + "Failed to close {} CLI stdin for session {}", + self.executor_type, session_id + )); + ExecutorError::spawn_failed(e, context) + })?; + } + + Ok(child) + } + fn normalize_logs( &self, logs: &str, @@ -667,83 +690,6 @@ impl ClaudeExecutor { } } -#[async_trait] -impl Executor for ClaudeFollowupExecutor { - async fn spawn( - &self, - _pool: &sqlx::SqlitePool, - _task_id: Uuid, - worktree_path: &str, - ) -> Result { - // Use shell command for cross-platform compatibility - let (shell_cmd, shell_arg) = get_shell_command(); - // Pass prompt via stdin instead of command line to avoid shell escaping issues - let claude_command = format!("{} --resume={}", self.command_base, self.session_id); - - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .current_dir(worktree_path) - .arg(shell_arg) - .arg(&claude_command); - - let mut child = command - .group_spawn() // Create new process group so we can kill entire tree - .map_err(|e| { - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_context(format!( - "{} CLI followup execution for session {}", - self.executor_type, self.session_id - )) - .spawn_error(e) - })?; - - // Write prompt to stdin safely - if let Some(mut stdin) = child.inner().stdin.take() { - use tokio::io::AsyncWriteExt; - tracing::debug!( - "Writing prompt to {} stdin for session {}: {:?}", - self.executor_type, - self.session_id, - self.prompt - ); - stdin.write_all(self.prompt.as_bytes()).await.map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_context(format!( - "Failed to write prompt to {} CLI stdin for session {}", - self.executor_type, self.session_id - )); - ExecutorError::spawn_failed(e, context) - })?; - stdin.shutdown().await.map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_context(format!( - "Failed to close {} CLI stdin for session {}", - self.executor_type, self.session_id - )); - ExecutorError::spawn_failed(e, context) - })?; - } - - Ok(child) - } - - fn normalize_logs( - &self, - logs: &str, - worktree_path: &str, - ) -> Result { - // Reuse the same logic as the main ClaudeExecutor - let main_executor = ClaudeExecutor::new(); - main_executor.normalize_logs(logs, worktree_path) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/backend/src/executors/gemini.rs b/backend/src/executors/gemini.rs index dff708af..43ab80ec 100644 --- a/backend/src/executors/gemini.rs +++ b/backend/src/executors/gemini.rs @@ -30,12 +30,6 @@ use crate::{ /// An executor that uses Gemini CLI to process tasks pub struct GeminiExecutor; -/// An executor that continues a Gemini task with context from previous execution -pub struct GeminiFollowupExecutor { - pub attempt_id: Uuid, - pub prompt: String, -} - #[async_trait] impl Executor for GeminiExecutor { async fn spawn( @@ -134,6 +128,59 @@ Task title: {}"#, Ok(child) } + async fn spawn_followup( + &self, + pool: &sqlx::SqlitePool, + task_id: Uuid, + session_id: &str, + prompt: &str, + worktree_path: &str, + ) -> Result { + // For Gemini, session_id is the attempt_id + let attempt_id = Uuid::parse_str(session_id) + .map_err(|_| ExecutorError::InvalidSessionId(session_id.to_string()))?; + + let task = self.load_task(pool, task_id).await?; + let resume_context = self.collect_resume_context(pool, &task, attempt_id).await?; + let comprehensive_prompt = self.build_comprehensive_prompt(&task, &resume_context, prompt); + self.spawn_process(worktree_path, &comprehensive_prompt, attempt_id) + .await + } + + async fn execute_followup_streaming( + &self, + pool: &sqlx::SqlitePool, + task_id: Uuid, + attempt_id: Uuid, + execution_process_id: Uuid, + session_id: &str, + prompt: &str, + worktree_path: &str, + ) -> Result { + tracing::info!( + "Starting Gemini follow-up execution for attempt {} (session {})", + attempt_id, + session_id + ); + + // For Gemini, session_id is the attempt_id - update it in the database + Self::update_session_id(pool, execution_process_id, session_id).await; + + let mut child = self + .spawn_followup(pool, task_id, session_id, prompt, worktree_path) + .await?; + + tracing::info!( + "Gemini follow-up process spawned successfully for attempt {}, PID: {:?}", + attempt_id, + child.inner().id() + ); + + Self::setup_streaming(pool, &mut child, attempt_id, execution_process_id); + + Ok(child) + } + fn normalize_logs( &self, logs: &str, @@ -396,9 +443,7 @@ impl GeminiExecutor { ); } } -} -impl GeminiFollowupExecutor { async fn load_task( &self, pool: &sqlx::SqlitePool, @@ -413,10 +458,11 @@ impl GeminiFollowupExecutor { &self, pool: &sqlx::SqlitePool, task: &Task, + attempt_id: Uuid, ) -> Result { crate::models::task_attempt::TaskAttempt::get_attempt_resume_context( pool, - self.attempt_id, + attempt_id, task.id, task.project_id, ) @@ -428,31 +474,25 @@ impl GeminiFollowupExecutor { &self, task: &Task, resume_context: &crate::models::task_attempt::AttemptResumeContext, + prompt: &str, ) -> String { format!( r#"RESUME CONTEXT FOR CONTINUING TASK - === TASK INFORMATION === Project ID: {} Task ID: {} Task Title: {} Task Description: {} - === EXECUTION HISTORY === The following is the execution history from this task attempt: - {} - === CURRENT CHANGES === The following git diff shows changes made from the base branch to the current state: - ```diff {} ``` - === CURRENT REQUEST === {} - === INSTRUCTIONS === You are continuing work on the above task. The execution history shows what has been done previously, and the git diff shows the current state of all changes. Please continue from where the previous execution left off, taking into account all the context provided above. "#, @@ -472,7 +512,7 @@ You are continuing work on the above task. The execution history shows what has } else { &resume_context.cumulative_diffs }, - self.prompt + prompt ) } @@ -480,10 +520,11 @@ You are continuing work on the above task. The execution history shows what has &self, worktree_path: &str, comprehensive_prompt: &str, + attempt_id: Uuid, ) -> Result { tracing::info!( "Spawning Gemini followup execution for attempt {} with resume context ({} chars)", - self.attempt_id, + attempt_id, comprehensive_prompt.len() ); @@ -493,12 +534,12 @@ You are continuing work on the above task. The execution history shows what has crate::executor::SpawnContext::from_command(&command, "Gemini") .with_context(format!( "Gemini CLI followup execution with context for attempt {}", - self.attempt_id + attempt_id )) .spawn_error(e) })?; - self.send_prompt_to_stdin(&mut child, &command, comprehensive_prompt) + self.send_prompt_to_stdin(&mut child, &command, comprehensive_prompt, attempt_id) .await?; Ok(child) } @@ -508,11 +549,12 @@ You are continuing work on the above task. The execution history shows what has child: &mut AsyncGroupChild, command: &Command, comprehensive_prompt: &str, + attempt_id: Uuid, ) -> Result<(), ExecutorError> { if let Some(mut stdin) = child.inner().stdin.take() { tracing::debug!( "Sending resume context to Gemini for attempt {}: {} characters", - self.attempt_id, + attempt_id, comprehensive_prompt.len() ); @@ -523,7 +565,7 @@ You are continuing work on the above task. The execution history shows what has let context = crate::executor::SpawnContext::from_command(command, "Gemini") .with_context(format!( "Failed to write resume prompt to Gemini CLI stdin for attempt {}", - self.attempt_id + attempt_id )); ExecutorError::spawn_failed(e, context) })?; @@ -532,79 +574,20 @@ You are continuing work on the above task. The execution history shows what has let context = crate::executor::SpawnContext::from_command(command, "Gemini") .with_context(format!( "Failed to close Gemini CLI stdin for attempt {}", - self.attempt_id + attempt_id )); ExecutorError::spawn_failed(e, context) })?; tracing::info!( "Successfully sent resume context to Gemini for attempt {}", - self.attempt_id + attempt_id ); } Ok(()) } -} -#[async_trait] -impl Executor for GeminiFollowupExecutor { - async fn spawn( - &self, - pool: &sqlx::SqlitePool, - task_id: Uuid, - worktree_path: &str, - ) -> Result { - let task = self.load_task(pool, task_id).await?; - let resume_context = self.collect_resume_context(pool, &task).await?; - let comprehensive_prompt = self.build_comprehensive_prompt(&task, &resume_context); - self.spawn_process(worktree_path, &comprehensive_prompt) - .await - } - - async fn execute_streaming( - &self, - pool: &sqlx::SqlitePool, - task_id: Uuid, - attempt_id: Uuid, - execution_process_id: Uuid, - worktree_path: &str, - ) -> Result { - tracing::info!( - "Starting Gemini followup execution for task {} attempt {} with resume context", - task_id, - attempt_id - ); - - // Update ExecutorSession with the session_id immediately - GeminiExecutor::update_session_id(pool, execution_process_id, &self.attempt_id.to_string()) - .await; - - let mut child = self.spawn(pool, task_id, worktree_path).await?; - - tracing::info!( - "Gemini followup process spawned successfully for attempt {}, PID: {:?}", - attempt_id, - child.inner().id() - ); - - GeminiExecutor::setup_streaming(pool, &mut child, attempt_id, execution_process_id); - - Ok(child) - } - - fn normalize_logs( - &self, - logs: &str, - worktree_path: &str, - ) -> Result { - // Reuse the same logic as the main GeminiExecutor - let main_executor = GeminiExecutor; - main_executor.normalize_logs(logs, worktree_path) - } -} - -impl GeminiExecutor { /// Format Gemini CLI output by inserting line breaks where periods are directly /// followed by capital letters (common Gemini CLI formatting issue). /// Handles both intra-chunk and cross-chunk period-to-capital transitions. diff --git a/backend/src/executors/mod.rs b/backend/src/executors/mod.rs index ea1651e5..60b49844 100644 --- a/backend/src/executors/mod.rs +++ b/backend/src/executors/mod.rs @@ -8,12 +8,12 @@ pub mod gemini; pub mod setup_script; pub mod sst_opencode; -pub use amp::{AmpExecutor, AmpFollowupExecutor}; -pub use ccr::{CCRExecutor, CCRFollowupExecutor}; -pub use charm_opencode::{CharmOpencodeExecutor, CharmOpencodeFollowupExecutor}; -pub use claude::{ClaudeExecutor, ClaudeFollowupExecutor}; +pub use amp::AmpExecutor; +pub use ccr::CCRExecutor; +pub use charm_opencode::CharmOpencodeExecutor; +pub use claude::ClaudeExecutor; pub use dev_server::DevServerExecutor; pub use echo::EchoExecutor; -pub use gemini::{GeminiExecutor, GeminiFollowupExecutor}; +pub use gemini::GeminiExecutor; pub use setup_script::SetupScriptExecutor; -pub use sst_opencode::{SstOpencodeExecutor, SstOpencodeFollowupExecutor}; +pub use sst_opencode::SstOpencodeExecutor; diff --git a/backend/src/executors/sst_opencode.rs b/backend/src/executors/sst_opencode.rs index f9af0d7a..f660d9de 100644 --- a/backend/src/executors/sst_opencode.rs +++ b/backend/src/executors/sst_opencode.rs @@ -236,24 +236,6 @@ impl SstOpencodeExecutor { } /// An executor that resumes an SST Opencode session -pub struct SstOpencodeFollowupExecutor { - pub session_id: String, - pub prompt: String, - executor_type: String, - command_base: String, -} - -impl SstOpencodeFollowupExecutor { - /// Create a new SstOpencodeFollowupExecutor with default settings - pub fn new(session_id: String, prompt: String) -> Self { - Self { - session_id, - prompt, - executor_type: "SST Opencode".to_string(), - command_base: "npx -y opencode-ai@latest run --print-logs".to_string(), - } - } -} #[async_trait] impl Executor for SstOpencodeExecutor { @@ -398,84 +380,21 @@ Task title: {}"#, summary: None, }) } -} -#[async_trait] -impl Executor for SstOpencodeFollowupExecutor { - async fn spawn( - &self, - _pool: &sqlx::SqlitePool, - _task_id: Uuid, - worktree_path: &str, - ) -> Result { - // Use shell command for cross-platform compatibility - let (shell_cmd, shell_arg) = get_shell_command(); - let opencode_command = format!("{} --session {}", self.command_base, self.session_id); - - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::null()) // Ignore stdout for OpenCode - .stderr(std::process::Stdio::piped()) - .current_dir(worktree_path) - .arg(shell_arg) - .arg(&opencode_command) - .env("NODE_NO_WARNINGS", "1"); - - let mut child = command - .group_spawn() // Create new process group so we can kill entire tree - .map_err(|e| { - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_context(format!( - "{} CLI followup execution for session {}", - self.executor_type, self.session_id - )) - .spawn_error(e) - })?; - - // Write prompt to stdin safely - if let Some(mut stdin) = child.inner().stdin.take() { - use tokio::io::AsyncWriteExt; - tracing::debug!( - "Writing prompt to {} stdin for session {}: {:?}", - self.executor_type, - self.session_id, - self.prompt - ); - stdin.write_all(self.prompt.as_bytes()).await.map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_context(format!( - "Failed to write prompt to {} CLI stdin for session {}", - self.executor_type, self.session_id - )); - ExecutorError::spawn_failed(e, context) - })?; - stdin.shutdown().await.map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_context(format!( - "Failed to close {} CLI stdin for session {}", - self.executor_type, self.session_id - )); - ExecutorError::spawn_failed(e, context) - })?; - } - - Ok(child) - } - - /// Execute with OpenCode filtering for stderr - async fn execute_streaming( + /// Execute follow-up with OpenCode filtering for stderr + async fn execute_followup_streaming( &self, pool: &sqlx::SqlitePool, task_id: Uuid, attempt_id: Uuid, execution_process_id: Uuid, + session_id: &str, + prompt: &str, worktree_path: &str, ) -> Result { - let mut child = self.spawn(pool, task_id, worktree_path).await?; + let mut child = self + .spawn_followup(pool, task_id, session_id, prompt, worktree_path) + .await?; // Take stderr pipe for OpenCode filtering let stderr = child @@ -498,14 +417,71 @@ impl Executor for SstOpencodeFollowupExecutor { Ok(child) } - fn normalize_logs( + async fn spawn_followup( &self, - logs: &str, + _pool: &sqlx::SqlitePool, + _task_id: Uuid, + session_id: &str, + prompt: &str, worktree_path: &str, - ) -> Result { - // Reuse the same logic as the main SstOpencodeExecutor - let main_executor = SstOpencodeExecutor::new(); - main_executor.normalize_logs(logs, worktree_path) + ) -> Result { + use std::process::Stdio; + + use tokio::io::AsyncWriteExt; + + // Use shell command for cross-platform compatibility + let (shell_cmd, shell_arg) = get_shell_command(); + let opencode_command = format!("{} --session {}", self.command, session_id); + + let mut command = Command::new(shell_cmd); + command + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::null()) // Ignore stdout for OpenCode + .stderr(Stdio::piped()) + .current_dir(worktree_path) + .arg(shell_arg) + .arg(&opencode_command) + .env("NODE_NO_WARNINGS", "1"); + + let mut child = command.group_spawn().map_err(|e| { + crate::executor::SpawnContext::from_command(&command, &self.executor_type) + .with_context(format!( + "{} CLI followup execution for session {}", + self.executor_type, session_id + )) + .spawn_error(e) + })?; + + // Write prompt to stdin safely + if let Some(mut stdin) = child.inner().stdin.take() { + tracing::debug!( + "Writing prompt to {} stdin for session {}: {:?}", + self.executor_type, + session_id, + prompt + ); + stdin.write_all(prompt.as_bytes()).await.map_err(|e| { + let context = + crate::executor::SpawnContext::from_command(&command, &self.executor_type) + .with_context(format!( + "Failed to write prompt to {} CLI stdin for session {}", + self.executor_type, session_id + )); + ExecutorError::spawn_failed(e, context) + })?; + stdin.shutdown().await.map_err(|e| { + let context = + crate::executor::SpawnContext::from_command(&command, &self.executor_type) + .with_context(format!( + "Failed to close {} CLI stdin for session {}", + self.executor_type, session_id + )); + ExecutorError::spawn_failed(e, context) + })?; + } + + Ok(child) } } diff --git a/backend/src/services/process_service.rs b/backend/src/services/process_service.rs index f17fb116..9d84bee0 100644 --- a/backend/src/services/process_service.rs +++ b/backend/src/services/process_service.rs @@ -208,7 +208,10 @@ impl ProcessService { app_state, attempt_id, task_id, - crate::executor::ExecutorType::CodingAgent(executor_config), + crate::executor::ExecutorType::CodingAgent { + config: executor_config, + follow_up: None, + }, "Starting executor".to_string(), ExecutionProcessType::CodingAgent, &task_attempt.worktree_path, @@ -434,10 +437,12 @@ impl ProcessService { "SESSION_FOLLOWUP: Attempting follow-up execution with session ID: {} (attempt: {}, worktree: {})", session_id, attempt_id, worktree_path ); - crate::executor::ExecutorType::FollowUpCodingAgent { + crate::executor::ExecutorType::CodingAgent { config: executor_config.clone(), - session_id: executor_session.session_id.clone(), - prompt: prompt.to_string(), + follow_up: Some(crate::executor::FollowUpInfo { + session_id: session_id.clone(), + prompt: prompt.to_string(), + }), } } else { // No session ID available, start new session @@ -445,7 +450,10 @@ impl ProcessService { "SESSION_FOLLOWUP: No session ID available for follow-up execution on attempt {}, starting new session (worktree: {})", attempt_id, worktree_path ); - crate::executor::ExecutorType::CodingAgent(executor_config.clone()) + crate::executor::ExecutorType::CodingAgent { + config: executor_config.clone(), + follow_up: None, + } }; // Try to start the follow-up execution @@ -472,7 +480,10 @@ impl ProcessService { ); // Create a new session instead of trying to resume - let new_session_executor = crate::executor::ExecutorType::CodingAgent(executor_config); + let new_session_executor = crate::executor::ExecutorType::CodingAgent { + config: executor_config, + follow_up: None, + }; Self::start_process_execution( pool, @@ -522,9 +533,10 @@ impl ProcessService { if matches!(process_type, ExecutionProcessType::CodingAgent) { // Extract follow-up prompt if this is a follow-up execution let followup_prompt = match &executor_type { - crate::executor::ExecutorType::FollowUpCodingAgent { prompt, .. } => { - Some(prompt.clone()) - } + crate::executor::ExecutorType::CodingAgent { + follow_up: Some(ref info), + .. + } => Some(info.prompt.clone()), _ => None, }; Self::create_executor_session_record( @@ -649,14 +661,14 @@ impl ProcessService { Some(serde_json::to_string(&[shell_arg, "dev_server"]).unwrap()), None, // Dev servers don't have an executor type ), - crate::executor::ExecutorType::CodingAgent(config) => { - ("executor".to_string(), None, Some(format!("{}", config))) + crate::executor::ExecutorType::CodingAgent { config, follow_up } => { + let command = if follow_up.is_some() { + "followup_executor".to_string() + } else { + "executor".to_string() + }; + (command, None, Some(format!("{}", config))) } - crate::executor::ExecutorType::FollowUpCodingAgent { config, .. } => ( - "followup_executor".to_string(), - None, - Some(format!("{}", config)), - ), }; let create_process = CreateExecutionProcess { @@ -732,97 +744,26 @@ impl ProcessService { .execute_streaming(pool, task_id, attempt_id, process_id, worktree_path) .await } - crate::executor::ExecutorType::CodingAgent(config) => { + crate::executor::ExecutorType::CodingAgent { config, follow_up } => { let executor = config.create_executor(); - executor - .execute_streaming(pool, task_id, attempt_id, process_id, worktree_path) - .await - } - crate::executor::ExecutorType::FollowUpCodingAgent { - config, - session_id, - prompt, - } => { - use crate::executors::{ - AmpFollowupExecutor, CCRFollowupExecutor, CharmOpencodeFollowupExecutor, - ClaudeFollowupExecutor, GeminiFollowupExecutor, SstOpencodeFollowupExecutor, - }; - let executor: Box = match config { - crate::executor::ExecutorConfig::Claude => { - if let Some(sid) = session_id { - Box::new(ClaudeFollowupExecutor::new(sid.clone(), prompt.clone())) - } else { - return Err(TaskAttemptError::TaskNotFound); // No session ID for followup - } - } - crate::executor::ExecutorConfig::ClaudePlan => { - if let Some(sid) = session_id { - Box::new(ClaudeFollowupExecutor::new_plan_mode( - sid.clone(), - prompt.clone(), - )) - } else { - return Err(TaskAttemptError::TaskNotFound); // No session ID for followup - } - } - crate::executor::ExecutorConfig::Amp => { - if let Some(tid) = session_id { - Box::new(AmpFollowupExecutor { - thread_id: tid.clone(), - prompt: prompt.clone(), - }) - } else { - return Err(TaskAttemptError::TaskNotFound); // No thread ID for followup - } - } - crate::executor::ExecutorConfig::Gemini => { - // For Gemini, we don't use real session IDs, we pass the context directly - Box::new(GeminiFollowupExecutor { + if let Some(ref follow_up_info) = follow_up { + executor + .execute_followup_streaming( + pool, + task_id, attempt_id, - prompt: prompt.clone(), - }) - } - crate::executor::ExecutorConfig::Echo => { - // Echo doesn't support followup, use regular echo - config.create_executor() - } - crate::executor::ExecutorConfig::CharmOpencode => { - if let Some(sid) = session_id { - Box::new(CharmOpencodeFollowupExecutor { - session_id: sid.clone(), - prompt: prompt.clone(), - }) - } else { - return Err(TaskAttemptError::TaskNotFound); // No session ID for followup - } - } - crate::executor::ExecutorConfig::ClaudeCodeRouter => { - if let Some(sid) = session_id { - Box::new(CCRFollowupExecutor::new(sid.clone(), prompt.clone())) - } else { - return Err(TaskAttemptError::TaskNotFound); // No session ID for followup - } - } - crate::executor::ExecutorConfig::SstOpencode => { - if let Some(sid) = session_id { - Box::new(SstOpencodeFollowupExecutor::new( - sid.clone(), - prompt.clone(), - )) - } else { - return Err(TaskAttemptError::TaskNotFound); // No session ID for followup - } - } - crate::executor::ExecutorConfig::SetupScript { .. } => { - // Setup scripts don't support followup, use regular setup script - config.create_executor() - } - }; - - executor - .execute_streaming(pool, task_id, attempt_id, process_id, worktree_path) - .await + process_id, + &follow_up_info.session_id, + &follow_up_info.prompt, + worktree_path, + ) + .await + } else { + executor + .execute_streaming(pool, task_id, attempt_id, process_id, worktree_path) + .await + } } };