refactor: consolidate Executor and FollowUpExecutor structs (#260)

* consolidate Executor and FollowUpExecutor structs

* fix opencode
This commit is contained in:
Gabriel Gordon-Hall
2025-07-19 15:00:02 +01:00
committed by GitHub
parent 4ecce29287
commit 975fc901e1
9 changed files with 429 additions and 579 deletions

View File

@@ -106,6 +106,19 @@ impl SpawnContext {
self.additional_context = Some(context.into());
self
}
/// Create SpawnContext from Command, then use builder methods for additional context
pub fn from_command(
command: &tokio::process::Command,
executor_type: impl Into<String>,
) -> Self {
Self::from(command).with_executor_type(executor_type)
}
/// Finalize the context and create an ExecutorError
pub fn spawn_error(self, error: std::io::Error) -> ExecutorError {
ExecutorError::spawn_failed(error, self)
}
}
/// Extract SpawnContext from a tokio::process::Command
@@ -147,6 +160,8 @@ pub enum ExecutorError {
DatabaseError(sqlx::Error),
ContextCollectionFailed(String),
GitError(String),
InvalidSessionId(String),
FollowUpNotSupported,
}
impl std::fmt::Display for ExecutorError {
@@ -185,6 +200,10 @@ impl std::fmt::Display for ExecutorError {
write!(f, "Context collection failed: {}", msg)
}
ExecutorError::GitError(msg) => write!(f, "Git operation error: {}", msg),
ExecutorError::InvalidSessionId(msg) => write!(f, "Invalid session_id: {}", msg),
ExecutorError::FollowUpNotSupported => {
write!(f, "This executor does not support follow-up sessions")
}
}
}
}
@@ -235,23 +254,7 @@ impl ExecutorError {
}
}
/// Helper to create SpawnContext from Command with builder pattern
impl SpawnContext {
/// Create SpawnContext from Command, then use builder methods for additional context
pub fn from_command(
command: &tokio::process::Command,
executor_type: impl Into<String>,
) -> Self {
Self::from(command).with_executor_type(executor_type)
}
/// Finalize the context and create an ExecutorError
pub fn spawn_error(self, error: std::io::Error) -> ExecutorError {
ExecutorError::spawn_failed(error, self)
}
}
/// Trait for defining CLI commands that can be executed for task attempts
/// Trait for coding agents that can execute tasks, normalize logs, and support follow-up sessions
#[async_trait]
pub trait Executor: Send + Sync {
/// Spawn the command for a given task attempt
@@ -262,6 +265,22 @@ pub trait Executor: Send + Sync {
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError>;
/// Spawn a follow-up session for executors that support it
///
/// This method is used to continue an existing session with a new prompt.
/// Not all executors support follow-up sessions, so the default implementation
/// returns an error.
async fn spawn_followup(
&self,
_pool: &sqlx::SqlitePool,
_task_id: Uuid,
_session_id: &str,
_prompt: &str,
_worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError> {
Err(ExecutorError::FollowUpNotSupported)
}
/// Normalize executor logs into a standard format
fn normalize_logs(
&self,
@@ -278,22 +297,14 @@ pub trait Executor: Send + Sync {
})
}
// Note: Fast-path streaming is now handled by the Gemini WAL system.
// The Gemini executor uses its own push_patch() method to emit patches,
// which are automatically served via SSE endpoints with resumable streaming.
/// Execute the command and stream output to database in real-time
async fn execute_streaming(
#[allow(clippy::result_large_err)]
fn setup_streaming(
&self,
child: &mut command_group::AsyncGroupChild,
pool: &sqlx::SqlitePool,
task_id: Uuid,
attempt_id: Uuid,
execution_process_id: Uuid,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError> {
let mut child = self.spawn(pool, task_id, worktree_path).await?;
// Take stdout and stderr pipes for streaming
) -> Result<(), ExecutorError> {
let stdout = child
.inner()
.stdout
@@ -305,7 +316,6 @@ pub trait Executor: Send + Sync {
.take()
.expect("Failed to take stderr from child process");
// Start streaming tasks
let pool_clone1 = pool.clone();
let pool_clone2 = pool.clone();
@@ -324,6 +334,39 @@ pub trait Executor: Send + Sync {
false,
));
Ok(())
}
/// Execute the command and stream output to database in real-time
async fn execute_streaming(
&self,
pool: &sqlx::SqlitePool,
task_id: Uuid,
attempt_id: Uuid,
execution_process_id: Uuid,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError> {
let mut child = self.spawn(pool, task_id, worktree_path).await?;
Self::setup_streaming(self, &mut child, pool, attempt_id, execution_process_id)?;
Ok(child)
}
/// Execute a follow-up command and stream output to database in real-time
#[allow(clippy::too_many_arguments)]
async fn execute_followup_streaming(
&self,
pool: &sqlx::SqlitePool,
task_id: Uuid,
attempt_id: Uuid,
execution_process_id: Uuid,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError> {
let mut child = self
.spawn_followup(pool, task_id, session_id, prompt, worktree_path)
.await?;
Self::setup_streaming(self, &mut child, pool, attempt_id, execution_process_id)?;
Ok(child)
}
}
@@ -333,14 +376,19 @@ pub trait Executor: Send + Sync {
pub enum ExecutorType {
SetupScript(String),
DevServer(String),
CodingAgent(ExecutorConfig),
FollowUpCodingAgent {
CodingAgent {
config: ExecutorConfig,
session_id: Option<String>,
prompt: String,
follow_up: Option<FollowUpInfo>,
},
}
/// Information needed to continue a previous session
#[derive(Debug, Clone)]
pub struct FollowUpInfo {
pub session_id: String,
pub prompt: String,
}
/// Configuration for different executor types
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[serde(tag = "type", rename_all = "kebab-case")]
@@ -360,9 +408,6 @@ pub enum ExecutorConfig {
CharmOpencode,
#[serde(alias = "opencode")]
SstOpencode,
// Future executors can be added here
// Shell { command: String },
// Docker { image: String, command: String },
}
// Constants for frontend
@@ -558,7 +603,6 @@ async fn stream_stdout_to_db(
session_id_parsed = true;
}
}
accumulated_output.push_str(&line);
update_counter += 1;

View File

@@ -17,12 +17,6 @@ use crate::{
/// An executor that uses Amp to process tasks
pub struct AmpExecutor;
/// An executor that continues an Amp thread
pub struct AmpFollowupExecutor {
pub thread_id: String,
pub prompt: String,
}
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)]
#[serde(tag = "type")]
pub enum AmpJson {
@@ -259,6 +253,67 @@ Task title: {}"#,
Ok(child)
}
async fn spawn_followup(
&self,
_pool: &sqlx::SqlitePool,
_task_id: Uuid,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
use std::process::Stdio;
use tokio::{io::AsyncWriteExt, process::Command};
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();
let amp_command = format!(
"npx @sourcegraph/amp@0.0.1752148945-gd8844f threads continue {} --format=jsonl",
session_id
);
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(worktree_path)
.arg(shell_arg)
.arg(&amp_command);
let mut child = command.group_spawn().map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "Amp")
.with_context(format!(
"Amp CLI followup execution for thread {}",
session_id
))
.spawn_error(e)
})?;
// Feed the prompt in, then close the pipe so amp sees EOF
if let Some(mut stdin) = child.inner().stdin.take() {
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "Amp")
.with_context(format!(
"Failed to write prompt to Amp CLI stdin for thread {}",
session_id
))
.spawn_error(e)
})?;
stdin.shutdown().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "Amp")
.with_context(format!(
"Failed to close Amp CLI stdin for thread {}",
session_id
))
.spawn_error(e)
})?;
}
Ok(child)
}
fn normalize_logs(
&self,
logs: &str,
@@ -590,66 +645,6 @@ impl AmpExecutor {
}
}
#[async_trait]
impl Executor for AmpFollowupExecutor {
async fn spawn(
&self,
_pool: &sqlx::SqlitePool,
_task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
use std::process::Stdio;
use tokio::{io::AsyncWriteExt, process::Command};
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();
let amp_command = format!(
"npx @sourcegraph/amp@0.0.1752148945-gd8844f threads continue {} --format=jsonl",
self.thread_id
);
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(Stdio::piped()) // <-- open a pipe
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(worktree_path)
.arg(shell_arg)
.arg(&amp_command);
let mut child = command
.group_spawn() // Create new process group so we can kill entire tree
.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "Amp")
.with_context(format!(
"Amp CLI followup execution for thread {}",
self.thread_id
))
.spawn_error(e)
})?;
// feed the prompt in, then close the pipe so `amp` sees EOF
if let Some(mut stdin) = child.inner().stdin.take() {
stdin.write_all(self.prompt.as_bytes()).await.unwrap();
stdin.shutdown().await.unwrap(); // or `drop(stdin);`
}
Ok(child)
}
fn normalize_logs(
&self,
logs: &str,
worktree_path: &str,
) -> Result<NormalizedConversation, String> {
// Reuse the same logic as the main AmpExecutor
let main_executor = AmpExecutor;
main_executor.normalize_logs(logs, worktree_path)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -4,7 +4,7 @@ use uuid::Uuid;
use crate::{
executor::{Executor, ExecutorError, NormalizedConversation},
executors::{ClaudeExecutor, ClaudeFollowupExecutor},
executors::ClaudeExecutor,
};
/// An executor that uses Claude Code Router (CCR) to process tasks
@@ -37,6 +37,19 @@ impl Executor for CCRExecutor {
self.0.spawn(pool, task_id, worktree_path).await
}
async fn spawn_followup(
&self,
pool: &sqlx::SqlitePool,
task_id: Uuid,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
self.0
.spawn_followup(pool, task_id, session_id, prompt, worktree_path)
.await
}
fn normalize_logs(
&self,
logs: &str,
@@ -76,40 +89,3 @@ fn filter_ccr_service_messages(logs: &str) -> String {
.collect::<Vec<&str>>()
.join("\n")
}
/// Claude Code Router followup executor - forwards to ClaudeFollowupExecutor with Claude Code Router command
pub struct CCRFollowupExecutor(ClaudeFollowupExecutor);
impl CCRFollowupExecutor {
pub fn new(session_id: String, prompt: String) -> Self {
Self(ClaudeFollowupExecutor::with_command(
session_id,
prompt,
"claude-code-router".to_string(),
"npx -y @musistudio/claude-code-router code -p --dangerously-skip-permissions --verbose --output-format=stream-json".to_string(),
))
}
}
#[async_trait]
impl Executor for CCRFollowupExecutor {
async fn spawn(
&self,
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
self.0.spawn(pool, task_id, worktree_path).await
}
fn normalize_logs(
&self,
logs: &str,
worktree_path: &str,
) -> Result<NormalizedConversation, String> {
let filtered_logs = filter_ccr_service_messages(logs);
let mut result = self.0.normalize_logs(&filtered_logs, worktree_path)?;
result.executor_type = "claude-code-router".to_string();
Ok(result)
}
}

View File

@@ -11,12 +11,6 @@ use crate::{
/// An executor that uses OpenCode to process tasks
pub struct CharmOpencodeExecutor;
/// An executor that continues an OpenCode thread
pub struct CharmOpencodeFollowupExecutor {
pub session_id: String,
pub prompt: String,
}
#[async_trait]
impl Executor for CharmOpencodeExecutor {
async fn spawn(
@@ -78,25 +72,25 @@ Task title: {}"#,
Ok(child)
}
}
#[async_trait]
impl Executor for CharmOpencodeFollowupExecutor {
async fn spawn(
async fn spawn_followup(
&self,
_pool: &sqlx::SqlitePool,
_task_id: Uuid,
_session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
use std::process::Stdio;
use tokio::process::Command;
// Use shell command for cross-platform compatibility
// CharmOpencode doesn't support session-based followup, so we ignore session_id
// and just run with the new prompt
let (shell_cmd, shell_arg) = get_shell_command();
let opencode_command = format!(
"opencode -p \"{}\" --output-format=json",
self.prompt.replace('"', "\\\"")
prompt.replace('"', "\\\"")
);
let mut command = Command::new(shell_cmd);
@@ -108,16 +102,11 @@ impl Executor for CharmOpencodeFollowupExecutor {
.arg(shell_arg)
.arg(&opencode_command);
let child = command
.group_spawn() // Create new process group so we can kill entire tree
.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "CharmOpenCode")
.with_context(format!(
"CharmOpenCode CLI followup execution for session {}",
self.session_id
))
.spawn_error(e)
})?;
let child = command.group_spawn().map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "CharmOpenCode")
.with_context("CharmOpenCode CLI followup execution")
.spawn_error(e)
})?;
Ok(child)
}

