feat: Enhance executable resolution by refreshing PATH (#1098)

* Refresh path on executable lookup

* Make resolve_executable_path async

* Handle task attempt start failure gracefully

* clippy fix

* Remove unused to_shell_string

* Lint

---------

Co-authored-by: Alex Netsch <alex@bloop.ai>
This commit is contained in:
Solomon
2025-11-03 15:57:53 +00:00
committed by GitHub
parent c59ffdd0ab
commit 99f7d9a4bc
21 changed files with 532 additions and 163 deletions

20
Cargo.lock generated
View File

@@ -1306,6 +1306,7 @@ dependencies = [
"ts-rs 11.0.1",
"utils",
"uuid",
"winsplit",
"xdg",
]
@@ -5246,7 +5247,6 @@ dependencies = [
"futures-util",
"git2",
"json-patch",
"lazy_static",
"libc",
"open",
"regex",
@@ -5265,6 +5265,8 @@ dependencies = [
"ts-rs 11.0.1",
"uuid",
"which",
"windows-sys 0.61.2",
"winreg",
]
[[package]]
@@ -5926,12 +5928,28 @@ dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.55.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb5a765337c50e9ec252c2069be9bf91c7df47afb103b642ba3a53bf8101be97"
dependencies = [
"cfg-if",
"windows-sys 0.59.0",
]
[[package]]
name = "winsafe"
version = "0.0.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904"
[[package]]
name = "winsplit"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ab703352da6a72f35c39a533526393725640575bb211f61987a2748323ad956"
[[package]]
name = "wit-bindgen"
version = "0.46.0"

View File

@@ -48,3 +48,6 @@ codex-app-server-protocol = { git = "https://github.com/openai/codex.git", packa
codex-mcp-types = { git = "https://github.com/openai/codex.git", package = "mcp-types", rev = "488ec061bf4d36916b8f477c700ea4fde4162a7a" }
sha2 = "0.10"
derivative = "2.2.0"
[target.'cfg(windows)'.dependencies]
winsplit = "0.1.0"

View File

@@ -1,6 +1,42 @@
use std::path::PathBuf;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use ts_rs::TS;
use workspace_utils::shell::resolve_executable_path;
use crate::executors::ExecutorError;
#[derive(Debug, Error)]
pub enum CommandBuildError {
#[error("base command cannot be parsed: {0}")]
InvalidBase(String),
#[error("base command is empty after parsing")]
EmptyCommand,
#[error("failed to quote command: {0}")]
QuoteError(#[from] shlex::QuoteError),
}
#[derive(Debug, Clone)]
pub struct CommandParts {
program: String,
args: Vec<String>,
}
impl CommandParts {
pub fn new(program: String, args: Vec<String>) -> Self {
Self { program, args }
}
pub async fn into_resolved(self) -> Result<(PathBuf, Vec<String>), ExecutorError> {
let CommandParts { program, args } = self;
let executable = resolve_executable_path(&program)
.await
.ok_or(ExecutorError::ExecutableNotFound { program })?;
Ok((executable, args))
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema, Default)]
pub struct CmdOverrides {
@@ -60,15 +96,26 @@ impl CommandBuilder {
}
self
}
pub fn build_initial(&self) -> String {
let mut parts = vec![self.base.clone()];
if let Some(ref params) = self.params {
parts.extend(params.clone());
}
parts.join(" ")
pub fn build_initial(&self) -> Result<CommandParts, CommandBuildError> {
self.build(&[])
}
pub fn build_follow_up(&self, additional_args: &[String]) -> String {
pub fn build_follow_up(
&self,
additional_args: &[String],
) -> Result<CommandParts, CommandBuildError> {
self.build(additional_args)
}
fn build(&self, additional_args: &[String]) -> Result<CommandParts, CommandBuildError> {
let mut parts = split_command_line(&self.simple_join(additional_args))?;
let program = parts.remove(0);
Ok(CommandParts::new(program, parts))
}
fn simple_join(&self, additional_args: &[String]) -> String {
let mut parts = vec![self.base.clone()];
if let Some(ref params) = self.params {
parts.extend(params.clone());
@@ -78,6 +125,23 @@ impl CommandBuilder {
}
}
fn split_command_line(input: &str) -> Result<Vec<String>, CommandBuildError> {
#[cfg(windows)]
{
let parts = winsplit::split(input);
if parts.is_empty() {
Err(CommandBuildError::EmptyCommand)
} else {
Ok(parts)
}
}
#[cfg(not(windows))]
{
shlex::split(input).ok_or_else(|| CommandBuildError::InvalidBase(input.to_string()))
}
}
pub fn apply_overrides(builder: CommandBuilder, overrides: &CmdOverrides) -> CommandBuilder {
let builder = if let Some(ref base) = overrides.base_command_override {
builder.override_base(base.clone())

View File

@@ -14,10 +14,13 @@ use tokio_util::{
io::ReaderStream,
};
use tracing::error;
use workspace_utils::{shell::get_shell_command, stream_lines::LinesStreamExt};
use workspace_utils::stream_lines::LinesStreamExt;
use super::{AcpClient, SessionManager};
use crate::executors::{ExecutorError, SpawnedChild, acp::AcpEvent};
use crate::{
command::CommandParts,
executors::{ExecutorError, SpawnedChild, acp::AcpEvent},
};
/// Reusable harness for ACP-based conns (Gemini, Qwen, etc.)
pub struct AcpAgentHarness {
@@ -50,18 +53,17 @@ impl AcpAgentHarness {
&self,
current_dir: &Path,
prompt: String,
full_command: String,
command_parts: CommandParts,
) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let mut command = Command::new(shell_cmd);
let (program_path, args) = command_parts.into_resolved().await?;
let mut command = Command::new(program_path);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(full_command)
.args(&args)
.env("NODE_NO_WARNINGS", "1");
let mut child = command.group_spawn()?;
@@ -88,18 +90,17 @@ impl AcpAgentHarness {
current_dir: &Path,
prompt: String,
session_id: &str,
full_command: String,
command_parts: CommandParts,
) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let mut command = Command::new(shell_cmd);
let (program_path, args) = command_parts.into_resolved().await?;
let mut command = Command::new(program_path);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(full_command)
.args(&args)
.env("NODE_NO_WARNINGS", "1");
let mut child = command.group_spawn()?;

View File

@@ -6,7 +6,7 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, process::Command};
use ts_rs::TS;
use workspace_utils::{msg_store::MsgStore, shell::get_shell_command};
use workspace_utils::msg_store::MsgStore;
use crate::{
command::{CmdOverrides, CommandBuilder, apply_overrides},
@@ -45,20 +45,19 @@ impl Amp {
#[async_trait]
impl StandardCodingAgentExecutor for Amp {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let amp_command = self.build_command_builder().build_initial();
let command_parts = self.build_command_builder().build_initial()?;
let (executable_path, args) = command_parts.into_resolved().await?;
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let mut command = Command::new(shell_cmd);
let mut command = Command::new(executable_path);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&amp_command);
.args(&args);
let mut child = command.group_spawn()?;
@@ -77,22 +76,20 @@ impl StandardCodingAgentExecutor for Amp {
prompt: &str,
session_id: &str,
) -> Result<SpawnedChild, ExecutorError> {
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();
// 1) Fork the thread synchronously to obtain new thread id
let fork_cmd = self.build_command_builder().build_follow_up(&[
let builder = self.build_command_builder();
let fork_line = builder.build_follow_up(&[
"threads".to_string(),
"fork".to_string(),
session_id.to_string(),
]);
let fork_output = Command::new(shell_cmd)
])?;
let (fork_program, fork_args) = fork_line.into_resolved().await?;
let fork_output = Command::new(fork_program)
.kill_on_drop(true)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&fork_cmd)
.args(&fork_args)
.output()
.await?;
let stdout_str = String::from_utf8_lossy(&fork_output.stdout);
@@ -112,23 +109,23 @@ impl StandardCodingAgentExecutor for Amp {
tracing::debug!("AMP threads fork -> new thread id: {}", new_thread_id);
// 2) Continue using the new thread id
let continue_cmd = self.build_command_builder().build_follow_up(&[
let continue_line = builder.build_follow_up(&[
"threads".to_string(),
"continue".to_string(),
new_thread_id.clone(),
]);
])?;
let (continue_program, continue_args) = continue_line.into_resolved().await?;
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let mut command = Command::new(shell_cmd);
let mut command = Command::new(continue_program);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&continue_cmd);
.args(&continue_args);
let mut child = command.group_spawn()?;

View File

@@ -18,13 +18,12 @@ use workspace_utils::{
log_msg::LogMsg,
msg_store::MsgStore,
path::make_path_relative,
shell::get_shell_command,
};
use self::{client::ClaudeAgentClient, protocol::ProtocolPeer, types::PermissionMode};
use crate::{
approvals::ExecutorApprovalService,
command::{CmdOverrides, CommandBuilder, apply_overrides},
command::{CmdOverrides, CommandBuilder, CommandParts, apply_overrides},
executors::{
AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor,
codex::client::LogWriter,
@@ -156,8 +155,9 @@ impl StandardCodingAgentExecutor for ClaudeCode {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let command_builder = self.build_command_builder().await;
let base_command = command_builder.build_initial();
self.spawn_internal(current_dir, prompt, base_command).await
let command_parts = command_builder.build_initial()?;
self.spawn_internal(current_dir, prompt, command_parts)
.await
}
async fn spawn_follow_up(
@@ -167,12 +167,13 @@ impl StandardCodingAgentExecutor for ClaudeCode {
session_id: &str,
) -> Result<SpawnedChild, ExecutorError> {
let command_builder = self.build_command_builder().await;
let base_command = command_builder.build_follow_up(&[
let command_parts = command_builder.build_follow_up(&[
"--fork-session".to_string(),
"--resume".to_string(),
session_id.to_string(),
]);
self.spawn_internal(current_dir, prompt, base_command).await
])?;
self.spawn_internal(current_dir, prompt, command_parts)
.await
}
fn normalize_logs(&self, msg_store: Arc<MsgStore>, current_dir: &Path) {
@@ -201,20 +202,19 @@ impl ClaudeCode {
&self,
current_dir: &Path,
prompt: &str,
base_command: String,
command_parts: CommandParts,
) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let (program_path, args) = command_parts.into_resolved().await?;
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let mut command = Command::new(shell_cmd);
let mut command = Command::new(program_path);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&base_command);
.args(&args);
let mut child = command.group_spawn()?;
let child_stdout = child.inner().stdout.take().ok_or_else(|| {

View File

@@ -21,7 +21,7 @@ use serde_json::Value;
use strum_macros::AsRefStr;
use tokio::process::Command;
use ts_rs::TS;
use workspace_utils::{msg_store::MsgStore, shell::get_shell_command};
use workspace_utils::msg_store::MsgStore;
use self::{
client::{AppServerClient, LogWriter},
@@ -31,7 +31,7 @@ use self::{
};
use crate::{
approvals::ExecutorApprovalService,
command::{CmdOverrides, CommandBuilder, apply_overrides},
command::{CmdOverrides, CommandBuilder, CommandParts, apply_overrides},
executors::{
AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor,
codex::{jsonrpc::ExitSignalSender, normalize_logs::Error},
@@ -142,8 +142,8 @@ impl StandardCodingAgentExecutor for Codex {
}
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let command = self.build_command_builder().build_initial();
self.spawn(current_dir, prompt, command, None).await
let command_parts = self.build_command_builder().build_initial()?;
self.spawn(current_dir, prompt, command_parts, None).await
}
async fn spawn_follow_up(
@@ -152,8 +152,8 @@ impl StandardCodingAgentExecutor for Codex {
prompt: &str,
session_id: &str,
) -> Result<SpawnedChild, ExecutorError> {
let command = self.build_command_builder().build_follow_up(&[]);
self.spawn(current_dir, prompt, command, Some(session_id))
let command_parts = self.build_command_builder().build_follow_up(&[])?;
self.spawn(current_dir, prompt, command_parts, Some(session_id))
.await
}
@@ -247,21 +247,20 @@ impl Codex {
&self,
current_dir: &Path,
prompt: &str,
command: String,
command_parts: CommandParts,
resume_session: Option<&str>,
) -> Result<SpawnedChild, ExecutorError> {
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let (shell_cmd, shell_arg) = get_shell_command();
let (program_path, args) = command_parts.into_resolved().await?;
let mut process = Command::new(shell_cmd);
let mut process = Command::new(program_path);
process
.kill_on_drop(true)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&command)
.args(&args)
.env("NODE_NO_WARNINGS", "1")
.env("NO_COLOR", "1")
.env("RUST_LOG", "error");

View File

@@ -18,9 +18,7 @@ use tokio::{
};
use ts_rs::TS;
use uuid::Uuid;
use workspace_utils::{
msg_store::MsgStore, path::get_vibe_kanban_temp_dir, shell::get_shell_command,
};
use workspace_utils::{msg_store::MsgStore, path::get_vibe_kanban_temp_dir};
use crate::{
command::{CmdOverrides, CommandBuilder, apply_overrides},
@@ -97,23 +95,22 @@ impl Copilot {
#[async_trait]
impl StandardCodingAgentExecutor for Copilot {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let log_dir = Self::create_temp_log_dir(current_dir).await?;
let copilot_command = self
let command_parts = self
.build_command_builder(&log_dir.to_string_lossy())
.build_initial();
.build_initial()?;
let (program_path, args) = command_parts.into_resolved().await?;
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let mut command = Command::new(shell_cmd);
let mut command = Command::new(program_path);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(copilot_command)
.args(&args)
.env("NODE_NO_WARNINGS", "1");
let mut child = command.group_spawn()?;
@@ -136,15 +133,15 @@ impl StandardCodingAgentExecutor for Copilot {
prompt: &str,
session_id: &str,
) -> Result<SpawnedChild, ExecutorError> {
let (shell_cmd, shell_arg) = get_shell_command();
let log_dir = Self::create_temp_log_dir(current_dir).await?;
let copilot_command = self
let command_parts = self
.build_command_builder(&log_dir.to_string_lossy())
.build_follow_up(&["--resume".to_string(), session_id.to_string()]);
.build_follow_up(&["--resume".to_string(), session_id.to_string()])?;
let (program_path, args) = command_parts.into_resolved().await?;
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let mut command = Command::new(shell_cmd);
let mut command = Command::new(program_path);
command
.kill_on_drop(true)
@@ -152,8 +149,7 @@ impl StandardCodingAgentExecutor for Copilot {
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(copilot_command)
.args(&args)
.env("NODE_NO_WARNINGS", "1");
let mut child = command.group_spawn()?;

View File

@@ -15,7 +15,7 @@ use workspace_utils::{
},
msg_store::MsgStore,
path::make_path_relative,
shell::{get_shell_command, resolve_executable_path},
shell::resolve_executable_path,
};
use crate::{
@@ -67,20 +67,19 @@ impl StandardCodingAgentExecutor for CursorAgent {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
mcp::ensure_mcp_server_trust(self, current_dir).await;
let (shell_cmd, shell_arg) = get_shell_command();
let agent_cmd = self.build_command_builder().build_initial();
let command_parts = self.build_command_builder().build_initial()?;
let (executable_path, args) = command_parts.into_resolved().await?;
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let mut command = Command::new(shell_cmd);
let mut command = Command::new(executable_path);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&agent_cmd);
.args(&args);
let mut child = command.group_spawn()?;
@@ -100,22 +99,21 @@ impl StandardCodingAgentExecutor for CursorAgent {
) -> Result<SpawnedChild, ExecutorError> {
mcp::ensure_mcp_server_trust(self, current_dir).await;
let (shell_cmd, shell_arg) = get_shell_command();
let agent_cmd = self
let command_parts = self
.build_command_builder()
.build_follow_up(&["--resume".to_string(), session_id.to_string()]);
.build_follow_up(&["--resume".to_string(), session_id.to_string()])?;
let (executable_path, args) = command_parts.into_resolved().await?;
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let mut command = Command::new(shell_cmd);
let mut command = Command::new(executable_path);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&agent_cmd);
.args(&args);
let mut child = command.group_spawn()?;
@@ -445,7 +443,7 @@ impl StandardCodingAgentExecutor for CursorAgent {
}
async fn check_availability(&self) -> bool {
resolve_executable_path("cursor-agent").is_some()
resolve_executable_path("cursor-agent").await.is_some()
}
}

View File

@@ -65,7 +65,7 @@ impl StandardCodingAgentExecutor for Gemini {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let harness = AcpAgentHarness::new();
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let gemini_command = self.build_command_builder().build_initial();
let gemini_command = self.build_command_builder().build_initial()?;
harness
.spawn_with_command(current_dir, combined_prompt, gemini_command)
.await
@@ -79,7 +79,7 @@ impl StandardCodingAgentExecutor for Gemini {
) -> Result<SpawnedChild, ExecutorError> {
let harness = AcpAgentHarness::new();
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let gemini_command = self.build_command_builder().build_follow_up(&[]);
let gemini_command = self.build_command_builder().build_follow_up(&[])?;
harness
.spawn_follow_up_with_command(current_dir, combined_prompt, session_id, gemini_command)
.await

View File

@@ -14,6 +14,7 @@ use workspace_utils::msg_store::MsgStore;
use crate::{
approvals::ExecutorApprovalService,
command::CommandBuildError,
executors::{
amp::Amp, claude::ClaudeCode, codex::Codex, copilot::Copilot, cursor::CursorAgent,
gemini::Gemini, opencode::Opencode, qwen::QwenCode,
@@ -55,6 +56,10 @@ pub enum ExecutorError {
TomlDeserialize(#[from] toml::de::Error),
#[error(transparent)]
ExecutorApprovalError(#[from] crate::approvals::ExecutorApprovalError),
#[error(transparent)]
CommandBuild(#[from] CommandBuildError),
#[error("Executable `{program}` not found in PATH")]
ExecutableNotFound { program: String },
}
#[enum_dispatch]

View File

@@ -16,7 +16,7 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, process::Command};
use ts_rs::TS;
use workspace_utils::{msg_store::MsgStore, path::make_path_relative, shell::get_shell_command};
use workspace_utils::{msg_store::MsgStore, path::make_path_relative};
use crate::{
command::{CmdOverrides, CommandBuilder, apply_overrides},
@@ -131,20 +131,19 @@ impl StandardCodingAgentExecutor for Opencode {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
// Start a dedicated local share bridge bound to this opencode process
let bridge = ShareBridge::start().await.map_err(ExecutorError::Io)?;
let (shell_cmd, shell_arg) = get_shell_command();
let opencode_command = self.build_command_builder().build_initial();
let command_parts = self.build_command_builder().build_initial()?;
let (program_path, args) = command_parts.into_resolved().await?;
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let mut command = Command::new(shell_cmd);
let mut command = Command::new(program_path);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped()) // Keep stdout but we won't use it
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(opencode_command)
.args(&args)
.env("NODE_NO_WARNINGS", "1")
.env("OPENCODE_AUTO_SHARE", "1")
.env("OPENCODE_API", bridge.base_url.clone());
@@ -196,22 +195,21 @@ impl StandardCodingAgentExecutor for Opencode {
) -> Result<SpawnedChild, ExecutorError> {
// Start a dedicated local share bridge bound to this opencode process
let bridge = ShareBridge::start().await.map_err(ExecutorError::Io)?;
let (shell_cmd, shell_arg) = get_shell_command();
let opencode_command = self
let command_parts = self
.build_command_builder()
.build_follow_up(&["--session".to_string(), session_id.to_string()]);
.build_follow_up(&["--session".to_string(), session_id.to_string()])?;
let (program_path, args) = command_parts.into_resolved().await?;
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let mut command = Command::new(shell_cmd);
let mut command = Command::new(program_path);
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped()) // Keep stdout but we won't use it
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&opencode_command)
.args(&args)
.env("NODE_NO_WARNINGS", "1")
.env("OPENCODE_AUTO_SHARE", "1")
.env("OPENCODE_API", bridge.base_url.clone());

View File

@@ -39,7 +39,7 @@ impl QwenCode {
#[async_trait]
impl StandardCodingAgentExecutor for QwenCode {
async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result<SpawnedChild, ExecutorError> {
let qwen_command = self.build_command_builder().build_initial();
let qwen_command = self.build_command_builder().build_initial()?;
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let harness = AcpAgentHarness::with_session_namespace("qwen_sessions");
harness
@@ -53,7 +53,7 @@ impl StandardCodingAgentExecutor for QwenCode {
prompt: &str,
session_id: &str,
) -> Result<SpawnedChild, ExecutorError> {
let qwen_command = self.build_command_builder().build_follow_up(&[]);
let qwen_command = self.build_command_builder().build_follow_up(&[])?;
let combined_prompt = self.append_prompt.combine_prompt(prompt);
let harness = AcpAgentHarness::with_session_namespace("qwen_sessions");
harness

View File

@@ -169,10 +169,13 @@ pub async fn create_task_attempt(
)
.await?;
let execution_process = deployment
if let Err(err) = deployment
.container()
.start_attempt(&task_attempt, executor_profile_id.clone())
.await?;
.await
{
tracing::error!("Failed to start task attempt: {}", err);
}
deployment
.track_if_analytics_allowed(
@@ -186,7 +189,7 @@ pub async fn create_task_attempt(
)
.await;
tracing::info!("Started execution process {}", execution_process.id);
tracing::info!("Created attempt for task {}", task.id);
Ok(ResponseJson(ApiResponse::success(task_attempt)))
}

View File

@@ -180,10 +180,12 @@ pub async fn create_task_and_start(
task.id,
)
.await?;
let execution_process = deployment
let is_attempt_running = deployment
.container()
.start_attempt(&task_attempt, payload.executor_profile_id.clone())
.await?;
.await
.inspect_err(|err| tracing::error!("Failed to start task attempt: {}", err))
.is_ok();
deployment
.track_if_analytics_allowed(
"task_attempt_started",
@@ -200,10 +202,10 @@ pub async fn create_task_and_start(
.await?
.ok_or(ApiError::Database(SqlxError::RowNotFound))?;
tracing::info!("Started execution process {}", execution_process.id);
tracing::info!("Started attempt for task {}", task.id);
Ok(ResponseJson(ApiResponse::success(TaskWithAttemptStatus {
task,
has_in_progress_attempt: true,
has_in_progress_attempt: is_attempt_running,
has_merged_attempt: false,
last_attempt_failed: false,
executor: task_attempt.executor,

View File

@@ -23,7 +23,7 @@ use std::{
use base64::{Engine, engine::general_purpose::STANDARD as BASE64_STANDARD};
use thiserror::Error;
use utils::shell::resolve_executable_path;
use utils::shell::resolve_executable_path_blocking; // TODO: make GitCli async
use crate::services::git::Commit;
@@ -497,7 +497,8 @@ impl GitCli {
/// Return true if there are staged changes (index differs from HEAD)
pub fn has_staged_changes(&self, repo_path: &Path) -> Result<bool, GitCliError> {
// `git diff --cached --quiet` returns exit code 1 if there are differences
let out = Command::new(resolve_executable_path("git").ok_or(GitCliError::NotAvailable)?)
let out =
Command::new(resolve_executable_path_blocking("git").ok_or(GitCliError::NotAvailable)?)
.arg("-C")
.arg(repo_path)
.arg("diff")
@@ -625,7 +626,7 @@ impl GitCli {
/// Ensure `git` is available on PATH
fn ensure_available(&self) -> Result<(), GitCliError> {
let git = resolve_executable_path("git").ok_or(GitCliError::NotAvailable)?;
let git = resolve_executable_path_blocking("git").ok_or(GitCliError::NotAvailable)?;
let out = Command::new(&git)
.arg("--version")
.output()
@@ -656,7 +657,7 @@ impl GitCli {
S: AsRef<OsStr>,
{
self.ensure_available()?;
let git = resolve_executable_path("git").ok_or(GitCliError::NotAvailable)?;
let git = resolve_executable_path_blocking("git").ok_or(GitCliError::NotAvailable)?;
let mut cmd = Command::new(&git);
cmd.arg("-C").arg(repo_path);
for a in args {
@@ -684,7 +685,7 @@ impl GitCli {
S: AsRef<OsStr>,
{
self.ensure_available()?;
let git = resolve_executable_path("git").ok_or(GitCliError::NotAvailable)?;
let git = resolve_executable_path_blocking("git").ok_or(GitCliError::NotAvailable)?;
let mut cmd = Command::new(&git);
cmd.arg("-C").arg(repo_path);
for (k, v) in envs {

View File

@@ -7,7 +7,7 @@ use std::{
use git2::{Error as GitError, Repository};
use thiserror::Error;
use tracing::{debug, info};
use utils::shell::get_shell_command;
use utils::shell::resolve_executable_path;
use super::{
git::{GitService, GitServiceError},
@@ -429,14 +429,13 @@ impl WorktreeManager {
// Try using git rev-parse --git-common-dir from within the worktree
let worktree_path_owned = worktree_path.to_path_buf();
tokio::task::spawn_blocking(move || {
let (shell_cmd, shell_arg) = get_shell_command();
let git_command = "git rev-parse --git-common-dir";
let git_path = resolve_executable_path("git").await?;
let output = std::process::Command::new(shell_cmd)
.args([shell_arg, git_command])
let output = tokio::process::Command::new(git_path)
.args(["rev-parse", "--git-common-dir"])
.current_dir(&worktree_path_owned)
.output()
.await
.ok()?;
if output.status.success() {
@@ -454,10 +453,6 @@ impl WorktreeManager {
} else {
None
}
})
.await
.ok()
.flatten()
}
/// Simple worktree cleanup when we can't determine the main repo

View File

@@ -21,7 +21,6 @@ open = "5.3.2"
regex = "1.11.1"
sentry = { version = "0.41.0", features = ["anyhow", "backtrace", "panic", "debug-images"] }
sentry-tracing = { version = "0.41.0", features = ["backtrace"] }
lazy_static = "1.4"
futures-util = "0.3"
json-patch = "2.0"
base64 = "0.22"
@@ -33,3 +32,7 @@ shellexpand = "3.1.1"
which = "8.0.0"
similar = "2"
git2 = "0.18"
[target.'cfg(windows)'.dependencies]
winreg = "0.55"
windows-sys = { version = "0.61", features = ["Win32_System_Environment"] }

View File

@@ -17,6 +17,7 @@ pub mod shell;
pub mod stream_ext;
pub mod stream_lines;
pub mod text;
pub mod tokio;
pub mod version;
/// Cache for WSL2 detection result

View File

@@ -1,30 +1,278 @@
//! Cross-platform shell command utilities
use std::{
collections::HashSet,
env::{join_paths, split_paths},
ffi::{OsStr, OsString},
path::{Path, PathBuf},
time::Duration,
};
use crate::tokio::block_on;
/// Returns the appropriate shell command and argument for the current platform.
///
/// Returns (shell_program, shell_arg) where:
/// - Windows: ("cmd", "/C")
/// - Unix-like: ("sh", "-c") or ("bash", "-c") if available
pub fn get_shell_command() -> (&'static str, &'static str) {
pub fn get_shell_command() -> (String, &'static str) {
if cfg!(windows) {
("cmd", "/C")
("cmd".into(), "/C")
} else {
// Prefer SHELL env var if set and valid
if let Ok(shell) = std::env::var("SHELL") {
let path = Path::new(&shell);
if path.is_absolute() && path.is_file() {
return (shell, "-c");
}
}
// Prefer zsh or bash if available, fallback to sh
if std::path::Path::new("/bin/zsh").exists() {
("zsh", "-c")
("zsh".into(), "-c")
} else if std::path::Path::new("/bin/bash").exists() {
("bash", "-c")
("bash".into(), "-c")
} else {
("sh", "-c")
("sh".into(), "-c")
}
}
}
/// Resolves the full path of an executable using the system's PATH environment variable.
/// Note: On Windows, resolving the executable path can be necessary before passing
/// it to `std::process::Command::new`, as the latter has been deficient in finding executables.
pub fn resolve_executable_path(executable: &str) -> Option<String> {
which::which(executable)
.ok()
.map(|p| p.to_string_lossy().to_string())
/// Resolve an executable by name, falling back to a refreshed PATH if needed.
///
/// The search order is:
/// 1. Explicit paths (absolute or containing a separator).
/// 2. The current process PATH via `which`.
/// 3. A platform-specific refresh of PATH (login shell on Unix, PowerShell on Windows),
/// after which we re-run the `which` lookup and update the process PATH for future calls.
pub async fn resolve_executable_path(executable: &str) -> Option<PathBuf> {
if executable.trim().is_empty() {
return None;
}
let path = Path::new(executable);
if path.is_absolute() && path.is_file() {
return Some(path.to_path_buf());
}
if let Some(found) = which(executable).await {
return Some(found);
}
if refresh_path().await
&& let Some(found) = which(executable).await
{
return Some(found);
}
None
}
pub fn resolve_executable_path_blocking(executable: &str) -> Option<PathBuf> {
block_on(resolve_executable_path(executable))
}
/// Merge two PATH strings into a single, de-duplicated PATH.
///
/// - Keeps the order of entries from `primary`.
/// - Appends only *unseen* entries from `secondary`.
/// - Ignores empty components.
/// - Returns a platform-correct PATH string (using the OS separator).
pub fn merge_paths(primary: impl AsRef<OsStr>, secondary: impl AsRef<OsStr>) -> OsString {
let mut seen = HashSet::<PathBuf>::new();
let mut merged = Vec::<PathBuf>::new();
for p in split_paths(primary.as_ref()).chain(split_paths(secondary.as_ref())) {
if !p.as_os_str().is_empty() && seen.insert(p.clone()) {
merged.push(p);
}
}
join_paths(merged).unwrap_or_default()
}
async fn refresh_path() -> bool {
let Some(refreshed) = get_fresh_path().await else {
return false;
};
let existing = std::env::var_os("PATH").unwrap_or_default();
let refreshed_os = OsString::from(&refreshed);
let merged = merge_paths(&existing, refreshed_os);
if merged == existing {
return false;
}
tracing::debug!(?existing, ?refreshed, ?merged, "Refreshed PATH");
unsafe {
std::env::set_var("PATH", &merged);
}
true
}
async fn which(executable: &str) -> Option<PathBuf> {
let executable = executable.to_string();
tokio::task::spawn_blocking(move || which::which(executable))
.await
.ok()
.and_then(|result| result.ok())
}
#[cfg(not(windows))]
async fn get_fresh_path() -> Option<String> {
use tokio::process::Command;
async fn run(shell: &Path, login: bool) -> Option<String> {
let mut cmd = Command::new(shell);
if login {
cmd.arg("-l");
}
cmd.arg("-c")
.arg("printf '%s' \"$PATH\"")
.env("TERM", "dumb")
.kill_on_drop(true);
const PATH_REFRESH_COMMAND_TIMEOUT: Duration = Duration::from_secs(5);
let child = cmd.spawn().ok()?;
let output = match tokio::time::timeout(
PATH_REFRESH_COMMAND_TIMEOUT,
child.wait_with_output(),
)
.await
{
Ok(Ok(output)) => output,
Ok(Err(err)) => {
tracing::debug!(
shell = %shell.display(),
?err,
"Failed to retrieve PATH from login shell"
);
return None;
}
Err(_) => {
tracing::warn!(
shell = %shell.display(),
timeout_secs = PATH_REFRESH_COMMAND_TIMEOUT.as_secs(),
"Timed out retrieving PATH from login shell"
);
return None;
}
};
if !output.status.success() {
return None;
}
let path = String::from_utf8(output.stdout).ok()?.trim().to_string();
if path.is_empty() { None } else { Some(path) }
}
let mut paths = Vec::new();
let shells = vec![
(PathBuf::from("/bin/zsh"), true),
(PathBuf::from("/bin/bash"), true),
(PathBuf::from("/bin/sh"), false),
];
let mut current_shell_name = None;
if let Ok(shell) = std::env::var("SHELL") {
let path = Path::new(&shell);
if path.is_absolute() && path.is_file() {
current_shell_name = path.file_name().and_then(OsStr::to_str).map(String::from);
if let Some(path) = run(path, true).await {
paths.push(path);
}
}
}
for (shell_path, login) in shells {
if !shell_path.exists() {
continue;
}
let shell_name = shell_path
.file_name()
.and_then(OsStr::to_str)
.map(String::from);
if current_shell_name != shell_name
&& let Some(path) = run(&shell_path, login).await
{
paths.push(path);
}
}
if paths.is_empty() {
return None;
}
paths
.into_iter()
.map(OsString::from)
.reduce(|a, b| merge_paths(&a, &b))
.map(|merged| merged.to_string_lossy().into_owned())
}
#[cfg(windows)]
async fn get_fresh_path() -> Option<String> {
tokio::task::spawn_blocking(get_fresh_path_blocking)
.await
.ok()
.flatten()
}
#[cfg(windows)]
fn get_fresh_path_blocking() -> Option<String> {
use std::{
ffi::{OsStr, OsString},
os::windows::ffi::{OsStrExt, OsStringExt},
};
use winreg::{HKEY, RegKey, enums::*};
// Expand %VARS% for registry PATH entries
fn expand_env_vars(input: &OsStr) -> OsString {
use windows_sys::Win32::System::Environment::ExpandEnvironmentStringsW;
let wide: Vec<u16> = input.encode_wide().chain(Some(0)).collect();
unsafe {
let needed = ExpandEnvironmentStringsW(wide.as_ptr(), std::ptr::null_mut(), 0);
if needed == 0 {
return input.to_os_string();
}
let mut buf = vec![0u16; needed as usize];
let written = ExpandEnvironmentStringsW(wide.as_ptr(), buf.as_mut_ptr(), needed);
if written == 0 {
return input.to_os_string();
}
// written includes the trailing NUL when it fits
OsString::from_wide(&buf[..(written as usize).saturating_sub(1)])
}
}
fn read_registry_path(root: HKEY, subkey: &str) -> Option<OsString> {
let key = RegKey::predef(root)
.open_subkey_with_flags(subkey, KEY_READ)
.ok()?;
key.get_value::<String, _>("Path").ok().map(OsString::from)
}
let mut paths: Vec<OsString> = Vec::new();
if let Some(user_path) = read_registry_path(HKEY_CURRENT_USER, "Environment") {
paths.push(expand_env_vars(&user_path));
}
if let Some(machine_path) = read_registry_path(
HKEY_LOCAL_MACHINE,
r"System\CurrentControlSet\Control\Session Manager\Environment",
) {
paths.push(expand_env_vars(&machine_path));
}
if paths.is_empty() {
return None;
}
paths
.into_iter()
.map(OsString::from)
.reduce(|a, b| merge_paths(&a, &b))
.map(|merged| merged.to_string_lossy().into_owned())
}

37
crates/utils/src/tokio.rs Normal file
View File

@@ -0,0 +1,37 @@
use std::{future::Future, sync::OnceLock};
use tokio::runtime::{Builder, Handle, Runtime, RuntimeFlavor};
fn rt() -> &'static Runtime {
static RT: OnceLock<Runtime> = OnceLock::new();
RT.get_or_init(|| {
Builder::new_multi_thread()
.enable_all()
.build()
.expect("failed to build global Tokio runtime")
})
}
/// Run an async future from sync code safely.
/// If already inside a Tokio runtime, it will use that runtime.
pub fn block_on<F, T>(fut: F) -> T
where
F: Future<Output = T> + Send,
T: Send,
{
match Handle::try_current() {
// Already inside a Tokio runtime
Ok(h) => match h.runtime_flavor() {
// Use block_in_place so other tasks keep running.
RuntimeFlavor::MultiThread => tokio::task::block_in_place(|| rt().block_on(fut)),
// Spawn a new thread to avoid freezing a single-thread runtime.
RuntimeFlavor::CurrentThread | _ => std::thread::scope(|s| {
s.spawn(|| rt().block_on(fut))
.join()
.expect("thread panicked")
}),
},
// Outside Tokio: block normally.
Err(_) => rt().block_on(fut),
}
}