From 1c23d4fd11b8db3a8140fe38a9706d38a546079a Mon Sep 17 00:00:00 2001 From: Solomon Date: Thu, 25 Sep 2025 19:36:08 +0100 Subject: [PATCH] use Gemini-CLI --experimental-acp (#784) --- .gitignore | 1 + crates/executors/Cargo.toml | 3 +- .../src/actions/coding_agent_follow_up.rs | 5 +- .../src/actions/coding_agent_initial.rs | 5 +- crates/executors/src/actions/mod.rs | 7 +- crates/executors/src/actions/script.rs | 11 +- crates/executors/src/executors/acp/client.rs | 145 ++++ crates/executors/src/executors/acp/harness.rs | 377 ++++++++++ crates/executors/src/executors/acp/mod.rs | 44 ++ .../src/executors/acp/normalize_logs.rs | 673 ++++++++++++++++++ crates/executors/src/executors/acp/session.rs | 180 +++++ crates/executors/src/executors/amp.rs | 16 +- crates/executors/src/executors/claude.rs | 16 +- crates/executors/src/executors/codex.rs | 17 +- crates/executors/src/executors/cursor.rs | 16 +- crates/executors/src/executors/gemini.rs | 365 +--------- crates/executors/src/executors/mod.rs | 33 +- crates/executors/src/executors/opencode.rs | 16 +- crates/executors/src/executors/qwen.rs | 109 +-- crates/executors/src/stdout_dup.rs | 19 + crates/local-deployment/src/container.rs | 371 +++++----- 21 files changed, 1750 insertions(+), 679 deletions(-) create mode 100644 crates/executors/src/executors/acp/client.rs create mode 100644 crates/executors/src/executors/acp/harness.rs create mode 100644 crates/executors/src/executors/acp/mod.rs create mode 100644 crates/executors/src/executors/acp/normalize_logs.rs create mode 100644 crates/executors/src/executors/acp/session.rs diff --git a/.gitignore b/.gitignore index 79301f9e..8eff8477 100644 --- a/.gitignore +++ b/.gitignore @@ -67,6 +67,7 @@ coverage/ frontend/dist crates/executors/bindings crates/utils/bindings +crates/services/bindings build-npm-package-codesign.sh diff --git a/crates/executors/Cargo.toml b/crates/executors/Cargo.toml index e6d0eb8c..57341744 100644 --- a/crates/executors/Cargo.toml +++ b/crates/executors/Cargo.toml @@ -6,7 +6,7 @@ edition = "2024" [dependencies] workspace_utils = { path = "../utils", package = "utils" } tokio = { workspace = true } -tokio-util = { version = "0.7", features = ["io"] } +tokio-util = { version = "0.7", features = ["io", "compat"] } bytes = "1.0" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } @@ -42,3 +42,4 @@ convert_case = "0.6" sqlx = "0.8.6" axum = { workspace = true } shlex = "1.3.0" +agent-client-protocol = "0.4" diff --git a/crates/executors/src/actions/coding_agent_follow_up.rs b/crates/executors/src/actions/coding_agent_follow_up.rs index 3211d9e8..2cba1053 100644 --- a/crates/executors/src/actions/coding_agent_follow_up.rs +++ b/crates/executors/src/actions/coding_agent_follow_up.rs @@ -1,13 +1,12 @@ use std::path::Path; use async_trait::async_trait; -use command_group::AsyncGroupChild; use serde::{Deserialize, Serialize}; use ts_rs::TS; use crate::{ actions::Executable, - executors::{ExecutorError, StandardCodingAgentExecutor}, + executors::{ExecutorError, SpawnedChild, StandardCodingAgentExecutor}, profile::{ExecutorConfigs, ExecutorProfileId}, }; @@ -30,7 +29,7 @@ impl CodingAgentFollowUpRequest { #[async_trait] impl Executable for CodingAgentFollowUpRequest { - async fn spawn(&self, current_dir: &Path) -> Result { + async fn spawn(&self, current_dir: &Path) -> Result { let executor_profile_id = self.get_executor_profile_id(); let agent = ExecutorConfigs::get_cached() .get_coding_agent(&executor_profile_id) diff --git a/crates/executors/src/actions/coding_agent_initial.rs b/crates/executors/src/actions/coding_agent_initial.rs index 6fafa888..48dde87a 100644 --- a/crates/executors/src/actions/coding_agent_initial.rs +++ b/crates/executors/src/actions/coding_agent_initial.rs @@ -1,13 +1,12 @@ use std::path::Path; use async_trait::async_trait; -use command_group::AsyncGroupChild; use serde::{Deserialize, Serialize}; use ts_rs::TS; use crate::{ actions::Executable, - executors::{ExecutorError, StandardCodingAgentExecutor}, + executors::{ExecutorError, SpawnedChild, StandardCodingAgentExecutor}, profile::{ExecutorConfigs, ExecutorProfileId}, }; @@ -22,7 +21,7 @@ pub struct CodingAgentInitialRequest { #[async_trait] impl Executable for CodingAgentInitialRequest { - async fn spawn(&self, current_dir: &Path) -> Result { + async fn spawn(&self, current_dir: &Path) -> Result { let executor_profile_id = self.executor_profile_id.clone(); let agent = ExecutorConfigs::get_cached() .get_coding_agent(&executor_profile_id) diff --git a/crates/executors/src/actions/mod.rs b/crates/executors/src/actions/mod.rs index 7b51f625..9a5a23aa 100644 --- a/crates/executors/src/actions/mod.rs +++ b/crates/executors/src/actions/mod.rs @@ -1,7 +1,6 @@ use std::path::Path; use async_trait::async_trait; -use command_group::AsyncGroupChild; use enum_dispatch::enum_dispatch; use serde::{Deserialize, Serialize}; use ts_rs::TS; @@ -11,7 +10,7 @@ use crate::{ coding_agent_follow_up::CodingAgentFollowUpRequest, coding_agent_initial::CodingAgentInitialRequest, script::ScriptRequest, }, - executors::ExecutorError, + executors::{ExecutorError, SpawnedChild}, }; pub mod coding_agent_follow_up; pub mod coding_agent_initial; @@ -49,12 +48,12 @@ impl ExecutorAction { #[async_trait] #[enum_dispatch(ExecutorActionType)] pub trait Executable { - async fn spawn(&self, current_dir: &Path) -> Result; + async fn spawn(&self, current_dir: &Path) -> Result; } #[async_trait] impl Executable for ExecutorAction { - async fn spawn(&self, current_dir: &Path) -> Result { + async fn spawn(&self, current_dir: &Path) -> Result { self.typ.spawn(current_dir).await } } diff --git a/crates/executors/src/actions/script.rs b/crates/executors/src/actions/script.rs index 16271c36..fddc71da 100644 --- a/crates/executors/src/actions/script.rs +++ b/crates/executors/src/actions/script.rs @@ -1,13 +1,16 @@ use std::path::Path; use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; +use command_group::AsyncCommandGroup; use serde::{Deserialize, Serialize}; use tokio::process::Command; use ts_rs::TS; use workspace_utils::shell::get_shell_command; -use crate::{actions::Executable, executors::ExecutorError}; +use crate::{ + actions::Executable, + executors::{ExecutorError, SpawnedChild}, +}; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS)] pub enum ScriptRequestLanguage { @@ -30,7 +33,7 @@ pub struct ScriptRequest { #[async_trait] impl Executable for ScriptRequest { - async fn spawn(&self, current_dir: &Path) -> Result { + async fn spawn(&self, current_dir: &Path) -> Result { let (shell_cmd, shell_arg) = get_shell_command(); let mut command = Command::new(shell_cmd); command @@ -43,6 +46,6 @@ impl Executable for ScriptRequest { let child = command.group_spawn()?; - Ok(child) + Ok(child.into()) } } diff --git a/crates/executors/src/executors/acp/client.rs b/crates/executors/src/executors/acp/client.rs new file mode 100644 index 00000000..57dc4e82 --- /dev/null +++ b/crates/executors/src/executors/acp/client.rs @@ -0,0 +1,145 @@ +use agent_client_protocol as acp; +use async_trait::async_trait; +use tokio::sync::mpsc; +use tracing::{debug, warn}; + +use crate::executors::acp::AcpEvent; + +/// ACP client that handles agent-client protocol communication +pub struct AcpClient { + event_tx: mpsc::UnboundedSender, +} + +impl AcpClient { + /// Create a new ACP client + pub fn new(event_tx: mpsc::UnboundedSender) -> Self { + Self { event_tx } + } + + pub fn record_user_prompt_event(&self, prompt: &str) { + self.send_event(AcpEvent::User(prompt.to_string())); + } + + /// Send an event to the event channel + fn send_event(&self, event: AcpEvent) { + if let Err(e) = self.event_tx.send(event) { + warn!("Failed to send ACP event: {}", e); + } + } +} + +#[async_trait(?Send)] +impl acp::Client for AcpClient { + async fn request_permission( + &self, + args: acp::RequestPermissionRequest, + ) -> Result { + // Forward the request as an event + self.send_event(AcpEvent::RequestPermission(args.clone())); + + // Auto-approve with best available option + let chosen_option = args + .options + .iter() + .find(|o| matches!(o.kind, acp::PermissionOptionKind::AllowAlways)) + .or_else(|| { + args.options + .iter() + .find(|o| matches!(o.kind, acp::PermissionOptionKind::AllowOnce)) + }) + .or_else(|| args.options.first()); + + let outcome = if let Some(opt) = chosen_option { + debug!("Auto-approving permission with option: {}", opt.id); + acp::RequestPermissionOutcome::Selected { + option_id: opt.id.clone(), + } + } else { + warn!("No permission options available, cancelling"); + acp::RequestPermissionOutcome::Cancelled + }; + + Ok(acp::RequestPermissionResponse { + outcome, + meta: None, + }) + } + + async fn session_notification(&self, args: acp::SessionNotification) -> Result<(), acp::Error> { + // Convert to typed events + let event = match args.update { + acp::SessionUpdate::AgentMessageChunk { content } => Some(AcpEvent::Message(content)), + acp::SessionUpdate::AgentThoughtChunk { content } => Some(AcpEvent::Thought(content)), + acp::SessionUpdate::ToolCall(tc) => Some(AcpEvent::ToolCall(tc)), + acp::SessionUpdate::ToolCallUpdate(update) => Some(AcpEvent::ToolUpdate(update)), + acp::SessionUpdate::Plan(plan) => Some(AcpEvent::Plan(plan)), + _ => Some(AcpEvent::Other(args)), + }; + + if let Some(event) = event { + self.send_event(event); + } + + Ok(()) + } + + // File system operations - not implemented as we don't expose FS + async fn write_text_file( + &self, + _args: acp::WriteTextFileRequest, + ) -> Result { + Err(acp::Error::method_not_found()) + } + + async fn read_text_file( + &self, + _args: acp::ReadTextFileRequest, + ) -> Result { + Err(acp::Error::method_not_found()) + } + + // Terminal operations - not implemented + async fn create_terminal( + &self, + _args: acp::CreateTerminalRequest, + ) -> Result { + Err(acp::Error::method_not_found()) + } + + async fn terminal_output( + &self, + _args: acp::TerminalOutputRequest, + ) -> Result { + Err(acp::Error::method_not_found()) + } + + async fn release_terminal( + &self, + _args: acp::ReleaseTerminalRequest, + ) -> Result { + Err(acp::Error::method_not_found()) + } + + async fn wait_for_terminal_exit( + &self, + _args: acp::WaitForTerminalExitRequest, + ) -> Result { + Err(acp::Error::method_not_found()) + } + + async fn kill_terminal_command( + &self, + _args: acp::KillTerminalCommandRequest, + ) -> Result { + Err(acp::Error::method_not_found()) + } + + // Extension methods + async fn ext_method(&self, _args: acp::ExtRequest) -> Result { + Err(acp::Error::method_not_found()) + } + + async fn ext_notification(&self, _args: acp::ExtNotification) -> Result<(), acp::Error> { + Ok(()) + } +} diff --git a/crates/executors/src/executors/acp/harness.rs b/crates/executors/src/executors/acp/harness.rs new file mode 100644 index 00000000..2a168746 --- /dev/null +++ b/crates/executors/src/executors/acp/harness.rs @@ -0,0 +1,377 @@ +use std::{ + path::{Path, PathBuf}, + process::Stdio, + sync::Arc, +}; + +use agent_client_protocol as proto; +use agent_client_protocol::Agent as _; +use command_group::{AsyncCommandGroup, AsyncGroupChild}; +use futures::StreamExt; +use tokio::{io::AsyncWriteExt, process::Command, sync::mpsc}; +use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt}; +use tracing::error; +use workspace_utils::shell::get_shell_command; + +use super::{AcpClient, SessionManager}; +use crate::executors::{ExecutorError, SpawnedChild, acp::AcpEvent}; + +/// Reusable harness for ACP-based conns (Gemini, Qwen, etc.) +pub struct AcpAgentHarness { + session_namespace: String, +} + +impl Default for AcpAgentHarness { + fn default() -> Self { + // Keep existing behavior for Gemini + Self::new() + } +} + +impl AcpAgentHarness { + /// Create a harness with the default Gemini namespace + pub fn new() -> Self { + Self { + session_namespace: "gemini_sessions".to_string(), + } + } + + /// Create a harness with a custom session namespace (e.g. for Qwen) + pub fn with_session_namespace(namespace: impl Into) -> Self { + Self { + session_namespace: namespace.into(), + } + } + + pub async fn spawn_with_command( + &self, + current_dir: &Path, + prompt: String, + full_command: String, + ) -> Result { + let (shell_cmd, shell_arg) = get_shell_command(); + let mut command = Command::new(shell_cmd); + command + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(current_dir) + .arg(shell_arg) + .arg(full_command) + .env("NODE_NO_WARNINGS", "1"); + + let mut child = command.group_spawn()?; + + let (exit_tx, exit_rx) = tokio::sync::oneshot::channel::<()>(); + Self::bootstrap_acp_connection( + &mut child, + current_dir.to_path_buf(), + None, + prompt, + Some(exit_tx), + self.session_namespace.clone(), + ) + .await?; + + Ok(SpawnedChild { + child, + exit_signal: Some(exit_rx), + }) + } + + pub async fn spawn_follow_up_with_command( + &self, + current_dir: &Path, + prompt: String, + session_id: &str, + full_command: String, + ) -> Result { + let (shell_cmd, shell_arg) = get_shell_command(); + let mut command = Command::new(shell_cmd); + command + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(current_dir) + .arg(shell_arg) + .arg(full_command) + .env("NODE_NO_WARNINGS", "1"); + + let mut child = command.group_spawn()?; + + let (exit_tx, exit_rx) = tokio::sync::oneshot::channel::<()>(); + Self::bootstrap_acp_connection( + &mut child, + current_dir.to_path_buf(), + Some(session_id.to_string()), + prompt, + Some(exit_tx), + self.session_namespace.clone(), + ) + .await?; + + Ok(SpawnedChild { + child, + exit_signal: Some(exit_rx), + }) + } + + async fn bootstrap_acp_connection( + child: &mut AsyncGroupChild, + cwd: PathBuf, + existing_session: Option, + prompt: String, + exit_signal: Option>, + session_namespace: String, + ) -> Result<(), ExecutorError> { + // Take child's stdio for ACP wiring + let orig_stdout = child.inner().stdout.take().ok_or_else(|| { + ExecutorError::Io(std::io::Error::new( + std::io::ErrorKind::NotFound, + "Child process has no stdout", + )) + })?; + let orig_stdin = child.inner().stdin.take().ok_or_else(|| { + ExecutorError::Io(std::io::Error::new( + std::io::ErrorKind::NotFound, + "Child process has no stdin", + )) + })?; + + // Create a fresh stdout pipe for logs + let writer = crate::stdout_dup::create_stdout_pipe_writer(child)?; + let shared_writer = Arc::new(tokio::sync::Mutex::new(writer)); + let (log_tx, mut log_rx) = mpsc::unbounded_channel::(); + + // Spawn log -> stdout writer task + tokio::spawn(async move { + while let Some(line) = log_rx.recv().await { + let mut data = line.into_bytes(); + data.push(b'\n'); + let mut w = shared_writer.lock().await; + let _ = w.write_all(&data).await; + } + }); + + // ACP client STDIO + let (mut to_acp_writer, acp_incoming_reader) = tokio::io::duplex(64 * 1024); + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + + // Process stdout -> ACP + tokio::spawn(async move { + let mut stdout_stream = tokio_util::io::ReaderStream::new(orig_stdout); + while let Some(res) = stdout_stream.next().await { + if *shutdown_rx.borrow() { + break; + } + match res { + Ok(data) => { + let _ = to_acp_writer.write_all(&data).await; + } + Err(_) => break, + } + } + }); + + // ACP crate expects futures::AsyncRead + AsyncWrite, use tokio compat to adapt tokio::io::AsyncRead + Write + let outgoing = orig_stdin.compat_write(); + let incoming = acp_incoming_reader.compat(); + + let mut exit_signal_tx = exit_signal; + + // Run ACP client in a LocalSet + tokio::task::spawn_blocking(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("build runtime"); + + rt.block_on(async move { + let local = tokio::task::LocalSet::new(); + local + .run_until(async move { + // Create event and raw channels + // Typed events available for future use; raw lines forwarded and persisted + let (event_tx, mut event_rx) = + mpsc::unbounded_channel::(); + + // Create session manager + let session_manager = match SessionManager::new(session_namespace) { + Ok(sm) => sm, + Err(e) => { + error!("Failed to create session manager: {}", e); + return; + } + }; + let session_manager = std::sync::Arc::new(session_manager); + + // Create ACP client + let client = AcpClient::new(event_tx.clone()); + + client.record_user_prompt_event(&prompt); + + // Set up connection + let (conn, io_fut) = + proto::ClientSideConnection::new(client, outgoing, incoming, |fut| { + tokio::task::spawn_local(fut); + }); + + // Drive I/O + let io_handle = tokio::task::spawn_local(async move { + let _ = io_fut.await; + }); + + // Initialize + let _ = conn + .initialize(proto::InitializeRequest { + protocol_version: proto::V1, + client_capabilities: proto::ClientCapabilities { + fs: proto::FileSystemCapability { + read_text_file: false, + write_text_file: false, + meta: None, + }, + terminal: false, + meta: None, + }, + meta: None, + }) + .await; + + // Handle session creation/forking + let (acp_session_id, display_session_id, prompt_to_send) = + if let Some(existing) = existing_session { + // Fork existing session + let new_ui_id = uuid::Uuid::new_v4().to_string(); + let _ = session_manager.fork_session(&existing, &new_ui_id); + + let history = session_manager.read_session_raw(&new_ui_id).ok(); + let meta = + history.map(|h| serde_json::json!({ "history_jsonl": h })); + + match conn + .new_session(proto::NewSessionRequest { + mcp_servers: vec![], + cwd: cwd.clone(), + meta, + }) + .await + { + Ok(resp) => { + let resume_prompt = session_manager + .generate_resume_prompt(&new_ui_id, &prompt) + .unwrap_or_else(|_| prompt.clone()); + (resp.session_id.0.to_string(), new_ui_id, resume_prompt) + } + Err(e) => { + error!("Failed to create session: {}", e); + return; + } + } + } else { + // New session + match conn + .new_session(proto::NewSessionRequest { + mcp_servers: vec![], + cwd: cwd.clone(), + meta: None, + }) + .await + { + Ok(resp) => { + let sid = resp.session_id.0.to_string(); + (sid.clone(), sid, prompt) + } + Err(e) => { + error!("Failed to create session: {}", e); + return; + } + } + }; + + // Emit session ID + let _ = log_tx + .send(AcpEvent::SessionStart(display_session_id.clone()).to_string()); + + // Start raw event forwarder and persistence + let app_tx_clone = log_tx.clone(); + let sess_id_for_writer = display_session_id.clone(); + let sm_for_writer = session_manager.clone(); + tokio::spawn(async move { + while let Some(event) = event_rx.recv().await { + // Forward to stdout + let _ = app_tx_clone.send(event.to_string()); + // Persist to session file + let _ = sm_for_writer + .append_raw_line(&sess_id_for_writer, &event.to_string()); + } + }); + + // Save prompt to session + let _ = session_manager.append_raw_line( + &display_session_id, + &serde_json::to_string(&serde_json::json!({ "user": prompt_to_send })) + .unwrap_or_default(), + ); + + // Build prompt request + let req = proto::PromptRequest { + session_id: proto::SessionId(acp_session_id.clone().into()), + prompt: vec![proto::ContentBlock::Text(proto::TextContent { + annotations: None, + text: prompt_to_send, + meta: None, + })], + meta: None, + }; + + // Send the prompt and await completion to obtain stop_reason + match conn.prompt(req).await { + Ok(resp) => { + // Emit done with stop_reason + let stop_reason = + serde_json::to_string(&resp.stop_reason).unwrap_or_default(); + let _ = log_tx.send(AcpEvent::Done(stop_reason).to_string()); + } + Err(e) => { + tracing::debug!("error {} {e} {:?}", e.code, e.data); + if e.code == agent_client_protocol::ErrorCode::INTERNAL_ERROR.code + && e.data + .as_ref() + .is_some_and(|d| d == "server shut down unexpectedly") + { + tracing::debug!("ACP server killed"); + } else { + let _ = + log_tx.send(AcpEvent::Error(format!("{e}")).to_string()); + } + } + } + // Notify container of completion + if let Some(tx) = exit_signal_tx.take() { + let _ = tx.send(()); + } + + // Cancel session work + let _ = conn + .cancel(proto::CancelNotification { + session_id: proto::SessionId(acp_session_id.into()), + meta: None, + }) + .await; + + // Cleanup + drop(conn); + let _ = shutdown_tx.send(true); + let _ = io_handle.await; + drop(log_tx); + }) + .await; + }); + }); + + Ok(()) + } +} diff --git a/crates/executors/src/executors/acp/mod.rs b/crates/executors/src/executors/acp/mod.rs new file mode 100644 index 00000000..a1ccb4b6 --- /dev/null +++ b/crates/executors/src/executors/acp/mod.rs @@ -0,0 +1,44 @@ +pub mod client; +pub mod harness; +pub mod normalize_logs; +pub mod session; + +use std::{fmt::Display, str::FromStr}; + +pub use client::AcpClient; +pub use harness::AcpAgentHarness; +pub use normalize_logs::*; +use serde::{Deserialize, Serialize}; +pub use session::SessionManager; + +/// Parsed event types for internal processing +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum AcpEvent { + User(String), + SessionStart(String), + Message(agent_client_protocol::ContentBlock), + Thought(agent_client_protocol::ContentBlock), + ToolCall(agent_client_protocol::ToolCall), + ToolUpdate(agent_client_protocol::ToolCallUpdate), + Plan(agent_client_protocol::Plan), + AvailableCommands(Vec), + CurrentMode(agent_client_protocol::SessionModeId), + RequestPermission(agent_client_protocol::RequestPermissionRequest), + Error(String), + Done(String), + Other(agent_client_protocol::SessionNotification), +} + +impl Display for AcpEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", serde_json::to_string(self).unwrap_or_default()) + } +} + +impl FromStr for AcpEvent { + type Err = serde_json::Error; + + fn from_str(s: &str) -> Result { + serde_json::from_str(s) + } +} diff --git a/crates/executors/src/executors/acp/normalize_logs.rs b/crates/executors/src/executors/acp/normalize_logs.rs new file mode 100644 index 00000000..9992fc03 --- /dev/null +++ b/crates/executors/src/executors/acp/normalize_logs.rs @@ -0,0 +1,673 @@ +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, +}; + +use agent_client_protocol::{self as acp, SessionNotification}; +use futures::StreamExt; +use lazy_static::lazy_static; +use regex::Regex; +use serde::Deserialize; +use tracing::debug; +use workspace_utils::msg_store::MsgStore; + +pub use super::AcpAgentHarness; +use super::AcpEvent; +use crate::logs::{ + ActionType, FileChange, NormalizedEntry, NormalizedEntryType, ToolResult, ToolResultValueType, + ToolStatus as LogToolStatus, + stderr_processor::normalize_stderr_logs, + utils::{ConversationPatch, EntryIndexProvider}, +}; + +pub fn normalize_logs(msg_store: Arc, worktree_path: &Path) { + // stderr normalization + let entry_index = EntryIndexProvider::start_from(&msg_store); + normalize_stderr_logs(msg_store.clone(), entry_index.clone()); + + // stdout normalization (main loop) + let worktree_path = worktree_path.to_path_buf(); + // Type aliases to simplify complex state types and appease clippy + tokio::spawn(async move { + type ToolStates = std::collections::HashMap; + + let mut stored_session_id = false; + let mut streaming: StreamingState = StreamingState::default(); + let mut tool_states: ToolStates = HashMap::new(); + + let mut stdout_lines = msg_store.stdout_lines_stream(); + while let Some(Ok(line)) = stdout_lines.next().await { + if let Some(parsed) = AcpEventParser::parse_line(&line) { + debug!("Parsed ACP line: {:?}", parsed); + match parsed { + AcpEvent::SessionStart(id) => { + if !stored_session_id { + msg_store.push_session_id(id); + stored_session_id = true; + } + } + AcpEvent::Error(msg) => { + let idx = entry_index.next(); + let entry = NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::ErrorMessage, + content: msg, + metadata: None, + }; + msg_store.push_patch(ConversationPatch::add_normalized_entry(idx, entry)); + } + AcpEvent::Done(_) => { + streaming.assistant_text = None; + streaming.thinking_text = None; + } + AcpEvent::Message(content) => { + streaming.thinking_text = None; + if let agent_client_protocol::ContentBlock::Text(text) = content { + let is_new = streaming.assistant_text.is_none(); + if is_new { + let idx = entry_index.next(); + streaming.assistant_text = Some(StreamingText { + index: idx, + content: String::new(), + }); + } + if let Some(ref mut s) = streaming.assistant_text { + s.content.push_str(&text.text); + let entry = NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::AssistantMessage, + content: s.content.clone(), + metadata: None, + }; + let patch = if is_new { + ConversationPatch::add_normalized_entry(s.index, entry) + } else { + ConversationPatch::replace(s.index, entry) + }; + msg_store.push_patch(patch); + } + } + } + AcpEvent::Thought(content) => { + streaming.assistant_text = None; + if let agent_client_protocol::ContentBlock::Text(text) = content { + let is_new = streaming.thinking_text.is_none(); + if is_new { + let idx = entry_index.next(); + streaming.thinking_text = Some(StreamingText { + index: idx, + content: String::new(), + }); + } + if let Some(ref mut s) = streaming.thinking_text { + s.content.push_str(&text.text); + let entry = NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::Thinking, + content: s.content.clone(), + metadata: None, + }; + let patch = if is_new { + ConversationPatch::add_normalized_entry(s.index, entry) + } else { + ConversationPatch::replace(s.index, entry) + }; + msg_store.push_patch(patch); + } + } + } + AcpEvent::Plan(plan) => { + streaming.assistant_text = None; + streaming.thinking_text = None; + let mut body = String::from("Plan:\n"); + for (i, e) in plan.entries.iter().enumerate() { + body.push_str(&format!("{}. {}\n", i + 1, e.content)); + } + let idx = entry_index.next(); + let entry = NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::SystemMessage, + content: body, + metadata: None, + }; + msg_store.push_patch(ConversationPatch::add_normalized_entry(idx, entry)); + } + AcpEvent::AvailableCommands(cmds) => { + let mut body = String::from("Available commands:\n"); + for c in &cmds { + body.push_str(&format!("- {}\n", c.name)); + } + let idx = entry_index.next(); + let entry = NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::SystemMessage, + content: body, + metadata: None, + }; + msg_store.push_patch(ConversationPatch::add_normalized_entry(idx, entry)); + } + AcpEvent::CurrentMode(mode_id) => { + let idx = entry_index.next(); + let entry = NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::SystemMessage, + content: format!("Current mode: {}", mode_id.0), + metadata: None, + }; + msg_store.push_patch(ConversationPatch::add_normalized_entry(idx, entry)); + } + AcpEvent::RequestPermission(perm) => { + if let Ok(tc) = agent_client_protocol::ToolCall::try_from(perm.tool_call) { + handle_tool_call( + &tc, + &worktree_path, + &mut streaming, + &mut tool_states, + &entry_index, + &msg_store, + ); + } + } + AcpEvent::ToolCall(tc) => handle_tool_call( + &tc, + &worktree_path, + &mut streaming, + &mut tool_states, + &entry_index, + &msg_store, + ), + AcpEvent::ToolUpdate(update) => { + let mut update = update; + if update.fields.title.is_none() { + update.fields.title = tool_states + .get(&update.id.0.to_string()) + .map(|s| s.title.clone()) + .or_else(|| Some("".to_string())); + } + debug!("Got tool call update: {:?}", update); + if let Ok(tc) = agent_client_protocol::ToolCall::try_from(update.clone()) { + handle_tool_call( + &tc, + &worktree_path, + &mut streaming, + &mut tool_states, + &entry_index, + &msg_store, + ); + } else { + debug!("Failed to convert tool call update to ToolCall"); + } + } + AcpEvent::User(_) | AcpEvent::Other(_) => (), + } + } + } + + fn handle_tool_call( + tc: &agent_client_protocol::ToolCall, + worktree_path: &Path, + streaming: &mut StreamingState, + tool_states: &mut ToolStates, + entry_index: &EntryIndexProvider, + msg_store: &Arc, + ) { + streaming.assistant_text = None; + streaming.thinking_text = None; + let id = tc.id.0.to_string(); + let is_new = !tool_states.contains_key(&id); + let tool_data = tool_states.entry(id).or_default(); + tool_data.extend(tc, worktree_path); + if is_new { + tool_data.index = entry_index.next(); + } + let action = map_to_action_type(tool_data); + let entry = NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::ToolUse { + tool_name: tool_data.title.clone(), + action_type: action, + status: convert_tool_status(&tool_data.status), + }, + content: get_tool_content(tool_data), + metadata: None, + }; + let patch = if is_new { + ConversationPatch::add_normalized_entry(tool_data.index, entry) + } else { + ConversationPatch::replace(tool_data.index, entry) + }; + msg_store.push_patch(patch); + } + + fn map_to_action_type(tc: &PartialToolCallData) -> ActionType { + match tc.kind { + agent_client_protocol::ToolKind::Read => { + // Special-case: read_many_files style titles parsed via helper + if tc.id.0.starts_with("read_many_files") { + let result = collect_text_content(&tc.content).map(|text| ToolResult { + r#type: ToolResultValueType::Markdown, + value: serde_json::Value::String(text), + }); + return ActionType::Tool { + tool_name: "read_many_files".to_string(), + arguments: Some(serde_json::Value::String(tc.title.clone())), + result, + }; + } + ActionType::FileRead { + path: tc + .path + .clone() + .unwrap_or_default() + .to_string_lossy() + .to_string(), + } + } + agent_client_protocol::ToolKind::Edit => { + let changes = extract_file_changes(tc); + ActionType::FileEdit { + path: tc + .path + .clone() + .unwrap_or_default() + .to_string_lossy() + .to_string(), + changes, + } + } + agent_client_protocol::ToolKind::Execute => { + let command = AcpEventParser::parse_execute_command(&tc.title); + // Prefer structured raw_output, else fallback to aggregated text content + let completed = + matches!(tc.status, agent_client_protocol::ToolCallStatus::Completed); + tracing::debug!( + "Mapping execute tool call, completed: {}, command: {}", + completed, + command + ); + let mut result = if let Some(out_val) = tc.raw_output.as_ref() { + serde_json::from_value::(out_val.clone()) + .ok() + .map(|out| { + let mut exit_status = out + .exit_code + .map(|code| crate::logs::CommandExitStatus::ExitCode { code }); + let output = out.stdout.or(out.stderr).unwrap_or_default(); + if exit_status.is_none() && completed { + exit_status = Some(crate::logs::CommandExitStatus::Success { + success: true, + }); + } + crate::logs::CommandRunResult { + exit_status, + output: Some(output), + } + }) + } else { + None + }; + if result.is_none() && completed { + result = Some(crate::logs::CommandRunResult { + exit_status: Some(crate::logs::CommandExitStatus::Success { + success: true, + }), + output: None, + }); + } + ActionType::CommandRun { command, result } + } + agent_client_protocol::ToolKind::Delete => ActionType::FileEdit { + path: tc + .path + .clone() + .unwrap_or_default() + .to_string_lossy() + .to_string(), + changes: vec![FileChange::Delete], + }, + agent_client_protocol::ToolKind::Search => { + let query = tc + .raw_input + .as_ref() + .and_then(|v| serde_json::from_value::(v.clone()).ok()) + .map(|a| a.query) + .unwrap_or_else(|| tc.title.clone()); + ActionType::Search { query } + } + agent_client_protocol::ToolKind::Fetch => { + let mut url = tc + .raw_input + .as_ref() + .and_then(|v| serde_json::from_value::(v.clone()).ok()) + .map(|a| a.url) + .unwrap_or_default(); + if url.is_empty() { + // Fallback: try to extract first URL from the title + if let Some(extracted) = extract_url_from_text(&tc.title) { + url = extracted; + } + } + ActionType::WebFetch { url } + } + agent_client_protocol::ToolKind::Think => { + let tool_name = extract_tool_name_from_id(tc.id.0.as_ref()) + .unwrap_or_else(|| tc.title.clone()); + // For think/save_memory, surface both title and aggregated text content as arguments + let text = collect_text_content(&tc.content); + let arguments = Some(match &text { + Some(t) => serde_json::json!({ "title": tc.title, "content": t }), + None => serde_json::json!({ "title": tc.title }), + }); + let result = if let Some(output) = &tc.raw_output { + Some(ToolResult { + r#type: ToolResultValueType::Json, + value: output.clone(), + }) + } else { + collect_text_content(&tc.content).map(|text| ToolResult { + r#type: ToolResultValueType::Markdown, + value: serde_json::Value::String(text), + }) + }; + ActionType::Tool { + tool_name, + arguments, + result, + } + } + agent_client_protocol::ToolKind::SwitchMode => ActionType::Other { + description: "switch_mode".to_string(), + }, + agent_client_protocol::ToolKind::Other | agent_client_protocol::ToolKind::Move => { + // Derive a friendlier tool name from the id if it looks like name- + let tool_name = extract_tool_name_from_id(tc.id.0.as_ref()) + .unwrap_or_else(|| tc.title.clone()); + + // Some tools embed JSON args into the title instead of raw_input + let arguments = if let Some(raw) = &tc.raw_input { + Some(raw.clone()) + } else if tc.title.trim_start().starts_with('{') { + // Title contains JSON arguments for the tool + serde_json::from_str::(&tc.title).ok() + } else { + None + }; + // Extract result: prefer raw_output (structured), else text content as Markdown + let result = if let Some(output) = &tc.raw_output { + Some(ToolResult { + r#type: ToolResultValueType::Json, + value: output.clone(), + }) + } else { + collect_text_content(&tc.content).map(|text| ToolResult { + r#type: ToolResultValueType::Markdown, + value: serde_json::Value::String(text), + }) + }; + ActionType::Tool { + tool_name, + arguments, + result, + } + } + } + } + + fn extract_file_changes(tc: &PartialToolCallData) -> Vec { + let mut changes = Vec::new(); + for c in &tc.content { + if let agent_client_protocol::ToolCallContent::Diff { diff } = c { + let path = diff.path.to_string_lossy().to_string(); + let rel = if !path.is_empty() { + path + } else { + tc.path + .clone() + .unwrap_or_default() + .to_string_lossy() + .to_string() + }; + let old_text = diff.old_text.as_deref().unwrap_or(""); + if old_text.is_empty() { + changes.push(FileChange::Write { + content: diff.new_text.clone(), + }); + } else { + let unified = workspace_utils::diff::create_unified_diff( + &rel, + old_text, + &diff.new_text, + ); + changes.push(FileChange::Edit { + unified_diff: unified, + has_line_numbers: false, + }); + } + } + } + changes + } + + fn get_tool_content(tc: &PartialToolCallData) -> String { + match tc.kind { + agent_client_protocol::ToolKind::Execute => { + AcpEventParser::parse_execute_command(&tc.title) + } + agent_client_protocol::ToolKind::Think => "Saving memory".to_string(), + agent_client_protocol::ToolKind::Other => { + let tool_name = extract_tool_name_from_id(tc.id.0.as_ref()) + .unwrap_or_else(|| "tool".to_string()); + if tc.title.is_empty() { + tool_name + } else { + format!("{}: {}", tool_name, tc.title) + } + } + agent_client_protocol::ToolKind::Read => { + if tc.id.0.starts_with("read_many_files") { + "Read files".to_string() + } else { + tc.title.clone() + } + } + _ => tc.title.clone(), + } + } + + fn extract_tool_name_from_id(id: &str) -> Option { + if let Some(idx) = id.rfind('-') { + let (head, tail) = id.split_at(idx); + if tail + .trim_start_matches('-') + .chars() + .all(|c| c.is_ascii_digit()) + { + return Some(head.to_string()); + } + } + None + } + + fn extract_url_from_text(text: &str) -> Option { + // Simple URL extractor + lazy_static! { + static ref URL_RE: Regex = + Regex::new(r#"https?://[^\s"')]+"#).expect("valid regex"); + } + URL_RE.find(text).map(|m| m.as_str().to_string()) + } + + fn collect_text_content( + content: &[agent_client_protocol::ToolCallContent], + ) -> Option { + let mut out = String::new(); + for c in content { + if let agent_client_protocol::ToolCallContent::Content { content } = c + && let agent_client_protocol::ContentBlock::Text(t) = content + { + out.push_str(&t.text); + if !out.ends_with('\n') { + out.push('\n'); + } + } + } + if out.is_empty() { None } else { Some(out) } + } + + fn convert_tool_status(status: &agent_client_protocol::ToolCallStatus) -> LogToolStatus { + match status { + agent_client_protocol::ToolCallStatus::Pending + | agent_client_protocol::ToolCallStatus::InProgress => LogToolStatus::Created, + agent_client_protocol::ToolCallStatus::Completed => LogToolStatus::Success, + agent_client_protocol::ToolCallStatus::Failed => LogToolStatus::Failed, + } + } + }); +} + +struct PartialToolCallData { + index: usize, + id: agent_client_protocol::ToolCallId, + kind: agent_client_protocol::ToolKind, + title: String, + status: agent_client_protocol::ToolCallStatus, + path: Option, + content: Vec, + raw_input: Option, + raw_output: Option, +} + +impl PartialToolCallData { + fn extend(&mut self, tc: &agent_client_protocol::ToolCall, worktree_path: &Path) { + self.id = tc.id.clone(); + if tc.kind != Default::default() { + self.kind = tc.kind; + } + if !tc.title.is_empty() { + self.title = tc.title.clone(); + } + if tc.status != Default::default() { + self.status = tc.status; + } + if !tc.locations.is_empty() { + self.path = tc.locations.first().map(|l| { + PathBuf::from(workspace_utils::path::make_path_relative( + &l.path.to_string_lossy(), + &worktree_path.to_string_lossy(), + )) + }); + } + if !tc.content.is_empty() { + self.content = tc.content.clone(); + } + if tc.raw_input.is_some() { + self.raw_input = tc.raw_input.clone(); + } + if tc.raw_output.is_some() { + self.raw_output = tc.raw_output.clone(); + } + } +} + +impl Default for PartialToolCallData { + fn default() -> Self { + Self { + id: agent_client_protocol::ToolCallId(Default::default()), + index: 0, + kind: agent_client_protocol::ToolKind::default(), + title: String::new(), + status: Default::default(), + path: None, + content: Vec::new(), + raw_input: None, + raw_output: None, + } + } +} + +struct AcpEventParser; + +impl AcpEventParser { + /// Parse a line that may contain an ACP event + pub fn parse_line(line: &str) -> Option { + let trimmed = line.trim(); + + if let Ok(acp_event) = serde_json::from_str::(trimmed) { + return Some(acp_event); + } + + debug!("Failed to parse ACP raw log {trimmed}"); + + None + } + + /// Parse command from tool title (for execute tools) + pub fn parse_execute_command(title: &str) -> String { + title.split(" (").next().unwrap_or(title).trim().to_string() + } +} + +/// Result of parsing a line +#[derive(Debug, Clone)] +#[allow(clippy::large_enum_variant)] +pub enum ParsedLine { + SessionId(String), + Event(AcpEvent), + Error(String), + Done, +} + +impl TryFrom for AcpEvent { + type Error = (); + + fn try_from(notification: SessionNotification) -> Result { + let event = match notification.update { + acp::SessionUpdate::AgentMessageChunk { content } => AcpEvent::Message(content), + acp::SessionUpdate::AgentThoughtChunk { content } => AcpEvent::Thought(content), + acp::SessionUpdate::ToolCall(tc) => AcpEvent::ToolCall(tc), + acp::SessionUpdate::ToolCallUpdate(update) => AcpEvent::ToolUpdate(update), + acp::SessionUpdate::Plan(plan) => AcpEvent::Plan(plan), + acp::SessionUpdate::AvailableCommandsUpdate { available_commands } => { + AcpEvent::AvailableCommands(available_commands) + } + acp::SessionUpdate::CurrentModeUpdate { current_mode_id } => { + AcpEvent::CurrentMode(current_mode_id) + } + _ => return Err(()), + }; + Ok(event) + } +} + +#[derive(Debug, Clone, Deserialize)] +struct SearchArgs { + query: String, +} + +#[derive(Debug, Clone, Deserialize)] +struct FetchArgs { + url: String, +} + +#[derive(Debug, Clone, Deserialize)] +struct ShellOutput { + #[serde(default)] + exit_code: Option, + #[serde(default)] + stdout: Option, + #[serde(default)] + stderr: Option, +} + +#[derive(Debug, Clone, Default)] +struct StreamingState { + assistant_text: Option, + thinking_text: Option, +} + +#[derive(Debug, Clone)] +struct StreamingText { + index: usize, + content: String, +} diff --git a/crates/executors/src/executors/acp/session.rs b/crates/executors/src/executors/acp/session.rs new file mode 100644 index 00000000..6fc15acc --- /dev/null +++ b/crates/executors/src/executors/acp/session.rs @@ -0,0 +1,180 @@ +use std::{ + fs::{self, OpenOptions}, + io::{self, Result, Write}, + path::PathBuf, + str::FromStr, +}; + +use serde::{Deserialize, Serialize}; + +use crate::executors::acp::AcpEvent; + +/// Manages session persistence and state for ACP interactions +pub struct SessionManager { + base_dir: PathBuf, +} + +impl SessionManager { + /// Create a new session manager with the given namespace + pub fn new(namespace: impl Into) -> Result { + let namespace = namespace.into(); + let mut vk_dir = dirs::home_dir() + .ok_or_else(|| io::Error::other("Could not determine home directory"))? + .join(".vibe-kanban"); + + if cfg!(debug_assertions) { + vk_dir = vk_dir.join("dev"); + } + + let base_dir = vk_dir.join(&namespace); + + fs::create_dir_all(&base_dir)?; + + Ok(Self { base_dir }) + } + + /// Get the file path for a session + fn session_file_path(&self, session_id: &str) -> PathBuf { + self.base_dir.join(format!("{session_id}.jsonl")) + } + + /// Append a raw JSON line to the session log + /// + /// We normalize ACP payloads by: + /// - Removing top-level `sessionId` + /// - Unwrapping the `update` envelope (store its object directly) + /// - Dropping top-level `options` (permission menu). Note: `options` is + /// mutually exclusive with `update`, so when `update` is present we do not + /// perform any `options` stripping. + pub fn append_raw_line(&self, session_id: &str, raw_json: &str) -> Result<()> { + let Some(normalized) = Self::normalize_session_event(raw_json) else { + return Ok(()); + }; + + let path = self.session_file_path(session_id); + let mut file = OpenOptions::new().create(true).append(true).open(path)?; + + writeln!(file, "{normalized}")?; + Ok(()) + } + + /// Attempt to normalize a raw ACP JSON event into a cleaner shape. + /// Rules: + /// - Remove top-level `sessionId` always. + /// - If `update` is present with an object that has `sessionUpdate`, emit + /// a single-key object where key = camelCase(sessionUpdate) and value = + /// the `update` object minus `sessionUpdate`. + /// - If `update` is absent, remove only top-level `options`. + /// + /// Returns None if the input is not a JSON object. + fn normalize_session_event(raw_json: &str) -> Option { + let mut event = AcpEvent::from_str(raw_json).ok()?; + + match event { + AcpEvent::SessionStart(..) + | AcpEvent::Error(..) + | AcpEvent::Done(..) + | AcpEvent::Other(..) => return None, + + AcpEvent::User(..) + | AcpEvent::Message(..) + | AcpEvent::Thought(..) + | AcpEvent::ToolCall(..) + | AcpEvent::ToolUpdate(..) + | AcpEvent::Plan(..) + | AcpEvent::AvailableCommands(..) + | AcpEvent::CurrentMode(..) => {} + + AcpEvent::RequestPermission(req) => event = AcpEvent::ToolUpdate(req.tool_call), + } + + match event { + AcpEvent::User(prompt) => { + return serde_json::to_string(&serde_json::json!({"user": prompt})).ok(); + } + AcpEvent::Message(ref content) | AcpEvent::Thought(ref content) => { + if let agent_client_protocol::ContentBlock::Text(text) = content { + // Special simplification for pure text messages + let key = if let AcpEvent::Message(_) = event { + "assistant" + } else { + "thinking" + }; + return serde_json::to_string(&serde_json::json!({ key: text.text })).ok(); + } + } + _ => {} + } + + serde_json::to_string(&event).ok() + } + + /// Read the raw JSONL content of a session + pub fn read_session_raw(&self, session_id: &str) -> Result { + let path = self.session_file_path(session_id); + if !path.exists() { + return Ok(String::new()); + } + + fs::read_to_string(path) + } + + /// Fork a session to create a new one with the same history + pub fn fork_session(&self, old_id: &str, new_id: &str) -> Result<()> { + let old_path = self.session_file_path(old_id); + let new_path = self.session_file_path(new_id); + + if old_path.exists() { + fs::copy(&old_path, &new_path)?; + } else { + // Create empty new file if old doesn't exist + OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&new_path)?; + } + + Ok(()) + } + + /// Delete a session + pub fn delete_session(&self, session_id: &str) -> Result<()> { + let path = self.session_file_path(session_id); + if path.exists() { + fs::remove_file(path)?; + } + Ok(()) + } + + /// Generate a resume prompt from session history + pub fn generate_resume_prompt(&self, session_id: &str, current_prompt: &str) -> Result { + let session_context = self.read_session_raw(session_id)?; + + Ok(format!( + concat!( + "RESUME CONTEXT FOR CONTINUING TASK\n\n", + "=== EXECUTION HISTORY ===\n", + "The following is the conversation history from this session:\n", + "{}\n\n", + "=== CURRENT REQUEST ===\n", + "{}\n\n", + "=== INSTRUCTIONS ===\n", + "You are continuing work on the above task. The execution history shows ", + "the previous conversation in this session. Please continue from where ", + "the previous execution left off, taking into account all the context provided above." + ), + session_context, current_prompt + )) + } +} + +/// Session metadata stored separately from events +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SessionMetadata { + pub session_id: String, + pub created_at: chrono::DateTime, + pub updated_at: chrono::DateTime, + pub parent_session: Option, + pub tags: Vec, +} diff --git a/crates/executors/src/executors/amp.rs b/crates/executors/src/executors/amp.rs index 23c746e3..c7dd542c 100644 --- a/crates/executors/src/executors/amp.rs +++ b/crates/executors/src/executors/amp.rs @@ -1,7 +1,7 @@ use std::{path::Path, process::Stdio, sync::Arc}; use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; +use command_group::AsyncCommandGroup; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use tokio::{io::AsyncWriteExt, process::Command}; @@ -11,7 +11,7 @@ use workspace_utils::{msg_store::MsgStore, shell::get_shell_command}; use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, executors::{ - AppendPrompt, ExecutorError, StandardCodingAgentExecutor, + AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor, claude::{ClaudeLogProcessor, HistoryStrategy}, }, logs::{stderr_processor::normalize_stderr_logs, utils::EntryIndexProvider}, @@ -44,11 +44,7 @@ impl Amp { #[async_trait] impl StandardCodingAgentExecutor for Amp { - async fn spawn( - &self, - current_dir: &Path, - prompt: &str, - ) -> Result { + async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { let (shell_cmd, shell_arg) = get_shell_command(); let amp_command = self.build_command_builder().build_initial(); @@ -72,7 +68,7 @@ impl StandardCodingAgentExecutor for Amp { stdin.shutdown().await?; } - Ok(child) + Ok(child.into()) } async fn spawn_follow_up( @@ -80,7 +76,7 @@ impl StandardCodingAgentExecutor for Amp { current_dir: &Path, prompt: &str, session_id: &str, - ) -> Result { + ) -> Result { // Use shell command for cross-platform compatibility let (shell_cmd, shell_arg) = get_shell_command(); @@ -142,7 +138,7 @@ impl StandardCodingAgentExecutor for Amp { stdin.shutdown().await?; } - Ok(child) + Ok(child.into()) } fn normalize_logs(&self, msg_store: Arc, current_dir: &Path) { diff --git a/crates/executors/src/executors/claude.rs b/crates/executors/src/executors/claude.rs index e008419e..95a9c209 100644 --- a/crates/executors/src/executors/claude.rs +++ b/crates/executors/src/executors/claude.rs @@ -3,7 +3,7 @@ use std::os::unix::fs::PermissionsExt; use std::{path::Path, process::Stdio, sync::Arc}; use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; +use command_group::AsyncCommandGroup; use futures::StreamExt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -21,7 +21,7 @@ use workspace_utils::{ use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, - executors::{AppendPrompt, ExecutorError, StandardCodingAgentExecutor}, + executors::{AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor}, logs::{ ActionType, FileChange, NormalizedEntry, NormalizedEntryType, TodoItem, ToolStatus, stderr_processor::normalize_stderr_logs, @@ -121,11 +121,7 @@ impl ClaudeCode { #[async_trait] impl StandardCodingAgentExecutor for ClaudeCode { - async fn spawn( - &self, - current_dir: &Path, - prompt: &str, - ) -> Result { + async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { let (shell_cmd, shell_arg) = get_shell_command(); let command_builder = self.build_command_builder().await; let mut base_command = command_builder.build_initial(); @@ -158,7 +154,7 @@ impl StandardCodingAgentExecutor for ClaudeCode { stdin.shutdown().await?; } - Ok(child) + Ok(child.into()) } async fn spawn_follow_up( @@ -166,7 +162,7 @@ impl StandardCodingAgentExecutor for ClaudeCode { current_dir: &Path, prompt: &str, session_id: &str, - ) -> Result { + ) -> Result { let (shell_cmd, shell_arg) = get_shell_command(); let command_builder = self.build_command_builder().await; // Build follow-up command with --resume {session_id} @@ -201,7 +197,7 @@ impl StandardCodingAgentExecutor for ClaudeCode { stdin.shutdown().await?; } - Ok(child) + Ok(child.into()) } fn normalize_logs(&self, msg_store: Arc, current_dir: &Path) { diff --git a/crates/executors/src/executors/codex.rs b/crates/executors/src/executors/codex.rs index a5578428..9273a66e 100644 --- a/crates/executors/src/executors/codex.rs +++ b/crates/executors/src/executors/codex.rs @@ -7,7 +7,7 @@ use std::{ }; use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; +use command_group::AsyncCommandGroup; use futures::StreamExt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -24,7 +24,8 @@ use workspace_utils::{ use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, executors::{ - AppendPrompt, ExecutorError, StandardCodingAgentExecutor, codex::session::SessionHandler, + AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor, + codex::session::SessionHandler, }, logs::{ ActionType, FileChange, NormalizedEntry, NormalizedEntryType, ToolStatus, @@ -126,11 +127,7 @@ impl Codex { #[async_trait] impl StandardCodingAgentExecutor for Codex { - async fn spawn( - &self, - current_dir: &Path, - prompt: &str, - ) -> Result { + async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { let (shell_cmd, shell_arg) = get_shell_command(); let codex_command = self.build_command_builder().build_initial(); @@ -156,7 +153,7 @@ impl StandardCodingAgentExecutor for Codex { stdin.shutdown().await?; } - Ok(child) + Ok(child.into()) } async fn spawn_follow_up( @@ -164,7 +161,7 @@ impl StandardCodingAgentExecutor for Codex { current_dir: &Path, prompt: &str, session_id: &str, - ) -> Result { + ) -> Result { // Fork rollout: copy and assign a new session id so each execution has a unique session let (_rollout_file_path, new_session_id) = SessionHandler::fork_rollout_file(session_id) .map_err(|e| ExecutorError::SpawnError(std::io::Error::other(e)))?; @@ -196,7 +193,7 @@ impl StandardCodingAgentExecutor for Codex { stdin.shutdown().await?; } - Ok(child) + Ok(child.into()) } fn normalize_logs(&self, msg_store: Arc, current_dir: &Path) { diff --git a/crates/executors/src/executors/cursor.rs b/crates/executors/src/executors/cursor.rs index 38a3726d..910e9b2c 100644 --- a/crates/executors/src/executors/cursor.rs +++ b/crates/executors/src/executors/cursor.rs @@ -2,7 +2,7 @@ use core::str; use std::{path::Path, process::Stdio, sync::Arc, time::Duration}; use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; +use command_group::AsyncCommandGroup; use futures::StreamExt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -20,7 +20,7 @@ use workspace_utils::{ use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, - executors::{AppendPrompt, ExecutorError, StandardCodingAgentExecutor}, + executors::{AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor}, logs::{ ActionType, FileChange, NormalizedEntry, NormalizedEntryType, TodoItem, ToolStatus, plain_text_processor::PlainTextLogProcessor, @@ -61,11 +61,7 @@ impl Cursor { #[async_trait] impl StandardCodingAgentExecutor for Cursor { - async fn spawn( - &self, - current_dir: &Path, - prompt: &str, - ) -> Result { + async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { let (shell_cmd, shell_arg) = get_shell_command(); let agent_cmd = self.build_command_builder().build_initial(); @@ -88,7 +84,7 @@ impl StandardCodingAgentExecutor for Cursor { stdin.shutdown().await?; } - Ok(child) + Ok(child.into()) } async fn spawn_follow_up( @@ -96,7 +92,7 @@ impl StandardCodingAgentExecutor for Cursor { current_dir: &Path, prompt: &str, session_id: &str, - ) -> Result { + ) -> Result { let (shell_cmd, shell_arg) = get_shell_command(); let agent_cmd = self .build_command_builder() @@ -121,7 +117,7 @@ impl StandardCodingAgentExecutor for Cursor { stdin.shutdown().await?; } - Ok(child) + Ok(child.into()) } fn normalize_logs(&self, msg_store: Arc, worktree_path: &Path) { diff --git a/crates/executors/src/executors/gemini.rs b/crates/executors/src/executors/gemini.rs index dbb34472..c076fc25 100644 --- a/crates/executors/src/executors/gemini.rs +++ b/crates/executors/src/executors/gemini.rs @@ -1,30 +1,15 @@ -use std::{ - path::{Path, PathBuf}, - process::Stdio, - sync::Arc, -}; +use std::{path::Path, sync::Arc}; use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; -use futures::{StreamExt, stream::BoxStream}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use tokio::{ - fs::{self, OpenOptions}, - io::AsyncWriteExt, - process::Command, -}; use ts_rs::TS; -use workspace_utils::{msg_store::MsgStore, shell::get_shell_command}; +use workspace_utils::msg_store::MsgStore; +pub use super::acp::AcpAgentHarness; use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, - executors::{AppendPrompt, ExecutorError, StandardCodingAgentExecutor}, - logs::{ - NormalizedEntry, NormalizedEntryType, plain_text_processor::PlainTextLogProcessor, - stderr_processor::normalize_stderr_logs, utils::EntryIndexProvider, - }, - stdout_dup, + executors::{AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor}, }; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema)] @@ -69,352 +54,42 @@ impl Gemini { builder = builder.extend_params(["--yolo"]); } + builder = builder.extend_params(["--experimental-acp"]); + apply_overrides(builder, &self.cmd) } } #[async_trait] impl StandardCodingAgentExecutor for Gemini { - async fn spawn( - &self, - current_dir: &Path, - prompt: &str, - ) -> Result { - let (shell_cmd, shell_arg) = get_shell_command(); - let gemini_command = self.build_command_builder().build_initial(); - + async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { + let harness = AcpAgentHarness::new(); let combined_prompt = self.append_prompt.combine_prompt(prompt); - - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(current_dir) - .arg(shell_arg) - .arg(gemini_command) - .env("NODE_NO_WARNINGS", "1"); - - let mut child = command.group_spawn()?; - - // Write prompt to stdin - if let Some(mut stdin) = child.inner().stdin.take() { - stdin.write_all(combined_prompt.as_bytes()).await?; - stdin.shutdown().await?; - } - - // Duplicate stdout for session logging - let duplicate_stdout = stdout_dup::duplicate_stdout(&mut child)?; - tokio::spawn(Self::record_session( - duplicate_stdout, - current_dir.to_path_buf(), - prompt.to_string(), - false, - )); - - Ok(child) + let gemini_command = self.build_command_builder().build_initial(); + harness + .spawn_with_command(current_dir, combined_prompt, gemini_command) + .await } async fn spawn_follow_up( &self, current_dir: &Path, prompt: &str, - _session_id: &str, - ) -> Result { - // Build comprehensive prompt with session context - let followup_prompt = self.build_followup_prompt(current_dir, prompt).await?; - - let (shell_cmd, shell_arg) = get_shell_command(); + session_id: &str, + ) -> Result { + let harness = AcpAgentHarness::new(); + let combined_prompt = self.append_prompt.combine_prompt(prompt); let gemini_command = self.build_command_builder().build_follow_up(&[]); - - let mut command = Command::new(shell_cmd); - - command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(current_dir) - .arg(shell_arg) - .arg(gemini_command) - .env("NODE_NO_WARNINGS", "1"); - - let mut child = command.group_spawn()?; - - // Write comprehensive prompt to stdin - if let Some(mut stdin) = child.inner().stdin.take() { - stdin.write_all(followup_prompt.as_bytes()).await?; - stdin.shutdown().await?; - } - - // Duplicate stdout for session logging (resume existing session) - let duplicate_stdout = stdout_dup::duplicate_stdout(&mut child)?; - tokio::spawn(Self::record_session( - duplicate_stdout, - current_dir.to_path_buf(), - prompt.to_string(), - true, - )); - - Ok(child) + harness + .spawn_follow_up_with_command(current_dir, combined_prompt, session_id, gemini_command) + .await } - /// Parses both stderr and stdout logs for Gemini executor using PlainTextLogProcessor. - /// - /// - Stderr: uses the standard stderr log processor, which formats stderr output as ErrorMessage entries. - /// - Stdout: applies custom `format_chunk` to insert line breaks on period-to-capital transitions, - /// then create assitant messages from the output. - /// - /// Each entry is converted into an `AssistantMessage` or `ErrorMessage` and emitted as patches. - /// - /// # Example - /// - /// ```rust,ignore - /// gemini.normalize_logs(msg_store.clone(), &worktree_path); - /// ``` - /// - /// Subsequent queries to `msg_store` will receive JSON patches representing parsed log entries. - /// Sets up log normalization for the Gemini executor: - /// - stderr via [`normalize_stderr_logs`] - /// - stdout via [`PlainTextLogProcessor`] with Gemini-specific formatting and default heuristics fn normalize_logs(&self, msg_store: Arc, worktree_path: &Path) { - let entry_index_counter = EntryIndexProvider::start_from(&msg_store); - normalize_stderr_logs(msg_store.clone(), entry_index_counter.clone()); - - // Send session ID to msg_store to enable follow-ups - msg_store.push_session_id( - worktree_path - .file_name() - .unwrap_or_default() - .to_string_lossy() - .to_string(), - ); - - // Normalize Agent logs - tokio::spawn(async move { - let mut stdout = msg_store.stdout_chunked_stream(); - - // Create a processor with Gemini-specific formatting - let mut processor = Self::create_gemini_style_processor(entry_index_counter); - - while let Some(Ok(chunk)) = stdout.next().await { - for patch in processor.process(chunk) { - msg_store.push_patch(patch); - } - } - }); + super::acp::normalize_logs(msg_store, worktree_path); } - // MCP configuration methods fn default_mcp_config_path(&self) -> Option { dirs::home_dir().map(|home| home.join(".gemini").join("settings.json")) } } - -impl Gemini { - /// Creates a PlainTextLogProcessor that applies Gemini's sentence-break heuristics. - /// - /// This processor formats chunks by inserting line breaks at period-to-capital transitions - /// and filters out Gemini CLI noise messages. - pub(crate) fn create_gemini_style_processor( - index_provider: EntryIndexProvider, - ) -> PlainTextLogProcessor { - PlainTextLogProcessor::builder() - .normalized_entry_producer(Box::new(|content: String| NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::AssistantMessage, - content, - metadata: None, - })) - .format_chunk(Box::new(|partial, chunk| { - Self::format_stdout_chunk(&chunk, partial.unwrap_or("")) - })) - .transform_lines(Box::new(|lines: &mut Vec| { - lines.retain(|l| l != "Data collection is disabled.\n"); - })) - .index_provider(index_provider) - .build() - } - - /// Make Gemini output more readable by inserting line breaks where periods are directly - /// followed by capital letters (common Gemini CLI formatting issue). - /// Handles both intra-chunk and cross-chunk period-to-capital transitions. - fn format_stdout_chunk(content: &str, accumulated_message: &str) -> String { - let mut result = String::with_capacity(content.len() + 100); - let chars: Vec = content.chars().collect(); - - // Check for cross-chunk boundary: previous chunk ended with period, current starts with capital - if !accumulated_message.is_empty() && !content.is_empty() { - let ends_with_period = accumulated_message.ends_with('.'); - let starts_with_capital = chars - .first() - .map(|&c| c.is_uppercase() && c.is_alphabetic()) - .unwrap_or(false); - - if ends_with_period && starts_with_capital { - result.push('\n'); - } - } - - // Handle intra-chunk period-to-capital transitions - for i in 0..chars.len() { - result.push(chars[i]); - - // Check if current char is '.' and next char is uppercase letter (no space between) - if chars[i] == '.' && i + 1 < chars.len() { - let next_char = chars[i + 1]; - if next_char.is_uppercase() && next_char.is_alphabetic() { - result.push('\n'); - } - } - } - - result - } - - async fn record_session( - mut stdout_stream: BoxStream<'static, std::io::Result>, - current_dir: PathBuf, - prompt: String, - resume_session: bool, - ) { - let file_path = Self::get_session_file_path(¤t_dir).await; - - // Ensure the directory exists - if let Some(parent) = file_path.parent() { - let _ = fs::create_dir_all(parent).await; - } - - // If not resuming session, delete the file first - if !resume_session { - let _ = fs::remove_file(&file_path).await; - } - - // Always append from here on - let mut file = match OpenOptions::new() - .create(true) - .append(true) - .open(&file_path) - .await - { - Ok(file) => file, - Err(_) => { - tracing::error!("Failed to open session file: {:?}", file_path); - return; - } - }; - - // Write user message as normalized entry - let mut user_message_json = serde_json::to_string(&NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::UserMessage, - content: prompt, - metadata: None, - }) - .unwrap_or_default(); - user_message_json.push('\n'); - let _ = file.write_all(user_message_json.as_bytes()).await; - - // Read stdout incrementally and append assistant message - let mut stdout_content = String::new(); - - // Read stdout until the process finishes - while let Some(Ok(chunk)) = stdout_stream.next().await { - stdout_content.push_str(&chunk); - } - - let mut assistant_message_json = serde_json::to_string(&NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::AssistantMessage, - content: stdout_content, - metadata: None, - }) - .unwrap_or_default(); - assistant_message_json.push('\n'); - let _ = file.write_all(assistant_message_json.as_bytes()).await; - } - - /// Build comprehensive prompt with session context for follow-up execution - async fn build_followup_prompt( - &self, - current_dir: &Path, - prompt: &str, - ) -> Result { - let session_file_path = Self::get_session_file_path(current_dir).await; - - // Read existing session context - let session_context = fs::read_to_string(&session_file_path).await.map_err(|e| { - ExecutorError::FollowUpNotSupported(format!( - "No existing Gemini session found for this worktree. Session file not found at {session_file_path:?}: {e}" - )) - })?; - - Ok(format!( - r#"RESUME CONTEXT FOR CONTINUING TASK - -=== EXECUTION HISTORY === -The following is the conversation history from this session: -{session_context} - -=== CURRENT REQUEST === -{prompt} - -=== INSTRUCTIONS === -You are continuing work on the above task. The execution history shows the previous conversation in this session. Please continue from where the previous execution left off, taking into account all the context provided above.{} -"#, - self.append_prompt.get().unwrap_or_default(), - )) - } - - fn get_sessions_base_dir() -> PathBuf { - // Determine base directory under user's home - let home = dirs::home_dir().unwrap_or_else(std::env::temp_dir); - if cfg!(debug_assertions) { - home.join(".vibe-kanban") - .join("dev") - .join("gemini_sessions") - } else { - home.join(".vibe-kanban").join("gemini_sessions") - } - } - - fn get_legacy_sessions_base_dir() -> PathBuf { - // Previous location was under the temp-based vibe-kanban dir - workspace_utils::path::get_vibe_kanban_temp_dir().join("gemini_sessions") - } - - async fn get_session_file_path(current_dir: &Path) -> PathBuf { - let file_name = current_dir.file_name().unwrap_or_default(); - let new_base = Self::get_sessions_base_dir(); - let new_path = new_base.join(file_name); - - // Ensure base directory exists - if let Some(parent) = new_path.parent() { - let _ = fs::create_dir_all(parent).await; - } - - // If the new file doesn't exist yet, try to migrate from legacy location - let new_exists = fs::metadata(&new_path).await.is_ok(); - if !new_exists { - let legacy_path = Self::get_legacy_sessions_base_dir().join(file_name); - if fs::metadata(&legacy_path).await.is_ok() { - if let Err(e) = fs::rename(&legacy_path, &new_path).await { - tracing::warn!( - "Failed to migrate Gemini session from {:?} to {:?}: {}", - legacy_path, - new_path, - e - ); - } else { - tracing::info!( - "Migrated Gemini session file from legacy temp directory to persistent directory: {:?}", - new_path - ); - } - } - } - - new_path - } -} diff --git a/crates/executors/src/executors/mod.rs b/crates/executors/src/executors/mod.rs index 34e0c502..3e99db46 100644 --- a/crates/executors/src/executors/mod.rs +++ b/crates/executors/src/executors/mod.rs @@ -20,6 +20,7 @@ use crate::{ mcp_config::McpConfig, }; +pub mod acp; pub mod amp; pub mod claude; pub mod codex; @@ -125,7 +126,9 @@ impl CodingAgent { Self::ClaudeCode(_) => vec![BaseAgentCapability::SessionFork], Self::Amp(_) => vec![BaseAgentCapability::SessionFork], Self::Codex(_) => vec![BaseAgentCapability::SessionFork], - Self::Gemini(_) | Self::Opencode(_) | Self::Cursor(_) | Self::QwenCode(_) => vec![], + Self::Gemini(_) => vec![BaseAgentCapability::SessionFork], + Self::QwenCode(_) => vec![BaseAgentCapability::SessionFork], + Self::Opencode(_) | Self::Cursor(_) => vec![], } } } @@ -133,17 +136,13 @@ impl CodingAgent { #[async_trait] #[enum_dispatch(CodingAgent)] pub trait StandardCodingAgentExecutor { - async fn spawn( - &self, - current_dir: &Path, - prompt: &str, - ) -> Result; + async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result; async fn spawn_follow_up( &self, current_dir: &Path, prompt: &str, session_id: &str, - ) -> Result; + ) -> Result; fn normalize_logs(&self, _raw_logs_event_store: Arc, _worktree_path: &Path); // MCP configuration methods @@ -156,6 +155,26 @@ pub trait StandardCodingAgentExecutor { } } +/// Optional exit notification from an executor. +/// When this receiver resolves, the container should gracefully stop the process +/// and mark it as successful (exit code 0). +pub type ExecutorExitSignal = tokio::sync::oneshot::Receiver<()>; + +#[derive(Debug)] +pub struct SpawnedChild { + pub child: AsyncGroupChild, + pub exit_signal: Option, +} + +impl From for SpawnedChild { + fn from(child: AsyncGroupChild) -> Self { + Self { + child, + exit_signal: None, + } + } +} + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema)] #[serde(transparent)] #[schemars( diff --git a/crates/executors/src/executors/opencode.rs b/crates/executors/src/executors/opencode.rs index f3ea2f92..2af02dcf 100644 --- a/crates/executors/src/executors/opencode.rs +++ b/crates/executors/src/executors/opencode.rs @@ -7,7 +7,7 @@ use std::{ }; use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; +use command_group::AsyncCommandGroup; use fork_stream::StreamExt as _; use futures::{StreamExt, future::ready, stream::BoxStream}; use lazy_static::lazy_static; @@ -21,7 +21,7 @@ use workspace_utils::{msg_store::MsgStore, path::make_path_relative, shell::get_ use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, executors::{ - AppendPrompt, ExecutorError, StandardCodingAgentExecutor, + AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor, opencode::share_bridge::Bridge as ShareBridge, }, logs::{ @@ -128,11 +128,7 @@ impl Opencode { #[async_trait] impl StandardCodingAgentExecutor for Opencode { - async fn spawn( - &self, - current_dir: &Path, - prompt: &str, - ) -> Result { + async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { // Start a dedicated local share bridge bound to this opencode process let bridge = ShareBridge::start().await.map_err(ExecutorError::Io)?; let (shell_cmd, shell_arg) = get_shell_command(); @@ -189,7 +185,7 @@ impl StandardCodingAgentExecutor for Opencode { tracing::debug!("Opencode process stdout closed"); bridge_for_shutdown.shutdown().await; }); - Ok(child) + Ok(child.into()) } async fn spawn_follow_up( @@ -197,7 +193,7 @@ impl StandardCodingAgentExecutor for Opencode { current_dir: &Path, prompt: &str, session_id: &str, - ) -> Result { + ) -> Result { // Start a dedicated local share bridge bound to this opencode process let bridge = ShareBridge::start().await.map_err(ExecutorError::Io)?; let (shell_cmd, shell_arg) = get_shell_command(); @@ -253,7 +249,7 @@ impl StandardCodingAgentExecutor for Opencode { while let Some(_chunk) = dup_stream.next().await {} bridge_for_shutdown.shutdown().await; }); - Ok(child) + Ok(child.into()) } /// Normalize logs for OpenCode executor diff --git a/crates/executors/src/executors/qwen.rs b/crates/executors/src/executors/qwen.rs index f6fb6098..53bc7563 100644 --- a/crates/executors/src/executors/qwen.rs +++ b/crates/executors/src/executors/qwen.rs @@ -1,17 +1,17 @@ -use std::{path::Path, process::Stdio, sync::Arc}; +use std::{path::Path, sync::Arc}; use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use tokio::{io::AsyncWriteExt, process::Command}; use ts_rs::TS; -use workspace_utils::{msg_store::MsgStore, shell::get_shell_command}; +use workspace_utils::msg_store::MsgStore; use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, - executors::{AppendPrompt, ExecutorError, StandardCodingAgentExecutor, gemini::Gemini}, - logs::{stderr_processor::normalize_stderr_logs, utils::EntryIndexProvider}, + executors::{ + AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor, + gemini::AcpAgentHarness, + }, }; #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema)] @@ -31,42 +31,20 @@ impl QwenCode { if self.yolo.unwrap_or(false) { builder = builder.extend_params(["--yolo"]); } - + builder = builder.extend_params(["--experimental-acp"]); apply_overrides(builder, &self.cmd) } } #[async_trait] impl StandardCodingAgentExecutor for QwenCode { - async fn spawn( - &self, - current_dir: &Path, - prompt: &str, - ) -> Result { - let (shell_cmd, shell_arg) = get_shell_command(); + async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { let qwen_command = self.build_command_builder().build_initial(); - let combined_prompt = self.append_prompt.combine_prompt(prompt); - - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(current_dir) - .arg(shell_arg) - .arg(&qwen_command); - - let mut child = command.group_spawn()?; - - // Feed the prompt in, then close the pipe - if let Some(mut stdin) = child.inner().stdin.take() { - stdin.write_all(combined_prompt.as_bytes()).await?; - stdin.shutdown().await?; - } - - Ok(child) + let harness = AcpAgentHarness::with_session_namespace("qwen_sessions"); + harness + .spawn_with_command(current_dir, combined_prompt, qwen_command) + .await } async fn spawn_follow_up( @@ -74,64 +52,17 @@ impl StandardCodingAgentExecutor for QwenCode { current_dir: &Path, prompt: &str, session_id: &str, - ) -> Result { - let (shell_cmd, shell_arg) = get_shell_command(); - let qwen_command = self - .build_command_builder() - .build_follow_up(&["--resume".to_string(), session_id.to_string()]); - + ) -> Result { + let qwen_command = self.build_command_builder().build_follow_up(&[]); let combined_prompt = self.append_prompt.combine_prompt(prompt); - - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(current_dir) - .arg(shell_arg) - .arg(&qwen_command); - - let mut child = command.group_spawn()?; - - // Feed the followup prompt in, then close the pipe - if let Some(mut stdin) = child.inner().stdin.take() { - stdin.write_all(combined_prompt.as_bytes()).await?; - stdin.shutdown().await?; - } - - Ok(child) + let harness = AcpAgentHarness::with_session_namespace("qwen_sessions"); + harness + .spawn_follow_up_with_command(current_dir, combined_prompt, session_id, qwen_command) + .await } - fn normalize_logs(&self, msg_store: Arc, current_dir: &Path) { - // QwenCode has similar output format to Gemini CLI - // Use Gemini's proven sentence-break formatting instead of simple replace - let entry_index_counter = EntryIndexProvider::start_from(&msg_store); - normalize_stderr_logs(msg_store.clone(), entry_index_counter.clone()); - - // Send session ID to msg_store to enable follow-ups - msg_store.push_session_id( - current_dir - .file_name() - .unwrap_or_default() - .to_string_lossy() - .to_string(), - ); - - // Use Gemini's log processor for consistent formatting - tokio::spawn(async move { - use futures::StreamExt; - let mut stdout = msg_store.stdout_chunked_stream(); - - // Use Gemini's proven sentence-break heuristics - let mut processor = Gemini::create_gemini_style_processor(entry_index_counter); - - while let Some(Ok(chunk)) = stdout.next().await { - for patch in processor.process(chunk) { - msg_store.push_patch(patch); - } - } - }); + fn normalize_logs(&self, msg_store: Arc, worktree_path: &Path) { + crate::executors::acp::normalize_logs(msg_store, worktree_path); } // MCP configuration methods diff --git a/crates/executors/src/stdout_dup.rs b/crates/executors/src/stdout_dup.rs index 0f07efb0..2f6f8a4f 100644 --- a/crates/executors/src/stdout_dup.rs +++ b/crates/executors/src/stdout_dup.rs @@ -159,6 +159,25 @@ pub fn tee_stdout_with_appender( )) } +/// Create a fresh stdout pipe for the child process and return an async writer +/// that writes directly to the child's new stdout. +/// +/// This helper does not read or duplicate any existing stdout; it simply +/// replaces the child's stdout with a new pipe reader and returns the +/// corresponding async writer for the caller to write into. +pub fn create_stdout_pipe_writer<'b>( + child: &mut AsyncGroupChild, +) -> Result { + // Create replacement pipe and set as new child stdout + let (pipe_reader, pipe_writer) = os_pipe::pipe().map_err(|e| { + ExecutorError::Io(std::io::Error::other(format!("Failed to create pipe: {e}"))) + })?; + child.inner().stdout = Some(wrap_fd_as_child_stdout(pipe_reader)?); + + // Return async writer to the caller + wrap_fd_as_tokio_writer(pipe_writer) +} + // ========================================= // OS file descriptor helper functions // ========================================= diff --git a/crates/local-deployment/src/container.rs b/crates/local-deployment/src/container.rs index 1fd14148..0e39aae2 100644 --- a/crates/local-deployment/src/container.rs +++ b/crates/local-deployment/src/container.rs @@ -1,6 +1,7 @@ use std::{ collections::{HashMap, HashSet}, io, + os::unix::process::ExitStatusExt, path::{Path, PathBuf}, sync::{ Arc, @@ -40,7 +41,7 @@ use executors::{ }, }, }; -use futures::{StreamExt, TryStreamExt, stream::select}; +use futures::{FutureExt, StreamExt, TryStreamExt, stream::select}; use notify_debouncer_full::DebouncedEvent; use serde_json::json; use services::services::{ @@ -346,7 +347,11 @@ impl LocalContainerService { /// Spawn a background task that polls the child process for completion and /// cleans up the execution entry when it exits. - pub fn spawn_exit_monitor(&self, exec_id: &Uuid) -> JoinHandle<()> { + pub fn spawn_exit_monitor( + &self, + exec_id: &Uuid, + exit_signal: Option>, + ) -> JoinHandle<()> { let exec_id = *exec_id; let child_store = self.child_store.clone(); let msg_stores = self.msg_stores.clone(); @@ -355,180 +360,199 @@ impl LocalContainerService { let container = self.clone(); let analytics = self.analytics.clone(); + let mut process_exit_rx = self.spawn_os_exit_watcher(exec_id); + tokio::spawn(async move { - loop { - let status_opt = { - let child_lock = { - let map = child_store.read().await; - map.get(&exec_id) - .cloned() - .unwrap_or_else(|| panic!("Child handle missing for {exec_id}")) + let mut exit_signal_future = exit_signal + .map(|rx| rx.map(|_| ()).boxed()) // wait for signal + .unwrap_or_else(|| std::future::pending::<()>().boxed()); // no signal, stall forever + + let status_result: std::io::Result; + + // Wait for process to exit, or exit signal from executor + tokio::select! { + // Exit signal. + // Some coding agent processes do not automatically exit after processing the user request; instead the executor + // signals when processing has finished to gracefully kill the process. + _ = &mut exit_signal_future => { + // Executor signaled completion: kill group and remember to force Completed(0) + if let Some(child_lock) = child_store.read().await.get(&exec_id).cloned() { + let mut child = child_lock.write().await ; + if let Err(err) = command::kill_process_group(&mut child).await { + tracing::error!("Failed to kill process group after exit signal: {} {}", exec_id, err); + } + } + status_result = Ok(std::process::ExitStatus::from_raw(0)); + } + // Process exit + exit_status_result = &mut process_exit_rx => { + status_result = exit_status_result.unwrap_or_else(|e| Err(std::io::Error::other(e))); + } + } + + let (exit_code, status) = match status_result { + Ok(exit_status) => { + let code = exit_status.code().unwrap_or(-1) as i64; + let status = if exit_status.success() { + ExecutionProcessStatus::Completed + } else { + ExecutionProcessStatus::Failed }; + (Some(code), status) + } + Err(_) => (None, ExecutionProcessStatus::Failed), + }; - let mut child_handler = child_lock.write().await; - match child_handler.try_wait() { - Ok(Some(status)) => Some(Ok(status)), - Ok(None) => None, - Err(e) => Some(Err(e)), - } - }; + if !ExecutionProcess::was_killed(&db.pool, exec_id).await + && let Err(e) = + ExecutionProcess::update_completion(&db.pool, exec_id, status, exit_code).await + { + tracing::error!("Failed to update execution process completion: {}", e); + } - // Update execution process and cleanup if exit - if let Some(status_result) = status_opt { - // Update execution process record with completion info - let (exit_code, status) = match status_result { - Ok(exit_status) => { - let code = exit_status.code().unwrap_or(-1) as i64; - let status = if exit_status.success() { - ExecutionProcessStatus::Completed - } else { - ExecutionProcessStatus::Failed - }; - (Some(code), status) - } - Err(_) => (None, ExecutionProcessStatus::Failed), - }; - - if !ExecutionProcess::was_killed(&db.pool, exec_id).await - && let Err(e) = ExecutionProcess::update_completion( - &db.pool, - exec_id, - status.clone(), - exit_code, - ) - .await - { - tracing::error!("Failed to update execution process completion: {}", e); - } - - if let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await { - // Update executor session summary if available - if let Err(e) = container.update_executor_session_summary(&exec_id).await { - tracing::warn!("Failed to update executor session summary: {}", e); - } - - // (moved) capture after-head commit occurs later, after commit/next-action handling - - if matches!( - ctx.execution_process.status, - ExecutionProcessStatus::Completed - ) && exit_code == Some(0) - { - // Commit changes (if any) and get feedback about whether changes were made - let changes_committed = match container.try_commit_changes(&ctx).await { - Ok(committed) => committed, - Err(e) => { - tracing::error!( - "Failed to commit changes after execution: {}", - e - ); - // Treat commit failures as if changes were made to be safe - true - } - }; - - // Determine whether to start the next action based on execution context - let should_start_next = if matches!( - ctx.execution_process.run_reason, - ExecutionProcessRunReason::CodingAgent - ) { - // Skip CleanupScript when CodingAgent produced no changes - changes_committed - } else { - // SetupScript always proceeds to CodingAgent - true - }; - - if should_start_next { - // If the process exited successfully, start the next action - if let Err(e) = container.try_start_next_action(&ctx).await { - tracing::error!( - "Failed to start next action after completion: {}", - e - ); - } - } else { - tracing::info!( - "Skipping cleanup script for task attempt {} - no changes made by coding agent", - ctx.task_attempt.id - ); - - // Manually finalize task since we're bypassing normal execution flow - Self::finalize_task(&db, &config, &ctx).await; - } - } - - if Self::should_finalize(&ctx) { - Self::finalize_task(&db, &config, &ctx).await; - // After finalization, check if a queued follow-up exists and start it - if let Err(e) = container.try_consume_queued_followup(&ctx).await { - tracing::error!( - "Failed to start queued follow-up for attempt {}: {}", - ctx.task_attempt.id, - e - ); - } - } - - // Fire event when CodingAgent execution has finished - if config.read().await.analytics_enabled == Some(true) - && matches!( - &ctx.execution_process.run_reason, - ExecutionProcessRunReason::CodingAgent - ) - && let Some(analytics) = &analytics - { - analytics.analytics_service.track_event(&analytics.user_id, "task_attempt_finished", Some(json!({ - "task_id": ctx.task.id.to_string(), - "project_id": ctx.task.project_id.to_string(), - "attempt_id": ctx.task_attempt.id.to_string(), - "execution_success": matches!(ctx.execution_process.status, ExecutionProcessStatus::Completed), - "exit_code": ctx.execution_process.exit_code, - }))); - } - } - - // Now that commit/next-action/finalization steps for this process are complete, - // capture the HEAD OID as the definitive "after" state (best-effort). - if let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await { - let worktree_dir = container.task_attempt_to_current_dir(&ctx.task_attempt); - if let Ok(head) = container.git().get_head_info(&worktree_dir) - && let Err(e) = ExecutionProcess::update_after_head_commit( - &db.pool, exec_id, &head.oid, - ) - .await - { - tracing::warn!( - "Failed to update after_head_commit for {}: {}", - exec_id, - e - ); - } - } - - // Cleanup msg store - if let Some(msg_arc) = msg_stores.write().await.remove(&exec_id) { - msg_arc.push_finished(); - tokio::time::sleep(Duration::from_millis(50)).await; // Wait for the finish message to propogate - match Arc::try_unwrap(msg_arc) { - Ok(inner) => drop(inner), - Err(arc) => tracing::error!( - "There are still {} strong Arcs to MsgStore for {}", - Arc::strong_count(&arc), - exec_id - ), - } - } - - // Cleanup child handle - child_store.write().await.remove(&exec_id); - break; + if let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await { + // Update executor session summary if available + if let Err(e) = container.update_executor_session_summary(&exec_id).await { + tracing::warn!("Failed to update executor session summary: {}", e); } - // still running, sleep and try again + if matches!( + ctx.execution_process.status, + ExecutionProcessStatus::Completed + ) && exit_code == Some(0) + { + // Commit changes (if any) and get feedback about whether changes were made + let changes_committed = match container.try_commit_changes(&ctx).await { + Ok(committed) => committed, + Err(e) => { + tracing::error!("Failed to commit changes after execution: {}", e); + // Treat commit failures as if changes were made to be safe + true + } + }; + + let should_start_next = if matches!( + ctx.execution_process.run_reason, + ExecutionProcessRunReason::CodingAgent + ) { + changes_committed + } else { + true + }; + + if should_start_next { + // If the process exited successfully, start the next action + if let Err(e) = container.try_start_next_action(&ctx).await { + tracing::error!("Failed to start next action after completion: {}", e); + } + } else { + tracing::info!( + "Skipping cleanup script for task attempt {} - no changes made by coding agent", + ctx.task_attempt.id + ); + + // Manually finalize task since we're bypassing normal execution flow + Self::finalize_task(&db, &config, &ctx).await; + } + } + + if Self::should_finalize(&ctx) { + Self::finalize_task(&db, &config, &ctx).await; + // After finalization, check if a queued follow-up exists and start it + if let Err(e) = container.try_consume_queued_followup(&ctx).await { + tracing::error!( + "Failed to start queued follow-up for attempt {}: {}", + ctx.task_attempt.id, + e + ); + } + } + + // Fire analytics event when CodingAgent execution has finished + if config.read().await.analytics_enabled == Some(true) + && matches!( + &ctx.execution_process.run_reason, + ExecutionProcessRunReason::CodingAgent + ) + && let Some(analytics) = &analytics + { + analytics.analytics_service.track_event(&analytics.user_id, "task_attempt_finished", Some(json!({ + "task_id": ctx.task.id.to_string(), + "project_id": ctx.task.project_id.to_string(), + "attempt_id": ctx.task_attempt.id.to_string(), + "execution_success": matches!(ctx.execution_process.status, ExecutionProcessStatus::Completed), + "exit_code": ctx.execution_process.exit_code, + }))); + } + } + + // Now that commit/next-action/finalization steps for this process are complete, + // capture the HEAD OID as the definitive "after" state (best-effort). + if let Ok(ctx) = ExecutionProcess::load_context(&db.pool, exec_id).await { + let worktree_dir = container.task_attempt_to_current_dir(&ctx.task_attempt); + if let Ok(head) = container.git().get_head_info(&worktree_dir) + && let Err(e) = + ExecutionProcess::update_after_head_commit(&db.pool, exec_id, &head.oid) + .await + { + tracing::warn!("Failed to update after_head_commit for {}: {}", exec_id, e); + } + } + + // Cleanup msg store + if let Some(msg_arc) = msg_stores.write().await.remove(&exec_id) { + msg_arc.push_finished(); + tokio::time::sleep(Duration::from_millis(50)).await; // Wait for the finish message to propogate + match Arc::try_unwrap(msg_arc) { + Ok(inner) => drop(inner), + Err(arc) => tracing::error!( + "There are still {} strong Arcs to MsgStore for {}", + Arc::strong_count(&arc), + exec_id + ), + } + } + + // Cleanup child handle + child_store.write().await.remove(&exec_id); + }) + } + + pub fn spawn_os_exit_watcher( + &self, + exec_id: Uuid, + ) -> tokio::sync::oneshot::Receiver> { + let (tx, rx) = tokio::sync::oneshot::channel::>(); + let child_store = self.child_store.clone(); + tokio::spawn(async move { + loop { + let child_lock = { + let map = child_store.read().await; + map.get(&exec_id).cloned() + }; + if let Some(child_lock) = child_lock { + let mut child_handler = child_lock.write().await; + match child_handler.try_wait() { + Ok(Some(status)) => { + let _ = tx.send(Ok(status)); + break; + } + Ok(None) => {} + Err(e) => { + let _ = tx.send(Err(e)); + break; + } + } + } else { + let _ = tx.send(Err(io::Error::other(format!( + "Child handle missing for {exec_id}" + )))); + break; + } tokio::time::sleep(Duration::from_millis(250)).await; } - }) + }); + rx } pub fn dir_name_from_task_attempt(attempt_id: &Uuid, task_title: &str) -> String { @@ -1043,15 +1067,16 @@ impl ContainerService for LocalContainerService { let current_dir = PathBuf::from(container_ref); // Create the child and stream, add to execution tracker - let mut child = executor_action.spawn(¤t_dir).await?; + let mut spawned = executor_action.spawn(¤t_dir).await?; - self.track_child_msgs_in_store(execution_process.id, &mut child) + self.track_child_msgs_in_store(execution_process.id, &mut spawned.child) .await; - self.add_child_to_store(execution_process.id, child).await; + self.add_child_to_store(execution_process.id, spawned.child) + .await; - // Spawn exit monitor - let _hn = self.spawn_exit_monitor(&execution_process.id); + // Spawn unified exit monitor: watches OS exit and optional executor signal + let _hn = self.spawn_exit_monitor(&execution_process.id, spawned.exit_signal); Ok(()) }