View File

@@ -78,57 +78,6 @@ impl ClaudeExecutor {
}
}
/// An executor that resumes a Claude session
pub struct ClaudeFollowupExecutor {
pub session_id: String,
pub prompt: String,
executor_type: String,
command_base: String,
}
impl ClaudeFollowupExecutor {
/// Create a new ClaudeFollowupExecutor with default settings
pub fn new(session_id: String, prompt: String) -> Self {
Self {
session_id,
prompt,
executor_type: "Claude".to_string(),
command_base: "npx -y @anthropic-ai/claude-code@latest -p --dangerously-skip-permissions --verbose --output-format=stream-json".to_string(),
}
}
pub fn new_plan_mode(session_id: String, prompt: String) -> Self {
let command = format!(
"npx -y @anthropic-ai/claude-code@latest -p --permission-mode=plan --verbose --output-format=stream-json --resume={}",
session_id
);
let script = create_watchkill_script(&command);
Self {
session_id,
prompt,
executor_type: "ClaudePlan".to_string(),
command_base: script,
}
}
/// Create a new ClaudeFollowupExecutor with custom settings
pub fn with_command(
session_id: String,
prompt: String,
executor_type: String,
command_base: String,
) -> Self {
Self {
session_id,
prompt,
executor_type,
command_base,
}
}
}
#[async_trait]
impl Executor for ClaudeExecutor {
async fn spawn(
@@ -214,6 +163,80 @@ Task title: {}"#,
Ok(child)
}
async fn spawn_followup(
&self,
_pool: &sqlx::SqlitePool,
_task_id: Uuid,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();
// Determine the command based on whether this is plan mode or not
let claude_command = if self.executor_type == "ClaudePlan" {
let command = format!(
"npx -y @anthropic-ai/claude-code@latest -p --permission-mode=plan --verbose --output-format=stream-json --resume={}",
session_id
);
create_watchkill_script(&command)
} else {
format!("{} --resume={}", self.command, session_id)
};
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.current_dir(worktree_path)
.arg(shell_arg)
.arg(&claude_command)
.env("NODE_NO_WARNINGS", "1");
let mut child = command.group_spawn().map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"{} CLI followup execution for session {}",
self.executor_type, session_id
))
.spawn_error(e)
})?;
// Write prompt to stdin safely
if let Some(mut stdin) = child.inner().stdin.take() {
use tokio::io::AsyncWriteExt;
tracing::debug!(
"Writing prompt to {} stdin for session {}: {:?}",
self.executor_type,
session_id,
prompt
);
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to write prompt to {} CLI stdin for session {}",
self.executor_type, session_id
));
ExecutorError::spawn_failed(e, context)
})?;
stdin.shutdown().await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to close {} CLI stdin for session {}",
self.executor_type, session_id
));
ExecutorError::spawn_failed(e, context)
})?;
}
Ok(child)
}
fn normalize_logs(
&self,
logs: &str,
@@ -667,83 +690,6 @@ impl ClaudeExecutor {
}
}
#[async_trait]
impl Executor for ClaudeFollowupExecutor {
async fn spawn(
&self,
_pool: &sqlx::SqlitePool,
_task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();
// Pass prompt via stdin instead of command line to avoid shell escaping issues
let claude_command = format!("{} --resume={}", self.command_base, self.session_id);
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.current_dir(worktree_path)
.arg(shell_arg)
.arg(&claude_command);
let mut child = command
.group_spawn() // Create new process group so we can kill entire tree
.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"{} CLI followup execution for session {}",
self.executor_type, self.session_id
))
.spawn_error(e)
})?;
// Write prompt to stdin safely
if let Some(mut stdin) = child.inner().stdin.take() {
use tokio::io::AsyncWriteExt;
tracing::debug!(
"Writing prompt to {} stdin for session {}: {:?}",
self.executor_type,
self.session_id,
self.prompt
);
stdin.write_all(self.prompt.as_bytes()).await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to write prompt to {} CLI stdin for session {}",
self.executor_type, self.session_id
));
ExecutorError::spawn_failed(e, context)
})?;
stdin.shutdown().await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to close {} CLI stdin for session {}",
self.executor_type, self.session_id
));
ExecutorError::spawn_failed(e, context)
})?;
}
Ok(child)
}
fn normalize_logs(
&self,
logs: &str,
worktree_path: &str,
) -> Result<NormalizedConversation, String> {
// Reuse the same logic as the main ClaudeExecutor
let main_executor = ClaudeExecutor::new();
main_executor.normalize_logs(logs, worktree_path)
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -30,12 +30,6 @@ use crate::{
/// An executor that uses Gemini CLI to process tasks
pub struct GeminiExecutor;
/// An executor that continues a Gemini task with context from previous execution
pub struct GeminiFollowupExecutor {
pub attempt_id: Uuid,
pub prompt: String,
}
#[async_trait]
impl Executor for GeminiExecutor {
async fn spawn(
@@ -134,6 +128,59 @@ Task title: {}"#,
Ok(child)
}
async fn spawn_followup(
&self,
pool: &sqlx::SqlitePool,
task_id: Uuid,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
// For Gemini, session_id is the attempt_id
let attempt_id = Uuid::parse_str(session_id)
.map_err(|_| ExecutorError::InvalidSessionId(session_id.to_string()))?;
let task = self.load_task(pool, task_id).await?;
let resume_context = self.collect_resume_context(pool, &task, attempt_id).await?;
let comprehensive_prompt = self.build_comprehensive_prompt(&task, &resume_context, prompt);
self.spawn_process(worktree_path, &comprehensive_prompt, attempt_id)
.await
}
async fn execute_followup_streaming(
&self,
pool: &sqlx::SqlitePool,
task_id: Uuid,
attempt_id: Uuid,
execution_process_id: Uuid,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
tracing::info!(
"Starting Gemini follow-up execution for attempt {} (session {})",
attempt_id,
session_id
);
// For Gemini, session_id is the attempt_id - update it in the database
Self::update_session_id(pool, execution_process_id, session_id).await;
let mut child = self
.spawn_followup(pool, task_id, session_id, prompt, worktree_path)
.await?;
tracing::info!(
"Gemini follow-up process spawned successfully for attempt {}, PID: {:?}",
attempt_id,
child.inner().id()
);
Self::setup_streaming(pool, &mut child, attempt_id, execution_process_id);
Ok(child)
}
fn normalize_logs(
&self,
logs: &str,
@@ -396,9 +443,7 @@ impl GeminiExecutor {
);
}
}
}
impl GeminiFollowupExecutor {
async fn load_task(
&self,
pool: &sqlx::SqlitePool,
@@ -413,10 +458,11 @@ impl GeminiFollowupExecutor {
&self,
pool: &sqlx::SqlitePool,
task: &Task,
attempt_id: Uuid,
) -> Result<crate::models::task_attempt::AttemptResumeContext, ExecutorError> {
crate::models::task_attempt::TaskAttempt::get_attempt_resume_context(
pool,
self.attempt_id,
attempt_id,
task.id,
task.project_id,
)
@@ -428,31 +474,25 @@ impl GeminiFollowupExecutor {
&self,
task: &Task,
resume_context: &crate::models::task_attempt::AttemptResumeContext,
prompt: &str,
) -> String {
format!(
r#"RESUME CONTEXT FOR CONTINUING TASK
=== TASK INFORMATION ===
Project ID: {}
Task ID: {}
Task Title: {}
Task Description: {}
=== EXECUTION HISTORY ===
The following is the execution history from this task attempt:
{}
=== CURRENT CHANGES ===
The following git diff shows changes made from the base branch to the current state:
```diff
{}
```
=== CURRENT REQUEST ===
{}
=== INSTRUCTIONS ===
You are continuing work on the above task. The execution history shows what has been done previously, and the git diff shows the current state of all changes. Please continue from where the previous execution left off, taking into account all the context provided above.
"#,
@@ -472,7 +512,7 @@ You are continuing work on the above task. The execution history shows what has
} else {
&resume_context.cumulative_diffs
},
self.prompt
prompt
)
}
@@ -480,10 +520,11 @@ You are continuing work on the above task. The execution history shows what has
&self,
worktree_path: &str,
comprehensive_prompt: &str,
attempt_id: Uuid,
) -> Result<AsyncGroupChild, ExecutorError> {
tracing::info!(
"Spawning Gemini followup execution for attempt {} with resume context ({} chars)",
self.attempt_id,
attempt_id,
comprehensive_prompt.len()
);
@@ -493,12 +534,12 @@ You are continuing work on the above task. The execution history shows what has
crate::executor::SpawnContext::from_command(&command, "Gemini")
.with_context(format!(
"Gemini CLI followup execution with context for attempt {}",
self.attempt_id
attempt_id
))
.spawn_error(e)
})?;
self.send_prompt_to_stdin(&mut child, &command, comprehensive_prompt)
self.send_prompt_to_stdin(&mut child, &command, comprehensive_prompt, attempt_id)
.await?;
Ok(child)
}
@@ -508,11 +549,12 @@ You are continuing work on the above task. The execution history shows what has
child: &mut AsyncGroupChild,
command: &Command,
comprehensive_prompt: &str,
attempt_id: Uuid,
) -> Result<(), ExecutorError> {
if let Some(mut stdin) = child.inner().stdin.take() {
tracing::debug!(
"Sending resume context to Gemini for attempt {}: {} characters",
self.attempt_id,
attempt_id,
comprehensive_prompt.len()
);
@@ -523,7 +565,7 @@ You are continuing work on the above task. The execution history shows what has
let context = crate::executor::SpawnContext::from_command(command, "Gemini")
.with_context(format!(
"Failed to write resume prompt to Gemini CLI stdin for attempt {}",
self.attempt_id
attempt_id
));
ExecutorError::spawn_failed(e, context)
})?;
@@ -532,79 +574,20 @@ You are continuing work on the above task. The execution history shows what has
let context = crate::executor::SpawnContext::from_command(command, "Gemini")
.with_context(format!(
"Failed to close Gemini CLI stdin for attempt {}",
self.attempt_id
attempt_id
));
ExecutorError::spawn_failed(e, context)
})?;
tracing::info!(
"Successfully sent resume context to Gemini for attempt {}",
self.attempt_id
attempt_id
);
}
Ok(())
}
}
#[async_trait]
impl Executor for GeminiFollowupExecutor {
async fn spawn(
&self,
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
let task = self.load_task(pool, task_id).await?;
let resume_context = self.collect_resume_context(pool, &task).await?;
let comprehensive_prompt = self.build_comprehensive_prompt(&task, &resume_context);
self.spawn_process(worktree_path, &comprehensive_prompt)
.await
}
async fn execute_streaming(
&self,
pool: &sqlx::SqlitePool,
task_id: Uuid,
attempt_id: Uuid,
execution_process_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
tracing::info!(
"Starting Gemini followup execution for task {} attempt {} with resume context",
task_id,
attempt_id
);
// Update ExecutorSession with the session_id immediately
GeminiExecutor::update_session_id(pool, execution_process_id, &self.attempt_id.to_string())
.await;
let mut child = self.spawn(pool, task_id, worktree_path).await?;
tracing::info!(
"Gemini followup process spawned successfully for attempt {}, PID: {:?}",
attempt_id,
child.inner().id()
);
GeminiExecutor::setup_streaming(pool, &mut child, attempt_id, execution_process_id);
Ok(child)
}
fn normalize_logs(
&self,
logs: &str,
worktree_path: &str,
) -> Result<NormalizedConversation, String> {
// Reuse the same logic as the main GeminiExecutor
let main_executor = GeminiExecutor;
main_executor.normalize_logs(logs, worktree_path)
}
}
impl GeminiExecutor {
/// Format Gemini CLI output by inserting line breaks where periods are directly
/// followed by capital letters (common Gemini CLI formatting issue).
/// Handles both intra-chunk and cross-chunk period-to-capital transitions.

View File

@@ -8,12 +8,12 @@ pub mod gemini;
pub mod setup_script;
pub mod sst_opencode;
pub use amp::{AmpExecutor, AmpFollowupExecutor};
pub use ccr::{CCRExecutor, CCRFollowupExecutor};
pub use charm_opencode::{CharmOpencodeExecutor, CharmOpencodeFollowupExecutor};
pub use claude::{ClaudeExecutor, ClaudeFollowupExecutor};
pub use amp::AmpExecutor;
pub use ccr::CCRExecutor;
pub use charm_opencode::CharmOpencodeExecutor;
pub use claude::ClaudeExecutor;
pub use dev_server::DevServerExecutor;
pub use echo::EchoExecutor;
pub use gemini::{GeminiExecutor, GeminiFollowupExecutor};
pub use gemini::GeminiExecutor;
pub use setup_script::SetupScriptExecutor;
pub use sst_opencode::{SstOpencodeExecutor, SstOpencodeFollowupExecutor};
pub use sst_opencode::SstOpencodeExecutor;

View File

@@ -236,24 +236,6 @@ impl SstOpencodeExecutor {
}
/// An executor that resumes an SST Opencode session
pub struct SstOpencodeFollowupExecutor {
pub session_id: String,
pub prompt: String,
executor_type: String,
command_base: String,
}
impl SstOpencodeFollowupExecutor {
/// Create a new SstOpencodeFollowupExecutor with default settings
pub fn new(session_id: String, prompt: String) -> Self {
Self {
session_id,
prompt,
executor_type: "SST Opencode".to_string(),
command_base: "npx -y opencode-ai@latest run --print-logs".to_string(),
}
}
}
#[async_trait]
impl Executor for SstOpencodeExecutor {
@@ -398,84 +380,21 @@ Task title: {}"#,
summary: None,
})
}
}
#[async_trait]
impl Executor for SstOpencodeFollowupExecutor {
async fn spawn(
&self,
_pool: &sqlx::SqlitePool,
_task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();
let opencode_command = format!("{} --session {}", self.command_base, self.session_id);
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::null()) // Ignore stdout for OpenCode
.stderr(std::process::Stdio::piped())
.current_dir(worktree_path)
.arg(shell_arg)
.arg(&opencode_command)
.env("NODE_NO_WARNINGS", "1");
let mut child = command
.group_spawn() // Create new process group so we can kill entire tree
.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"{} CLI followup execution for session {}",
self.executor_type, self.session_id
))
.spawn_error(e)
})?;
// Write prompt to stdin safely
if let Some(mut stdin) = child.inner().stdin.take() {
use tokio::io::AsyncWriteExt;
tracing::debug!(
"Writing prompt to {} stdin for session {}: {:?}",
self.executor_type,
self.session_id,
self.prompt
);
stdin.write_all(self.prompt.as_bytes()).await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to write prompt to {} CLI stdin for session {}",
self.executor_type, self.session_id
));
ExecutorError::spawn_failed(e, context)
})?;
stdin.shutdown().await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to close {} CLI stdin for session {}",
self.executor_type, self.session_id
));
ExecutorError::spawn_failed(e, context)
})?;
}
Ok(child)
}
/// Execute with OpenCode filtering for stderr
async fn execute_streaming(
/// Execute follow-up with OpenCode filtering for stderr
async fn execute_followup_streaming(
&self,
pool: &sqlx::SqlitePool,
task_id: Uuid,
attempt_id: Uuid,
execution_process_id: Uuid,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError> {
let mut child = self.spawn(pool, task_id, worktree_path).await?;
let mut child = self
.spawn_followup(pool, task_id, session_id, prompt, worktree_path)
.await?;
// Take stderr pipe for OpenCode filtering
let stderr = child
@@ -498,14 +417,71 @@ impl Executor for SstOpencodeFollowupExecutor {
Ok(child)
}
fn normalize_logs(
async fn spawn_followup(
&self,
logs: &str,
_pool: &sqlx::SqlitePool,
_task_id: Uuid,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<NormalizedConversation, String> {
// Reuse the same logic as the main SstOpencodeExecutor
let main_executor = SstOpencodeExecutor::new();
main_executor.normalize_logs(logs, worktree_path)
) -> Result<AsyncGroupChild, ExecutorError> {
use std::process::Stdio;
use tokio::io::AsyncWriteExt;
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();
let opencode_command = format!("{} --session {}", self.command, session_id);
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::null()) // Ignore stdout for OpenCode
.stderr(Stdio::piped())
.current_dir(worktree_path)
.arg(shell_arg)
.arg(&opencode_command)
.env("NODE_NO_WARNINGS", "1");
let mut child = command.group_spawn().map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"{} CLI followup execution for session {}",
self.executor_type, session_id
))
.spawn_error(e)
})?;
// Write prompt to stdin safely
if let Some(mut stdin) = child.inner().stdin.take() {
tracing::debug!(
"Writing prompt to {} stdin for session {}: {:?}",
self.executor_type,
session_id,
prompt
);
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to write prompt to {} CLI stdin for session {}",
self.executor_type, session_id
));
ExecutorError::spawn_failed(e, context)
})?;
stdin.shutdown().await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to close {} CLI stdin for session {}",
self.executor_type, session_id
));
ExecutorError::spawn_failed(e, context)
})?;
}
Ok(child)
}
}

