diff --git a/backend/Cargo.toml b/backend/Cargo.toml index 3e8dba78..188a4769 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -9,11 +9,17 @@ build = "build.rs" name = "vibe_kanban" path = "src/lib.rs" +[[bin]] +name = "cloud-runner" +path = "src/bin/cloud_runner.rs" + [lints.clippy] uninlined-format-args = "allow" [dependencies] tokio = { workspace = true } +tokio-util = { version = "0.7", features = ["io"] } +bytes = "1.0" axum = { workspace = true } tower-http = { workspace = true } serde = { workspace = true } diff --git a/backend/src/app_state.rs b/backend/src/app_state.rs index 21d763d5..c4598798 100644 --- a/backend/src/app_state.rs +++ b/backend/src/app_state.rs @@ -1,11 +1,10 @@ -use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; -#[cfg(unix)] -use nix::{sys::signal::Signal, unistd::Pid}; use tokio::sync::{Mutex, RwLock as TokioRwLock}; use uuid::Uuid; use crate::{ + command_runner, models::Environment, services::{generate_user_id, AnalyticsConfig, AnalyticsService}, }; @@ -22,7 +21,7 @@ pub enum ExecutionType { pub struct RunningExecution { pub task_attempt_id: Uuid, pub _execution_type: ExecutionType, - pub child: command_group::AsyncGroupChild, + pub child: command_runner::CommandProcess, } #[derive(Debug, Clone)] @@ -91,7 +90,7 @@ impl AppState { let mut completed_executions = Vec::new(); for (execution_id, running_exec) in executions.iter_mut() { - match running_exec.child.try_wait() { + match running_exec.child.try_wait().await { Ok(Some(status)) => { let success = status.success(); let exit_code = status.code().map(|c| c as i64); @@ -140,24 +139,11 @@ impl AppState { return Ok(false); }; - // hit the whole process group, not just the leader - #[cfg(unix)] - { - use nix::{sys::signal::killpg, unistd::getpgid}; - - let pgid = getpgid(Some(Pid::from_raw(exec.child.id().unwrap() as i32)))?; - for sig in [Signal::SIGINT, Signal::SIGTERM, Signal::SIGKILL] { - killpg(pgid, sig)?; - tokio::time::sleep(Duration::from_secs(2)).await; - if exec.child.try_wait()?.is_some() { - break; // gone! - } - } - } - - // final fallback – command_group already targets the group - exec.child.kill().await.ok(); - exec.child.wait().await.ok(); // reap + // Kill the process using CommandRunner's kill method + exec.child + .kill() + .await + .map_err(|e| Box::new(e) as Box)?; // only NOW remove it executions.remove(&execution_id); diff --git a/backend/src/bin/cloud_runner.rs b/backend/src/bin/cloud_runner.rs new file mode 100644 index 00000000..4a40a9ec --- /dev/null +++ b/backend/src/bin/cloud_runner.rs @@ -0,0 +1,401 @@ +use std::{collections::HashMap, sync::Arc}; + +use axum::{ + body::Body, + extract::{Path, State}, + http::StatusCode, + response::{Json, Response}, + routing::{delete, get, post}, + Router, +}; +use serde::Serialize; +use tokio::sync::Mutex; +use tokio_util::io::ReaderStream; +use tracing_subscriber::prelude::*; +use uuid::Uuid; +use vibe_kanban::command_runner::{CommandProcess, CommandRunner, CommandRunnerArgs}; + +// Structure to hold process and its streams +struct ProcessEntry { + process: CommandProcess, + // Store the actual stdout/stderr streams for direct streaming + stdout_stream: Option>, + stderr_stream: Option>, + completed: Arc>, +} + +impl std::fmt::Debug for ProcessEntry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ProcessEntry") + .field("process", &self.process) + .field("stdout_stream", &self.stdout_stream.is_some()) + .field("stderr_stream", &self.stderr_stream.is_some()) + .field("completed", &self.completed) + .finish() + } +} + +// Application state to manage running processes +#[derive(Clone)] +struct AppState { + processes: Arc>>, +} + +// Response type for API responses +#[derive(Debug, Serialize)] +struct ApiResponse { + success: bool, + data: Option, + error: Option, +} + +impl ApiResponse { + fn success(data: T) -> Self { + Self { + success: true, + data: Some(data), + error: None, + } + } + + #[allow(dead_code)] + fn error(message: String) -> Self { + Self { + success: false, + data: None, + error: Some(message), + } + } +} + +// Response type for command creation +#[derive(Debug, Serialize)] +struct CreateCommandResponse { + process_id: String, +} + +// Response type for process status +#[derive(Debug, Serialize)] +struct ProcessStatusResponse { + process_id: String, + running: bool, + exit_code: Option, + success: Option, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Initialize tracing + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "cloud_runner=info".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + // Create application state + let app_state = AppState { + processes: Arc::new(Mutex::new(HashMap::new())), + }; + + // Build router + let app = Router::new() + .route("/health", get(health_check)) + .route("/commands", post(create_command)) + .route("/commands/:process_id", delete(kill_command)) + .route("/commands/:process_id/status", get(get_process_status)) + .route("/commands/:process_id/stdout", get(get_process_stdout)) + .route("/commands/:process_id/stderr", get(get_process_stderr)) + .with_state(app_state); + + // Get port from environment or default to 8000 + let port = std::env::var("PORT").unwrap_or_else(|_| "8000".to_string()); + let addr = format!("0.0.0.0:{}", port); + + tracing::info!("Cloud Runner server starting on {}", addr); + + // Start the server + let listener = tokio::net::TcpListener::bind(&addr).await?; + axum::serve(listener, app).await?; + + Ok(()) +} + +// Health check endpoint +async fn health_check() -> Json> { + Json(ApiResponse::success("Cloud Runner is healthy".to_string())) +} + +// Create and start a new command +async fn create_command( + State(state): State, + Json(request): Json, +) -> Result>, StatusCode> { + tracing::info!("Creating command: {} {:?}", request.command, request.args); + + // Create a local command runner from the request + let runner = CommandRunner::from_args(request); + + // Start the process + let mut process = match runner.start().await { + Ok(process) => process, + Err(e) => { + tracing::error!("Failed to start command: {}", e); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + }; + + // Generate unique process ID + let process_id = Uuid::new_v4().to_string(); + + // Create completion flag + let completed = Arc::new(Mutex::new(false)); + + // Get the streams from the process - we'll store them directly + let mut streams = match process.stream().await { + Ok(streams) => streams, + Err(e) => { + tracing::error!("Failed to get process streams: {}", e); + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } + }; + + // Extract the streams for direct use + let stdout_stream = streams.stdout.take(); + let stderr_stream = streams.stderr.take(); + + // Spawn a task to monitor process completion + { + let process_id_for_completion = process_id.clone(); + let completed_flag = completed.clone(); + let processes_ref = state.processes.clone(); + tokio::spawn(async move { + // Wait for the process to complete + if let Ok(mut processes) = processes_ref.try_lock() { + if let Some(entry) = processes.get_mut(&process_id_for_completion) { + let _ = entry.process.wait().await; + *completed_flag.lock().await = true; + tracing::debug!("Marked process {} as completed", process_id_for_completion); + } + } + }); + } + + // Create process entry + let entry = ProcessEntry { + process, + stdout_stream, + stderr_stream, + completed: completed.clone(), + }; + + // Store the process entry + { + let mut processes = state.processes.lock().await; + processes.insert(process_id.clone(), entry); + } + + tracing::info!("Command started with process_id: {}", process_id); + + Ok(Json(ApiResponse::success(CreateCommandResponse { + process_id, + }))) +} + +// Kill a running command +async fn kill_command( + State(state): State, + Path(process_id): Path, +) -> Result>, StatusCode> { + tracing::info!("Killing command with process_id: {}", process_id); + + let mut processes = state.processes.lock().await; + + if let Some(mut entry) = processes.remove(&process_id) { + // First check if the process has already finished + match entry.process.status().await { + Ok(Some(_)) => { + // Process already finished, consider kill successful + tracing::info!( + "Process {} already completed, kill considered successful", + process_id + ); + Ok(Json(ApiResponse::success( + "Process was already completed".to_string(), + ))) + } + Ok(None) => { + // Process still running, attempt to kill + match entry.process.kill().await { + Ok(()) => { + tracing::info!("Successfully killed process: {}", process_id); + Ok(Json(ApiResponse::success( + "Process killed successfully".to_string(), + ))) + } + Err(e) => { + tracing::error!("Failed to kill process {}: {}", process_id, e); + + // Check if it's a "No such process" error (process finished during kill) + if e.to_string().contains("No such process") { + tracing::info!("Process {} finished during kill attempt", process_id); + Ok(Json(ApiResponse::success( + "Process finished during kill attempt".to_string(), + ))) + } else { + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + } + } + } + Err(e) => { + tracing::error!("Failed to check status for process {}: {}", process_id, e); + // Still attempt to kill + match entry.process.kill().await { + Ok(()) => { + tracing::info!("Successfully killed process: {}", process_id); + Ok(Json(ApiResponse::success( + "Process killed successfully".to_string(), + ))) + } + Err(e) => { + tracing::error!("Failed to kill process {}: {}", process_id, e); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + } + } + } + } else { + tracing::warn!("Process not found: {}", process_id); + Err(StatusCode::NOT_FOUND) + } +} + +// Get status of a running command +async fn get_process_status( + State(state): State, + Path(process_id): Path, +) -> Result>, StatusCode> { + tracing::info!("Getting status for process_id: {}", process_id); + + let mut processes = state.processes.lock().await; + + if let Some(entry) = processes.get_mut(&process_id) { + match entry.process.status().await { + Ok(Some(exit_status)) => { + // Process has completed + let response = ProcessStatusResponse { + process_id: process_id.clone(), + running: false, + exit_code: exit_status.code(), + success: Some(exit_status.success()), + }; + Ok(Json(ApiResponse::success(response))) + } + Ok(None) => { + // Process is still running + let response = ProcessStatusResponse { + process_id: process_id.clone(), + running: true, + exit_code: None, + success: None, + }; + Ok(Json(ApiResponse::success(response))) + } + Err(e) => { + tracing::error!("Failed to get status for process {}: {}", process_id, e); + Err(StatusCode::INTERNAL_SERVER_ERROR) + } + } + } else { + tracing::warn!("Process not found: {}", process_id); + Err(StatusCode::NOT_FOUND) + } +} + +// Get stdout stream for a running command (direct streaming, no buffering) +async fn get_process_stdout( + State(state): State, + Path(process_id): Path, +) -> Result { + tracing::info!( + "Starting direct stdout stream for process_id: {}", + process_id + ); + + let mut processes = state.processes.lock().await; + + if let Some(entry) = processes.get_mut(&process_id) { + // Take ownership of stdout directly for streaming + if let Some(stdout) = entry.stdout_stream.take() { + drop(processes); // Release the lock early + + // Convert the AsyncRead (stdout) directly into an HTTP stream + let stream = ReaderStream::new(stdout); + + let response = Response::builder() + .header("content-type", "application/octet-stream") + .header("cache-control", "no-cache") + .body(Body::from_stream(stream)) + .map_err(|e| { + tracing::error!("Failed to build response stream: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(response) + } else { + tracing::error!( + "Stdout already taken or unavailable for process {}", + process_id + ); + Err(StatusCode::GONE) + } + } else { + tracing::warn!("Process not found for stdout: {}", process_id); + Err(StatusCode::NOT_FOUND) + } +} + +// Get stderr stream for a running command (direct streaming, no buffering) +async fn get_process_stderr( + State(state): State, + Path(process_id): Path, +) -> Result { + tracing::info!( + "Starting direct stderr stream for process_id: {}", + process_id + ); + + let mut processes = state.processes.lock().await; + + if let Some(entry) = processes.get_mut(&process_id) { + // Take ownership of stderr directly for streaming + if let Some(stderr) = entry.stderr_stream.take() { + drop(processes); // Release the lock early + + // Convert the AsyncRead (stderr) directly into an HTTP stream + let stream = ReaderStream::new(stderr); + + let response = Response::builder() + .header("content-type", "application/octet-stream") + .header("cache-control", "no-cache") + .body(Body::from_stream(stream)) + .map_err(|e| { + tracing::error!("Failed to build response stream: {}", e); + StatusCode::INTERNAL_SERVER_ERROR + })?; + + Ok(response) + } else { + tracing::error!( + "Stderr already taken or unavailable for process {}", + process_id + ); + Err(StatusCode::GONE) + } + } else { + tracing::warn!("Process not found for stderr: {}", process_id); + Err(StatusCode::NOT_FOUND) + } +} diff --git a/backend/src/bin/test_remote.rs b/backend/src/bin/test_remote.rs new file mode 100644 index 00000000..da4db1dd --- /dev/null +++ b/backend/src/bin/test_remote.rs @@ -0,0 +1,659 @@ +use std::env; + +use vibe_kanban::command_runner::CommandRunner; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Set up remote execution + env::set_var("CLOUD_EXECUTION", "1"); + env::set_var("CLOUD_SERVER_URL", "http://localhost:8000"); + + println!("šŸš€ Testing remote CommandRunner..."); + + // Test 1: Simple echo command + println!("\nšŸ“ Test 1: Echo command"); + let mut runner = CommandRunner::new(); + let mut process = runner + .command("echo") + .arg("Hello from remote!") + .start() + .await?; + + println!("āœ… Successfully started remote echo command!"); + + // Kill it (though echo probably finished already) + match process.kill().await { + Ok(()) => println!("āœ… Successfully killed echo process"), + Err(e) => println!("āš ļø Kill failed (probably already finished): {}", e), + } + + // Test 2: Long-running command + println!("\nā° Test 2: Sleep command (5 seconds)"); + let mut runner2 = CommandRunner::new(); + let mut process2 = runner2.command("sleep").arg("5").start().await?; + + println!("āœ… Successfully started remote sleep command!"); + + // Wait a bit then kill it + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + process2.kill().await?; + println!("āœ… Successfully killed sleep process!"); + + // Test 3: Command with environment variables + println!("\nšŸŒ Test 3: Environment variables"); + let mut runner3 = CommandRunner::new(); + let mut process3 = runner3 + .command("printenv") + .arg("TEST_VAR") + .env("TEST_VAR", "remote_test_value") + .start() + .await?; + + println!("āœ… Successfully started remote printenv command!"); + process3.kill().await.ok(); // Don't fail if already finished + + // Test 4: Working directory + println!("\nšŸ“ Test 4: Working directory"); + let mut runner4 = CommandRunner::new(); + let mut process4 = runner4.command("pwd").working_dir("/tmp").start().await?; + + println!("āœ… Successfully started remote pwd command!"); + process4.kill().await.ok(); // Don't fail if already finished + + // Test 5: Process Status Checking (TDD - These will FAIL initially) + println!("\nšŸ“Š Test 5: Process Status Checking (TDD)"); + + // Test 5a: Status of running process + let mut runner5a = CommandRunner::new(); + let mut process5a = runner5a.command("sleep").arg("3").start().await?; + + println!("āœ… Started sleep process for status testing"); + + // This should return None (still running) + match process5a.status().await { + Ok(None) => println!("āœ… Status correctly shows process still running"), + Ok(Some(status)) => println!( + "āš ļø Process finished unexpectedly with status: {:?}", + status + ), + Err(e) => println!("āŒ Status check failed (expected for now): {}", e), + } + + // Test try_wait (non-blocking) + match process5a.try_wait().await { + Ok(None) => println!("āœ… try_wait correctly shows process still running"), + Ok(Some(status)) => println!( + "āš ļø Process finished unexpectedly with status: {:?}", + status + ), + Err(e) => println!("āŒ try_wait failed (expected for now): {}", e), + } + + // Kill the process to test status of completed process + process5a.kill().await.ok(); + + // Test 5b: Status of completed process + let mut runner5b = CommandRunner::new(); + let mut process5b = runner5b.command("echo").arg("status test").start().await?; + + println!("āœ… Started echo process for completion status testing"); + + // Wait for process to complete + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + match process5b.status().await { + Ok(Some(status)) => { + println!( + "āœ… Status correctly shows completed process: success={}, code={:?}", + status.success(), + status.code() + ); + } + Ok(None) => println!("āš ļø Process still running (might need more time)"), + Err(e) => println!("āŒ Status check failed (expected for now): {}", e), + } + + // Test 5c: Wait for process completion + let mut runner5c = CommandRunner::new(); + let mut process5c = runner5c.command("echo").arg("wait test").start().await?; + + println!("āœ… Started echo process for wait testing"); + + match process5c.wait().await { + Ok(status) => { + println!( + "āœ… Wait completed successfully: success={}, code={:?}", + status.success(), + status.code() + ); + } + Err(e) => println!("āŒ Wait failed (expected for now): {}", e), + } + + // Test 6: Output Streaming (TDD - These will FAIL initially) + println!("\n🌊 Test 6: Output Streaming (TDD)"); + + // Test 6a: Stdout streaming + let mut runner6a = CommandRunner::new(); + let mut process6a = runner6a + .command("echo") + .arg("Hello stdout streaming!") + .start() + .await?; + + println!("āœ… Started echo process for stdout streaming test"); + + // Give the server a moment to capture output from fast commands like echo + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + match process6a.stream().await { + Ok(mut stream) => { + println!("āœ… Got streams from process"); + + if let Some(stdout) = &mut stream.stdout { + use tokio::io::AsyncReadExt; + let mut buffer = Vec::new(); + + match stdout.read_to_end(&mut buffer).await { + Ok(bytes_read) => { + let output = String::from_utf8_lossy(&buffer); + if bytes_read > 0 && output.contains("Hello stdout streaming") { + println!("āœ… Successfully read stdout: '{}'", output.trim()); + } else if bytes_read == 0 { + println!( + "āŒ No stdout data received (expected for now - empty streams)" + ); + } else { + println!("āš ļø Unexpected stdout content: '{}'", output); + } + } + Err(e) => println!("āŒ Failed to read stdout: {}", e), + } + } else { + println!("āŒ No stdout stream available (expected for now)"); + } + } + Err(e) => println!("āŒ Failed to get streams: {}", e), + } + + // Test 6b: Stderr streaming + let mut runner6b = CommandRunner::new(); + let mut process6b = runner6b + .command("bash") + .arg("-c") + .arg("echo 'Error message' >&2") + .start() + .await?; + + println!("āœ… Started bash process for stderr streaming test"); + + // Give the server a moment to capture output from fast commands + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + match process6b.stream().await { + Ok(mut stream) => { + if let Some(stderr) = &mut stream.stderr { + use tokio::io::AsyncReadExt; + let mut buffer = Vec::new(); + + match stderr.read_to_end(&mut buffer).await { + Ok(bytes_read) => { + let output = String::from_utf8_lossy(&buffer); + if bytes_read > 0 && output.contains("Error message") { + println!("āœ… Successfully read stderr: '{}'", output.trim()); + } else if bytes_read == 0 { + println!( + "āŒ No stderr data received (expected for now - empty streams)" + ); + } else { + println!("āš ļø Unexpected stderr content: '{}'", output); + } + } + Err(e) => println!("āŒ Failed to read stderr: {}", e), + } + } else { + println!("āŒ No stderr stream available (expected for now)"); + } + } + Err(e) => println!("āŒ Failed to get streams: {}", e), + } + + // Test 6c: Streaming from long-running process + let mut runner6c = CommandRunner::new(); + let mut process6c = runner6c + .command("bash") + .arg("-c") + .arg("for i in {1..3}; do echo \"Line $i\"; sleep 0.1; done") + .start() + .await?; + + println!("āœ… Started bash process for streaming test"); + + // Give the server a moment to capture output from the command + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + match process6c.stream().await { + Ok(mut stream) => { + if let Some(stdout) = &mut stream.stdout { + use tokio::io::AsyncReadExt; + let mut buffer = [0u8; 1024]; + + // Try to read some data (this tests real-time streaming) + match tokio::time::timeout( + tokio::time::Duration::from_secs(2), + stdout.read(&mut buffer), + ) + .await + { + Ok(Ok(bytes_read)) => { + let output = String::from_utf8_lossy(&buffer[..bytes_read]); + if bytes_read > 0 { + println!("āœ… Successfully streamed output: '{}'", output.trim()); + } else { + println!("āŒ No streaming data received (expected for now)"); + } + } + Ok(Err(e)) => println!("āŒ Stream read error: {}", e), + Err(_) => { + println!("āŒ Stream read timeout (expected for now - no real streaming)") + } + } + } else { + println!("āŒ No stdout stream available for streaming test"); + } + } + Err(e) => println!("āŒ Failed to get streams for streaming test: {}", e), + } + + // Clean up + process6c.kill().await.ok(); + + // Test 7: Server Status API Endpoint (TDD - These will FAIL initially) + println!("\nšŸ” Test 7: Server Status API Endpoint (TDD)"); + + // Create a process first + let client = reqwest::Client::new(); + let command_request = serde_json::json!({ + "command": "sleep", + "args": ["5"], + "working_dir": null, + "env_vars": [], + "stdin": null + }); + + let response = client + .post("http://localhost:8000/commands") + .json(&command_request) + .send() + .await?; + + if response.status().is_success() { + let body: serde_json::Value = response.json().await?; + if let Some(process_id) = body["data"]["process_id"].as_str() { + println!("āœ… Created process for status API test: {}", process_id); + + // Test 7a: GET /commands/{id}/status for running process + let status_url = format!("http://localhost:8000/commands/{}/status", process_id); + match client.get(&status_url).send().await { + Ok(response) => { + if response.status().is_success() { + match response.json::().await { + Ok(status_body) => { + println!("āœ… Got status response: {}", status_body); + + // Check expected structure + if let Some(data) = status_body.get("data") { + if let Some(running) = + data.get("running").and_then(|v| v.as_bool()) + { + if running { + println!( + "āœ… Status correctly shows process is running" + ); + } else { + println!("āš ļø Process already finished"); + } + } else { + println!("āŒ Missing 'running' field in status response"); + } + } else { + println!("āŒ Missing 'data' field in status response"); + } + } + Err(e) => println!("āŒ Failed to parse status JSON: {}", e), + } + } else { + println!( + "āŒ Status API returned error: {} (expected for now)", + response.status() + ); + } + } + Err(e) => println!("āŒ Status API request failed (expected for now): {}", e), + } + + // Kill the process + let _ = client + .delete(format!("http://localhost:8000/commands/{}", process_id)) + .send() + .await; + } + } + + // Test 7b: Status of completed process + let quick_command = serde_json::json!({ + "command": "echo", + "args": ["quick command"], + "working_dir": null, + "env_vars": [], + "stdin": null + }); + + let response = client + .post("http://localhost:8000/commands") + .json(&quick_command) + .send() + .await?; + + if response.status().is_success() { + let body: serde_json::Value = response.json().await?; + if let Some(process_id) = body["data"]["process_id"].as_str() { + println!( + "āœ… Created quick process for completed status test: {}", + process_id + ); + + // Wait for it to complete + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + let status_url = format!("http://localhost:8000/commands/{}/status", process_id); + match client.get(&status_url).send().await { + Ok(response) => { + if response.status().is_success() { + match response.json::().await { + Ok(status_body) => { + println!("āœ… Got completed status response: {}", status_body); + + if let Some(data) = status_body.get("data") { + if let Some(exit_code) = data.get("exit_code") { + println!("āœ… Status includes exit code: {}", exit_code); + } + if let Some(success) = data.get("success") { + println!("āœ… Status includes success flag: {}", success); + } + } + } + Err(e) => println!("āŒ Failed to parse completed status JSON: {}", e), + } + } else { + println!( + "āŒ Completed status API returned error: {}", + response.status() + ); + } + } + Err(e) => println!("āŒ Completed status API request failed: {}", e), + } + } + } + + // Test 7c: Status of non-existent process (error handling) + let fake_id = "non-existent-process-id"; + let status_url = format!("http://localhost:8000/commands/{}/status", fake_id); + match client.get(&status_url).send().await { + Ok(response) => { + if response.status() == reqwest::StatusCode::NOT_FOUND { + println!("āœ… Status API correctly returns 404 for non-existent process"); + } else { + println!( + "āŒ Status API should return 404 for non-existent process, got: {}", + response.status() + ); + } + } + Err(e) => println!("āŒ Error testing non-existent process status: {}", e), + } + + // Test 8: Server Streaming API Endpoint (TDD - These will FAIL initially) + println!("\nšŸ“” Test 8: Server Streaming API Endpoint (TDD)"); + + // Create a process that generates output + let stream_command = serde_json::json!({ + "command": "bash", + "args": ["-c", "for i in {1..3}; do echo \"Stream line $i\"; sleep 0.1; done"], + "working_dir": null, + "env_vars": [], + "stdin": null + }); + + let response = client + .post("http://localhost:8000/commands") + .json(&stream_command) + .send() + .await?; + + if response.status().is_success() { + let body: serde_json::Value = response.json().await?; + if let Some(process_id) = body["data"]["process_id"].as_str() { + println!("āœ… Created streaming process: {}", process_id); + + // Test 8a: GET /commands/{id}/stream endpoint + let stream_url = format!("http://localhost:8000/commands/{}/stream", process_id); + match client.get(&stream_url).send().await { + Ok(response) => { + if response.status().is_success() { + println!("āœ… Stream endpoint accessible"); + if let Some(content_type) = response.headers().get("content-type") { + println!("āœ… Content-Type: {:?}", content_type); + } + + // Try to read the response body + match response.text().await { + Ok(text) => { + if !text.is_empty() { + println!("āœ… Received streaming data: '{}'", text.trim()); + } else { + println!("āŒ No streaming data received (expected for now)"); + } + } + Err(e) => println!("āŒ Failed to read stream response: {}", e), + } + } else { + println!( + "āŒ Stream endpoint returned error: {} (expected for now)", + response.status() + ); + } + } + Err(e) => println!("āŒ Stream API request failed (expected for now): {}", e), + } + + // Clean up + let _ = client + .delete(format!("http://localhost:8000/commands/{}", process_id)) + .send() + .await; + } + } + + // Test 8b: Streaming from non-existent process + let fake_stream_url = format!("http://localhost:8000/commands/{}/stream", "fake-id"); + match client.get(&fake_stream_url).send().await { + Ok(response) => { + if response.status() == reqwest::StatusCode::NOT_FOUND { + println!("āœ… Stream API correctly returns 404 for non-existent process"); + } else { + println!( + "āŒ Stream API should return 404 for non-existent process, got: {}", + response.status() + ); + } + } + Err(e) => println!("āŒ Error testing non-existent process stream: {}", e), + } + + // Test 9: True Chunk-Based Streaming Verification (Fixed) + println!("\n🌊 Test 9: True Chunk-Based Streaming Verification"); + + // Create a longer-running process to avoid timing issues + let stream_command = serde_json::json!({ + "command": "bash", + "args": ["-c", "for i in {1..6}; do echo \"Chunk $i at $(date +%H:%M:%S.%3N)\"; sleep 0.5; done"], + "working_dir": null, + "env_vars": [], + "stdin": null + }); + + let response = client + .post("http://localhost:8000/commands") + .json(&stream_command) + .send() + .await?; + + if response.status().is_success() { + let body: serde_json::Value = response.json().await?; + if let Some(process_id) = body["data"]["process_id"].as_str() { + println!( + "āœ… Created streaming process: {} (will run ~3 seconds)", + process_id + ); + + // Test chunk-based streaming with the /stream endpoint + let stream_url = format!("http://localhost:8000/commands/{}/stream", process_id); + + // Small delay to let the process start generating output + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let stream_response = client.get(&stream_url).send().await; + + match stream_response { + Ok(response) => { + if response.status().is_success() { + println!("āœ… Stream endpoint accessible"); + + let start_time = std::time::Instant::now(); + + println!("šŸ” Reading streaming response:"); + + // Try to read the response in chunks using a simpler approach + let bytes = match tokio::time::timeout( + tokio::time::Duration::from_secs(4), + response.bytes(), + ) + .await + { + Ok(Ok(bytes)) => bytes, + Ok(Err(e)) => { + println!(" āŒ Failed to read response: {}", e); + return Ok(()); + } + Err(_) => { + println!(" āŒ Response read timeout"); + return Ok(()); + } + }; + + let response_text = String::from_utf8_lossy(&bytes); + let lines: Vec<&str> = + response_text.lines().filter(|l| !l.is_empty()).collect(); + + println!("šŸ“Š Response analysis:"); + println!(" Total response size: {} bytes", bytes.len()); + println!(" Number of lines: {}", lines.len()); + println!( + " Read duration: {:.1}s", + start_time.elapsed().as_secs_f32() + ); + + if !lines.is_empty() { + println!(" Lines received:"); + for (i, line) in lines.iter().enumerate() { + println!(" {}: '{}'", i + 1, line); + } + } + + // The key insight: if we got multiple lines with different timestamps, + // it proves they were generated over time, even if delivered in one HTTP response + if lines.len() > 1 { + // Check if timestamps show progression + let first_line = lines[0]; + let last_line = lines[lines.len() - 1]; + + if first_line != last_line { + println!("āœ… STREAMING VERIFIED: {} lines with different content/timestamps!", lines.len()); + println!( + " This proves the server captured streaming output over time" + ); + if lines.len() >= 3 { + println!(" First: '{}'", first_line); + println!(" Last: '{}'", last_line); + } + } else { + println!( + "āš ļø Multiple identical lines - may indicate buffering issue" + ); + } + } else if lines.len() == 1 { + println!("āš ļø Only 1 line received: '{}'", lines[0]); + println!( + " This suggests the process finished too quickly or timing issue" + ); + } else { + println!("āŒ No output lines received"); + } + } else { + println!("āŒ Stream endpoint error: {}", response.status()); + } + } + Err(e) => println!("āŒ Stream request failed: {}", e), + } + + // Wait for process to complete, then verify final output + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + println!("\nšŸ” Verification: Testing completed process output:"); + let stdout_url = format!("http://localhost:8000/commands/{}/stdout", process_id); + match client.get(&stdout_url).send().await { + Ok(response) if response.status().is_success() => { + if let Ok(text) = response.text().await { + let final_lines: Vec<&str> = + text.lines().filter(|l| !l.is_empty()).collect(); + println!( + "āœ… Final stdout: {} lines, {} bytes", + final_lines.len(), + text.len() + ); + + if final_lines.len() >= 6 { + println!( + "āœ… Process completed successfully - all expected output captured" + ); + } else { + println!( + "āš ļø Expected 6 lines, got {} - process may have been interrupted", + final_lines.len() + ); + } + } + } + _ => println!("āš ļø Final stdout check failed"), + } + + // Clean up + let _ = client + .delete(format!("http://localhost:8000/commands/{}", process_id)) + .send() + .await; + } + } + + println!("\nšŸŽ‰ All TDD tests completed!"); + println!("šŸ’” Expected failures show what needs to be implemented:"); + println!(" šŸ“Š Remote status/wait methods"); + println!(" 🌊 Real output streaming"); + println!(" šŸ” GET /commands/:id/status endpoint"); + println!(" šŸ“” GET /commands/:id/stream endpoint"); + println!("šŸ”§ Time to make the tests pass! šŸš€"); + + Ok(()) +} diff --git a/backend/src/command_runner.rs b/backend/src/command_runner.rs new file mode 100644 index 00000000..c24e6bac --- /dev/null +++ b/backend/src/command_runner.rs @@ -0,0 +1,291 @@ +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use tokio::io::AsyncRead; + +use crate::models::Environment; + +mod local; +mod remote; + +pub use local::LocalCommandExecutor; +pub use remote::RemoteCommandExecutor; + +// Core trait that defines the interface for command execution +#[async_trait] +pub trait CommandExecutor: Send + Sync { + /// Start a process and return a handle to it + async fn start( + &self, + request: &CommandRunnerArgs, + ) -> Result, CommandError>; +} + +// Trait for managing running processes +#[async_trait] +pub trait ProcessHandle: Send + Sync { + /// Check if the process is still running, return exit status if finished + async fn try_wait(&mut self) -> Result, CommandError>; + + /// Wait for the process to complete and return exit status + async fn wait(&mut self) -> Result; + + /// Kill the process + async fn kill(&mut self) -> Result<(), CommandError>; + + /// Get streams for stdout and stderr + async fn stream(&mut self) -> Result; + + /// Get process identifier (for debugging/logging) + fn process_id(&self) -> String; + + /// Check current status (alias for try_wait for backward compatibility) + async fn status(&mut self) -> Result, CommandError> { + self.try_wait().await + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommandRunnerArgs { + pub command: String, + pub args: Vec, + pub working_dir: Option, + pub env_vars: Vec<(String, String)>, + pub stdin: Option, +} + +pub struct CommandRunner { + executor: Box, + command: Option, + args: Vec, + working_dir: Option, + env_vars: Vec<(String, String)>, + stdin: Option, +} +impl Default for CommandRunner { + fn default() -> Self { + Self::new() + } +} + +pub struct CommandProcess { + handle: Box, +} + +impl std::fmt::Debug for CommandProcess { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CommandProcess") + .field("process_id", &self.handle.process_id()) + .finish() + } +} + +#[derive(Debug)] +pub enum CommandError { + SpawnFailed { + command: String, + error: std::io::Error, + }, + StatusCheckFailed { + error: std::io::Error, + }, + KillFailed { + error: std::io::Error, + }, + ProcessNotStarted, + NoCommandSet, + IoError { + error: std::io::Error, + }, +} +impl From for CommandError { + fn from(error: std::io::Error) -> Self { + CommandError::IoError { error } + } +} +impl std::fmt::Display for CommandError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + CommandError::SpawnFailed { command, error } => { + write!(f, "Failed to spawn command '{}': {}", command, error) + } + CommandError::StatusCheckFailed { error } => { + write!(f, "Failed to check command status: {}", error) + } + CommandError::KillFailed { error } => { + write!(f, "Failed to kill command: {}", error) + } + CommandError::ProcessNotStarted => { + write!(f, "Process has not been started yet") + } + CommandError::NoCommandSet => { + write!(f, "No command has been set") + } + CommandError::IoError { error } => { + write!(f, "Failed to spawn command: {}", error) + } + } + } +} + +impl std::error::Error for CommandError {} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CommandExitStatus { + /// Exit code (0 for success on most platforms) + code: Option, + /// Whether the process exited successfully + success: bool, + /// Unix signal that terminated the process (Unix only) + #[cfg(unix)] + signal: Option, + /// Optional remote process identifier for cloud execution + remote_process_id: Option, + /// Optional session identifier for remote execution tracking + remote_session_id: Option, +} + +impl CommandExitStatus { + /// Returns true if the process exited successfully + pub fn success(&self) -> bool { + self.success + } + + /// Returns the exit code of the process, if available + pub fn code(&self) -> Option { + self.code + } +} + +pub struct CommandStream { + pub stdout: Option>, + pub stderr: Option>, +} + +impl CommandRunner { + pub fn new() -> Self { + let env = std::env::var("ENVIRONMENT").unwrap_or_else(|_| "local".to_string()); + let mode = env.parse().unwrap_or(Environment::Local); + match mode { + Environment::Cloud => CommandRunner { + executor: Box::new(RemoteCommandExecutor::new()), + command: None, + args: Vec::new(), + working_dir: None, + env_vars: Vec::new(), + stdin: None, + }, + Environment::Local => CommandRunner { + executor: Box::new(LocalCommandExecutor::new()), + command: None, + args: Vec::new(), + working_dir: None, + env_vars: Vec::new(), + stdin: None, + }, + } + } + + pub fn command(&mut self, cmd: &str) -> &mut Self { + self.command = Some(cmd.to_string()); + self + } + + pub fn get_program(&self) -> &str { + self.command.as_deref().unwrap_or("") + } + + pub fn get_args(&self) -> &[String] { + &self.args + } + + pub fn get_current_dir(&self) -> Option<&str> { + self.working_dir.as_deref() + } + + pub fn arg(&mut self, arg: &str) -> &mut Self { + self.args.push(arg.to_string()); + self + } + + pub fn stdin(&mut self, prompt: &str) -> &mut Self { + self.stdin = Some(prompt.to_string()); + self + } + + pub fn working_dir(&mut self, dir: &str) -> &mut Self { + self.working_dir = Some(dir.to_string()); + self + } + + pub fn env(&mut self, key: &str, val: &str) -> &mut Self { + self.env_vars.push((key.to_string(), val.to_string())); + self + } + + /// Convert the current CommandRunner state to a CreateCommandRequest + pub fn to_args(&self) -> Option { + Some(CommandRunnerArgs { + command: self.command.clone()?, + args: self.args.clone(), + working_dir: self.working_dir.clone(), + env_vars: self.env_vars.clone(), + stdin: self.stdin.clone(), + }) + } + + /// Create a CommandRunner from a CreateCommandRequest, respecting the environment + #[allow(dead_code)] + pub fn from_args(request: CommandRunnerArgs) -> Self { + let mut runner = Self::new(); + runner.command(&request.command); + + for arg in &request.args { + runner.arg(arg); + } + + if let Some(dir) = &request.working_dir { + runner.working_dir(dir); + } + + for (key, value) in &request.env_vars { + runner.env(key, value); + } + + if let Some(stdin) = &request.stdin { + runner.stdin(stdin); + } + + runner + } + + pub async fn start(&self) -> Result { + let request = self.to_args().ok_or(CommandError::NoCommandSet)?; + let handle = self.executor.start(&request).await?; + + Ok(CommandProcess { handle }) + } +} + +impl CommandProcess { + #[allow(dead_code)] + pub async fn status(&mut self) -> Result, CommandError> { + self.handle.status().await + } + + pub async fn try_wait(&mut self) -> Result, CommandError> { + self.handle.try_wait().await + } + + pub async fn kill(&mut self) -> Result<(), CommandError> { + self.handle.kill().await + } + + pub async fn stream(&mut self) -> Result { + self.handle.stream().await + } + + #[allow(dead_code)] + pub async fn wait(&mut self) -> Result { + self.handle.wait().await + } +} diff --git a/backend/src/command_runner/local.rs b/backend/src/command_runner/local.rs new file mode 100644 index 00000000..0e951204 --- /dev/null +++ b/backend/src/command_runner/local.rs @@ -0,0 +1,703 @@ +use std::{process::Stdio, time::Duration}; + +use async_trait::async_trait; +use command_group::{AsyncCommandGroup, AsyncGroupChild}; +#[cfg(unix)] +use nix::{ + sys::signal::{killpg, Signal}, + unistd::{getpgid, Pid}, +}; +use tokio::process::Command; + +use crate::command_runner::{ + CommandError, CommandExecutor, CommandExitStatus, CommandRunnerArgs, CommandStream, + ProcessHandle, +}; + +pub struct LocalCommandExecutor; + +impl Default for LocalCommandExecutor { + fn default() -> Self { + Self::new() + } +} + +impl LocalCommandExecutor { + pub fn new() -> Self { + Self + } +} + +#[async_trait] +impl CommandExecutor for LocalCommandExecutor { + async fn start( + &self, + request: &CommandRunnerArgs, + ) -> Result, CommandError> { + let mut cmd = Command::new(&request.command); + + cmd.args(&request.args) + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + if let Some(dir) = &request.working_dir { + cmd.current_dir(dir); + } + + for (key, val) in &request.env_vars { + cmd.env(key, val); + } + + let mut child = cmd.group_spawn().map_err(|e| CommandError::SpawnFailed { + command: format!("{} {}", request.command, request.args.join(" ")), + error: e, + })?; + + if let Some(prompt) = &request.stdin { + // Write prompt to stdin safely + if let Some(mut stdin) = child.inner().stdin.take() { + use tokio::io::AsyncWriteExt; + stdin.write_all(prompt.as_bytes()).await?; + stdin.shutdown().await?; + } + } + + Ok(Box::new(LocalProcessHandle::new(child))) + } +} + +pub struct LocalProcessHandle { + child: Option, + process_id: String, +} + +impl LocalProcessHandle { + pub fn new(mut child: AsyncGroupChild) -> Self { + let process_id = child + .inner() + .id() + .map(|id| id.to_string()) + .unwrap_or_else(|| "unknown".to_string()); + + Self { + child: Some(child), + process_id, + } + } +} + +#[async_trait] +impl ProcessHandle for LocalProcessHandle { + async fn try_wait(&mut self) -> Result, CommandError> { + match &mut self.child { + Some(child) => match child + .inner() + .try_wait() + .map_err(|e| CommandError::StatusCheckFailed { error: e })? + { + Some(status) => Ok(Some(CommandExitStatus::from_local(status))), + None => Ok(None), + }, + None => Err(CommandError::ProcessNotStarted), + } + } + + async fn wait(&mut self) -> Result { + match &mut self.child { + Some(child) => { + let status = child + .wait() + .await + .map_err(|e| CommandError::KillFailed { error: e })?; + Ok(CommandExitStatus::from_local(status)) + } + None => Err(CommandError::ProcessNotStarted), + } + } + + async fn kill(&mut self) -> Result<(), CommandError> { + match &mut self.child { + Some(child) => { + // hit the whole process group, not just the leader + #[cfg(unix)] + { + if let Some(pid) = child.inner().id() { + let pgid = getpgid(Some(Pid::from_raw(pid as i32))).map_err(|e| { + CommandError::KillFailed { + error: std::io::Error::other(e), + } + })?; + + for sig in [Signal::SIGINT, Signal::SIGTERM, Signal::SIGKILL] { + if let Err(e) = killpg(pgid, sig) { + tracing::warn!( + "Failed to send signal {:?} to process group {}: {}", + sig, + pgid, + e + ); + } + tokio::time::sleep(Duration::from_secs(2)).await; + if child + .inner() + .try_wait() + .map_err(|e| CommandError::StatusCheckFailed { error: e })? + .is_some() + { + break; // gone! + } + } + } + } + + // final fallback – command_group already targets the group + child + .kill() + .await + .map_err(|e| CommandError::KillFailed { error: e })?; + child + .wait() + .await + .map_err(|e| CommandError::KillFailed { error: e })?; // reap + + // Clear the handle after successful kill + self.child = None; + Ok(()) + } + None => Err(CommandError::ProcessNotStarted), + } + } + + async fn stream(&mut self) -> Result { + match &mut self.child { + Some(child) => { + let stdout = child.inner().stdout.take(); + let stderr = child.inner().stderr.take(); + Ok(CommandStream::from_local(stdout, stderr)) + } + None => Err(CommandError::ProcessNotStarted), + } + } + + fn process_id(&self) -> String { + self.process_id.clone() + } +} + +// Local-specific implementations for shared types +impl CommandExitStatus { + /// Create a CommandExitStatus from a std::process::ExitStatus (for local processes) + pub fn from_local(status: std::process::ExitStatus) -> Self { + Self { + code: status.code(), + success: status.success(), + #[cfg(unix)] + signal: { + use std::os::unix::process::ExitStatusExt; + status.signal() + }, + remote_process_id: None, + remote_session_id: None, + } + } +} + +impl CommandStream { + /// Create a CommandStream from local process streams + pub fn from_local( + stdout: Option, + stderr: Option, + ) -> Self { + Self { + stdout: stdout.map(|s| Box::new(s) as Box), + stderr: stderr.map(|s| Box::new(s) as Box), + } + } +} + +#[cfg(test)] +mod tests { + use std::process::Stdio; + + use command_group::{AsyncCommandGroup, AsyncGroupChild}; + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + process::Command, + }; + + use crate::command_runner::*; + + // Helper function to create a comparison tokio::process::Command + async fn create_tokio_command( + cmd: &str, + args: &[&str], + working_dir: Option<&str>, + env_vars: &[(String, String)], + stdin_data: Option<&str>, + ) -> Result { + let mut command = Command::new(cmd); + command + .args(args) + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()); + + if let Some(dir) = working_dir { + command.current_dir(dir); + } + + for (key, val) in env_vars { + command.env(key, val); + } + + let mut child = command.group_spawn()?; + + // Write stdin data if provided + if let Some(data) = stdin_data { + if let Some(mut stdin) = child.inner().stdin.take() { + stdin.write_all(data.as_bytes()).await?; + stdin.shutdown().await?; + } + } + + Ok(child) + } + + #[tokio::test] + async fn test_command_execution_comparison() { + // Ensure we're using local execution for this test + std::env::set_var("ENVIRONMENT", "local"); + let test_message = "hello world"; + + // Test with CommandRunner + let mut runner = CommandRunner::new(); + let mut process = runner + .command("echo") + .arg(test_message) + .start() + .await + .expect("CommandRunner should start echo command"); + + let mut stream = process.stream().await.expect("Should get stream"); + let mut stdout_data = Vec::new(); + if let Some(stdout) = &mut stream.stdout { + stdout + .read_to_end(&mut stdout_data) + .await + .expect("Should read stdout"); + } + let runner_output = String::from_utf8(stdout_data).expect("Should be valid UTF-8"); + + // Test with tokio::process::Command + let mut tokio_child = create_tokio_command("echo", &[test_message], None, &[], None) + .await + .expect("Should start tokio command"); + + let mut tokio_stdout_data = Vec::new(); + if let Some(stdout) = tokio_child.inner().stdout.take() { + let mut stdout = stdout; + stdout + .read_to_end(&mut tokio_stdout_data) + .await + .expect("Should read tokio stdout"); + } + let tokio_output = String::from_utf8(tokio_stdout_data).expect("Should be valid UTF-8"); + + // Both should produce the same output + assert_eq!(runner_output.trim(), tokio_output.trim()); + assert_eq!(runner_output.trim(), test_message); + } + + #[tokio::test] + async fn test_stdin_handling() { + // Ensure we're using local execution for this test + std::env::set_var("ENVIRONMENT", "local"); + let test_input = "test input data\n"; + + // Test with CommandRunner (using cat to echo stdin) + let mut runner = CommandRunner::new(); + let mut process = runner + .command("cat") + .stdin(test_input) + .start() + .await + .expect("CommandRunner should start cat command"); + + let mut stream = process.stream().await.expect("Should get stream"); + let mut stdout_data = Vec::new(); + if let Some(stdout) = &mut stream.stdout { + stdout + .read_to_end(&mut stdout_data) + .await + .expect("Should read stdout"); + } + let runner_output = String::from_utf8(stdout_data).expect("Should be valid UTF-8"); + + // Test with tokio::process::Command + let mut tokio_child = create_tokio_command("cat", &[], None, &[], Some(test_input)) + .await + .expect("Should start tokio command"); + + let mut tokio_stdout_data = Vec::new(); + if let Some(stdout) = tokio_child.inner().stdout.take() { + let mut stdout = stdout; + stdout + .read_to_end(&mut tokio_stdout_data) + .await + .expect("Should read tokio stdout"); + } + let tokio_output = String::from_utf8(tokio_stdout_data).expect("Should be valid UTF-8"); + + // Both should echo the input + assert_eq!(runner_output, tokio_output); + assert_eq!(runner_output, test_input); + } + + #[tokio::test] + async fn test_working_directory() { + // Use pwd command to check working directory + let test_dir = "/tmp"; + + // Test with CommandRunner + std::env::set_var("ENVIRONMENT", "local"); + let mut runner = CommandRunner::new(); + let mut process = runner + .command("pwd") + .working_dir(test_dir) + .start() + .await + .expect("CommandRunner should start pwd command"); + + let mut stream = process.stream().await.expect("Should get stream"); + let mut stdout_data = Vec::new(); + if let Some(stdout) = &mut stream.stdout { + stdout + .read_to_end(&mut stdout_data) + .await + .expect("Should read stdout"); + } + let runner_output = String::from_utf8(stdout_data).expect("Should be valid UTF-8"); + + // Test with tokio::process::Command + let mut tokio_child = create_tokio_command("pwd", &[], Some(test_dir), &[], None) + .await + .expect("Should start tokio command"); + + let mut tokio_stdout_data = Vec::new(); + if let Some(stdout) = tokio_child.inner().stdout.take() { + let mut stdout = stdout; + stdout + .read_to_end(&mut tokio_stdout_data) + .await + .expect("Should read tokio stdout"); + } + let tokio_output = String::from_utf8(tokio_stdout_data).expect("Should be valid UTF-8"); + + // Both should show the same working directory + assert_eq!(runner_output.trim(), tokio_output.trim()); + assert!(runner_output.trim().contains(test_dir)); + } + + #[tokio::test] + async fn test_environment_variables() { + let test_var = "TEST_VAR"; + let test_value = "test_value_123"; + + // Test with CommandRunner + std::env::set_var("ENVIRONMENT", "local"); + let mut runner = CommandRunner::new(); + let mut process = runner + .command("printenv") + .arg(test_var) + .env(test_var, test_value) + .start() + .await + .expect("CommandRunner should start printenv command"); + + let mut stream = process.stream().await.expect("Should get stream"); + let mut stdout_data = Vec::new(); + if let Some(stdout) = &mut stream.stdout { + stdout + .read_to_end(&mut stdout_data) + .await + .expect("Should read stdout"); + } + let runner_output = String::from_utf8(stdout_data).expect("Should be valid UTF-8"); + + // Test with tokio::process::Command + let env_vars = vec![(test_var.to_string(), test_value.to_string())]; + let mut tokio_child = create_tokio_command("printenv", &[test_var], None, &env_vars, None) + .await + .expect("Should start tokio command"); + + let mut tokio_stdout_data = Vec::new(); + if let Some(stdout) = tokio_child.inner().stdout.take() { + let mut stdout = stdout; + stdout + .read_to_end(&mut tokio_stdout_data) + .await + .expect("Should read tokio stdout"); + } + let tokio_output = String::from_utf8(tokio_stdout_data).expect("Should be valid UTF-8"); + + // Both should show the same environment variable + assert_eq!(runner_output.trim(), tokio_output.trim()); + assert_eq!(runner_output.trim(), test_value); + } + + #[tokio::test] + async fn test_process_group_creation() { + // Test that both CommandRunner and tokio::process::Command create process groups + // We'll use a sleep command that can be easily killed + + // Test with CommandRunner + std::env::set_var("ENVIRONMENT", "local"); + let mut runner = CommandRunner::new(); + let mut process = runner + .command("sleep") + .arg("10") // Sleep for 10 seconds + .start() + .await + .expect("CommandRunner should start sleep command"); + + // Check that process is running + let status = process.status().await.expect("Should check status"); + assert!(status.is_none(), "Process should still be running"); + + // Kill the process (might fail if already exited) + let _ = process.kill().await; + + // Wait a moment for the kill to take effect + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let final_status = process.status().await.expect("Should check final status"); + assert!( + final_status.is_some(), + "Process should have exited after kill" + ); + + // Test with tokio::process::Command for comparison + let mut tokio_child = create_tokio_command("sleep", &["10"], None, &[], None) + .await + .expect("Should start tokio sleep command"); + + // Check that process is running + let tokio_status = tokio_child + .inner() + .try_wait() + .expect("Should check tokio status"); + assert!( + tokio_status.is_none(), + "Tokio process should still be running" + ); + + // Kill the tokio process + tokio_child.kill().await.expect("Should kill tokio process"); + + // Wait a moment for the kill to take effect + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + let tokio_final_status = tokio_child + .inner() + .try_wait() + .expect("Should check tokio final status"); + assert!( + tokio_final_status.is_some(), + "Tokio process should have exited after kill" + ); + } + + #[tokio::test] + async fn test_kill_operation() { + // Test killing processes with both implementations + + // Test CommandRunner kill + std::env::set_var("ENVIRONMENT", "local"); + let mut runner = CommandRunner::new(); + let mut process = runner + .command("sleep") + .arg("60") // Long sleep + .start() + .await + .expect("Should start CommandRunner sleep"); + + // Verify it's running + assert!(process + .status() + .await + .expect("Should check status") + .is_none()); + + // Kill and verify it stops (might fail if already exited) + let _ = process.kill().await; + + // Give it time to die + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + let exit_status = process.status().await.expect("Should get exit status"); + assert!(exit_status.is_some(), "Process should have exited"); + + // Test tokio::process::Command kill for comparison + let mut tokio_child = create_tokio_command("sleep", &["60"], None, &[], None) + .await + .expect("Should start tokio sleep"); + + // Verify it's running + assert!(tokio_child + .inner() + .try_wait() + .expect("Should check tokio status") + .is_none()); + + // Kill and verify it stops + tokio_child.kill().await.expect("Should kill tokio process"); + + // Give it time to die + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + let tokio_exit_status = tokio_child + .inner() + .try_wait() + .expect("Should get tokio exit status"); + assert!( + tokio_exit_status.is_some(), + "Tokio process should have exited" + ); + } + + #[tokio::test] + async fn test_status_monitoring() { + // Test status monitoring with a quick command + + // Test with CommandRunner + std::env::set_var("ENVIRONMENT", "local"); + let mut runner = CommandRunner::new(); + let mut process = runner + .command("echo") + .arg("quick test") + .start() + .await + .expect("Should start CommandRunner echo"); + + // Initially might be running or might have finished quickly + let _initial_status = process.status().await.expect("Should check initial status"); + + // Wait for completion + let exit_status = process.wait().await.expect("Should wait for completion"); + assert!(exit_status.success(), "Echo command should succeed"); + + // After wait, status should show completion + let final_status = process.status().await.expect("Should check final status"); + assert!( + final_status.is_some(), + "Should have exit status after completion" + ); + assert!( + final_status.unwrap().success(), + "Should show successful exit" + ); + + // Test with tokio::process::Command for comparison + let mut tokio_child = create_tokio_command("echo", &["quick test"], None, &[], None) + .await + .expect("Should start tokio echo"); + + // Wait for completion + let tokio_exit_status = tokio_child + .wait() + .await + .expect("Should wait for tokio completion"); + assert!( + tokio_exit_status.success(), + "Tokio echo command should succeed" + ); + + // After wait, status should show completion + let tokio_final_status = tokio_child + .inner() + .try_wait() + .expect("Should check tokio final status"); + assert!( + tokio_final_status.is_some(), + "Should have tokio exit status after completion" + ); + assert!( + tokio_final_status.unwrap().success(), + "Should show tokio successful exit" + ); + } + + #[tokio::test] + async fn test_wait_for_completion() { + // Test waiting for process completion with specific exit codes + + // Test successful command (exit code 0) + std::env::set_var("ENVIRONMENT", "local"); + let mut runner = CommandRunner::new(); + let mut process = runner + .command("true") // Command that exits with 0 + .start() + .await + .expect("Should start true command"); + + let exit_status = process + .wait() + .await + .expect("Should wait for true completion"); + assert!(exit_status.success(), "true command should succeed"); + assert_eq!(exit_status.code(), Some(0), "true should exit with code 0"); + + // Test failing command (exit code 1) + let mut runner2 = CommandRunner::new(); + let mut process2 = runner2 + .command("false") // Command that exits with 1 + .start() + .await + .expect("Should start false command"); + + let exit_status2 = process2 + .wait() + .await + .expect("Should wait for false completion"); + assert!(!exit_status2.success(), "false command should fail"); + assert_eq!( + exit_status2.code(), + Some(1), + "false should exit with code 1" + ); + + // Compare with tokio::process::Command + let mut tokio_child = create_tokio_command("true", &[], None, &[], None) + .await + .expect("Should start tokio true"); + + let tokio_exit_status = tokio_child + .wait() + .await + .expect("Should wait for tokio true"); + assert!(tokio_exit_status.success(), "tokio true should succeed"); + assert_eq!( + tokio_exit_status.code(), + Some(0), + "tokio true should exit with code 0" + ); + + let mut tokio_child2 = create_tokio_command("false", &[], None, &[], None) + .await + .expect("Should start tokio false"); + + let tokio_exit_status2 = tokio_child2 + .wait() + .await + .expect("Should wait for tokio false"); + assert!(!tokio_exit_status2.success(), "tokio false should fail"); + assert_eq!( + tokio_exit_status2.code(), + Some(1), + "tokio false should exit with code 1" + ); + } +} diff --git a/backend/src/command_runner/remote.rs b/backend/src/command_runner/remote.rs new file mode 100644 index 00000000..e04d73c5 --- /dev/null +++ b/backend/src/command_runner/remote.rs @@ -0,0 +1,402 @@ +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use async_trait::async_trait; +use tokio::io::AsyncRead; + +use crate::command_runner::{ + CommandError, CommandExecutor, CommandExitStatus, CommandRunnerArgs, CommandStream, + ProcessHandle, +}; + +pub struct RemoteCommandExecutor { + cloud_server_url: String, +} + +impl Default for RemoteCommandExecutor { + fn default() -> Self { + Self::new() + } +} + +impl RemoteCommandExecutor { + pub fn new() -> Self { + let cloud_server_url = std::env::var("CLOUD_SERVER_URL") + .unwrap_or_else(|_| "http://localhost:8000".to_string()); + Self { cloud_server_url } + } +} + +#[async_trait] +impl CommandExecutor for RemoteCommandExecutor { + async fn start( + &self, + request: &CommandRunnerArgs, + ) -> Result, CommandError> { + let client = reqwest::Client::new(); + let response = client + .post(format!("{}/commands", self.cloud_server_url)) + .json(request) + .send() + .await + .map_err(|e| CommandError::IoError { + error: std::io::Error::other(e), + })?; + + let result: serde_json::Value = + response.json().await.map_err(|e| CommandError::IoError { + error: std::io::Error::other(e), + })?; + + let process_id = + result["data"]["process_id"] + .as_str() + .ok_or_else(|| CommandError::IoError { + error: std::io::Error::other(format!( + "Missing process_id in response: {}", + result + )), + })?; + + Ok(Box::new(RemoteProcessHandle::new( + process_id.to_string(), + self.cloud_server_url.clone(), + ))) + } +} + +pub struct RemoteProcessHandle { + process_id: String, + cloud_server_url: String, +} + +impl RemoteProcessHandle { + pub fn new(process_id: String, cloud_server_url: String) -> Self { + Self { + process_id, + cloud_server_url, + } + } +} + +#[async_trait] +impl ProcessHandle for RemoteProcessHandle { + async fn try_wait(&mut self) -> Result, CommandError> { + // Make HTTP request to get status from cloud server + let client = reqwest::Client::new(); + let response = client + .get(format!( + "{}/commands/{}/status", + self.cloud_server_url, self.process_id + )) + .send() + .await + .map_err(|e| CommandError::StatusCheckFailed { + error: std::io::Error::other(e), + })?; + + if !response.status().is_success() { + if response.status() == reqwest::StatusCode::NOT_FOUND { + return Err(CommandError::StatusCheckFailed { + error: std::io::Error::new(std::io::ErrorKind::NotFound, "Process not found"), + }); + } else { + return Err(CommandError::StatusCheckFailed { + error: std::io::Error::other("Status check failed"), + }); + } + } + + let result: serde_json::Value = + response + .json() + .await + .map_err(|e| CommandError::StatusCheckFailed { + error: std::io::Error::other(e), + })?; + + let data = result["data"] + .as_object() + .ok_or_else(|| CommandError::StatusCheckFailed { + error: std::io::Error::other("Invalid response format"), + })?; + + let running = data["running"].as_bool().unwrap_or(false); + + if running { + Ok(None) // Still running + } else { + // Process completed, extract exit status + let exit_code = data["exit_code"].as_i64().map(|c| c as i32); + let success = data["success"].as_bool().unwrap_or(false); + + Ok(Some(CommandExitStatus::from_remote( + exit_code, + success, + Some(self.process_id.clone()), + None, + ))) + } + } + + async fn wait(&mut self) -> Result { + // Poll the status endpoint until process completes + loop { + let client = reqwest::Client::new(); + let response = client + .get(format!( + "{}/commands/{}/status", + self.cloud_server_url, self.process_id + )) + .send() + .await + .map_err(|e| CommandError::StatusCheckFailed { + error: std::io::Error::other(e), + })?; + + if !response.status().is_success() { + if response.status() == reqwest::StatusCode::NOT_FOUND { + return Err(CommandError::StatusCheckFailed { + error: std::io::Error::new( + std::io::ErrorKind::NotFound, + "Process not found", + ), + }); + } else { + return Err(CommandError::StatusCheckFailed { + error: std::io::Error::other("Status check failed"), + }); + } + } + + let result: serde_json::Value = + response + .json() + .await + .map_err(|e| CommandError::StatusCheckFailed { + error: std::io::Error::other(e), + })?; + + let data = + result["data"] + .as_object() + .ok_or_else(|| CommandError::StatusCheckFailed { + error: std::io::Error::other("Invalid response format"), + })?; + + let running = data["running"].as_bool().unwrap_or(false); + + if !running { + // Process completed, extract exit status and return + let exit_code = data["exit_code"].as_i64().map(|c| c as i32); + let success = data["success"].as_bool().unwrap_or(false); + + return Ok(CommandExitStatus::from_remote( + exit_code, + success, + Some(self.process_id.clone()), + None, + )); + } + + // Wait a bit before polling again + tokio::time::sleep(tokio::time::Duration::from_millis(20)).await; + } + } + + async fn kill(&mut self) -> Result<(), CommandError> { + let client = reqwest::Client::new(); + let response = client + .delete(format!( + "{}/commands/{}", + self.cloud_server_url, self.process_id + )) + .send() + .await + .map_err(|e| CommandError::KillFailed { + error: std::io::Error::other(e), + })?; + + if !response.status().is_success() { + if response.status() == reqwest::StatusCode::NOT_FOUND { + // Process not found, might have already finished - treat as success + return Ok(()); + } + + return Err(CommandError::KillFailed { + error: std::io::Error::other(format!( + "Remote kill failed with status: {}", + response.status() + )), + }); + } + + // Check if server indicates process was already completed + if let Ok(result) = response.json::().await { + if let Some(data) = result.get("data") { + if let Some(message) = data.as_str() { + tracing::info!("Kill result: {}", message); + } + } + } + + Ok(()) + } + + async fn stream(&mut self) -> Result { + // Create HTTP streams for stdout and stderr concurrently + let stdout_url = format!( + "{}/commands/{}/stdout", + self.cloud_server_url, self.process_id + ); + let stderr_url = format!( + "{}/commands/{}/stderr", + self.cloud_server_url, self.process_id + ); + + // Create both streams concurrently using tokio::try_join! + let (stdout_result, stderr_result) = + tokio::try_join!(HTTPStream::new(stdout_url), HTTPStream::new(stderr_url))?; + + let stdout_stream: Option> = + Some(Box::new(stdout_result) as Box); + let stderr_stream: Option> = + Some(Box::new(stderr_result) as Box); + + Ok(CommandStream { + stdout: stdout_stream, + stderr: stderr_stream, + }) + } + + fn process_id(&self) -> String { + self.process_id.clone() + } +} + +/// HTTP-based AsyncRead wrapper for true streaming +pub struct HTTPStream { + stream: Pin, reqwest::Error>> + Send>>, + current_chunk: Vec, + chunk_position: usize, + finished: bool, +} + +// HTTPStream needs to be Unpin to work with the AsyncRead trait bounds +impl Unpin for HTTPStream {} + +impl HTTPStream { + pub async fn new(url: String) -> Result { + let client = reqwest::Client::new(); + let response = client + .get(&url) + .send() + .await + .map_err(|e| CommandError::IoError { + error: std::io::Error::other(e), + })?; + + if !response.status().is_success() { + return Err(CommandError::IoError { + error: std::io::Error::other(format!( + "HTTP request failed with status: {}", + response.status() + )), + }); + } + + // Use chunk() method to create a stream + Ok(Self { + stream: Box::pin(futures_util::stream::unfold( + response, + |mut resp| async move { + match resp.chunk().await { + Ok(Some(chunk)) => Some((Ok(chunk.to_vec()), resp)), + Ok(None) => None, + Err(e) => Some((Err(e), resp)), + } + }, + )), + current_chunk: Vec::new(), + chunk_position: 0, + finished: false, + }) + } +} + +impl AsyncRead for HTTPStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + if self.finished { + return Poll::Ready(Ok(())); + } + + // First, try to read from current chunk if available + if self.chunk_position < self.current_chunk.len() { + let remaining_in_chunk = self.current_chunk.len() - self.chunk_position; + let to_read = std::cmp::min(remaining_in_chunk, buf.remaining()); + + let chunk_data = + &self.current_chunk[self.chunk_position..self.chunk_position + to_read]; + buf.put_slice(chunk_data); + self.chunk_position += to_read; + + return Poll::Ready(Ok(())); + } + + // Current chunk is exhausted, try to get the next chunk + match self.stream.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(chunk))) => { + if chunk.is_empty() { + // Empty chunk, mark as finished + self.finished = true; + Poll::Ready(Ok(())) + } else { + // New chunk available + self.current_chunk = chunk; + self.chunk_position = 0; + + // Read from the new chunk + let to_read = std::cmp::min(self.current_chunk.len(), buf.remaining()); + let chunk_data = &self.current_chunk[..to_read]; + buf.put_slice(chunk_data); + self.chunk_position = to_read; + + Poll::Ready(Ok(())) + } + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Err(std::io::Error::other(e))), + Poll::Ready(None) => { + // Stream ended + self.finished = true; + Poll::Ready(Ok(())) + } + Poll::Pending => Poll::Pending, + } + } +} + +// Remote-specific implementations for shared types +impl CommandExitStatus { + /// Create a CommandExitStatus for remote processes + pub fn from_remote( + code: Option, + success: bool, + remote_process_id: Option, + remote_session_id: Option, + ) -> Self { + Self { + code, + success, + #[cfg(unix)] + signal: None, + remote_process_id, + remote_session_id, + } + } +} diff --git a/backend/src/executor.rs b/backend/src/executor.rs index 66de5d54..9215974a 100644 --- a/backend/src/executor.rs +++ b/backend/src/executor.rs @@ -6,9 +6,12 @@ use tokio::io::{AsyncBufReadExt, BufReader}; use ts_rs::TS; use uuid::Uuid; -use crate::executors::{ - AiderExecutor, AmpExecutor, CCRExecutor, CharmOpencodeExecutor, ClaudeExecutor, EchoExecutor, - GeminiExecutor, SetupScriptExecutor, SstOpencodeExecutor, +use crate::{ + command_runner::{CommandError, CommandProcess, CommandRunner}, + executors::{ + AiderExecutor, AmpExecutor, CCRExecutor, CharmOpencodeExecutor, ClaudeExecutor, + EchoExecutor, GeminiExecutor, SetupScriptExecutor, SstOpencodeExecutor, + }, }; // Constants for database streaming - fast for near-real-time updates @@ -106,37 +109,28 @@ impl SpawnContext { self.additional_context = Some(context.into()); self } - /// Create SpawnContext from Command, then use builder methods for additional context - pub fn from_command( - command: &tokio::process::Command, - executor_type: impl Into, - ) -> Self { + pub fn from_command(command: &CommandRunner, executor_type: impl Into) -> Self { Self::from(command).with_executor_type(executor_type) } /// Finalize the context and create an ExecutorError - pub fn spawn_error(self, error: std::io::Error) -> ExecutorError { + pub fn spawn_error(self, error: CommandError) -> ExecutorError { ExecutorError::spawn_failed(error, self) } } /// Extract SpawnContext from a tokio::process::Command /// This automatically captures all available information from the Command object -impl From<&tokio::process::Command> for SpawnContext { - fn from(command: &tokio::process::Command) -> Self { - let program = command.as_std().get_program().to_string_lossy().to_string(); - let args = command - .as_std() - .get_args() - .map(|s| s.to_string_lossy().to_string()) - .collect(); +impl From<&CommandRunner> for SpawnContext { + fn from(command: &CommandRunner) -> Self { + let program = command.get_program().to_string(); + let args = command.get_args().to_vec(); let working_dir = command - .as_std() .get_current_dir() - .map(|p| p.to_string_lossy().to_string()) - .unwrap_or_else(|| "current_dir".to_string()); + .unwrap_or("current_dir") + .to_string(); Self { executor_type: "Unknown".to_string(), // Must be set using with_executor_type() @@ -153,7 +147,7 @@ impl From<&tokio::process::Command> for SpawnContext { #[derive(Debug)] pub enum ExecutorError { SpawnFailed { - error: std::io::Error, + error: CommandError, context: SpawnContext, }, TaskNotFound, @@ -249,7 +243,7 @@ impl From for ExecutorError { impl ExecutorError { /// Create a new SpawnFailed error with context - pub fn spawn_failed(error: std::io::Error, context: SpawnContext) -> Self { + pub fn spawn_failed(error: CommandError, context: SpawnContext) -> Self { ExecutorError::SpawnFailed { error, context } } } @@ -263,7 +257,7 @@ pub trait Executor: Send + Sync { pool: &sqlx::SqlitePool, task_id: Uuid, worktree_path: &str, - ) -> Result; + ) -> Result; /// Spawn a follow-up session for executors that support it /// @@ -277,10 +271,9 @@ pub trait Executor: Send + Sync { _session_id: &str, _prompt: &str, _worktree_path: &str, - ) -> Result { + ) -> Result { Err(ExecutorError::FollowUpNotSupported) } - /// Normalize executor logs into a standard format fn normalize_logs( &self, @@ -298,22 +291,22 @@ pub trait Executor: Send + Sync { } #[allow(clippy::result_large_err)] - fn setup_streaming( + async fn setup_streaming( &self, - child: &mut command_group::AsyncGroupChild, + child: &mut CommandProcess, pool: &sqlx::SqlitePool, attempt_id: Uuid, execution_process_id: Uuid, ) -> Result<(), ExecutorError> { - let stdout = child - .inner() + let streams = child + .stream() + .await + .expect("Failed to get stdio from child process"); + let stdout = streams .stdout - .take() .expect("Failed to take stdout from child process"); - let stderr = child - .inner() + let stderr = streams .stderr - .take() .expect("Failed to take stderr from child process"); let pool_clone1 = pool.clone(); @@ -345,9 +338,9 @@ pub trait Executor: Send + Sync { attempt_id: Uuid, execution_process_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { let mut child = self.spawn(pool, task_id, worktree_path).await?; - Self::setup_streaming(self, &mut child, pool, attempt_id, execution_process_id)?; + Self::setup_streaming(self, &mut child, pool, attempt_id, execution_process_id).await?; Ok(child) } @@ -362,11 +355,11 @@ pub trait Executor: Send + Sync { session_id: &str, prompt: &str, worktree_path: &str, - ) -> Result { + ) -> Result { let mut child = self .spawn_followup(pool, task_id, session_id, prompt, worktree_path) .await?; - Self::setup_streaming(self, &mut child, pool, attempt_id, execution_process_id)?; + Self::setup_streaming(self, &mut child, pool, attempt_id, execution_process_id).await?; Ok(child) } } diff --git a/backend/src/executors/aider.rs b/backend/src/executors/aider.rs index 9d8eb4a2..0f7c1cd6 100644 --- a/backend/src/executors/aider.rs +++ b/backend/src/executors/aider.rs @@ -1,13 +1,10 @@ use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; use serde_json::Value; -use tokio::{ - io::{AsyncBufReadExt, BufReader}, - process::Command, -}; +use tokio::io::{AsyncBufReadExt, BufReader}; use uuid::Uuid; use crate::{ + command_runner::{CommandProcess, CommandRunner}, executor::{ ActionType, Executor, ExecutorError, NormalizedConversation, NormalizedEntry, NormalizedEntryType, @@ -478,7 +475,7 @@ impl Executor for AiderExecutor { pool: &sqlx::SqlitePool, task_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { // Get the task to fetch its description let task = Task::find_by_id(pool, task_id) .await? @@ -526,40 +523,33 @@ impl Executor for AiderExecutor { message_file.to_string_lossy() ); - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .current_dir(worktree_path) - .env("COLUMNS", "1000") // Prevent line wrapping in aider output - .arg(shell_arg) - .arg(&aider_command); - - tracing::debug!("Spawning Aider command: {}", &aider_command); - // Write message file after command is prepared for better error context tokio::fs::write(&message_file, prompt.as_bytes()) .await .map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_task(task_id, Some(task.title.clone())) - .with_context(format!( - "Failed to write message file {}", - message_file.display() - )); - ExecutorError::spawn_failed(e, context) + ExecutorError::ContextCollectionFailed(format!( + "Failed to write message file {}: {}", + message_file.display(), + e + )) })?; - let child = command - .group_spawn() // Create new process group so we can kill entire tree - .map_err(|e| { - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_task(task_id, Some(task.title.clone())) - .with_context(format!("{} CLI execution for new task", self.executor_type)) - .spawn_error(e) - })?; + tracing::debug!("Spawning Aider command: {}", &aider_command); + + let mut command = CommandRunner::new(); + command + .command(shell_cmd) + .arg(shell_arg) + .arg(&aider_command) + .working_dir(worktree_path) + .env("COLUMNS", "1000"); // Prevent line wrapping in aider output + + let child = command.start().await.map_err(|e| { + crate::executor::SpawnContext::from_command(&command, &self.executor_type) + .with_task(task_id, Some(task.title.clone())) + .with_context(format!("{} CLI execution for new task", self.executor_type)) + .spawn_error(e) + })?; tracing::debug!( "Started Aider with message file {} for task {}: {:?}", @@ -579,7 +569,7 @@ impl Executor for AiderExecutor { attempt_id: Uuid, execution_process_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { // Generate our own session ID and store it in the database immediately let session_id = format!("aider_task_{}", task_id); if let Err(e) = @@ -601,16 +591,15 @@ impl Executor for AiderExecutor { let mut child = self.spawn(pool, task_id, worktree_path).await?; // Take stdout and stderr pipes for Aider filtering - let stdout = child - .inner() + let streams = child + .stream() + .await + .expect("Failed to get stdio from child process"); + let stdout = streams .stdout - .take() .expect("Failed to take stdout from child process"); - - let stderr = child - .inner() + let stderr = streams .stderr - .take() .expect("Failed to take stderr from child process"); // Start Aider filtering task @@ -666,7 +655,7 @@ impl Executor for AiderExecutor { session_id: &str, prompt: &str, worktree_path: &str, - ) -> Result { + ) -> Result { // Update session ID for this execution process to ensure continuity if let Err(e) = ExecutorSession::update_session_id(pool, execution_process_id, session_id).await @@ -689,16 +678,15 @@ impl Executor for AiderExecutor { .await?; // Take stdout and stderr pipes for Aider filtering - let stdout = child - .inner() + let streams = child + .stream() + .await + .expect("Failed to get stdio from child process"); + let stdout = streams .stdout - .take() .expect("Failed to take stdout from child process"); - - let stderr = child - .inner() + let stderr = streams .stderr - .take() .expect("Failed to take stderr from child process"); // Start Aider filtering task @@ -723,7 +711,7 @@ impl Executor for AiderExecutor { session_id: &str, prompt: &str, worktree_path: &str, - ) -> Result { + ) -> Result { let base_dir = TaskAttempt::get_worktree_base_dir(); // Create session directory if it doesn't exist @@ -759,32 +747,28 @@ impl Executor for AiderExecutor { message_file.to_string_lossy() ); - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .current_dir(worktree_path) - .env("COLUMNS", "1000") // Prevent line wrapping in aider output - .arg(shell_arg) - .arg(&aider_command); - - tracing::debug!("Spawning Aider command: {}", &aider_command); - // Write message file after command is prepared for better error context tokio::fs::write(&message_file, prompt.as_bytes()) .await .map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_context(format!( - "Failed to write followup message file {}", - message_file.display() - )); - ExecutorError::spawn_failed(e, context) + ExecutorError::ContextCollectionFailed(format!( + "Failed to write followup message file {}: {}", + message_file.display(), + e + )) })?; - let child = command.group_spawn().map_err(|e| { + tracing::debug!("Spawning Aider command: {}", &aider_command); + + let mut command = CommandRunner::new(); + command + .command(shell_cmd) + .arg(shell_arg) + .arg(&aider_command) + .working_dir(worktree_path) + .env("COLUMNS", "1000"); // Prevent line wrapping in aider output + + let child = command.start().await.map_err(|e| { crate::executor::SpawnContext::from_command(&command, &self.executor_type) .with_context(format!( "{} CLI followup execution for session {}", diff --git a/backend/src/executors/amp.rs b/backend/src/executors/amp.rs index 96377f34..56850072 100644 --- a/backend/src/executors/amp.rs +++ b/backend/src/executors/amp.rs @@ -1,11 +1,12 @@ use std::path::Path; use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::{ + command_runner::{CommandProcess, CommandRunner}, + executor, executor::{ ActionType, Executor, ExecutorError, NormalizedConversation, NormalizedEntry, NormalizedEntryType, @@ -193,16 +194,12 @@ impl Executor for AmpExecutor { pool: &sqlx::SqlitePool, task_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { // Get the task to fetch its description let task = Task::find_by_id(pool, task_id) .await? .ok_or(ExecutorError::TaskNotFound)?; - use std::process::Stdio; - - use tokio::{io::AsyncWriteExt, process::Command}; - let prompt = if let Some(task_description) = task.description { format!( r#"project_id: {} @@ -225,32 +222,22 @@ Task title: {}"#, // --format=jsonl is deprecated in latest versions of Amp CLI let amp_command = "npx @sourcegraph/amp@0.0.1752148945-gd8844f --format=jsonl"; - let mut command = Command::new(shell_cmd); + let mut command = CommandRunner::new(); command - .kill_on_drop(true) - .stdin(Stdio::piped()) // <-- open a pipe - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(worktree_path) + .command(shell_cmd) .arg(shell_arg) - .arg(amp_command); + .arg(amp_command) + .stdin(&prompt) + .working_dir(worktree_path); - let mut child = command - .group_spawn() // Create new process group so we can kill entire tree - .map_err(|e| { - crate::executor::SpawnContext::from_command(&command, "Amp") - .with_task(task_id, Some(task.title.clone())) - .with_context("Amp CLI execution for new task") - .spawn_error(e) - })?; + let proc = command.start().await.map_err(|e| { + executor::SpawnContext::from_command(&command, "Amp") + .with_task(task_id, Some(task.title.clone())) + .with_context("Amp CLI execution for new task") + .spawn_error(e) + })?; - // feed the prompt in, then close the pipe so `amp` sees EOF - if let Some(mut stdin) = child.inner().stdin.take() { - stdin.write_all(prompt.as_bytes()).await.unwrap(); - stdin.shutdown().await.unwrap(); // or `drop(stdin);` - } - - Ok(child) + Ok(proc) } async fn spawn_followup( @@ -260,11 +247,7 @@ Task title: {}"#, session_id: &str, prompt: &str, worktree_path: &str, - ) -> Result { - use std::process::Stdio; - - use tokio::{io::AsyncWriteExt, process::Command}; - + ) -> Result { // Use shell command for cross-platform compatibility let (shell_cmd, shell_arg) = get_shell_command(); let amp_command = format!( @@ -272,17 +255,15 @@ Task title: {}"#, session_id ); - let mut command = Command::new(shell_cmd); + let mut command = CommandRunner::new(); command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(worktree_path) + .command(shell_cmd) .arg(shell_arg) - .arg(&_command); + .arg(&_command) + .stdin(prompt) + .working_dir(worktree_path); - let mut child = command.group_spawn().map_err(|e| { + let proc = command.start().await.map_err(|e| { crate::executor::SpawnContext::from_command(&command, "Amp") .with_context(format!( "Amp CLI followup execution for thread {}", @@ -291,27 +272,7 @@ Task title: {}"#, .spawn_error(e) })?; - // Feed the prompt in, then close the pipe so amp sees EOF - if let Some(mut stdin) = child.inner().stdin.take() { - stdin.write_all(prompt.as_bytes()).await.map_err(|e| { - crate::executor::SpawnContext::from_command(&command, "Amp") - .with_context(format!( - "Failed to write prompt to Amp CLI stdin for thread {}", - session_id - )) - .spawn_error(e) - })?; - stdin.shutdown().await.map_err(|e| { - crate::executor::SpawnContext::from_command(&command, "Amp") - .with_context(format!( - "Failed to close Amp CLI stdin for thread {}", - session_id - )) - .spawn_error(e) - })?; - } - - Ok(child) + Ok(proc) } fn normalize_logs( diff --git a/backend/src/executors/ccr.rs b/backend/src/executors/ccr.rs index 74b52c93..9bc40435 100644 --- a/backend/src/executors/ccr.rs +++ b/backend/src/executors/ccr.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; -use command_group::AsyncGroupChild; use uuid::Uuid; use crate::{ + command_runner::CommandProcess, executor::{Executor, ExecutorError, NormalizedConversation}, executors::ClaudeExecutor, }; @@ -33,7 +33,7 @@ impl Executor for CCRExecutor { pool: &sqlx::SqlitePool, task_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { self.0.spawn(pool, task_id, worktree_path).await } @@ -44,7 +44,7 @@ impl Executor for CCRExecutor { session_id: &str, prompt: &str, worktree_path: &str, - ) -> Result { + ) -> Result { self.0 .spawn_followup(pool, task_id, session_id, prompt, worktree_path) .await diff --git a/backend/src/executors/charm_opencode.rs b/backend/src/executors/charm_opencode.rs index 21b40e35..81e00d83 100644 --- a/backend/src/executors/charm_opencode.rs +++ b/backend/src/executors/charm_opencode.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; use uuid::Uuid; use crate::{ + command_runner::{CommandProcess, CommandRunner}, executor::{Executor, ExecutorError}, models::task::Task, utils::shell::get_shell_command, @@ -18,16 +18,12 @@ impl Executor for CharmOpencodeExecutor { pool: &sqlx::SqlitePool, task_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { // Get the task to fetch its description let task = Task::find_by_id(pool, task_id) .await? .ok_or(ExecutorError::TaskNotFound)?; - use std::process::Stdio; - - use tokio::process::Command; - let prompt = if let Some(task_description) = task.description { format!( r#"project_id: {} @@ -52,25 +48,21 @@ Task title: {}"#, prompt.replace('"', "\\\"") ); - let mut command = Command::new(shell_cmd); + let mut command = CommandRunner::new(); command - .kill_on_drop(true) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(worktree_path) + .command(shell_cmd) .arg(shell_arg) - .arg(opencode_command); + .arg(&opencode_command) + .working_dir(worktree_path); - let child = command - .group_spawn() // Create new process group so we can kill entire tree - .map_err(|e| { - crate::executor::SpawnContext::from_command(&command, "CharmOpenCode") - .with_task(task_id, Some(task.title.clone())) - .with_context("CharmOpenCode CLI execution for new task") - .spawn_error(e) - })?; + let proc = command.start().await.map_err(|e| { + crate::executor::SpawnContext::from_command(&command, "CharmOpenCode") + .with_task(task_id, Some(task.title.clone())) + .with_context("CharmOpenCode CLI execution for new task") + .spawn_error(e) + })?; - Ok(child) + Ok(proc) } async fn spawn_followup( @@ -80,11 +72,7 @@ Task title: {}"#, _session_id: &str, prompt: &str, worktree_path: &str, - ) -> Result { - use std::process::Stdio; - - use tokio::process::Command; - + ) -> Result { // CharmOpencode doesn't support session-based followup, so we ignore session_id // and just run with the new prompt let (shell_cmd, shell_arg) = get_shell_command(); @@ -93,21 +81,19 @@ Task title: {}"#, prompt.replace('"', "\\\"") ); - let mut command = Command::new(shell_cmd); + let mut command = CommandRunner::new(); command - .kill_on_drop(true) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(worktree_path) + .command(shell_cmd) .arg(shell_arg) - .arg(&opencode_command); + .arg(&opencode_command) + .working_dir(worktree_path); - let child = command.group_spawn().map_err(|e| { + let proc = command.start().await.map_err(|e| { crate::executor::SpawnContext::from_command(&command, "CharmOpenCode") .with_context("CharmOpenCode CLI followup execution") .spawn_error(e) })?; - Ok(child) + Ok(proc) } } diff --git a/backend/src/executors/claude.rs b/backend/src/executors/claude.rs index 8f27cf90..62f2001d 100644 --- a/backend/src/executors/claude.rs +++ b/backend/src/executors/claude.rs @@ -1,11 +1,10 @@ use std::path::Path; use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; -use tokio::process::Command; use uuid::Uuid; use crate::{ + command_runner::{CommandProcess, CommandRunner}, executor::{ ActionType, Executor, ExecutorError, NormalizedConversation, NormalizedEntry, NormalizedEntryType, @@ -84,7 +83,7 @@ impl Executor for ClaudeExecutor { pool: &sqlx::SqlitePool, task_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { // Get the task to fetch its description let task = Task::find_by_id(pool, task_id) .await? @@ -112,54 +111,22 @@ Task title: {}"#, // Pass prompt via stdin instead of command line to avoid shell escaping issues let claude_command = &self.command; - let mut command = Command::new(shell_cmd); + let mut command = CommandRunner::new(); command - .kill_on_drop(true) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .current_dir(worktree_path) + .command(shell_cmd) .arg(shell_arg) .arg(claude_command) + .stdin(&prompt) + .working_dir(worktree_path) .env("NODE_NO_WARNINGS", "1"); - let mut child = command - .group_spawn() // Create new process group so we can kill entire tree - .map_err(|e| { - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_task(task_id, Some(task.title.clone())) - .with_context(format!("{} CLI execution for new task", self.executor_type)) - .spawn_error(e) - })?; - - // Write prompt to stdin safely - if let Some(mut stdin) = child.inner().stdin.take() { - use tokio::io::AsyncWriteExt; - tracing::debug!( - "Writing prompt to Claude stdin for task {}: {:?}", - task_id, - prompt - ); - stdin.write_all(prompt.as_bytes()).await.map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_task(task_id, Some(task.title.clone())) - .with_context(format!( - "Failed to write prompt to {} CLI stdin", - self.executor_type - )); - ExecutorError::spawn_failed(e, context) - })?; - stdin.shutdown().await.map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_task(task_id, Some(task.title.clone())) - .with_context(format!("Failed to close {} CLI stdin", self.executor_type)); - ExecutorError::spawn_failed(e, context) - })?; - } - - Ok(child) + let proc = command.start().await.map_err(|e| { + crate::executor::SpawnContext::from_command(&command, &self.executor_type) + .with_task(task_id, Some(task.title.clone())) + .with_context(format!("{} CLI execution for new task", self.executor_type)) + .spawn_error(e) + })?; + Ok(proc) } async fn spawn_followup( @@ -169,7 +136,7 @@ Task title: {}"#, session_id: &str, prompt: &str, worktree_path: &str, - ) -> Result { + ) -> Result { // Use shell command for cross-platform compatibility let (shell_cmd, shell_arg) = get_shell_command(); @@ -184,18 +151,16 @@ Task title: {}"#, format!("{} --resume={}", self.command, session_id) }; - let mut command = Command::new(shell_cmd); + let mut command = CommandRunner::new(); command - .kill_on_drop(true) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) - .current_dir(worktree_path) + .command(shell_cmd) .arg(shell_arg) .arg(&claude_command) + .stdin(prompt) + .working_dir(worktree_path) .env("NODE_NO_WARNINGS", "1"); - let mut child = command.group_spawn().map_err(|e| { + let proc = command.start().await.map_err(|e| { crate::executor::SpawnContext::from_command(&command, &self.executor_type) .with_context(format!( "{} CLI followup execution for session {}", @@ -204,36 +169,7 @@ Task title: {}"#, .spawn_error(e) })?; - // Write prompt to stdin safely - if let Some(mut stdin) = child.inner().stdin.take() { - use tokio::io::AsyncWriteExt; - tracing::debug!( - "Writing prompt to {} stdin for session {}: {:?}", - self.executor_type, - session_id, - prompt - ); - stdin.write_all(prompt.as_bytes()).await.map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_context(format!( - "Failed to write prompt to {} CLI stdin for session {}", - self.executor_type, session_id - )); - ExecutorError::spawn_failed(e, context) - })?; - stdin.shutdown().await.map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_context(format!( - "Failed to close {} CLI stdin for session {}", - self.executor_type, session_id - )); - ExecutorError::spawn_failed(e, context) - })?; - } - - Ok(child) + Ok(proc) } fn normalize_logs( diff --git a/backend/src/executors/cleanup_script.rs b/backend/src/executors/cleanup_script.rs index 334da27c..c96dd710 100644 --- a/backend/src/executors/cleanup_script.rs +++ b/backend/src/executors/cleanup_script.rs @@ -1,9 +1,8 @@ use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; -use tokio::process::Command; use uuid::Uuid; use crate::{ + command_runner::{CommandProcess, CommandRunner}, executor::{Executor, ExecutorError}, models::{project::Project, task::Task}, utils::shell::get_shell_command, @@ -21,7 +20,7 @@ impl Executor for CleanupScriptExecutor { pool: &sqlx::SqlitePool, task_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { // Validate the task and project exist let task = Task::find_by_id(pool, task_id) .await? @@ -32,23 +31,21 @@ impl Executor for CleanupScriptExecutor { .ok_or(ExecutorError::TaskNotFound)?; // Reuse TaskNotFound for simplicity let (shell_cmd, shell_arg) = get_shell_command(); - let mut command = Command::new(shell_cmd); + let mut command = CommandRunner::new(); command - .kill_on_drop(true) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) + .command(shell_cmd) .arg(shell_arg) .arg(&self.script) - .current_dir(worktree_path); + .working_dir(worktree_path); - let child = command.group_spawn().map_err(|e| { + let proc = command.start().await.map_err(|e| { crate::executor::SpawnContext::from_command(&command, "CleanupScript") .with_task(task_id, Some(task.title.clone())) .with_context("Cleanup script execution") .spawn_error(e) })?; - Ok(child) + Ok(proc) } /// Normalize cleanup script logs into a readable format diff --git a/backend/src/executors/dev_server.rs b/backend/src/executors/dev_server.rs index d27d6640..2949501e 100644 --- a/backend/src/executors/dev_server.rs +++ b/backend/src/executors/dev_server.rs @@ -1,9 +1,8 @@ use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; -use tokio::process::Command; use uuid::Uuid; use crate::{ + command_runner::{CommandProcess, CommandRunner}, executor::{Executor, ExecutorError}, models::{project::Project, task::Task}, utils::shell::get_shell_command, @@ -21,7 +20,7 @@ impl Executor for DevServerExecutor { pool: &sqlx::SqlitePool, task_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { // Validate the task and project exist let task = Task::find_by_id(pool, task_id) .await? @@ -32,22 +31,20 @@ impl Executor for DevServerExecutor { .ok_or(ExecutorError::TaskNotFound)?; // Reuse TaskNotFound for simplicity let (shell_cmd, shell_arg) = get_shell_command(); - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) + let mut runner = CommandRunner::new(); + runner + .command(shell_cmd) .arg(shell_arg) .arg(&self.script) - .current_dir(worktree_path); + .working_dir(worktree_path); - let child = command.group_spawn().map_err(|e| { - crate::executor::SpawnContext::from_command(&command, "DevServer") + let process = runner.start().await.map_err(|e| { + crate::executor::SpawnContext::from_command(&runner, "DevServer") .with_task(task_id, Some(task.title.clone())) .with_context("Development server execution") .spawn_error(e) })?; - Ok(child) + Ok(process) } } diff --git a/backend/src/executors/echo.rs b/backend/src/executors/echo.rs index e0b77614..1021e729 100644 --- a/backend/src/executors/echo.rs +++ b/backend/src/executors/echo.rs @@ -1,10 +1,9 @@ use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; -use tokio::process::Command; use uuid::Uuid; use crate::{ - executor::{Executor, ExecutorError}, + command_runner::{CommandProcess, CommandRunner}, + executor::{Executor, ExecutorError, SpawnContext}, models::task::Task, utils::shell::get_shell_command, }; @@ -19,7 +18,7 @@ impl Executor for EchoExecutor { pool: &sqlx::SqlitePool, task_id: Uuid, _worktree_path: &str, - ) -> Result { + ) -> Result { // Get the task to fetch its description let task = Task::find_by_id(pool, task_id) .await? @@ -57,22 +56,18 @@ echo "Task completed: {}""#, ) }; - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) + let mut command_runner = CommandRunner::new(); + command_runner + .command(shell_cmd) .arg(shell_arg) .arg(&script); - let child = command - .group_spawn() // Create new process group so we can kill entire tree - .map_err(|e| { - crate::executor::SpawnContext::from_command(&command, "Echo") - .with_task(task_id, Some(task.title.clone())) - .with_context("Shell script execution for echo demo") - .spawn_error(e) - })?; + let child = command_runner.start().await.map_err(|e| { + SpawnContext::from_command(&command_runner, "Echo") + .with_task(task_id, Some(task.title.clone())) + .with_context("Shell script execution for echo demo") + .spawn_error(e) + })?; Ok(child) } diff --git a/backend/src/executors/gemini.rs b/backend/src/executors/gemini.rs index 43ab80ec..b0e32475 100644 --- a/backend/src/executors/gemini.rs +++ b/backend/src/executors/gemini.rs @@ -5,10 +5,9 @@ mod config; mod streaming; -use std::{process::Stdio, time::Instant}; +use std::time::Instant; use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; use config::{ max_chunk_size, max_display_size, max_latency_ms, max_message_size, GeminiStreamConfig, }; @@ -16,10 +15,10 @@ use config::{ use serde_json::Value; pub use streaming::GeminiPatchBatch; use streaming::GeminiStreaming; -use tokio::{io::AsyncWriteExt, process::Command}; use uuid::Uuid; use crate::{ + command_runner::{CommandProcess, CommandRunner}, executor::{ Executor, ExecutorError, NormalizedConversation, NormalizedEntry, NormalizedEntryType, }, @@ -37,7 +36,7 @@ impl Executor for GeminiExecutor { pool: &sqlx::SqlitePool, task_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { // Get the task to fetch its description let task = Task::find_by_id(pool, task_id) .await? @@ -61,42 +60,18 @@ Task title: {}"#, }; let mut command = Self::create_gemini_command(worktree_path); + command.stdin(&prompt); - let mut child = command - .group_spawn() // Create new process group so we can kill entire tree - .map_err(|e| { - crate::executor::SpawnContext::from_command(&command, "Gemini") - .with_task(task_id, Some(task.title.clone())) - .with_context("Gemini CLI execution for new task") - .spawn_error(e) - })?; + let proc = command.start().await.map_err(|e| { + crate::executor::SpawnContext::from_command(&command, "Gemini") + .with_task(task_id, Some(task.title.clone())) + .with_context("Gemini CLI execution for new task") + .spawn_error(e) + })?; - // Write prompt to stdin - if let Some(mut stdin) = child.inner().stdin.take() { - tracing::debug!( - "Writing prompt to Gemini stdin for task {}: {:?}", - task_id, - prompt - ); - stdin.write_all(prompt.as_bytes()).await.map_err(|e| { - let context = crate::executor::SpawnContext::from_command(&command, "Gemini") - .with_task(task_id, Some(task.title.clone())) - .with_context("Failed to write prompt to Gemini CLI stdin"); - ExecutorError::spawn_failed(e, context) - })?; - stdin.shutdown().await.map_err(|e| { - let context = crate::executor::SpawnContext::from_command(&command, "Gemini") - .with_task(task_id, Some(task.title.clone())) - .with_context("Failed to close Gemini CLI stdin"); - ExecutorError::spawn_failed(e, context) - })?; - tracing::info!( - "Successfully sent prompt to Gemini stdin for task {}", - task_id - ); - } + tracing::info!("Successfully started Gemini process for task {}", task_id); - Ok(child) + Ok(proc) } async fn execute_streaming( @@ -106,7 +81,7 @@ Task title: {}"#, attempt_id: Uuid, execution_process_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { tracing::info!( "Starting Gemini execution for task {} attempt {}", task_id, @@ -115,17 +90,16 @@ Task title: {}"#, Self::update_session_id(pool, execution_process_id, &attempt_id.to_string()).await; - let mut child = self.spawn(pool, task_id, worktree_path).await?; + let mut proc = self.spawn(pool, task_id, worktree_path).await?; tracing::info!( - "Gemini process spawned successfully for attempt {}, PID: {:?}", - attempt_id, - child.inner().id() + "Gemini process spawned successfully for attempt {}", + attempt_id ); - Self::setup_streaming(pool, &mut child, attempt_id, execution_process_id); + Self::setup_streaming(pool, &mut proc, attempt_id, execution_process_id).await; - Ok(child) + Ok(proc) } async fn spawn_followup( @@ -135,7 +109,7 @@ Task title: {}"#, session_id: &str, prompt: &str, worktree_path: &str, - ) -> Result { + ) -> Result { // For Gemini, session_id is the attempt_id let attempt_id = Uuid::parse_str(session_id) .map_err(|_| ExecutorError::InvalidSessionId(session_id.to_string()))?; @@ -156,7 +130,7 @@ Task title: {}"#, session_id: &str, prompt: &str, worktree_path: &str, - ) -> Result { + ) -> Result { tracing::info!( "Starting Gemini follow-up execution for attempt {} (session {})", attempt_id, @@ -166,19 +140,18 @@ Task title: {}"#, // For Gemini, session_id is the attempt_id - update it in the database Self::update_session_id(pool, execution_process_id, session_id).await; - let mut child = self + let mut proc = self .spawn_followup(pool, task_id, session_id, prompt, worktree_path) .await?; tracing::info!( - "Gemini follow-up process spawned successfully for attempt {}, PID: {:?}", - attempt_id, - child.inner().id() + "Gemini follow-up process spawned successfully for attempt {}", + attempt_id ); - Self::setup_streaming(pool, &mut child, attempt_id, execution_process_id); + Self::setup_streaming(pool, &mut proc, attempt_id, execution_process_id).await; - Ok(child) + Ok(proc) } fn normalize_logs( @@ -261,19 +234,16 @@ Task title: {}"#, impl GeminiExecutor { /// Create a standardized Gemini CLI command - fn create_gemini_command(worktree_path: &str) -> Command { + fn create_gemini_command(worktree_path: &str) -> CommandRunner { let (shell_cmd, shell_arg) = get_shell_command(); let gemini_command = "npx @google/gemini-cli@latest --yolo"; - let mut command = Command::new(shell_cmd); + let mut command = CommandRunner::new(); command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(worktree_path) + .command(shell_cmd) .arg(shell_arg) .arg(gemini_command) + .working_dir(worktree_path) .env("NODE_NO_WARNINGS", "1"); command } @@ -306,23 +276,25 @@ impl GeminiExecutor { } /// Setup streaming for both stdout and stderr - fn setup_streaming( + async fn setup_streaming( pool: &sqlx::SqlitePool, - child: &mut AsyncGroupChild, + proc: &mut CommandProcess, attempt_id: Uuid, execution_process_id: Uuid, ) { - // Take stdout and stderr pipes for streaming - let stdout = child - .inner() + // Get stdout and stderr streams from CommandProcess + let mut stream = proc + .stream() + .await + .expect("Failed to get streams from command process"); + let stdout = stream .stdout .take() - .expect("Failed to take stdout from child process"); - let stderr = child - .inner() + .expect("Failed to get stdout from command stream"); + let stderr = stream .stderr .take() - .expect("Failed to take stderr from child process"); + .expect("Failed to get stderr from command stream"); // Start streaming tasks with Gemini-specific line-based message updates let pool_clone1 = pool.clone(); @@ -521,7 +493,7 @@ You are continuing work on the above task. The execution history shows what has worktree_path: &str, comprehensive_prompt: &str, attempt_id: Uuid, - ) -> Result { + ) -> Result { tracing::info!( "Spawning Gemini followup execution for attempt {} with resume context ({} chars)", attempt_id, @@ -529,8 +501,9 @@ You are continuing work on the above task. The execution history shows what has ); let mut command = GeminiExecutor::create_gemini_command(worktree_path); + command.stdin(comprehensive_prompt); - let mut child = command.group_spawn().map_err(|e| { + let proc = command.start().await.map_err(|e| { crate::executor::SpawnContext::from_command(&command, "Gemini") .with_context(format!( "Gemini CLI followup execution with context for attempt {}", @@ -539,53 +512,12 @@ You are continuing work on the above task. The execution history shows what has .spawn_error(e) })?; - self.send_prompt_to_stdin(&mut child, &command, comprehensive_prompt, attempt_id) - .await?; - Ok(child) - } + tracing::info!( + "Successfully started Gemini followup process for attempt {}", + attempt_id + ); - async fn send_prompt_to_stdin( - &self, - child: &mut AsyncGroupChild, - command: &Command, - comprehensive_prompt: &str, - attempt_id: Uuid, - ) -> Result<(), ExecutorError> { - if let Some(mut stdin) = child.inner().stdin.take() { - tracing::debug!( - "Sending resume context to Gemini for attempt {}: {} characters", - attempt_id, - comprehensive_prompt.len() - ); - - stdin - .write_all(comprehensive_prompt.as_bytes()) - .await - .map_err(|e| { - let context = crate::executor::SpawnContext::from_command(command, "Gemini") - .with_context(format!( - "Failed to write resume prompt to Gemini CLI stdin for attempt {}", - attempt_id - )); - ExecutorError::spawn_failed(e, context) - })?; - - stdin.shutdown().await.map_err(|e| { - let context = crate::executor::SpawnContext::from_command(command, "Gemini") - .with_context(format!( - "Failed to close Gemini CLI stdin for attempt {}", - attempt_id - )); - ExecutorError::spawn_failed(e, context) - })?; - - tracing::info!( - "Successfully sent resume context to Gemini for attempt {}", - attempt_id - ); - } - - Ok(()) + Ok(proc) } /// Format Gemini CLI output by inserting line breaks where periods are directly diff --git a/backend/src/executors/setup_script.rs b/backend/src/executors/setup_script.rs index 3721df89..562ff6b6 100644 --- a/backend/src/executors/setup_script.rs +++ b/backend/src/executors/setup_script.rs @@ -1,9 +1,8 @@ use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; -use tokio::process::Command; use uuid::Uuid; use crate::{ + command_runner::{CommandProcess, CommandRunner}, executor::{Executor, ExecutorError}, models::{project::Project, task::Task}, utils::shell::get_shell_command, @@ -27,7 +26,7 @@ impl Executor for SetupScriptExecutor { pool: &sqlx::SqlitePool, task_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { // Validate the task and project exist let task = Task::find_by_id(pool, task_id) .await? @@ -38,23 +37,21 @@ impl Executor for SetupScriptExecutor { .ok_or(ExecutorError::TaskNotFound)?; // Reuse TaskNotFound for simplicity let (shell_cmd, shell_arg) = get_shell_command(); - let mut command = Command::new(shell_cmd); + let mut command = CommandRunner::new(); command - .kill_on_drop(true) - .stdout(std::process::Stdio::piped()) - .stderr(std::process::Stdio::piped()) + .command(shell_cmd) .arg(shell_arg) .arg(&self.script) - .current_dir(worktree_path); + .working_dir(worktree_path); - let child = command.group_spawn().map_err(|e| { + let proc = command.start().await.map_err(|e| { crate::executor::SpawnContext::from_command(&command, "SetupScript") .with_task(task_id, Some(task.title.clone())) .with_context("Setup script execution") .spawn_error(e) })?; - Ok(child) + Ok(proc) } /// Normalize setup script logs into a readable format diff --git a/backend/src/executors/sst_opencode.rs b/backend/src/executors/sst_opencode.rs index b855e018..2a27e296 100644 --- a/backend/src/executors/sst_opencode.rs +++ b/backend/src/executors/sst_opencode.rs @@ -1,13 +1,10 @@ use async_trait::async_trait; -use command_group::{AsyncCommandGroup, AsyncGroupChild}; use serde_json::{json, Value}; -use tokio::{ - io::{AsyncBufReadExt, BufReader}, - process::Command, -}; +use tokio::io::{AsyncBufReadExt, BufReader}; use uuid::Uuid; use crate::{ + command_runner::{CommandProcess, CommandRunner}, executor::{Executor, ExecutorError, NormalizedConversation, NormalizedEntry}, models::{execution_process::ExecutionProcess, executor_session::ExecutorSession, task::Task}, utils::shell::get_shell_command, @@ -244,7 +241,7 @@ impl Executor for SstOpencodeExecutor { pool: &sqlx::SqlitePool, task_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { // Get the task to fetch its description let task = Task::find_by_id(pool, task_id) .await? @@ -271,54 +268,23 @@ Task title: {}"#, let (shell_cmd, shell_arg) = get_shell_command(); let opencode_command = &self.command; - let mut command = Command::new(shell_cmd); + let mut command = CommandRunner::new(); command - .kill_on_drop(true) - .stdin(std::process::Stdio::piped()) - .stdout(std::process::Stdio::null()) // Ignore stdout for OpenCode - .stderr(std::process::Stdio::piped()) - .current_dir(worktree_path) + .command(shell_cmd) .arg(shell_arg) .arg(opencode_command) + .stdin(&prompt) + .working_dir(worktree_path) .env("NODE_NO_WARNINGS", "1"); - let mut child = command - .group_spawn() // Create new process group so we can kill entire tree - .map_err(|e| { - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_task(task_id, Some(task.title.clone())) - .with_context(format!("{} CLI execution for new task", self.executor_type)) - .spawn_error(e) - })?; + let proc = command.start().await.map_err(|e| { + crate::executor::SpawnContext::from_command(&command, &self.executor_type) + .with_task(task_id, Some(task.title.clone())) + .with_context(format!("{} CLI execution for new task", self.executor_type)) + .spawn_error(e) + })?; - // Write prompt to stdin safely - if let Some(mut stdin) = child.inner().stdin.take() { - use tokio::io::AsyncWriteExt; - tracing::debug!( - "Writing prompt to OpenCode stdin for task {}: {:?}", - task_id, - prompt - ); - stdin.write_all(prompt.as_bytes()).await.map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_task(task_id, Some(task.title.clone())) - .with_context(format!( - "Failed to write prompt to {} CLI stdin", - self.executor_type - )); - ExecutorError::spawn_failed(e, context) - })?; - stdin.shutdown().await.map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_task(task_id, Some(task.title.clone())) - .with_context(format!("Failed to close {} CLI stdin", self.executor_type)); - ExecutorError::spawn_failed(e, context) - })?; - } - - Ok(child) + Ok(proc) } /// Execute with OpenCode filtering for stderr @@ -329,15 +295,18 @@ Task title: {}"#, attempt_id: Uuid, execution_process_id: Uuid, worktree_path: &str, - ) -> Result { - let mut child = self.spawn(pool, task_id, worktree_path).await?; + ) -> Result { + let mut proc = self.spawn(pool, task_id, worktree_path).await?; - // Take stderr pipe for OpenCode filtering - let stderr = child - .inner() + // Get stderr stream from CommandProcess for OpenCode filtering + let mut stream = proc + .stream() + .await + .expect("Failed to get streams from command process"); + let stderr = stream .stderr .take() - .expect("Failed to take stderr from child process"); + .expect("Failed to get stderr from command stream"); // Start OpenCode stderr filtering task let pool_clone = pool.clone(); @@ -350,7 +319,7 @@ Task title: {}"#, worktree_path_clone, )); - Ok(child) + Ok(proc) } fn normalize_logs( @@ -391,17 +360,20 @@ Task title: {}"#, session_id: &str, prompt: &str, worktree_path: &str, - ) -> Result { - let mut child = self + ) -> Result { + let mut proc = self .spawn_followup(pool, task_id, session_id, prompt, worktree_path) .await?; - // Take stderr pipe for OpenCode filtering - let stderr = child - .inner() + // Get stderr stream from CommandProcess for OpenCode filtering + let mut stream = proc + .stream() + .await + .expect("Failed to get streams from command process"); + let stderr = stream .stderr .take() - .expect("Failed to take stderr from child process"); + .expect("Failed to get stderr from command stream"); // Start OpenCode stderr filtering task let pool_clone = pool.clone(); @@ -414,7 +386,7 @@ Task title: {}"#, worktree_path_clone, )); - Ok(child) + Ok(proc) } async fn spawn_followup( @@ -424,27 +396,21 @@ Task title: {}"#, session_id: &str, prompt: &str, worktree_path: &str, - ) -> Result { - use std::process::Stdio; - - use tokio::io::AsyncWriteExt; - + ) -> Result { // Use shell command for cross-platform compatibility let (shell_cmd, shell_arg) = get_shell_command(); let opencode_command = format!("{} --session {}", self.command, session_id); - let mut command = Command::new(shell_cmd); + let mut command = CommandRunner::new(); command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::null()) // Ignore stdout for OpenCode - .stderr(Stdio::piped()) - .current_dir(worktree_path) + .command(shell_cmd) .arg(shell_arg) .arg(&opencode_command) + .stdin(prompt) + .working_dir(worktree_path) .env("NODE_NO_WARNINGS", "1"); - let mut child = command.group_spawn().map_err(|e| { + let proc = command.start().await.map_err(|e| { crate::executor::SpawnContext::from_command(&command, &self.executor_type) .with_context(format!( "{} CLI followup execution for session {}", @@ -453,35 +419,7 @@ Task title: {}"#, .spawn_error(e) })?; - // Write prompt to stdin safely - if let Some(mut stdin) = child.inner().stdin.take() { - tracing::debug!( - "Writing prompt to {} stdin for session {}: {:?}", - self.executor_type, - session_id, - prompt - ); - stdin.write_all(prompt.as_bytes()).await.map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_context(format!( - "Failed to write prompt to {} CLI stdin for session {}", - self.executor_type, session_id - )); - ExecutorError::spawn_failed(e, context) - })?; - stdin.shutdown().await.map_err(|e| { - let context = - crate::executor::SpawnContext::from_command(&command, &self.executor_type) - .with_context(format!( - "Failed to close {} CLI stdin for session {}", - self.executor_type, session_id - )); - ExecutorError::spawn_failed(e, context) - })?; - } - - Ok(child) + Ok(proc) } } diff --git a/backend/src/lib.rs b/backend/src/lib.rs index 53e08d18..4d008feb 100644 --- a/backend/src/lib.rs +++ b/backend/src/lib.rs @@ -3,6 +3,7 @@ use sentry_tracing::{EventFilter, SentryLayer}; use tracing::Level; pub mod app_state; +pub mod command_runner; pub mod execution_monitor; pub mod executor; pub mod executors; diff --git a/backend/src/main.rs b/backend/src/main.rs index c5d57749..0cd77c71 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -17,6 +17,7 @@ use tracing_subscriber::{filter::LevelFilter, prelude::*}; use vibe_kanban::{sentry_layer, Assets, ScriptAssets, SoundAssets}; mod app_state; +mod command_runner; mod execution_monitor; mod executor; mod executors; diff --git a/backend/src/services/process_service.rs b/backend/src/services/process_service.rs index 45904808..8619c79c 100644 --- a/backend/src/services/process_service.rs +++ b/backend/src/services/process_service.rs @@ -3,6 +3,7 @@ use tracing::{debug, info}; use uuid::Uuid; use crate::{ + command_runner, executor::Executor, models::{ execution_process::{CreateExecutionProcess, ExecutionProcess, ExecutionProcessType}, @@ -803,7 +804,7 @@ impl ProcessService { attempt_id: Uuid, process_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { use crate::executors::{CleanupScriptExecutor, DevServerExecutor, SetupScriptExecutor}; let result = match executor_type { @@ -863,7 +864,7 @@ impl ProcessService { process_id: Uuid, attempt_id: Uuid, process_type: &ExecutionProcessType, - child: command_group::AsyncGroupChild, + child: command_runner::CommandProcess, ) { let execution_type = match process_type { ExecutionProcessType::SetupScript => crate::app_state::ExecutionType::SetupScript, @@ -925,7 +926,7 @@ impl ProcessService { attempt_id: Uuid, process_id: Uuid, worktree_path: &str, - ) -> Result { + ) -> Result { use crate::executors::SetupScriptExecutor; let executor = SetupScriptExecutor {