From 99f7d9a4bcd452d449e2bf6c3d7e11252a48620e Mon Sep 17 00:00:00 2001 From: Solomon Date: Mon, 3 Nov 2025 15:57:53 +0000 Subject: [PATCH] feat: Enhance executable resolution by refreshing PATH (#1098) * Refresh path on executable lookup * Make resolve_executable_path async * Handle task attempt start failure gracefully * clippy fix * Remove unused to_shell_string * Lint --------- Co-authored-by: Alex Netsch --- Cargo.lock | 20 +- crates/executors/Cargo.toml | 3 + crates/executors/src/command.rs | 78 ++++- crates/executors/src/executors/acp/harness.rs | 25 +- crates/executors/src/executors/amp.rs | 35 ++- crates/executors/src/executors/claude.rs | 24 +- crates/executors/src/executors/codex.rs | 21 +- crates/executors/src/executors/copilot.rs | 26 +- crates/executors/src/executors/cursor.rs | 24 +- crates/executors/src/executors/gemini.rs | 4 +- crates/executors/src/executors/mod.rs | 5 + crates/executors/src/executors/opencode.rs | 22 +- crates/executors/src/executors/qwen.rs | 4 +- crates/server/src/routes/task_attempts.rs | 9 +- crates/server/src/routes/tasks.rs | 10 +- crates/services/src/services/git_cli.rs | 25 +- .../services/src/services/worktree_manager.rs | 45 ++- crates/utils/Cargo.toml | 5 +- crates/utils/src/lib.rs | 1 + crates/utils/src/shell.rs | 272 +++++++++++++++++- crates/utils/src/tokio.rs | 37 +++ 21 files changed, 532 insertions(+), 163 deletions(-) create mode 100644 crates/utils/src/tokio.rs diff --git a/Cargo.lock b/Cargo.lock index 060d6355..b1c61389 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1306,6 +1306,7 @@ dependencies = [ "ts-rs 11.0.1", "utils", "uuid", + "winsplit", "xdg", ] @@ -5246,7 +5247,6 @@ dependencies = [ "futures-util", "git2", "json-patch", - "lazy_static", "libc", "open", "regex", @@ -5265,6 +5265,8 @@ dependencies = [ "ts-rs 11.0.1", "uuid", "which", + "windows-sys 0.61.2", + "winreg", ] [[package]] @@ -5926,12 +5928,28 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.55.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb5a765337c50e9ec252c2069be9bf91c7df47afb103b642ba3a53bf8101be97" +dependencies = [ + "cfg-if", + "windows-sys 0.59.0", +] + [[package]] name = "winsafe" version = "0.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904" +[[package]] +name = "winsplit" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ab703352da6a72f35c39a533526393725640575bb211f61987a2748323ad956" + [[package]] name = "wit-bindgen" version = "0.46.0" diff --git a/crates/executors/Cargo.toml b/crates/executors/Cargo.toml index 915a06fe..0980dbb4 100644 --- a/crates/executors/Cargo.toml +++ b/crates/executors/Cargo.toml @@ -48,3 +48,6 @@ codex-app-server-protocol = { git = "https://github.com/openai/codex.git", packa codex-mcp-types = { git = "https://github.com/openai/codex.git", package = "mcp-types", rev = "488ec061bf4d36916b8f477c700ea4fde4162a7a" } sha2 = "0.10" derivative = "2.2.0" + +[target.'cfg(windows)'.dependencies] +winsplit = "0.1.0" diff --git a/crates/executors/src/command.rs b/crates/executors/src/command.rs index 798e3e6e..8db5f364 100644 --- a/crates/executors/src/command.rs +++ b/crates/executors/src/command.rs @@ -1,6 +1,42 @@ +use std::path::PathBuf; + use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use thiserror::Error; use ts_rs::TS; +use workspace_utils::shell::resolve_executable_path; + +use crate::executors::ExecutorError; + +#[derive(Debug, Error)] +pub enum CommandBuildError { + #[error("base command cannot be parsed: {0}")] + InvalidBase(String), + #[error("base command is empty after parsing")] + EmptyCommand, + #[error("failed to quote command: {0}")] + QuoteError(#[from] shlex::QuoteError), +} + +#[derive(Debug, Clone)] +pub struct CommandParts { + program: String, + args: Vec, +} + +impl CommandParts { + pub fn new(program: String, args: Vec) -> Self { + Self { program, args } + } + + pub async fn into_resolved(self) -> Result<(PathBuf, Vec), ExecutorError> { + let CommandParts { program, args } = self; + let executable = resolve_executable_path(&program) + .await + .ok_or(ExecutorError::ExecutableNotFound { program })?; + Ok((executable, args)) + } +} #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema, Default)] pub struct CmdOverrides { @@ -60,15 +96,26 @@ impl CommandBuilder { } self } - pub fn build_initial(&self) -> String { - let mut parts = vec![self.base.clone()]; - if let Some(ref params) = self.params { - parts.extend(params.clone()); - } - parts.join(" ") + + pub fn build_initial(&self) -> Result { + self.build(&[]) } - pub fn build_follow_up(&self, additional_args: &[String]) -> String { + pub fn build_follow_up( + &self, + additional_args: &[String], + ) -> Result { + self.build(additional_args) + } + + fn build(&self, additional_args: &[String]) -> Result { + let mut parts = split_command_line(&self.simple_join(additional_args))?; + + let program = parts.remove(0); + Ok(CommandParts::new(program, parts)) + } + + fn simple_join(&self, additional_args: &[String]) -> String { let mut parts = vec![self.base.clone()]; if let Some(ref params) = self.params { parts.extend(params.clone()); @@ -78,6 +125,23 @@ impl CommandBuilder { } } +fn split_command_line(input: &str) -> Result, CommandBuildError> { + #[cfg(windows)] + { + let parts = winsplit::split(input); + if parts.is_empty() { + Err(CommandBuildError::EmptyCommand) + } else { + Ok(parts) + } + } + + #[cfg(not(windows))] + { + shlex::split(input).ok_or_else(|| CommandBuildError::InvalidBase(input.to_string())) + } +} + pub fn apply_overrides(builder: CommandBuilder, overrides: &CmdOverrides) -> CommandBuilder { let builder = if let Some(ref base) = overrides.base_command_override { builder.override_base(base.clone()) diff --git a/crates/executors/src/executors/acp/harness.rs b/crates/executors/src/executors/acp/harness.rs index 3acfc27e..2f299b16 100644 --- a/crates/executors/src/executors/acp/harness.rs +++ b/crates/executors/src/executors/acp/harness.rs @@ -14,10 +14,13 @@ use tokio_util::{ io::ReaderStream, }; use tracing::error; -use workspace_utils::{shell::get_shell_command, stream_lines::LinesStreamExt}; +use workspace_utils::stream_lines::LinesStreamExt; use super::{AcpClient, SessionManager}; -use crate::executors::{ExecutorError, SpawnedChild, acp::AcpEvent}; +use crate::{ + command::CommandParts, + executors::{ExecutorError, SpawnedChild, acp::AcpEvent}, +}; /// Reusable harness for ACP-based conns (Gemini, Qwen, etc.) pub struct AcpAgentHarness { @@ -50,18 +53,17 @@ impl AcpAgentHarness { &self, current_dir: &Path, prompt: String, - full_command: String, + command_parts: CommandParts, ) -> Result { - let (shell_cmd, shell_arg) = get_shell_command(); - let mut command = Command::new(shell_cmd); + let (program_path, args) = command_parts.into_resolved().await?; + let mut command = Command::new(program_path); command .kill_on_drop(true) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(full_command) + .args(&args) .env("NODE_NO_WARNINGS", "1"); let mut child = command.group_spawn()?; @@ -88,18 +90,17 @@ impl AcpAgentHarness { current_dir: &Path, prompt: String, session_id: &str, - full_command: String, + command_parts: CommandParts, ) -> Result { - let (shell_cmd, shell_arg) = get_shell_command(); - let mut command = Command::new(shell_cmd); + let (program_path, args) = command_parts.into_resolved().await?; + let mut command = Command::new(program_path); command .kill_on_drop(true) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(full_command) + .args(&args) .env("NODE_NO_WARNINGS", "1"); let mut child = command.group_spawn()?; diff --git a/crates/executors/src/executors/amp.rs b/crates/executors/src/executors/amp.rs index a3949763..307ee4ec 100644 --- a/crates/executors/src/executors/amp.rs +++ b/crates/executors/src/executors/amp.rs @@ -6,7 +6,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use tokio::{io::AsyncWriteExt, process::Command}; use ts_rs::TS; -use workspace_utils::{msg_store::MsgStore, shell::get_shell_command}; +use workspace_utils::msg_store::MsgStore; use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, @@ -45,20 +45,19 @@ impl Amp { #[async_trait] impl StandardCodingAgentExecutor for Amp { async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { - let (shell_cmd, shell_arg) = get_shell_command(); - let amp_command = self.build_command_builder().build_initial(); + let command_parts = self.build_command_builder().build_initial()?; + let (executable_path, args) = command_parts.into_resolved().await?; let combined_prompt = self.append_prompt.combine_prompt(prompt); - let mut command = Command::new(shell_cmd); + let mut command = Command::new(executable_path); command .kill_on_drop(true) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(&_command); + .args(&args); let mut child = command.group_spawn()?; @@ -77,22 +76,20 @@ impl StandardCodingAgentExecutor for Amp { prompt: &str, session_id: &str, ) -> Result { - // Use shell command for cross-platform compatibility - let (shell_cmd, shell_arg) = get_shell_command(); - // 1) Fork the thread synchronously to obtain new thread id - let fork_cmd = self.build_command_builder().build_follow_up(&[ + let builder = self.build_command_builder(); + let fork_line = builder.build_follow_up(&[ "threads".to_string(), "fork".to_string(), session_id.to_string(), - ]); - let fork_output = Command::new(shell_cmd) + ])?; + let (fork_program, fork_args) = fork_line.into_resolved().await?; + let fork_output = Command::new(fork_program) .kill_on_drop(true) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(&fork_cmd) + .args(&fork_args) .output() .await?; let stdout_str = String::from_utf8_lossy(&fork_output.stdout); @@ -112,23 +109,23 @@ impl StandardCodingAgentExecutor for Amp { tracing::debug!("AMP threads fork -> new thread id: {}", new_thread_id); // 2) Continue using the new thread id - let continue_cmd = self.build_command_builder().build_follow_up(&[ + let continue_line = builder.build_follow_up(&[ "threads".to_string(), "continue".to_string(), new_thread_id.clone(), - ]); + ])?; + let (continue_program, continue_args) = continue_line.into_resolved().await?; let combined_prompt = self.append_prompt.combine_prompt(prompt); - let mut command = Command::new(shell_cmd); + let mut command = Command::new(continue_program); command .kill_on_drop(true) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(&continue_cmd); + .args(&continue_args); let mut child = command.group_spawn()?; diff --git a/crates/executors/src/executors/claude.rs b/crates/executors/src/executors/claude.rs index febb03ff..7ca80a87 100644 --- a/crates/executors/src/executors/claude.rs +++ b/crates/executors/src/executors/claude.rs @@ -18,13 +18,12 @@ use workspace_utils::{ log_msg::LogMsg, msg_store::MsgStore, path::make_path_relative, - shell::get_shell_command, }; use self::{client::ClaudeAgentClient, protocol::ProtocolPeer, types::PermissionMode}; use crate::{ approvals::ExecutorApprovalService, - command::{CmdOverrides, CommandBuilder, apply_overrides}, + command::{CmdOverrides, CommandBuilder, CommandParts, apply_overrides}, executors::{ AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor, codex::client::LogWriter, @@ -156,8 +155,9 @@ impl StandardCodingAgentExecutor for ClaudeCode { async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { let command_builder = self.build_command_builder().await; - let base_command = command_builder.build_initial(); - self.spawn_internal(current_dir, prompt, base_command).await + let command_parts = command_builder.build_initial()?; + self.spawn_internal(current_dir, prompt, command_parts) + .await } async fn spawn_follow_up( @@ -167,12 +167,13 @@ impl StandardCodingAgentExecutor for ClaudeCode { session_id: &str, ) -> Result { let command_builder = self.build_command_builder().await; - let base_command = command_builder.build_follow_up(&[ + let command_parts = command_builder.build_follow_up(&[ "--fork-session".to_string(), "--resume".to_string(), session_id.to_string(), - ]); - self.spawn_internal(current_dir, prompt, base_command).await + ])?; + self.spawn_internal(current_dir, prompt, command_parts) + .await } fn normalize_logs(&self, msg_store: Arc, current_dir: &Path) { @@ -201,20 +202,19 @@ impl ClaudeCode { &self, current_dir: &Path, prompt: &str, - base_command: String, + command_parts: CommandParts, ) -> Result { - let (shell_cmd, shell_arg) = get_shell_command(); + let (program_path, args) = command_parts.into_resolved().await?; let combined_prompt = self.append_prompt.combine_prompt(prompt); - let mut command = Command::new(shell_cmd); + let mut command = Command::new(program_path); command .kill_on_drop(true) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(&base_command); + .args(&args); let mut child = command.group_spawn()?; let child_stdout = child.inner().stdout.take().ok_or_else(|| { diff --git a/crates/executors/src/executors/codex.rs b/crates/executors/src/executors/codex.rs index fa780b72..b663cf8f 100644 --- a/crates/executors/src/executors/codex.rs +++ b/crates/executors/src/executors/codex.rs @@ -21,7 +21,7 @@ use serde_json::Value; use strum_macros::AsRefStr; use tokio::process::Command; use ts_rs::TS; -use workspace_utils::{msg_store::MsgStore, shell::get_shell_command}; +use workspace_utils::msg_store::MsgStore; use self::{ client::{AppServerClient, LogWriter}, @@ -31,7 +31,7 @@ use self::{ }; use crate::{ approvals::ExecutorApprovalService, - command::{CmdOverrides, CommandBuilder, apply_overrides}, + command::{CmdOverrides, CommandBuilder, CommandParts, apply_overrides}, executors::{ AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor, codex::{jsonrpc::ExitSignalSender, normalize_logs::Error}, @@ -142,8 +142,8 @@ impl StandardCodingAgentExecutor for Codex { } async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { - let command = self.build_command_builder().build_initial(); - self.spawn(current_dir, prompt, command, None).await + let command_parts = self.build_command_builder().build_initial()?; + self.spawn(current_dir, prompt, command_parts, None).await } async fn spawn_follow_up( @@ -152,8 +152,8 @@ impl StandardCodingAgentExecutor for Codex { prompt: &str, session_id: &str, ) -> Result { - let command = self.build_command_builder().build_follow_up(&[]); - self.spawn(current_dir, prompt, command, Some(session_id)) + let command_parts = self.build_command_builder().build_follow_up(&[])?; + self.spawn(current_dir, prompt, command_parts, Some(session_id)) .await } @@ -247,21 +247,20 @@ impl Codex { &self, current_dir: &Path, prompt: &str, - command: String, + command_parts: CommandParts, resume_session: Option<&str>, ) -> Result { let combined_prompt = self.append_prompt.combine_prompt(prompt); - let (shell_cmd, shell_arg) = get_shell_command(); + let (program_path, args) = command_parts.into_resolved().await?; - let mut process = Command::new(shell_cmd); + let mut process = Command::new(program_path); process .kill_on_drop(true) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(&command) + .args(&args) .env("NODE_NO_WARNINGS", "1") .env("NO_COLOR", "1") .env("RUST_LOG", "error"); diff --git a/crates/executors/src/executors/copilot.rs b/crates/executors/src/executors/copilot.rs index c0503847..a1e56353 100644 --- a/crates/executors/src/executors/copilot.rs +++ b/crates/executors/src/executors/copilot.rs @@ -18,9 +18,7 @@ use tokio::{ }; use ts_rs::TS; use uuid::Uuid; -use workspace_utils::{ - msg_store::MsgStore, path::get_vibe_kanban_temp_dir, shell::get_shell_command, -}; +use workspace_utils::{msg_store::MsgStore, path::get_vibe_kanban_temp_dir}; use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, @@ -97,23 +95,22 @@ impl Copilot { #[async_trait] impl StandardCodingAgentExecutor for Copilot { async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { - let (shell_cmd, shell_arg) = get_shell_command(); let log_dir = Self::create_temp_log_dir(current_dir).await?; - let copilot_command = self + let command_parts = self .build_command_builder(&log_dir.to_string_lossy()) - .build_initial(); + .build_initial()?; + let (program_path, args) = command_parts.into_resolved().await?; let combined_prompt = self.append_prompt.combine_prompt(prompt); - let mut command = Command::new(shell_cmd); + let mut command = Command::new(program_path); command .kill_on_drop(true) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(copilot_command) + .args(&args) .env("NODE_NO_WARNINGS", "1"); let mut child = command.group_spawn()?; @@ -136,15 +133,15 @@ impl StandardCodingAgentExecutor for Copilot { prompt: &str, session_id: &str, ) -> Result { - let (shell_cmd, shell_arg) = get_shell_command(); let log_dir = Self::create_temp_log_dir(current_dir).await?; - let copilot_command = self + let command_parts = self .build_command_builder(&log_dir.to_string_lossy()) - .build_follow_up(&["--resume".to_string(), session_id.to_string()]); + .build_follow_up(&["--resume".to_string(), session_id.to_string()])?; + let (program_path, args) = command_parts.into_resolved().await?; let combined_prompt = self.append_prompt.combine_prompt(prompt); - let mut command = Command::new(shell_cmd); + let mut command = Command::new(program_path); command .kill_on_drop(true) @@ -152,8 +149,7 @@ impl StandardCodingAgentExecutor for Copilot { .stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(copilot_command) + .args(&args) .env("NODE_NO_WARNINGS", "1"); let mut child = command.group_spawn()?; diff --git a/crates/executors/src/executors/cursor.rs b/crates/executors/src/executors/cursor.rs index 260c65d2..f1cb32fe 100644 --- a/crates/executors/src/executors/cursor.rs +++ b/crates/executors/src/executors/cursor.rs @@ -15,7 +15,7 @@ use workspace_utils::{ }, msg_store::MsgStore, path::make_path_relative, - shell::{get_shell_command, resolve_executable_path}, + shell::resolve_executable_path, }; use crate::{ @@ -67,20 +67,19 @@ impl StandardCodingAgentExecutor for CursorAgent { async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { mcp::ensure_mcp_server_trust(self, current_dir).await; - let (shell_cmd, shell_arg) = get_shell_command(); - let agent_cmd = self.build_command_builder().build_initial(); + let command_parts = self.build_command_builder().build_initial()?; + let (executable_path, args) = command_parts.into_resolved().await?; let combined_prompt = self.append_prompt.combine_prompt(prompt); - let mut command = Command::new(shell_cmd); + let mut command = Command::new(executable_path); command .kill_on_drop(true) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(&agent_cmd); + .args(&args); let mut child = command.group_spawn()?; @@ -100,22 +99,21 @@ impl StandardCodingAgentExecutor for CursorAgent { ) -> Result { mcp::ensure_mcp_server_trust(self, current_dir).await; - let (shell_cmd, shell_arg) = get_shell_command(); - let agent_cmd = self + let command_parts = self .build_command_builder() - .build_follow_up(&["--resume".to_string(), session_id.to_string()]); + .build_follow_up(&["--resume".to_string(), session_id.to_string()])?; + let (executable_path, args) = command_parts.into_resolved().await?; let combined_prompt = self.append_prompt.combine_prompt(prompt); - let mut command = Command::new(shell_cmd); + let mut command = Command::new(executable_path); command .kill_on_drop(true) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(&agent_cmd); + .args(&args); let mut child = command.group_spawn()?; @@ -445,7 +443,7 @@ impl StandardCodingAgentExecutor for CursorAgent { } async fn check_availability(&self) -> bool { - resolve_executable_path("cursor-agent").is_some() + resolve_executable_path("cursor-agent").await.is_some() } } diff --git a/crates/executors/src/executors/gemini.rs b/crates/executors/src/executors/gemini.rs index 1a749339..9c58f68c 100644 --- a/crates/executors/src/executors/gemini.rs +++ b/crates/executors/src/executors/gemini.rs @@ -65,7 +65,7 @@ impl StandardCodingAgentExecutor for Gemini { async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { let harness = AcpAgentHarness::new(); let combined_prompt = self.append_prompt.combine_prompt(prompt); - let gemini_command = self.build_command_builder().build_initial(); + let gemini_command = self.build_command_builder().build_initial()?; harness .spawn_with_command(current_dir, combined_prompt, gemini_command) .await @@ -79,7 +79,7 @@ impl StandardCodingAgentExecutor for Gemini { ) -> Result { let harness = AcpAgentHarness::new(); let combined_prompt = self.append_prompt.combine_prompt(prompt); - let gemini_command = self.build_command_builder().build_follow_up(&[]); + let gemini_command = self.build_command_builder().build_follow_up(&[])?; harness .spawn_follow_up_with_command(current_dir, combined_prompt, session_id, gemini_command) .await diff --git a/crates/executors/src/executors/mod.rs b/crates/executors/src/executors/mod.rs index ef900fda..684bcb8a 100644 --- a/crates/executors/src/executors/mod.rs +++ b/crates/executors/src/executors/mod.rs @@ -14,6 +14,7 @@ use workspace_utils::msg_store::MsgStore; use crate::{ approvals::ExecutorApprovalService, + command::CommandBuildError, executors::{ amp::Amp, claude::ClaudeCode, codex::Codex, copilot::Copilot, cursor::CursorAgent, gemini::Gemini, opencode::Opencode, qwen::QwenCode, @@ -55,6 +56,10 @@ pub enum ExecutorError { TomlDeserialize(#[from] toml::de::Error), #[error(transparent)] ExecutorApprovalError(#[from] crate::approvals::ExecutorApprovalError), + #[error(transparent)] + CommandBuild(#[from] CommandBuildError), + #[error("Executable `{program}` not found in PATH")] + ExecutableNotFound { program: String }, } #[enum_dispatch] diff --git a/crates/executors/src/executors/opencode.rs b/crates/executors/src/executors/opencode.rs index 0a6ae7eb..ecdee680 100644 --- a/crates/executors/src/executors/opencode.rs +++ b/crates/executors/src/executors/opencode.rs @@ -16,7 +16,7 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use tokio::{io::AsyncWriteExt, process::Command}; use ts_rs::TS; -use workspace_utils::{msg_store::MsgStore, path::make_path_relative, shell::get_shell_command}; +use workspace_utils::{msg_store::MsgStore, path::make_path_relative}; use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, @@ -131,20 +131,19 @@ impl StandardCodingAgentExecutor for Opencode { async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { // Start a dedicated local share bridge bound to this opencode process let bridge = ShareBridge::start().await.map_err(ExecutorError::Io)?; - let (shell_cmd, shell_arg) = get_shell_command(); - let opencode_command = self.build_command_builder().build_initial(); + let command_parts = self.build_command_builder().build_initial()?; + let (program_path, args) = command_parts.into_resolved().await?; let combined_prompt = self.append_prompt.combine_prompt(prompt); - let mut command = Command::new(shell_cmd); + let mut command = Command::new(program_path); command .kill_on_drop(true) .stdin(Stdio::piped()) .stdout(Stdio::piped()) // Keep stdout but we won't use it .stderr(Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(opencode_command) + .args(&args) .env("NODE_NO_WARNINGS", "1") .env("OPENCODE_AUTO_SHARE", "1") .env("OPENCODE_API", bridge.base_url.clone()); @@ -196,22 +195,21 @@ impl StandardCodingAgentExecutor for Opencode { ) -> Result { // Start a dedicated local share bridge bound to this opencode process let bridge = ShareBridge::start().await.map_err(ExecutorError::Io)?; - let (shell_cmd, shell_arg) = get_shell_command(); - let opencode_command = self + let command_parts = self .build_command_builder() - .build_follow_up(&["--session".to_string(), session_id.to_string()]); + .build_follow_up(&["--session".to_string(), session_id.to_string()])?; + let (program_path, args) = command_parts.into_resolved().await?; let combined_prompt = self.append_prompt.combine_prompt(prompt); - let mut command = Command::new(shell_cmd); + let mut command = Command::new(program_path); command .kill_on_drop(true) .stdin(Stdio::piped()) .stdout(Stdio::piped()) // Keep stdout but we won't use it .stderr(Stdio::piped()) .current_dir(current_dir) - .arg(shell_arg) - .arg(&opencode_command) + .args(&args) .env("NODE_NO_WARNINGS", "1") .env("OPENCODE_AUTO_SHARE", "1") .env("OPENCODE_API", bridge.base_url.clone()); diff --git a/crates/executors/src/executors/qwen.rs b/crates/executors/src/executors/qwen.rs index 0fa59004..b4566f54 100644 --- a/crates/executors/src/executors/qwen.rs +++ b/crates/executors/src/executors/qwen.rs @@ -39,7 +39,7 @@ impl QwenCode { #[async_trait] impl StandardCodingAgentExecutor for QwenCode { async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { - let qwen_command = self.build_command_builder().build_initial(); + let qwen_command = self.build_command_builder().build_initial()?; let combined_prompt = self.append_prompt.combine_prompt(prompt); let harness = AcpAgentHarness::with_session_namespace("qwen_sessions"); harness @@ -53,7 +53,7 @@ impl StandardCodingAgentExecutor for QwenCode { prompt: &str, session_id: &str, ) -> Result { - let qwen_command = self.build_command_builder().build_follow_up(&[]); + let qwen_command = self.build_command_builder().build_follow_up(&[])?; let combined_prompt = self.append_prompt.combine_prompt(prompt); let harness = AcpAgentHarness::with_session_namespace("qwen_sessions"); harness diff --git a/crates/server/src/routes/task_attempts.rs b/crates/server/src/routes/task_attempts.rs index 29261ca7..1a0f51b2 100644 --- a/crates/server/src/routes/task_attempts.rs +++ b/crates/server/src/routes/task_attempts.rs @@ -169,10 +169,13 @@ pub async fn create_task_attempt( ) .await?; - let execution_process = deployment + if let Err(err) = deployment .container() .start_attempt(&task_attempt, executor_profile_id.clone()) - .await?; + .await + { + tracing::error!("Failed to start task attempt: {}", err); + } deployment .track_if_analytics_allowed( @@ -186,7 +189,7 @@ pub async fn create_task_attempt( ) .await; - tracing::info!("Started execution process {}", execution_process.id); + tracing::info!("Created attempt for task {}", task.id); Ok(ResponseJson(ApiResponse::success(task_attempt))) } diff --git a/crates/server/src/routes/tasks.rs b/crates/server/src/routes/tasks.rs index 7dfe29eb..3b02e902 100644 --- a/crates/server/src/routes/tasks.rs +++ b/crates/server/src/routes/tasks.rs @@ -180,10 +180,12 @@ pub async fn create_task_and_start( task.id, ) .await?; - let execution_process = deployment + let is_attempt_running = deployment .container() .start_attempt(&task_attempt, payload.executor_profile_id.clone()) - .await?; + .await + .inspect_err(|err| tracing::error!("Failed to start task attempt: {}", err)) + .is_ok(); deployment .track_if_analytics_allowed( "task_attempt_started", @@ -200,10 +202,10 @@ pub async fn create_task_and_start( .await? .ok_or(ApiError::Database(SqlxError::RowNotFound))?; - tracing::info!("Started execution process {}", execution_process.id); + tracing::info!("Started attempt for task {}", task.id); Ok(ResponseJson(ApiResponse::success(TaskWithAttemptStatus { task, - has_in_progress_attempt: true, + has_in_progress_attempt: is_attempt_running, has_merged_attempt: false, last_attempt_failed: false, executor: task_attempt.executor, diff --git a/crates/services/src/services/git_cli.rs b/crates/services/src/services/git_cli.rs index b1a67a17..0c9bf9db 100644 --- a/crates/services/src/services/git_cli.rs +++ b/crates/services/src/services/git_cli.rs @@ -23,7 +23,7 @@ use std::{ use base64::{Engine, engine::general_purpose::STANDARD as BASE64_STANDARD}; use thiserror::Error; -use utils::shell::resolve_executable_path; +use utils::shell::resolve_executable_path_blocking; // TODO: make GitCli async use crate::services::git::Commit; @@ -497,14 +497,15 @@ impl GitCli { /// Return true if there are staged changes (index differs from HEAD) pub fn has_staged_changes(&self, repo_path: &Path) -> Result { // `git diff --cached --quiet` returns exit code 1 if there are differences - let out = Command::new(resolve_executable_path("git").ok_or(GitCliError::NotAvailable)?) - .arg("-C") - .arg(repo_path) - .arg("diff") - .arg("--cached") - .arg("--quiet") - .output() - .map_err(|e| GitCliError::CommandFailed(e.to_string()))?; + let out = + Command::new(resolve_executable_path_blocking("git").ok_or(GitCliError::NotAvailable)?) + .arg("-C") + .arg(repo_path) + .arg("diff") + .arg("--cached") + .arg("--quiet") + .output() + .map_err(|e| GitCliError::CommandFailed(e.to_string()))?; match out.status.code() { Some(0) => Ok(false), Some(1) => Ok(true), @@ -625,7 +626,7 @@ impl GitCli { /// Ensure `git` is available on PATH fn ensure_available(&self) -> Result<(), GitCliError> { - let git = resolve_executable_path("git").ok_or(GitCliError::NotAvailable)?; + let git = resolve_executable_path_blocking("git").ok_or(GitCliError::NotAvailable)?; let out = Command::new(&git) .arg("--version") .output() @@ -656,7 +657,7 @@ impl GitCli { S: AsRef, { self.ensure_available()?; - let git = resolve_executable_path("git").ok_or(GitCliError::NotAvailable)?; + let git = resolve_executable_path_blocking("git").ok_or(GitCliError::NotAvailable)?; let mut cmd = Command::new(&git); cmd.arg("-C").arg(repo_path); for a in args { @@ -684,7 +685,7 @@ impl GitCli { S: AsRef, { self.ensure_available()?; - let git = resolve_executable_path("git").ok_or(GitCliError::NotAvailable)?; + let git = resolve_executable_path_blocking("git").ok_or(GitCliError::NotAvailable)?; let mut cmd = Command::new(&git); cmd.arg("-C").arg(repo_path); for (k, v) in envs { diff --git a/crates/services/src/services/worktree_manager.rs b/crates/services/src/services/worktree_manager.rs index 18651562..f26dbae5 100644 --- a/crates/services/src/services/worktree_manager.rs +++ b/crates/services/src/services/worktree_manager.rs @@ -7,7 +7,7 @@ use std::{ use git2::{Error as GitError, Repository}; use thiserror::Error; use tracing::{debug, info}; -use utils::shell::get_shell_command; +use utils::shell::resolve_executable_path; use super::{ git::{GitService, GitServiceError}, @@ -429,35 +429,30 @@ impl WorktreeManager { // Try using git rev-parse --git-common-dir from within the worktree let worktree_path_owned = worktree_path.to_path_buf(); - tokio::task::spawn_blocking(move || { - let (shell_cmd, shell_arg) = get_shell_command(); - let git_command = "git rev-parse --git-common-dir"; + let git_path = resolve_executable_path("git").await?; - let output = std::process::Command::new(shell_cmd) - .args([shell_arg, git_command]) - .current_dir(&worktree_path_owned) - .output() - .ok()?; + let output = tokio::process::Command::new(git_path) + .args(["rev-parse", "--git-common-dir"]) + .current_dir(&worktree_path_owned) + .output() + .await + .ok()?; - if output.status.success() { - let git_common_dir = String::from_utf8(output.stdout).ok()?.trim().to_string(); + if output.status.success() { + let git_common_dir = String::from_utf8(output.stdout).ok()?.trim().to_string(); - // git-common-dir gives us the path to the .git directory - // We need the working directory (parent of .git) - let git_dir_path = Path::new(&git_common_dir); - if git_dir_path.file_name() == Some(std::ffi::OsStr::new(".git")) { - git_dir_path.parent()?.to_str().map(PathBuf::from) - } else { - // In case of bare repo or unusual setup, use the git-common-dir as is - Some(PathBuf::from(git_common_dir)) - } + // git-common-dir gives us the path to the .git directory + // We need the working directory (parent of .git) + let git_dir_path = Path::new(&git_common_dir); + if git_dir_path.file_name() == Some(std::ffi::OsStr::new(".git")) { + git_dir_path.parent()?.to_str().map(PathBuf::from) } else { - None + // In case of bare repo or unusual setup, use the git-common-dir as is + Some(PathBuf::from(git_common_dir)) } - }) - .await - .ok() - .flatten() + } else { + None + } } /// Simple worktree cleanup when we can't determine the main repo diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index a8329d69..70f42008 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -21,7 +21,6 @@ open = "5.3.2" regex = "1.11.1" sentry = { version = "0.41.0", features = ["anyhow", "backtrace", "panic", "debug-images"] } sentry-tracing = { version = "0.41.0", features = ["backtrace"] } -lazy_static = "1.4" futures-util = "0.3" json-patch = "2.0" base64 = "0.22" @@ -33,3 +32,7 @@ shellexpand = "3.1.1" which = "8.0.0" similar = "2" git2 = "0.18" + +[target.'cfg(windows)'.dependencies] +winreg = "0.55" +windows-sys = { version = "0.61", features = ["Win32_System_Environment"] } diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 7c449604..ab94f6b9 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -17,6 +17,7 @@ pub mod shell; pub mod stream_ext; pub mod stream_lines; pub mod text; +pub mod tokio; pub mod version; /// Cache for WSL2 detection result diff --git a/crates/utils/src/shell.rs b/crates/utils/src/shell.rs index c844a960..cc9b8cc0 100644 --- a/crates/utils/src/shell.rs +++ b/crates/utils/src/shell.rs @@ -1,30 +1,278 @@ //! Cross-platform shell command utilities +use std::{ + collections::HashSet, + env::{join_paths, split_paths}, + ffi::{OsStr, OsString}, + path::{Path, PathBuf}, + time::Duration, +}; + +use crate::tokio::block_on; + /// Returns the appropriate shell command and argument for the current platform. /// /// Returns (shell_program, shell_arg) where: /// - Windows: ("cmd", "/C") /// - Unix-like: ("sh", "-c") or ("bash", "-c") if available -pub fn get_shell_command() -> (&'static str, &'static str) { +pub fn get_shell_command() -> (String, &'static str) { if cfg!(windows) { - ("cmd", "/C") + ("cmd".into(), "/C") } else { + // Prefer SHELL env var if set and valid + if let Ok(shell) = std::env::var("SHELL") { + let path = Path::new(&shell); + if path.is_absolute() && path.is_file() { + return (shell, "-c"); + } + } // Prefer zsh or bash if available, fallback to sh if std::path::Path::new("/bin/zsh").exists() { - ("zsh", "-c") + ("zsh".into(), "-c") } else if std::path::Path::new("/bin/bash").exists() { - ("bash", "-c") + ("bash".into(), "-c") } else { - ("sh", "-c") + ("sh".into(), "-c") } } } -/// Resolves the full path of an executable using the system's PATH environment variable. -/// Note: On Windows, resolving the executable path can be necessary before passing -/// it to `std::process::Command::new`, as the latter has been deficient in finding executables. -pub fn resolve_executable_path(executable: &str) -> Option { - which::which(executable) - .ok() - .map(|p| p.to_string_lossy().to_string()) +/// Resolve an executable by name, falling back to a refreshed PATH if needed. +/// +/// The search order is: +/// 1. Explicit paths (absolute or containing a separator). +/// 2. The current process PATH via `which`. +/// 3. A platform-specific refresh of PATH (login shell on Unix, PowerShell on Windows), +/// after which we re-run the `which` lookup and update the process PATH for future calls. +pub async fn resolve_executable_path(executable: &str) -> Option { + if executable.trim().is_empty() { + return None; + } + + let path = Path::new(executable); + if path.is_absolute() && path.is_file() { + return Some(path.to_path_buf()); + } + + if let Some(found) = which(executable).await { + return Some(found); + } + + if refresh_path().await + && let Some(found) = which(executable).await + { + return Some(found); + } + + None +} + +pub fn resolve_executable_path_blocking(executable: &str) -> Option { + block_on(resolve_executable_path(executable)) +} + +/// Merge two PATH strings into a single, de-duplicated PATH. +/// +/// - Keeps the order of entries from `primary`. +/// - Appends only *unseen* entries from `secondary`. +/// - Ignores empty components. +/// - Returns a platform-correct PATH string (using the OS separator). +pub fn merge_paths(primary: impl AsRef, secondary: impl AsRef) -> OsString { + let mut seen = HashSet::::new(); + let mut merged = Vec::::new(); + + for p in split_paths(primary.as_ref()).chain(split_paths(secondary.as_ref())) { + if !p.as_os_str().is_empty() && seen.insert(p.clone()) { + merged.push(p); + } + } + + join_paths(merged).unwrap_or_default() +} + +async fn refresh_path() -> bool { + let Some(refreshed) = get_fresh_path().await else { + return false; + }; + let existing = std::env::var_os("PATH").unwrap_or_default(); + let refreshed_os = OsString::from(&refreshed); + let merged = merge_paths(&existing, refreshed_os); + if merged == existing { + return false; + } + tracing::debug!(?existing, ?refreshed, ?merged, "Refreshed PATH"); + unsafe { + std::env::set_var("PATH", &merged); + } + true +} + +async fn which(executable: &str) -> Option { + let executable = executable.to_string(); + tokio::task::spawn_blocking(move || which::which(executable)) + .await + .ok() + .and_then(|result| result.ok()) +} + +#[cfg(not(windows))] +async fn get_fresh_path() -> Option { + use tokio::process::Command; + + async fn run(shell: &Path, login: bool) -> Option { + let mut cmd = Command::new(shell); + if login { + cmd.arg("-l"); + } + cmd.arg("-c") + .arg("printf '%s' \"$PATH\"") + .env("TERM", "dumb") + .kill_on_drop(true); + + const PATH_REFRESH_COMMAND_TIMEOUT: Duration = Duration::from_secs(5); + + let child = cmd.spawn().ok()?; + let output = match tokio::time::timeout( + PATH_REFRESH_COMMAND_TIMEOUT, + child.wait_with_output(), + ) + .await + { + Ok(Ok(output)) => output, + Ok(Err(err)) => { + tracing::debug!( + shell = %shell.display(), + ?err, + "Failed to retrieve PATH from login shell" + ); + return None; + } + Err(_) => { + tracing::warn!( + shell = %shell.display(), + timeout_secs = PATH_REFRESH_COMMAND_TIMEOUT.as_secs(), + "Timed out retrieving PATH from login shell" + ); + return None; + } + }; + + if !output.status.success() { + return None; + } + let path = String::from_utf8(output.stdout).ok()?.trim().to_string(); + if path.is_empty() { None } else { Some(path) } + } + + let mut paths = Vec::new(); + + let shells = vec![ + (PathBuf::from("/bin/zsh"), true), + (PathBuf::from("/bin/bash"), true), + (PathBuf::from("/bin/sh"), false), + ]; + + let mut current_shell_name = None; + if let Ok(shell) = std::env::var("SHELL") { + let path = Path::new(&shell); + if path.is_absolute() && path.is_file() { + current_shell_name = path.file_name().and_then(OsStr::to_str).map(String::from); + if let Some(path) = run(path, true).await { + paths.push(path); + } + } + } + + for (shell_path, login) in shells { + if !shell_path.exists() { + continue; + } + let shell_name = shell_path + .file_name() + .and_then(OsStr::to_str) + .map(String::from); + if current_shell_name != shell_name + && let Some(path) = run(&shell_path, login).await + { + paths.push(path); + } + } + + if paths.is_empty() { + return None; + } + + paths + .into_iter() + .map(OsString::from) + .reduce(|a, b| merge_paths(&a, &b)) + .map(|merged| merged.to_string_lossy().into_owned()) +} + +#[cfg(windows)] +async fn get_fresh_path() -> Option { + tokio::task::spawn_blocking(get_fresh_path_blocking) + .await + .ok() + .flatten() +} + +#[cfg(windows)] +fn get_fresh_path_blocking() -> Option { + use std::{ + ffi::{OsStr, OsString}, + os::windows::ffi::{OsStrExt, OsStringExt}, + }; + + use winreg::{HKEY, RegKey, enums::*}; + + // Expand %VARS% for registry PATH entries + fn expand_env_vars(input: &OsStr) -> OsString { + use windows_sys::Win32::System::Environment::ExpandEnvironmentStringsW; + + let wide: Vec = input.encode_wide().chain(Some(0)).collect(); + unsafe { + let needed = ExpandEnvironmentStringsW(wide.as_ptr(), std::ptr::null_mut(), 0); + if needed == 0 { + return input.to_os_string(); + } + let mut buf = vec![0u16; needed as usize]; + let written = ExpandEnvironmentStringsW(wide.as_ptr(), buf.as_mut_ptr(), needed); + if written == 0 { + return input.to_os_string(); + } + // written includes the trailing NUL when it fits + OsString::from_wide(&buf[..(written as usize).saturating_sub(1)]) + } + } + + fn read_registry_path(root: HKEY, subkey: &str) -> Option { + let key = RegKey::predef(root) + .open_subkey_with_flags(subkey, KEY_READ) + .ok()?; + key.get_value::("Path").ok().map(OsString::from) + } + + let mut paths: Vec = Vec::new(); + + if let Some(user_path) = read_registry_path(HKEY_CURRENT_USER, "Environment") { + paths.push(expand_env_vars(&user_path)); + } + + if let Some(machine_path) = read_registry_path( + HKEY_LOCAL_MACHINE, + r"System\CurrentControlSet\Control\Session Manager\Environment", + ) { + paths.push(expand_env_vars(&machine_path)); + } + + if paths.is_empty() { + return None; + } + + paths + .into_iter() + .map(OsString::from) + .reduce(|a, b| merge_paths(&a, &b)) + .map(|merged| merged.to_string_lossy().into_owned()) } diff --git a/crates/utils/src/tokio.rs b/crates/utils/src/tokio.rs new file mode 100644 index 00000000..a35b9f6e --- /dev/null +++ b/crates/utils/src/tokio.rs @@ -0,0 +1,37 @@ +use std::{future::Future, sync::OnceLock}; + +use tokio::runtime::{Builder, Handle, Runtime, RuntimeFlavor}; + +fn rt() -> &'static Runtime { + static RT: OnceLock = OnceLock::new(); + RT.get_or_init(|| { + Builder::new_multi_thread() + .enable_all() + .build() + .expect("failed to build global Tokio runtime") + }) +} + +/// Run an async future from sync code safely. +/// If already inside a Tokio runtime, it will use that runtime. +pub fn block_on(fut: F) -> T +where + F: Future + Send, + T: Send, +{ + match Handle::try_current() { + // Already inside a Tokio runtime + Ok(h) => match h.runtime_flavor() { + // Use block_in_place so other tasks keep running. + RuntimeFlavor::MultiThread => tokio::task::block_in_place(|| rt().block_on(fut)), + // Spawn a new thread to avoid freezing a single-thread runtime. + RuntimeFlavor::CurrentThread | _ => std::thread::scope(|s| { + s.spawn(|| rt().block_on(fut)) + .join() + .expect("thread panicked") + }), + }, + // Outside Tokio: block normally. + Err(_) => rt().block_on(fut), + } +}