View File

@@ -208,7 +208,10 @@ impl ProcessService {
app_state,
attempt_id,
task_id,
crate::executor::ExecutorType::CodingAgent(executor_config),
crate::executor::ExecutorType::CodingAgent {
config: executor_config,
follow_up: None,
},
"Starting executor".to_string(),
ExecutionProcessType::CodingAgent,
&task_attempt.worktree_path,
@@ -434,10 +437,12 @@ impl ProcessService {
"SESSION_FOLLOWUP: Attempting follow-up execution with session ID: {} (attempt: {}, worktree: {})",
session_id, attempt_id, worktree_path
);
crate::executor::ExecutorType::FollowUpCodingAgent {
crate::executor::ExecutorType::CodingAgent {
config: executor_config.clone(),
session_id: executor_session.session_id.clone(),
prompt: prompt.to_string(),
follow_up: Some(crate::executor::FollowUpInfo {
session_id: session_id.clone(),
prompt: prompt.to_string(),
}),
}
} else {
// No session ID available, start new session
@@ -445,7 +450,10 @@ impl ProcessService {
"SESSION_FOLLOWUP: No session ID available for follow-up execution on attempt {}, starting new session (worktree: {})",
attempt_id, worktree_path
);
crate::executor::ExecutorType::CodingAgent(executor_config.clone())
crate::executor::ExecutorType::CodingAgent {
config: executor_config.clone(),
follow_up: None,
}
};
// Try to start the follow-up execution
@@ -472,7 +480,10 @@ impl ProcessService {
);
// Create a new session instead of trying to resume
let new_session_executor = crate::executor::ExecutorType::CodingAgent(executor_config);
let new_session_executor = crate::executor::ExecutorType::CodingAgent {
config: executor_config,
follow_up: None,
};
Self::start_process_execution(
pool,
@@ -522,9 +533,10 @@ impl ProcessService {
if matches!(process_type, ExecutionProcessType::CodingAgent) {
// Extract follow-up prompt if this is a follow-up execution
let followup_prompt = match &executor_type {
crate::executor::ExecutorType::FollowUpCodingAgent { prompt, .. } => {
Some(prompt.clone())
}
crate::executor::ExecutorType::CodingAgent {
follow_up: Some(ref info),
..
} => Some(info.prompt.clone()),
_ => None,
};
Self::create_executor_session_record(
@@ -649,14 +661,14 @@ impl ProcessService {
Some(serde_json::to_string(&[shell_arg, "dev_server"]).unwrap()),
None, // Dev servers don't have an executor type
),
crate::executor::ExecutorType::CodingAgent(config) => {
("executor".to_string(), None, Some(format!("{}", config)))
crate::executor::ExecutorType::CodingAgent { config, follow_up } => {
let command = if follow_up.is_some() {
"followup_executor".to_string()
} else {
"executor".to_string()
};
(command, None, Some(format!("{}", config)))
}
crate::executor::ExecutorType::FollowUpCodingAgent { config, .. } => (
"followup_executor".to_string(),
None,
Some(format!("{}", config)),
),
};
let create_process = CreateExecutionProcess {
@@ -732,97 +744,26 @@ impl ProcessService {
.execute_streaming(pool, task_id, attempt_id, process_id, worktree_path)
.await
}
crate::executor::ExecutorType::CodingAgent(config) => {
crate::executor::ExecutorType::CodingAgent { config, follow_up } => {
let executor = config.create_executor();
executor
.execute_streaming(pool, task_id, attempt_id, process_id, worktree_path)
.await
}
crate::executor::ExecutorType::FollowUpCodingAgent {
config,
session_id,
prompt,
} => {
use crate::executors::{
AmpFollowupExecutor, CCRFollowupExecutor, CharmOpencodeFollowupExecutor,
ClaudeFollowupExecutor, GeminiFollowupExecutor, SstOpencodeFollowupExecutor,
};
let executor: Box<dyn crate::executor::Executor> = match config {
crate::executor::ExecutorConfig::Claude => {
if let Some(sid) = session_id {
Box::new(ClaudeFollowupExecutor::new(sid.clone(), prompt.clone()))
} else {
return Err(TaskAttemptError::TaskNotFound); // No session ID for followup
}
}
crate::executor::ExecutorConfig::ClaudePlan => {
if let Some(sid) = session_id {
Box::new(ClaudeFollowupExecutor::new_plan_mode(
sid.clone(),
prompt.clone(),
))
} else {
return Err(TaskAttemptError::TaskNotFound); // No session ID for followup
}
}
crate::executor::ExecutorConfig::Amp => {
if let Some(tid) = session_id {
Box::new(AmpFollowupExecutor {
thread_id: tid.clone(),
prompt: prompt.clone(),
})
} else {
return Err(TaskAttemptError::TaskNotFound); // No thread ID for followup
}
}
crate::executor::ExecutorConfig::Gemini => {
// For Gemini, we don't use real session IDs, we pass the context directly
Box::new(GeminiFollowupExecutor {
if let Some(ref follow_up_info) = follow_up {
executor
.execute_followup_streaming(
pool,
task_id,
attempt_id,
prompt: prompt.clone(),
})
}
crate::executor::ExecutorConfig::Echo => {
// Echo doesn't support followup, use regular echo
config.create_executor()
}
crate::executor::ExecutorConfig::CharmOpencode => {
if let Some(sid) = session_id {
Box::new(CharmOpencodeFollowupExecutor {
session_id: sid.clone(),
prompt: prompt.clone(),
})
} else {
return Err(TaskAttemptError::TaskNotFound); // No session ID for followup
}
}
crate::executor::ExecutorConfig::ClaudeCodeRouter => {
if let Some(sid) = session_id {
Box::new(CCRFollowupExecutor::new(sid.clone(), prompt.clone()))
} else {
return Err(TaskAttemptError::TaskNotFound); // No session ID for followup
}
}
crate::executor::ExecutorConfig::SstOpencode => {
if let Some(sid) = session_id {
Box::new(SstOpencodeFollowupExecutor::new(
sid.clone(),
prompt.clone(),
))
} else {
return Err(TaskAttemptError::TaskNotFound); // No session ID for followup
}
}
crate::executor::ExecutorConfig::SetupScript { .. } => {
// Setup scripts don't support followup, use regular setup script
config.create_executor()
}
};
executor
.execute_streaming(pool, task_id, attempt_id, process_id, worktree_path)
.await
process_id,
&follow_up_info.session_id,
&follow_up_info.prompt,
worktree_path,
)
.await
} else {
executor
.execute_streaming(pool, task_id, attempt_id, process_id, worktree_path)
.await
}
}
};