Alex/refactor command runner (#323)

* feat: implement CommandRunner and integrate with executors

refactor: replace command_group::AsyncGroupChild with command_runner::CommandProcess in executor and process_service

Migrate traits and claude to commandrunner

Migrate gemini to command_runner

Migrate sst_opencode

Migrate ccr

Migrate amp

Migrate charm opencode

Migrate cleanup_script

Migrate executor (vibe-kanban 28b4ede6)

Ive added an abstract command runner to enable local and remote execution later. I already migrated the amp executor, please go ahead and replace migrate process handling with the new command runner @backend/src/command_runner.rs . If there are any missing functions ask me about them. Migrate backend/src/executors/echo.rs to be compatible.

Migrate executor (vibe-kanban 9dc48bc8)

Ive added an abstract command runner to enable local and remote execution later. I already migrated the amp executor, please go ahead and replace migrate process handling with the new command runner @backend/src/command_runner.rs . If there are any missing functions ask me about them. Migrate @backend/src/executors/dev_server.rs to be compatible.

Migrate executor (vibe-kanban d3ac2aa5)

Ive added an abstract command runner to enable local and remote execution later. I already migrated the amp executor, please go ahead and replace migrate process handling with the new command runner @backend/src/command_runner.rs . If there are any missing functions ask me about them. Migrate backend/src/executors/setup_script.rs to be compatible.

Fmt + lint

* Refactor CommandRunner initialization to use new() method for improved environment handling

* Add basic cloud runner and test scripts

Enhance cloud runner and command runner for true streaming support

- Refactor process management in cloud runner to use ProcessEntry struct for better handling of stdout and stderr streams.
- Implement true chunk-based streaming for command output via HTTP in command runner.
- Update test_remote to verify streaming functionality with real-time output capture.

Clippy and fmt

Refactor CommandStream and CommandProcess to remove dead code and improve stream handling

Refactor cloud runner and command runner to improve API response handling and streamline process status management

Change stream setup to be async

* Revert "Change stream setup to be async"

This reverts commit 79b5cde12aefafe9e669b93167036c8c6adf9145.

Revert "Refactor cloud runner and command runner to improve API response handling and streamline process status management"

This reverts commit 3cc03ff82424bd715a6f20f3124bd7bf80bc2d72.

Revert "Refactor CommandStream and CommandProcess to remove dead code and improve stream handling"

This reverts commit dcab0fcd9622416b7881af4add513b371894e408.

* refactor: remove unused imports and update command execution to use CommandProcess

* refactor: clean up CommandRunner and CommandProcess by removing dead code and updating initialization logic

* Fix improts

* refactor commandexecutors into local and remote

* refactor: update stream methods to be asynchronous across command execution components

* refactor: update command runner references; remove remote test binary; remove debug script

* Remove unused stdout alias

* Clippy

* refactor: consolidate CommandExitStatus implementations for local and remote processes

* refactor: replace CreateCommandRequest with CommandRunnerArgs in command execution

* refactor: optimize stream creation by using concurrent HTTP requests
This commit is contained in:
Alex Netsch
2025-07-24 11:44:57 +01:00
committed by GitHub
parent 96f27ff8bc
commit 2197dd064d
22 changed files with 2752 additions and 585 deletions

View File

@@ -9,11 +9,17 @@ build = "build.rs"
name = "vibe_kanban"
path = "src/lib.rs"
[[bin]]
name = "cloud-runner"
path = "src/bin/cloud_runner.rs"
[lints.clippy]
uninlined-format-args = "allow"
[dependencies]
tokio = { workspace = true }
tokio-util = { version = "0.7", features = ["io"] }
bytes = "1.0"
axum = { workspace = true }
tower-http = { workspace = true }
serde = { workspace = true }

View File

@@ -1,11 +1,10 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
#[cfg(unix)]
use nix::{sys::signal::Signal, unistd::Pid};
use tokio::sync::{Mutex, RwLock as TokioRwLock};
use uuid::Uuid;
use crate::{
command_runner,
models::Environment,
services::{generate_user_id, AnalyticsConfig, AnalyticsService},
};
@@ -22,7 +21,7 @@ pub enum ExecutionType {
pub struct RunningExecution {
pub task_attempt_id: Uuid,
pub _execution_type: ExecutionType,
pub child: command_group::AsyncGroupChild,
pub child: command_runner::CommandProcess,
}
#[derive(Debug, Clone)]
@@ -91,7 +90,7 @@ impl AppState {
let mut completed_executions = Vec::new();
for (execution_id, running_exec) in executions.iter_mut() {
match running_exec.child.try_wait() {
match running_exec.child.try_wait().await {
Ok(Some(status)) => {
let success = status.success();
let exit_code = status.code().map(|c| c as i64);
@@ -140,24 +139,11 @@ impl AppState {
return Ok(false);
};
// hit the whole process group, not just the leader
#[cfg(unix)]
{
use nix::{sys::signal::killpg, unistd::getpgid};
let pgid = getpgid(Some(Pid::from_raw(exec.child.id().unwrap() as i32)))?;
for sig in [Signal::SIGINT, Signal::SIGTERM, Signal::SIGKILL] {
killpg(pgid, sig)?;
tokio::time::sleep(Duration::from_secs(2)).await;
if exec.child.try_wait()?.is_some() {
break; // gone!
}
}
}
// final fallback command_group already targets the group
exec.child.kill().await.ok();
exec.child.wait().await.ok(); // reap
// Kill the process using CommandRunner's kill method
exec.child
.kill()
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
// only NOW remove it
executions.remove(&execution_id);

View File

@@ -0,0 +1,401 @@
use std::{collections::HashMap, sync::Arc};
use axum::{
body::Body,
extract::{Path, State},
http::StatusCode,
response::{Json, Response},
routing::{delete, get, post},
Router,
};
use serde::Serialize;
use tokio::sync::Mutex;
use tokio_util::io::ReaderStream;
use tracing_subscriber::prelude::*;
use uuid::Uuid;
use vibe_kanban::command_runner::{CommandProcess, CommandRunner, CommandRunnerArgs};
// Structure to hold process and its streams
struct ProcessEntry {
process: CommandProcess,
// Store the actual stdout/stderr streams for direct streaming
stdout_stream: Option<Box<dyn tokio::io::AsyncRead + Unpin + Send>>,
stderr_stream: Option<Box<dyn tokio::io::AsyncRead + Unpin + Send>>,
completed: Arc<Mutex<bool>>,
}
impl std::fmt::Debug for ProcessEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProcessEntry")
.field("process", &self.process)
.field("stdout_stream", &self.stdout_stream.is_some())
.field("stderr_stream", &self.stderr_stream.is_some())
.field("completed", &self.completed)
.finish()
}
}
// Application state to manage running processes
#[derive(Clone)]
struct AppState {
processes: Arc<Mutex<HashMap<String, ProcessEntry>>>,
}
// Response type for API responses
#[derive(Debug, Serialize)]
struct ApiResponse<T> {
success: bool,
data: Option<T>,
error: Option<String>,
}
impl<T> ApiResponse<T> {
fn success(data: T) -> Self {
Self {
success: true,
data: Some(data),
error: None,
}
}
#[allow(dead_code)]
fn error(message: String) -> Self {
Self {
success: false,
data: None,
error: Some(message),
}
}
}
// Response type for command creation
#[derive(Debug, Serialize)]
struct CreateCommandResponse {
process_id: String,
}
// Response type for process status
#[derive(Debug, Serialize)]
struct ProcessStatusResponse {
process_id: String,
running: bool,
exit_code: Option<i32>,
success: Option<bool>,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize tracing
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "cloud_runner=info".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
// Create application state
let app_state = AppState {
processes: Arc::new(Mutex::new(HashMap::new())),
};
// Build router
let app = Router::new()
.route("/health", get(health_check))
.route("/commands", post(create_command))
.route("/commands/:process_id", delete(kill_command))
.route("/commands/:process_id/status", get(get_process_status))
.route("/commands/:process_id/stdout", get(get_process_stdout))
.route("/commands/:process_id/stderr", get(get_process_stderr))
.with_state(app_state);
// Get port from environment or default to 8000
let port = std::env::var("PORT").unwrap_or_else(|_| "8000".to_string());
let addr = format!("0.0.0.0:{}", port);
tracing::info!("Cloud Runner server starting on {}", addr);
// Start the server
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
// Health check endpoint
async fn health_check() -> Json<ApiResponse<String>> {
Json(ApiResponse::success("Cloud Runner is healthy".to_string()))
}
// Create and start a new command
async fn create_command(
State(state): State<AppState>,
Json(request): Json<CommandRunnerArgs>,
) -> Result<Json<ApiResponse<CreateCommandResponse>>, StatusCode> {
tracing::info!("Creating command: {} {:?}", request.command, request.args);
// Create a local command runner from the request
let runner = CommandRunner::from_args(request);
// Start the process
let mut process = match runner.start().await {
Ok(process) => process,
Err(e) => {
tracing::error!("Failed to start command: {}", e);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
};
// Generate unique process ID
let process_id = Uuid::new_v4().to_string();
// Create completion flag
let completed = Arc::new(Mutex::new(false));
// Get the streams from the process - we'll store them directly
let mut streams = match process.stream().await {
Ok(streams) => streams,
Err(e) => {
tracing::error!("Failed to get process streams: {}", e);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
};
// Extract the streams for direct use
let stdout_stream = streams.stdout.take();
let stderr_stream = streams.stderr.take();
// Spawn a task to monitor process completion
{
let process_id_for_completion = process_id.clone();
let completed_flag = completed.clone();
let processes_ref = state.processes.clone();
tokio::spawn(async move {
// Wait for the process to complete
if let Ok(mut processes) = processes_ref.try_lock() {
if let Some(entry) = processes.get_mut(&process_id_for_completion) {
let _ = entry.process.wait().await;
*completed_flag.lock().await = true;
tracing::debug!("Marked process {} as completed", process_id_for_completion);
}
}
});
}
// Create process entry
let entry = ProcessEntry {
process,
stdout_stream,
stderr_stream,
completed: completed.clone(),
};
// Store the process entry
{
let mut processes = state.processes.lock().await;
processes.insert(process_id.clone(), entry);
}
tracing::info!("Command started with process_id: {}", process_id);
Ok(Json(ApiResponse::success(CreateCommandResponse {
process_id,
})))
}
// Kill a running command
async fn kill_command(
State(state): State<AppState>,
Path(process_id): Path<String>,
) -> Result<Json<ApiResponse<String>>, StatusCode> {
tracing::info!("Killing command with process_id: {}", process_id);
let mut processes = state.processes.lock().await;
if let Some(mut entry) = processes.remove(&process_id) {
// First check if the process has already finished
match entry.process.status().await {
Ok(Some(_)) => {
// Process already finished, consider kill successful
tracing::info!(
"Process {} already completed, kill considered successful",
process_id
);
Ok(Json(ApiResponse::success(
"Process was already completed".to_string(),
)))
}
Ok(None) => {
// Process still running, attempt to kill
match entry.process.kill().await {
Ok(()) => {
tracing::info!("Successfully killed process: {}", process_id);
Ok(Json(ApiResponse::success(
"Process killed successfully".to_string(),
)))
}
Err(e) => {
tracing::error!("Failed to kill process {}: {}", process_id, e);
// Check if it's a "No such process" error (process finished during kill)
if e.to_string().contains("No such process") {
tracing::info!("Process {} finished during kill attempt", process_id);
Ok(Json(ApiResponse::success(
"Process finished during kill attempt".to_string(),
)))
} else {
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
}
Err(e) => {
tracing::error!("Failed to check status for process {}: {}", process_id, e);
// Still attempt to kill
match entry.process.kill().await {
Ok(()) => {
tracing::info!("Successfully killed process: {}", process_id);
Ok(Json(ApiResponse::success(
"Process killed successfully".to_string(),
)))
}
Err(e) => {
tracing::error!("Failed to kill process {}: {}", process_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
}
} else {
tracing::warn!("Process not found: {}", process_id);
Err(StatusCode::NOT_FOUND)
}
}
// Get status of a running command
async fn get_process_status(
State(state): State<AppState>,
Path(process_id): Path<String>,
) -> Result<Json<ApiResponse<ProcessStatusResponse>>, StatusCode> {
tracing::info!("Getting status for process_id: {}", process_id);
let mut processes = state.processes.lock().await;
if let Some(entry) = processes.get_mut(&process_id) {
match entry.process.status().await {
Ok(Some(exit_status)) => {
// Process has completed
let response = ProcessStatusResponse {
process_id: process_id.clone(),
running: false,
exit_code: exit_status.code(),
success: Some(exit_status.success()),
};
Ok(Json(ApiResponse::success(response)))
}
Ok(None) => {
// Process is still running
let response = ProcessStatusResponse {
process_id: process_id.clone(),
running: true,
exit_code: None,
success: None,
};
Ok(Json(ApiResponse::success(response)))
}
Err(e) => {
tracing::error!("Failed to get status for process {}: {}", process_id, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
} else {
tracing::warn!("Process not found: {}", process_id);
Err(StatusCode::NOT_FOUND)
}
}
// Get stdout stream for a running command (direct streaming, no buffering)
async fn get_process_stdout(
State(state): State<AppState>,
Path(process_id): Path<String>,
) -> Result<Response, StatusCode> {
tracing::info!(
"Starting direct stdout stream for process_id: {}",
process_id
);
let mut processes = state.processes.lock().await;
if let Some(entry) = processes.get_mut(&process_id) {
// Take ownership of stdout directly for streaming
if let Some(stdout) = entry.stdout_stream.take() {
drop(processes); // Release the lock early
// Convert the AsyncRead (stdout) directly into an HTTP stream
let stream = ReaderStream::new(stdout);
let response = Response::builder()
.header("content-type", "application/octet-stream")
.header("cache-control", "no-cache")
.body(Body::from_stream(stream))
.map_err(|e| {
tracing::error!("Failed to build response stream: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(response)
} else {
tracing::error!(
"Stdout already taken or unavailable for process {}",
process_id
);
Err(StatusCode::GONE)
}
} else {
tracing::warn!("Process not found for stdout: {}", process_id);
Err(StatusCode::NOT_FOUND)
}
}
// Get stderr stream for a running command (direct streaming, no buffering)
async fn get_process_stderr(
State(state): State<AppState>,
Path(process_id): Path<String>,
) -> Result<Response, StatusCode> {
tracing::info!(
"Starting direct stderr stream for process_id: {}",
process_id
);
let mut processes = state.processes.lock().await;
if let Some(entry) = processes.get_mut(&process_id) {
// Take ownership of stderr directly for streaming
if let Some(stderr) = entry.stderr_stream.take() {
drop(processes); // Release the lock early
// Convert the AsyncRead (stderr) directly into an HTTP stream
let stream = ReaderStream::new(stderr);
let response = Response::builder()
.header("content-type", "application/octet-stream")
.header("cache-control", "no-cache")
.body(Body::from_stream(stream))
.map_err(|e| {
tracing::error!("Failed to build response stream: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(response)
} else {
tracing::error!(
"Stderr already taken or unavailable for process {}",
process_id
);
Err(StatusCode::GONE)
}
} else {
tracing::warn!("Process not found for stderr: {}", process_id);
Err(StatusCode::NOT_FOUND)
}
}

View File

@@ -0,0 +1,659 @@
use std::env;
use vibe_kanban::command_runner::CommandRunner;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set up remote execution
env::set_var("CLOUD_EXECUTION", "1");
env::set_var("CLOUD_SERVER_URL", "http://localhost:8000");
println!("🚀 Testing remote CommandRunner...");
// Test 1: Simple echo command
println!("\n📝 Test 1: Echo command");
let mut runner = CommandRunner::new();
let mut process = runner
.command("echo")
.arg("Hello from remote!")
.start()
.await?;
println!("✅ Successfully started remote echo command!");
// Kill it (though echo probably finished already)
match process.kill().await {
Ok(()) => println!("✅ Successfully killed echo process"),
Err(e) => println!("⚠️ Kill failed (probably already finished): {}", e),
}
// Test 2: Long-running command
println!("\n⏰ Test 2: Sleep command (5 seconds)");
let mut runner2 = CommandRunner::new();
let mut process2 = runner2.command("sleep").arg("5").start().await?;
println!("✅ Successfully started remote sleep command!");
// Wait a bit then kill it
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
process2.kill().await?;
println!("✅ Successfully killed sleep process!");
// Test 3: Command with environment variables
println!("\n🌍 Test 3: Environment variables");
let mut runner3 = CommandRunner::new();
let mut process3 = runner3
.command("printenv")
.arg("TEST_VAR")
.env("TEST_VAR", "remote_test_value")
.start()
.await?;
println!("✅ Successfully started remote printenv command!");
process3.kill().await.ok(); // Don't fail if already finished
// Test 4: Working directory
println!("\n📁 Test 4: Working directory");
let mut runner4 = CommandRunner::new();
let mut process4 = runner4.command("pwd").working_dir("/tmp").start().await?;
println!("✅ Successfully started remote pwd command!");
process4.kill().await.ok(); // Don't fail if already finished
// Test 5: Process Status Checking (TDD - These will FAIL initially)
println!("\n📊 Test 5: Process Status Checking (TDD)");
// Test 5a: Status of running process
let mut runner5a = CommandRunner::new();
let mut process5a = runner5a.command("sleep").arg("3").start().await?;
println!("✅ Started sleep process for status testing");
// This should return None (still running)
match process5a.status().await {
Ok(None) => println!("✅ Status correctly shows process still running"),
Ok(Some(status)) => println!(
"⚠️ Process finished unexpectedly with status: {:?}",
status
),
Err(e) => println!("❌ Status check failed (expected for now): {}", e),
}
// Test try_wait (non-blocking)
match process5a.try_wait().await {
Ok(None) => println!("✅ try_wait correctly shows process still running"),
Ok(Some(status)) => println!(
"⚠️ Process finished unexpectedly with status: {:?}",
status
),
Err(e) => println!("❌ try_wait failed (expected for now): {}", e),
}
// Kill the process to test status of completed process
process5a.kill().await.ok();
// Test 5b: Status of completed process
let mut runner5b = CommandRunner::new();
let mut process5b = runner5b.command("echo").arg("status test").start().await?;
println!("✅ Started echo process for completion status testing");
// Wait for process to complete
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
match process5b.status().await {
Ok(Some(status)) => {
println!(
"✅ Status correctly shows completed process: success={}, code={:?}",
status.success(),
status.code()
);
}
Ok(None) => println!("⚠️ Process still running (might need more time)"),
Err(e) => println!("❌ Status check failed (expected for now): {}", e),
}
// Test 5c: Wait for process completion
let mut runner5c = CommandRunner::new();
let mut process5c = runner5c.command("echo").arg("wait test").start().await?;
println!("✅ Started echo process for wait testing");
match process5c.wait().await {
Ok(status) => {
println!(
"✅ Wait completed successfully: success={}, code={:?}",
status.success(),
status.code()
);
}
Err(e) => println!("❌ Wait failed (expected for now): {}", e),
}
// Test 6: Output Streaming (TDD - These will FAIL initially)
println!("\n🌊 Test 6: Output Streaming (TDD)");
// Test 6a: Stdout streaming
let mut runner6a = CommandRunner::new();
let mut process6a = runner6a
.command("echo")
.arg("Hello stdout streaming!")
.start()
.await?;
println!("✅ Started echo process for stdout streaming test");
// Give the server a moment to capture output from fast commands like echo
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
match process6a.stream().await {
Ok(mut stream) => {
println!("✅ Got streams from process");
if let Some(stdout) = &mut stream.stdout {
use tokio::io::AsyncReadExt;
let mut buffer = Vec::new();
match stdout.read_to_end(&mut buffer).await {
Ok(bytes_read) => {
let output = String::from_utf8_lossy(&buffer);
if bytes_read > 0 && output.contains("Hello stdout streaming") {
println!("✅ Successfully read stdout: '{}'", output.trim());
} else if bytes_read == 0 {
println!(
"❌ No stdout data received (expected for now - empty streams)"
);
} else {
println!("⚠️ Unexpected stdout content: '{}'", output);
}
}
Err(e) => println!("❌ Failed to read stdout: {}", e),
}
} else {
println!("❌ No stdout stream available (expected for now)");
}
}
Err(e) => println!("❌ Failed to get streams: {}", e),
}
// Test 6b: Stderr streaming
let mut runner6b = CommandRunner::new();
let mut process6b = runner6b
.command("bash")
.arg("-c")
.arg("echo 'Error message' >&2")
.start()
.await?;
println!("✅ Started bash process for stderr streaming test");
// Give the server a moment to capture output from fast commands
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
match process6b.stream().await {
Ok(mut stream) => {
if let Some(stderr) = &mut stream.stderr {
use tokio::io::AsyncReadExt;
let mut buffer = Vec::new();
match stderr.read_to_end(&mut buffer).await {
Ok(bytes_read) => {
let output = String::from_utf8_lossy(&buffer);
if bytes_read > 0 && output.contains("Error message") {
println!("✅ Successfully read stderr: '{}'", output.trim());
} else if bytes_read == 0 {
println!(
"❌ No stderr data received (expected for now - empty streams)"
);
} else {
println!("⚠️ Unexpected stderr content: '{}'", output);
}
}
Err(e) => println!("❌ Failed to read stderr: {}", e),
}
} else {
println!("❌ No stderr stream available (expected for now)");
}
}
Err(e) => println!("❌ Failed to get streams: {}", e),
}
// Test 6c: Streaming from long-running process
let mut runner6c = CommandRunner::new();
let mut process6c = runner6c
.command("bash")
.arg("-c")
.arg("for i in {1..3}; do echo \"Line $i\"; sleep 0.1; done")
.start()
.await?;
println!("✅ Started bash process for streaming test");
// Give the server a moment to capture output from the command
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
match process6c.stream().await {
Ok(mut stream) => {
if let Some(stdout) = &mut stream.stdout {
use tokio::io::AsyncReadExt;
let mut buffer = [0u8; 1024];
// Try to read some data (this tests real-time streaming)
match tokio::time::timeout(
tokio::time::Duration::from_secs(2),
stdout.read(&mut buffer),
)
.await
{
Ok(Ok(bytes_read)) => {
let output = String::from_utf8_lossy(&buffer[..bytes_read]);
if bytes_read > 0 {
println!("✅ Successfully streamed output: '{}'", output.trim());
} else {
println!("❌ No streaming data received (expected for now)");
}
}
Ok(Err(e)) => println!("❌ Stream read error: {}", e),
Err(_) => {
println!("❌ Stream read timeout (expected for now - no real streaming)")
}
}
} else {
println!("❌ No stdout stream available for streaming test");
}
}
Err(e) => println!("❌ Failed to get streams for streaming test: {}", e),
}
// Clean up
process6c.kill().await.ok();
// Test 7: Server Status API Endpoint (TDD - These will FAIL initially)
println!("\n🔍 Test 7: Server Status API Endpoint (TDD)");
// Create a process first
let client = reqwest::Client::new();
let command_request = serde_json::json!({
"command": "sleep",
"args": ["5"],
"working_dir": null,
"env_vars": [],
"stdin": null
});
let response = client
.post("http://localhost:8000/commands")
.json(&command_request)
.send()
.await?;
if response.status().is_success() {
let body: serde_json::Value = response.json().await?;
if let Some(process_id) = body["data"]["process_id"].as_str() {
println!("✅ Created process for status API test: {}", process_id);
// Test 7a: GET /commands/{id}/status for running process
let status_url = format!("http://localhost:8000/commands/{}/status", process_id);
match client.get(&status_url).send().await {
Ok(response) => {
if response.status().is_success() {
match response.json::<serde_json::Value>().await {
Ok(status_body) => {
println!("✅ Got status response: {}", status_body);
// Check expected structure
if let Some(data) = status_body.get("data") {
if let Some(running) =
data.get("running").and_then(|v| v.as_bool())
{
if running {
println!(
"✅ Status correctly shows process is running"
);
} else {
println!("⚠️ Process already finished");
}
} else {
println!("❌ Missing 'running' field in status response");
}
} else {
println!("❌ Missing 'data' field in status response");
}
}
Err(e) => println!("❌ Failed to parse status JSON: {}", e),
}
} else {
println!(
"❌ Status API returned error: {} (expected for now)",
response.status()
);
}
}
Err(e) => println!("❌ Status API request failed (expected for now): {}", e),
}
// Kill the process
let _ = client
.delete(format!("http://localhost:8000/commands/{}", process_id))
.send()
.await;
}
}
// Test 7b: Status of completed process
let quick_command = serde_json::json!({
"command": "echo",
"args": ["quick command"],
"working_dir": null,
"env_vars": [],
"stdin": null
});
let response = client
.post("http://localhost:8000/commands")
.json(&quick_command)
.send()
.await?;
if response.status().is_success() {
let body: serde_json::Value = response.json().await?;
if let Some(process_id) = body["data"]["process_id"].as_str() {
println!(
"✅ Created quick process for completed status test: {}",
process_id
);
// Wait for it to complete
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let status_url = format!("http://localhost:8000/commands/{}/status", process_id);
match client.get(&status_url).send().await {
Ok(response) => {
if response.status().is_success() {
match response.json::<serde_json::Value>().await {
Ok(status_body) => {
println!("✅ Got completed status response: {}", status_body);
if let Some(data) = status_body.get("data") {
if let Some(exit_code) = data.get("exit_code") {
println!("✅ Status includes exit code: {}", exit_code);
}
if let Some(success) = data.get("success") {
println!("✅ Status includes success flag: {}", success);
}
}
}
Err(e) => println!("❌ Failed to parse completed status JSON: {}", e),
}
} else {
println!(
"❌ Completed status API returned error: {}",
response.status()
);
}
}
Err(e) => println!("❌ Completed status API request failed: {}", e),
}
}
}
// Test 7c: Status of non-existent process (error handling)
let fake_id = "non-existent-process-id";
let status_url = format!("http://localhost:8000/commands/{}/status", fake_id);
match client.get(&status_url).send().await {
Ok(response) => {
if response.status() == reqwest::StatusCode::NOT_FOUND {
println!("✅ Status API correctly returns 404 for non-existent process");
} else {
println!(
"❌ Status API should return 404 for non-existent process, got: {}",
response.status()
);
}
}
Err(e) => println!("❌ Error testing non-existent process status: {}", e),
}
// Test 8: Server Streaming API Endpoint (TDD - These will FAIL initially)
println!("\n📡 Test 8: Server Streaming API Endpoint (TDD)");
// Create a process that generates output
let stream_command = serde_json::json!({
"command": "bash",
"args": ["-c", "for i in {1..3}; do echo \"Stream line $i\"; sleep 0.1; done"],
"working_dir": null,
"env_vars": [],
"stdin": null
});
let response = client
.post("http://localhost:8000/commands")
.json(&stream_command)
.send()
.await?;
if response.status().is_success() {
let body: serde_json::Value = response.json().await?;
if let Some(process_id) = body["data"]["process_id"].as_str() {
println!("✅ Created streaming process: {}", process_id);
// Test 8a: GET /commands/{id}/stream endpoint
let stream_url = format!("http://localhost:8000/commands/{}/stream", process_id);
match client.get(&stream_url).send().await {
Ok(response) => {
if response.status().is_success() {
println!("✅ Stream endpoint accessible");
if let Some(content_type) = response.headers().get("content-type") {
println!("✅ Content-Type: {:?}", content_type);
}
// Try to read the response body
match response.text().await {
Ok(text) => {
if !text.is_empty() {
println!("✅ Received streaming data: '{}'", text.trim());
} else {
println!("❌ No streaming data received (expected for now)");
}
}
Err(e) => println!("❌ Failed to read stream response: {}", e),
}
} else {
println!(
"❌ Stream endpoint returned error: {} (expected for now)",
response.status()
);
}
}
Err(e) => println!("❌ Stream API request failed (expected for now): {}", e),
}
// Clean up
let _ = client
.delete(format!("http://localhost:8000/commands/{}", process_id))
.send()
.await;
}
}
// Test 8b: Streaming from non-existent process
let fake_stream_url = format!("http://localhost:8000/commands/{}/stream", "fake-id");
match client.get(&fake_stream_url).send().await {
Ok(response) => {
if response.status() == reqwest::StatusCode::NOT_FOUND {
println!("✅ Stream API correctly returns 404 for non-existent process");
} else {
println!(
"❌ Stream API should return 404 for non-existent process, got: {}",
response.status()
);
}
}
Err(e) => println!("❌ Error testing non-existent process stream: {}", e),
}
// Test 9: True Chunk-Based Streaming Verification (Fixed)
println!("\n🌊 Test 9: True Chunk-Based Streaming Verification");
// Create a longer-running process to avoid timing issues
let stream_command = serde_json::json!({
"command": "bash",
"args": ["-c", "for i in {1..6}; do echo \"Chunk $i at $(date +%H:%M:%S.%3N)\"; sleep 0.5; done"],
"working_dir": null,
"env_vars": [],
"stdin": null
});
let response = client
.post("http://localhost:8000/commands")
.json(&stream_command)
.send()
.await?;
if response.status().is_success() {
let body: serde_json::Value = response.json().await?;
if let Some(process_id) = body["data"]["process_id"].as_str() {
println!(
"✅ Created streaming process: {} (will run ~3 seconds)",
process_id
);
// Test chunk-based streaming with the /stream endpoint
let stream_url = format!("http://localhost:8000/commands/{}/stream", process_id);
// Small delay to let the process start generating output
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let stream_response = client.get(&stream_url).send().await;
match stream_response {
Ok(response) => {
if response.status().is_success() {
println!("✅ Stream endpoint accessible");
let start_time = std::time::Instant::now();
println!("🔍 Reading streaming response:");
// Try to read the response in chunks using a simpler approach
let bytes = match tokio::time::timeout(
tokio::time::Duration::from_secs(4),
response.bytes(),
)
.await
{
Ok(Ok(bytes)) => bytes,
Ok(Err(e)) => {
println!(" ❌ Failed to read response: {}", e);
return Ok(());
}
Err(_) => {
println!(" ❌ Response read timeout");
return Ok(());
}
};
let response_text = String::from_utf8_lossy(&bytes);
let lines: Vec<&str> =
response_text.lines().filter(|l| !l.is_empty()).collect();
println!("📊 Response analysis:");
println!(" Total response size: {} bytes", bytes.len());
println!(" Number of lines: {}", lines.len());
println!(
" Read duration: {:.1}s",
start_time.elapsed().as_secs_f32()
);
if !lines.is_empty() {
println!(" Lines received:");
for (i, line) in lines.iter().enumerate() {
println!(" {}: '{}'", i + 1, line);
}
}
// The key insight: if we got multiple lines with different timestamps,
// it proves they were generated over time, even if delivered in one HTTP response
if lines.len() > 1 {
// Check if timestamps show progression
let first_line = lines[0];
let last_line = lines[lines.len() - 1];
if first_line != last_line {
println!("✅ STREAMING VERIFIED: {} lines with different content/timestamps!", lines.len());
println!(
" This proves the server captured streaming output over time"
);
if lines.len() >= 3 {
println!(" First: '{}'", first_line);
println!(" Last: '{}'", last_line);
}
} else {
println!(
"⚠️ Multiple identical lines - may indicate buffering issue"
);
}
} else if lines.len() == 1 {
println!("⚠️ Only 1 line received: '{}'", lines[0]);
println!(
" This suggests the process finished too quickly or timing issue"
);
} else {
println!("❌ No output lines received");
}
} else {
println!("❌ Stream endpoint error: {}", response.status());
}
}
Err(e) => println!("❌ Stream request failed: {}", e),
}
// Wait for process to complete, then verify final output
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
println!("\n🔍 Verification: Testing completed process output:");
let stdout_url = format!("http://localhost:8000/commands/{}/stdout", process_id);
match client.get(&stdout_url).send().await {
Ok(response) if response.status().is_success() => {
if let Ok(text) = response.text().await {
let final_lines: Vec<&str> =
text.lines().filter(|l| !l.is_empty()).collect();
println!(
"✅ Final stdout: {} lines, {} bytes",
final_lines.len(),
text.len()
);
if final_lines.len() >= 6 {
println!(
"✅ Process completed successfully - all expected output captured"
);
} else {
println!(
"⚠️ Expected 6 lines, got {} - process may have been interrupted",
final_lines.len()
);
}
}
}
_ => println!("⚠️ Final stdout check failed"),
}
// Clean up
let _ = client
.delete(format!("http://localhost:8000/commands/{}", process_id))
.send()
.await;
}
}
println!("\n🎉 All TDD tests completed!");
println!("💡 Expected failures show what needs to be implemented:");
println!(" 📊 Remote status/wait methods");
println!(" 🌊 Real output streaming");
println!(" 🔍 GET /commands/:id/status endpoint");
println!(" 📡 GET /commands/:id/stream endpoint");
println!("🔧 Time to make the tests pass! 🚀");
Ok(())
}

View File

@@ -0,0 +1,291 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncRead;
use crate::models::Environment;
mod local;
mod remote;
pub use local::LocalCommandExecutor;
pub use remote::RemoteCommandExecutor;
// Core trait that defines the interface for command execution
#[async_trait]
pub trait CommandExecutor: Send + Sync {
/// Start a process and return a handle to it
async fn start(
&self,
request: &CommandRunnerArgs,
) -> Result<Box<dyn ProcessHandle>, CommandError>;
}
// Trait for managing running processes
#[async_trait]
pub trait ProcessHandle: Send + Sync {
/// Check if the process is still running, return exit status if finished
async fn try_wait(&mut self) -> Result<Option<CommandExitStatus>, CommandError>;
/// Wait for the process to complete and return exit status
async fn wait(&mut self) -> Result<CommandExitStatus, CommandError>;
/// Kill the process
async fn kill(&mut self) -> Result<(), CommandError>;
/// Get streams for stdout and stderr
async fn stream(&mut self) -> Result<CommandStream, CommandError>;
/// Get process identifier (for debugging/logging)
fn process_id(&self) -> String;
/// Check current status (alias for try_wait for backward compatibility)
async fn status(&mut self) -> Result<Option<CommandExitStatus>, CommandError> {
self.try_wait().await
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommandRunnerArgs {
pub command: String,
pub args: Vec<String>,
pub working_dir: Option<String>,
pub env_vars: Vec<(String, String)>,
pub stdin: Option<String>,
}
pub struct CommandRunner {
executor: Box<dyn CommandExecutor>,
command: Option<String>,
args: Vec<String>,
working_dir: Option<String>,
env_vars: Vec<(String, String)>,
stdin: Option<String>,
}
impl Default for CommandRunner {
fn default() -> Self {
Self::new()
}
}
pub struct CommandProcess {
handle: Box<dyn ProcessHandle>,
}
impl std::fmt::Debug for CommandProcess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CommandProcess")
.field("process_id", &self.handle.process_id())
.finish()
}
}
#[derive(Debug)]
pub enum CommandError {
SpawnFailed {
command: String,
error: std::io::Error,
},
StatusCheckFailed {
error: std::io::Error,
},
KillFailed {
error: std::io::Error,
},
ProcessNotStarted,
NoCommandSet,
IoError {
error: std::io::Error,
},
}
impl From<std::io::Error> for CommandError {
fn from(error: std::io::Error) -> Self {
CommandError::IoError { error }
}
}
impl std::fmt::Display for CommandError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
CommandError::SpawnFailed { command, error } => {
write!(f, "Failed to spawn command '{}': {}", command, error)
}
CommandError::StatusCheckFailed { error } => {
write!(f, "Failed to check command status: {}", error)
}
CommandError::KillFailed { error } => {
write!(f, "Failed to kill command: {}", error)
}
CommandError::ProcessNotStarted => {
write!(f, "Process has not been started yet")
}
CommandError::NoCommandSet => {
write!(f, "No command has been set")
}
CommandError::IoError { error } => {
write!(f, "Failed to spawn command: {}", error)
}
}
}
}
impl std::error::Error for CommandError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CommandExitStatus {
/// Exit code (0 for success on most platforms)
code: Option<i32>,
/// Whether the process exited successfully
success: bool,
/// Unix signal that terminated the process (Unix only)
#[cfg(unix)]
signal: Option<i32>,
/// Optional remote process identifier for cloud execution
remote_process_id: Option<String>,
/// Optional session identifier for remote execution tracking
remote_session_id: Option<String>,
}
impl CommandExitStatus {
/// Returns true if the process exited successfully
pub fn success(&self) -> bool {
self.success
}
/// Returns the exit code of the process, if available
pub fn code(&self) -> Option<i32> {
self.code
}
}
pub struct CommandStream {
pub stdout: Option<Box<dyn AsyncRead + Unpin + Send>>,
pub stderr: Option<Box<dyn AsyncRead + Unpin + Send>>,
}
impl CommandRunner {
pub fn new() -> Self {
let env = std::env::var("ENVIRONMENT").unwrap_or_else(|_| "local".to_string());
let mode = env.parse().unwrap_or(Environment::Local);
match mode {
Environment::Cloud => CommandRunner {
executor: Box::new(RemoteCommandExecutor::new()),
command: None,
args: Vec::new(),
working_dir: None,
env_vars: Vec::new(),
stdin: None,
},
Environment::Local => CommandRunner {
executor: Box::new(LocalCommandExecutor::new()),
command: None,
args: Vec::new(),
working_dir: None,
env_vars: Vec::new(),
stdin: None,
},
}
}
pub fn command(&mut self, cmd: &str) -> &mut Self {
self.command = Some(cmd.to_string());
self
}
pub fn get_program(&self) -> &str {
self.command.as_deref().unwrap_or("")
}
pub fn get_args(&self) -> &[String] {
&self.args
}
pub fn get_current_dir(&self) -> Option<&str> {
self.working_dir.as_deref()
}
pub fn arg(&mut self, arg: &str) -> &mut Self {
self.args.push(arg.to_string());
self
}
pub fn stdin(&mut self, prompt: &str) -> &mut Self {
self.stdin = Some(prompt.to_string());
self
}
pub fn working_dir(&mut self, dir: &str) -> &mut Self {
self.working_dir = Some(dir.to_string());
self
}
pub fn env(&mut self, key: &str, val: &str) -> &mut Self {
self.env_vars.push((key.to_string(), val.to_string()));
self
}
/// Convert the current CommandRunner state to a CreateCommandRequest
pub fn to_args(&self) -> Option<CommandRunnerArgs> {
Some(CommandRunnerArgs {
command: self.command.clone()?,
args: self.args.clone(),
working_dir: self.working_dir.clone(),
env_vars: self.env_vars.clone(),
stdin: self.stdin.clone(),
})
}
/// Create a CommandRunner from a CreateCommandRequest, respecting the environment
#[allow(dead_code)]
pub fn from_args(request: CommandRunnerArgs) -> Self {
let mut runner = Self::new();
runner.command(&request.command);
for arg in &request.args {
runner.arg(arg);
}
if let Some(dir) = &request.working_dir {
runner.working_dir(dir);
}
for (key, value) in &request.env_vars {
runner.env(key, value);
}
if let Some(stdin) = &request.stdin {
runner.stdin(stdin);
}
runner
}
pub async fn start(&self) -> Result<CommandProcess, CommandError> {
let request = self.to_args().ok_or(CommandError::NoCommandSet)?;
let handle = self.executor.start(&request).await?;
Ok(CommandProcess { handle })
}
}
impl CommandProcess {
#[allow(dead_code)]
pub async fn status(&mut self) -> Result<Option<CommandExitStatus>, CommandError> {
self.handle.status().await
}
pub async fn try_wait(&mut self) -> Result<Option<CommandExitStatus>, CommandError> {
self.handle.try_wait().await
}
pub async fn kill(&mut self) -> Result<(), CommandError> {
self.handle.kill().await
}
pub async fn stream(&mut self) -> Result<CommandStream, CommandError> {
self.handle.stream().await
}
#[allow(dead_code)]
pub async fn wait(&mut self) -> Result<CommandExitStatus, CommandError> {
self.handle.wait().await
}
}

View File

@@ -0,0 +1,703 @@
use std::{process::Stdio, time::Duration};
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
#[cfg(unix)]
use nix::{
sys::signal::{killpg, Signal},
unistd::{getpgid, Pid},
};
use tokio::process::Command;
use crate::command_runner::{
CommandError, CommandExecutor, CommandExitStatus, CommandRunnerArgs, CommandStream,
ProcessHandle,
};
pub struct LocalCommandExecutor;
impl Default for LocalCommandExecutor {
fn default() -> Self {
Self::new()
}
}
impl LocalCommandExecutor {
pub fn new() -> Self {
Self
}
}
#[async_trait]
impl CommandExecutor for LocalCommandExecutor {
async fn start(
&self,
request: &CommandRunnerArgs,
) -> Result<Box<dyn ProcessHandle>, CommandError> {
let mut cmd = Command::new(&request.command);
cmd.args(&request.args)
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
if let Some(dir) = &request.working_dir {
cmd.current_dir(dir);
}
for (key, val) in &request.env_vars {
cmd.env(key, val);
}
let mut child = cmd.group_spawn().map_err(|e| CommandError::SpawnFailed {
command: format!("{} {}", request.command, request.args.join(" ")),
error: e,
})?;
if let Some(prompt) = &request.stdin {
// Write prompt to stdin safely
if let Some(mut stdin) = child.inner().stdin.take() {
use tokio::io::AsyncWriteExt;
stdin.write_all(prompt.as_bytes()).await?;
stdin.shutdown().await?;
}
}
Ok(Box::new(LocalProcessHandle::new(child)))
}
}
pub struct LocalProcessHandle {
child: Option<AsyncGroupChild>,
process_id: String,
}
impl LocalProcessHandle {
pub fn new(mut child: AsyncGroupChild) -> Self {
let process_id = child
.inner()
.id()
.map(|id| id.to_string())
.unwrap_or_else(|| "unknown".to_string());
Self {
child: Some(child),
process_id,
}
}
}
#[async_trait]
impl ProcessHandle for LocalProcessHandle {
async fn try_wait(&mut self) -> Result<Option<CommandExitStatus>, CommandError> {
match &mut self.child {
Some(child) => match child
.inner()
.try_wait()
.map_err(|e| CommandError::StatusCheckFailed { error: e })?
{
Some(status) => Ok(Some(CommandExitStatus::from_local(status))),
None => Ok(None),
},
None => Err(CommandError::ProcessNotStarted),
}
}
async fn wait(&mut self) -> Result<CommandExitStatus, CommandError> {
match &mut self.child {
Some(child) => {
let status = child
.wait()
.await
.map_err(|e| CommandError::KillFailed { error: e })?;
Ok(CommandExitStatus::from_local(status))
}
None => Err(CommandError::ProcessNotStarted),
}
}
async fn kill(&mut self) -> Result<(), CommandError> {
match &mut self.child {
Some(child) => {
// hit the whole process group, not just the leader
#[cfg(unix)]
{
if let Some(pid) = child.inner().id() {
let pgid = getpgid(Some(Pid::from_raw(pid as i32))).map_err(|e| {
CommandError::KillFailed {
error: std::io::Error::other(e),
}
})?;
for sig in [Signal::SIGINT, Signal::SIGTERM, Signal::SIGKILL] {
if let Err(e) = killpg(pgid, sig) {
tracing::warn!(
"Failed to send signal {:?} to process group {}: {}",
sig,
pgid,
e
);
}
tokio::time::sleep(Duration::from_secs(2)).await;
if child
.inner()
.try_wait()
.map_err(|e| CommandError::StatusCheckFailed { error: e })?
.is_some()
{
break; // gone!
}
}
}
}
// final fallback command_group already targets the group
child
.kill()
.await
.map_err(|e| CommandError::KillFailed { error: e })?;
child
.wait()
.await
.map_err(|e| CommandError::KillFailed { error: e })?; // reap
// Clear the handle after successful kill
self.child = None;
Ok(())
}
None => Err(CommandError::ProcessNotStarted),
}
}
async fn stream(&mut self) -> Result<CommandStream, CommandError> {
match &mut self.child {
Some(child) => {
let stdout = child.inner().stdout.take();
let stderr = child.inner().stderr.take();
Ok(CommandStream::from_local(stdout, stderr))
}
None => Err(CommandError::ProcessNotStarted),
}
}
fn process_id(&self) -> String {
self.process_id.clone()
}
}
// Local-specific implementations for shared types
impl CommandExitStatus {
/// Create a CommandExitStatus from a std::process::ExitStatus (for local processes)
pub fn from_local(status: std::process::ExitStatus) -> Self {
Self {
code: status.code(),
success: status.success(),
#[cfg(unix)]
signal: {
use std::os::unix::process::ExitStatusExt;
status.signal()
},
remote_process_id: None,
remote_session_id: None,
}
}
}
impl CommandStream {
/// Create a CommandStream from local process streams
pub fn from_local(
stdout: Option<tokio::process::ChildStdout>,
stderr: Option<tokio::process::ChildStderr>,
) -> Self {
Self {
stdout: stdout.map(|s| Box::new(s) as Box<dyn tokio::io::AsyncRead + Unpin + Send>),
stderr: stderr.map(|s| Box::new(s) as Box<dyn tokio::io::AsyncRead + Unpin + Send>),
}
}
}
#[cfg(test)]
mod tests {
use std::process::Stdio;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
process::Command,
};
use crate::command_runner::*;
// Helper function to create a comparison tokio::process::Command
async fn create_tokio_command(
cmd: &str,
args: &[&str],
working_dir: Option<&str>,
env_vars: &[(String, String)],
stdin_data: Option<&str>,
) -> Result<AsyncGroupChild, std::io::Error> {
let mut command = Command::new(cmd);
command
.args(args)
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
if let Some(dir) = working_dir {
command.current_dir(dir);
}
for (key, val) in env_vars {
command.env(key, val);
}
let mut child = command.group_spawn()?;
// Write stdin data if provided
if let Some(data) = stdin_data {
if let Some(mut stdin) = child.inner().stdin.take() {
stdin.write_all(data.as_bytes()).await?;
stdin.shutdown().await?;
}
}
Ok(child)
}
#[tokio::test]
async fn test_command_execution_comparison() {
// Ensure we're using local execution for this test
std::env::set_var("ENVIRONMENT", "local");
let test_message = "hello world";
// Test with CommandRunner
let mut runner = CommandRunner::new();
let mut process = runner
.command("echo")
.arg(test_message)
.start()
.await
.expect("CommandRunner should start echo command");
let mut stream = process.stream().await.expect("Should get stream");
let mut stdout_data = Vec::new();
if let Some(stdout) = &mut stream.stdout {
stdout
.read_to_end(&mut stdout_data)
.await
.expect("Should read stdout");
}
let runner_output = String::from_utf8(stdout_data).expect("Should be valid UTF-8");
// Test with tokio::process::Command
let mut tokio_child = create_tokio_command("echo", &[test_message], None, &[], None)
.await
.expect("Should start tokio command");
let mut tokio_stdout_data = Vec::new();
if let Some(stdout) = tokio_child.inner().stdout.take() {
let mut stdout = stdout;
stdout
.read_to_end(&mut tokio_stdout_data)
.await
.expect("Should read tokio stdout");
}
let tokio_output = String::from_utf8(tokio_stdout_data).expect("Should be valid UTF-8");
// Both should produce the same output
assert_eq!(runner_output.trim(), tokio_output.trim());
assert_eq!(runner_output.trim(), test_message);
}
#[tokio::test]
async fn test_stdin_handling() {
// Ensure we're using local execution for this test
std::env::set_var("ENVIRONMENT", "local");
let test_input = "test input data\n";
// Test with CommandRunner (using cat to echo stdin)
let mut runner = CommandRunner::new();
let mut process = runner
.command("cat")
.stdin(test_input)
.start()
.await
.expect("CommandRunner should start cat command");
let mut stream = process.stream().await.expect("Should get stream");
let mut stdout_data = Vec::new();
if let Some(stdout) = &mut stream.stdout {
stdout
.read_to_end(&mut stdout_data)
.await
.expect("Should read stdout");
}
let runner_output = String::from_utf8(stdout_data).expect("Should be valid UTF-8");
// Test with tokio::process::Command
let mut tokio_child = create_tokio_command("cat", &[], None, &[], Some(test_input))
.await
.expect("Should start tokio command");
let mut tokio_stdout_data = Vec::new();
if let Some(stdout) = tokio_child.inner().stdout.take() {
let mut stdout = stdout;
stdout
.read_to_end(&mut tokio_stdout_data)
.await
.expect("Should read tokio stdout");
}
let tokio_output = String::from_utf8(tokio_stdout_data).expect("Should be valid UTF-8");
// Both should echo the input
assert_eq!(runner_output, tokio_output);
assert_eq!(runner_output, test_input);
}
#[tokio::test]
async fn test_working_directory() {
// Use pwd command to check working directory
let test_dir = "/tmp";
// Test with CommandRunner
std::env::set_var("ENVIRONMENT", "local");
let mut runner = CommandRunner::new();
let mut process = runner
.command("pwd")
.working_dir(test_dir)
.start()
.await
.expect("CommandRunner should start pwd command");
let mut stream = process.stream().await.expect("Should get stream");
let mut stdout_data = Vec::new();
if let Some(stdout) = &mut stream.stdout {
stdout
.read_to_end(&mut stdout_data)
.await
.expect("Should read stdout");
}
let runner_output = String::from_utf8(stdout_data).expect("Should be valid UTF-8");
// Test with tokio::process::Command
let mut tokio_child = create_tokio_command("pwd", &[], Some(test_dir), &[], None)
.await
.expect("Should start tokio command");
let mut tokio_stdout_data = Vec::new();
if let Some(stdout) = tokio_child.inner().stdout.take() {
let mut stdout = stdout;
stdout
.read_to_end(&mut tokio_stdout_data)
.await
.expect("Should read tokio stdout");
}
let tokio_output = String::from_utf8(tokio_stdout_data).expect("Should be valid UTF-8");
// Both should show the same working directory
assert_eq!(runner_output.trim(), tokio_output.trim());
assert!(runner_output.trim().contains(test_dir));
}
#[tokio::test]
async fn test_environment_variables() {
let test_var = "TEST_VAR";
let test_value = "test_value_123";
// Test with CommandRunner
std::env::set_var("ENVIRONMENT", "local");
let mut runner = CommandRunner::new();
let mut process = runner
.command("printenv")
.arg(test_var)
.env(test_var, test_value)
.start()
.await
.expect("CommandRunner should start printenv command");
let mut stream = process.stream().await.expect("Should get stream");
let mut stdout_data = Vec::new();
if let Some(stdout) = &mut stream.stdout {
stdout
.read_to_end(&mut stdout_data)
.await
.expect("Should read stdout");
}
let runner_output = String::from_utf8(stdout_data).expect("Should be valid UTF-8");
// Test with tokio::process::Command
let env_vars = vec![(test_var.to_string(), test_value.to_string())];
let mut tokio_child = create_tokio_command("printenv", &[test_var], None, &env_vars, None)
.await
.expect("Should start tokio command");
let mut tokio_stdout_data = Vec::new();
if let Some(stdout) = tokio_child.inner().stdout.take() {
let mut stdout = stdout;
stdout
.read_to_end(&mut tokio_stdout_data)
.await
.expect("Should read tokio stdout");
}
let tokio_output = String::from_utf8(tokio_stdout_data).expect("Should be valid UTF-8");
// Both should show the same environment variable
assert_eq!(runner_output.trim(), tokio_output.trim());
assert_eq!(runner_output.trim(), test_value);
}
#[tokio::test]
async fn test_process_group_creation() {
// Test that both CommandRunner and tokio::process::Command create process groups
// We'll use a sleep command that can be easily killed
// Test with CommandRunner
std::env::set_var("ENVIRONMENT", "local");
let mut runner = CommandRunner::new();
let mut process = runner
.command("sleep")
.arg("10") // Sleep for 10 seconds
.start()
.await
.expect("CommandRunner should start sleep command");
// Check that process is running
let status = process.status().await.expect("Should check status");
assert!(status.is_none(), "Process should still be running");
// Kill the process (might fail if already exited)
let _ = process.kill().await;
// Wait a moment for the kill to take effect
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let final_status = process.status().await.expect("Should check final status");
assert!(
final_status.is_some(),
"Process should have exited after kill"
);
// Test with tokio::process::Command for comparison
let mut tokio_child = create_tokio_command("sleep", &["10"], None, &[], None)
.await
.expect("Should start tokio sleep command");
// Check that process is running
let tokio_status = tokio_child
.inner()
.try_wait()
.expect("Should check tokio status");
assert!(
tokio_status.is_none(),
"Tokio process should still be running"
);
// Kill the tokio process
tokio_child.kill().await.expect("Should kill tokio process");
// Wait a moment for the kill to take effect
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let tokio_final_status = tokio_child
.inner()
.try_wait()
.expect("Should check tokio final status");
assert!(
tokio_final_status.is_some(),
"Tokio process should have exited after kill"
);
}
#[tokio::test]
async fn test_kill_operation() {
// Test killing processes with both implementations
// Test CommandRunner kill
std::env::set_var("ENVIRONMENT", "local");
let mut runner = CommandRunner::new();
let mut process = runner
.command("sleep")
.arg("60") // Long sleep
.start()
.await
.expect("Should start CommandRunner sleep");
// Verify it's running
assert!(process
.status()
.await
.expect("Should check status")
.is_none());
// Kill and verify it stops (might fail if already exited)
let _ = process.kill().await;
// Give it time to die
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let exit_status = process.status().await.expect("Should get exit status");
assert!(exit_status.is_some(), "Process should have exited");
// Test tokio::process::Command kill for comparison
let mut tokio_child = create_tokio_command("sleep", &["60"], None, &[], None)
.await
.expect("Should start tokio sleep");
// Verify it's running
assert!(tokio_child
.inner()
.try_wait()
.expect("Should check tokio status")
.is_none());
// Kill and verify it stops
tokio_child.kill().await.expect("Should kill tokio process");
// Give it time to die
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
let tokio_exit_status = tokio_child
.inner()
.try_wait()
.expect("Should get tokio exit status");
assert!(
tokio_exit_status.is_some(),
"Tokio process should have exited"
);
}
#[tokio::test]
async fn test_status_monitoring() {
// Test status monitoring with a quick command
// Test with CommandRunner
std::env::set_var("ENVIRONMENT", "local");
let mut runner = CommandRunner::new();
let mut process = runner
.command("echo")
.arg("quick test")
.start()
.await
.expect("Should start CommandRunner echo");
// Initially might be running or might have finished quickly
let _initial_status = process.status().await.expect("Should check initial status");
// Wait for completion
let exit_status = process.wait().await.expect("Should wait for completion");
assert!(exit_status.success(), "Echo command should succeed");
// After wait, status should show completion
let final_status = process.status().await.expect("Should check final status");
assert!(
final_status.is_some(),
"Should have exit status after completion"
);
assert!(
final_status.unwrap().success(),
"Should show successful exit"
);
// Test with tokio::process::Command for comparison
let mut tokio_child = create_tokio_command("echo", &["quick test"], None, &[], None)
.await
.expect("Should start tokio echo");
// Wait for completion
let tokio_exit_status = tokio_child
.wait()
.await
.expect("Should wait for tokio completion");
assert!(
tokio_exit_status.success(),
"Tokio echo command should succeed"
);
// After wait, status should show completion
let tokio_final_status = tokio_child
.inner()
.try_wait()
.expect("Should check tokio final status");
assert!(
tokio_final_status.is_some(),
"Should have tokio exit status after completion"
);
assert!(
tokio_final_status.unwrap().success(),
"Should show tokio successful exit"
);
}
#[tokio::test]
async fn test_wait_for_completion() {
// Test waiting for process completion with specific exit codes
// Test successful command (exit code 0)
std::env::set_var("ENVIRONMENT", "local");
let mut runner = CommandRunner::new();
let mut process = runner
.command("true") // Command that exits with 0
.start()
.await
.expect("Should start true command");
let exit_status = process
.wait()
.await
.expect("Should wait for true completion");
assert!(exit_status.success(), "true command should succeed");
assert_eq!(exit_status.code(), Some(0), "true should exit with code 0");
// Test failing command (exit code 1)
let mut runner2 = CommandRunner::new();
let mut process2 = runner2
.command("false") // Command that exits with 1
.start()
.await
.expect("Should start false command");
let exit_status2 = process2
.wait()
.await
.expect("Should wait for false completion");
assert!(!exit_status2.success(), "false command should fail");
assert_eq!(
exit_status2.code(),
Some(1),
"false should exit with code 1"
);
// Compare with tokio::process::Command
let mut tokio_child = create_tokio_command("true", &[], None, &[], None)
.await
.expect("Should start tokio true");
let tokio_exit_status = tokio_child
.wait()
.await
.expect("Should wait for tokio true");
assert!(tokio_exit_status.success(), "tokio true should succeed");
assert_eq!(
tokio_exit_status.code(),
Some(0),
"tokio true should exit with code 0"
);
let mut tokio_child2 = create_tokio_command("false", &[], None, &[], None)
.await
.expect("Should start tokio false");
let tokio_exit_status2 = tokio_child2
.wait()
.await
.expect("Should wait for tokio false");
assert!(!tokio_exit_status2.success(), "tokio false should fail");
assert_eq!(
tokio_exit_status2.code(),
Some(1),
"tokio false should exit with code 1"
);
}
}

View File

@@ -0,0 +1,402 @@
use std::{
pin::Pin,
task::{Context, Poll},
};
use async_trait::async_trait;
use tokio::io::AsyncRead;
use crate::command_runner::{
CommandError, CommandExecutor, CommandExitStatus, CommandRunnerArgs, CommandStream,
ProcessHandle,
};
pub struct RemoteCommandExecutor {
cloud_server_url: String,
}
impl Default for RemoteCommandExecutor {
fn default() -> Self {
Self::new()
}
}
impl RemoteCommandExecutor {
pub fn new() -> Self {
let cloud_server_url = std::env::var("CLOUD_SERVER_URL")
.unwrap_or_else(|_| "http://localhost:8000".to_string());
Self { cloud_server_url }
}
}
#[async_trait]
impl CommandExecutor for RemoteCommandExecutor {
async fn start(
&self,
request: &CommandRunnerArgs,
) -> Result<Box<dyn ProcessHandle>, CommandError> {
let client = reqwest::Client::new();
let response = client
.post(format!("{}/commands", self.cloud_server_url))
.json(request)
.send()
.await
.map_err(|e| CommandError::IoError {
error: std::io::Error::other(e),
})?;
let result: serde_json::Value =
response.json().await.map_err(|e| CommandError::IoError {
error: std::io::Error::other(e),
})?;
let process_id =
result["data"]["process_id"]
.as_str()
.ok_or_else(|| CommandError::IoError {
error: std::io::Error::other(format!(
"Missing process_id in response: {}",
result
)),
})?;
Ok(Box::new(RemoteProcessHandle::new(
process_id.to_string(),
self.cloud_server_url.clone(),
)))
}
}
pub struct RemoteProcessHandle {
process_id: String,
cloud_server_url: String,
}
impl RemoteProcessHandle {
pub fn new(process_id: String, cloud_server_url: String) -> Self {
Self {
process_id,
cloud_server_url,
}
}
}
#[async_trait]
impl ProcessHandle for RemoteProcessHandle {
async fn try_wait(&mut self) -> Result<Option<CommandExitStatus>, CommandError> {
// Make HTTP request to get status from cloud server
let client = reqwest::Client::new();
let response = client
.get(format!(
"{}/commands/{}/status",
self.cloud_server_url, self.process_id
))
.send()
.await
.map_err(|e| CommandError::StatusCheckFailed {
error: std::io::Error::other(e),
})?;
if !response.status().is_success() {
if response.status() == reqwest::StatusCode::NOT_FOUND {
return Err(CommandError::StatusCheckFailed {
error: std::io::Error::new(std::io::ErrorKind::NotFound, "Process not found"),
});
} else {
return Err(CommandError::StatusCheckFailed {
error: std::io::Error::other("Status check failed"),
});
}
}
let result: serde_json::Value =
response
.json()
.await
.map_err(|e| CommandError::StatusCheckFailed {
error: std::io::Error::other(e),
})?;
let data = result["data"]
.as_object()
.ok_or_else(|| CommandError::StatusCheckFailed {
error: std::io::Error::other("Invalid response format"),
})?;
let running = data["running"].as_bool().unwrap_or(false);
if running {
Ok(None) // Still running
} else {
// Process completed, extract exit status
let exit_code = data["exit_code"].as_i64().map(|c| c as i32);
let success = data["success"].as_bool().unwrap_or(false);
Ok(Some(CommandExitStatus::from_remote(
exit_code,
success,
Some(self.process_id.clone()),
None,
)))
}
}
async fn wait(&mut self) -> Result<CommandExitStatus, CommandError> {
// Poll the status endpoint until process completes
loop {
let client = reqwest::Client::new();
let response = client
.get(format!(
"{}/commands/{}/status",
self.cloud_server_url, self.process_id
))
.send()
.await
.map_err(|e| CommandError::StatusCheckFailed {
error: std::io::Error::other(e),
})?;
if !response.status().is_success() {
if response.status() == reqwest::StatusCode::NOT_FOUND {
return Err(CommandError::StatusCheckFailed {
error: std::io::Error::new(
std::io::ErrorKind::NotFound,
"Process not found",
),
});
} else {
return Err(CommandError::StatusCheckFailed {
error: std::io::Error::other("Status check failed"),
});
}
}
let result: serde_json::Value =
response
.json()
.await
.map_err(|e| CommandError::StatusCheckFailed {
error: std::io::Error::other(e),
})?;
let data =
result["data"]
.as_object()
.ok_or_else(|| CommandError::StatusCheckFailed {
error: std::io::Error::other("Invalid response format"),
})?;
let running = data["running"].as_bool().unwrap_or(false);
if !running {
// Process completed, extract exit status and return
let exit_code = data["exit_code"].as_i64().map(|c| c as i32);
let success = data["success"].as_bool().unwrap_or(false);
return Ok(CommandExitStatus::from_remote(
exit_code,
success,
Some(self.process_id.clone()),
None,
));
}
// Wait a bit before polling again
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
}
}
async fn kill(&mut self) -> Result<(), CommandError> {
let client = reqwest::Client::new();
let response = client
.delete(format!(
"{}/commands/{}",
self.cloud_server_url, self.process_id
))
.send()
.await
.map_err(|e| CommandError::KillFailed {
error: std::io::Error::other(e),
})?;
if !response.status().is_success() {
if response.status() == reqwest::StatusCode::NOT_FOUND {
// Process not found, might have already finished - treat as success
return Ok(());
}
return Err(CommandError::KillFailed {
error: std::io::Error::other(format!(
"Remote kill failed with status: {}",
response.status()
)),
});
}
// Check if server indicates process was already completed
if let Ok(result) = response.json::<serde_json::Value>().await {
if let Some(data) = result.get("data") {
if let Some(message) = data.as_str() {
tracing::info!("Kill result: {}", message);
}
}
}
Ok(())
}
async fn stream(&mut self) -> Result<CommandStream, CommandError> {
// Create HTTP streams for stdout and stderr concurrently
let stdout_url = format!(
"{}/commands/{}/stdout",
self.cloud_server_url, self.process_id
);
let stderr_url = format!(
"{}/commands/{}/stderr",
self.cloud_server_url, self.process_id
);
// Create both streams concurrently using tokio::try_join!
let (stdout_result, stderr_result) =
tokio::try_join!(HTTPStream::new(stdout_url), HTTPStream::new(stderr_url))?;
let stdout_stream: Option<Box<dyn AsyncRead + Unpin + Send>> =
Some(Box::new(stdout_result) as Box<dyn AsyncRead + Unpin + Send>);
let stderr_stream: Option<Box<dyn AsyncRead + Unpin + Send>> =
Some(Box::new(stderr_result) as Box<dyn AsyncRead + Unpin + Send>);
Ok(CommandStream {
stdout: stdout_stream,
stderr: stderr_stream,
})
}
fn process_id(&self) -> String {
self.process_id.clone()
}
}
/// HTTP-based AsyncRead wrapper for true streaming
pub struct HTTPStream {
stream: Pin<Box<dyn futures_util::Stream<Item = Result<Vec<u8>, reqwest::Error>> + Send>>,
current_chunk: Vec<u8>,
chunk_position: usize,
finished: bool,
}
// HTTPStream needs to be Unpin to work with the AsyncRead trait bounds
impl Unpin for HTTPStream {}
impl HTTPStream {
pub async fn new(url: String) -> Result<Self, CommandError> {
let client = reqwest::Client::new();
let response = client
.get(&url)
.send()
.await
.map_err(|e| CommandError::IoError {
error: std::io::Error::other(e),
})?;
if !response.status().is_success() {
return Err(CommandError::IoError {
error: std::io::Error::other(format!(
"HTTP request failed with status: {}",
response.status()
)),
});
}
// Use chunk() method to create a stream
Ok(Self {
stream: Box::pin(futures_util::stream::unfold(
response,
|mut resp| async move {
match resp.chunk().await {
Ok(Some(chunk)) => Some((Ok(chunk.to_vec()), resp)),
Ok(None) => None,
Err(e) => Some((Err(e), resp)),
}
},
)),
current_chunk: Vec::new(),
chunk_position: 0,
finished: false,
})
}
}
impl AsyncRead for HTTPStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
if self.finished {
return Poll::Ready(Ok(()));
}
// First, try to read from current chunk if available
if self.chunk_position < self.current_chunk.len() {
let remaining_in_chunk = self.current_chunk.len() - self.chunk_position;
let to_read = std::cmp::min(remaining_in_chunk, buf.remaining());
let chunk_data =
&self.current_chunk[self.chunk_position..self.chunk_position + to_read];
buf.put_slice(chunk_data);
self.chunk_position += to_read;
return Poll::Ready(Ok(()));
}
// Current chunk is exhausted, try to get the next chunk
match self.stream.as_mut().poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => {
if chunk.is_empty() {
// Empty chunk, mark as finished
self.finished = true;
Poll::Ready(Ok(()))
} else {
// New chunk available
self.current_chunk = chunk;
self.chunk_position = 0;
// Read from the new chunk
let to_read = std::cmp::min(self.current_chunk.len(), buf.remaining());
let chunk_data = &self.current_chunk[..to_read];
buf.put_slice(chunk_data);
self.chunk_position = to_read;
Poll::Ready(Ok(()))
}
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Err(std::io::Error::other(e))),
Poll::Ready(None) => {
// Stream ended
self.finished = true;
Poll::Ready(Ok(()))
}
Poll::Pending => Poll::Pending,
}
}
}
// Remote-specific implementations for shared types
impl CommandExitStatus {
/// Create a CommandExitStatus for remote processes
pub fn from_remote(
code: Option<i32>,
success: bool,
remote_process_id: Option<String>,
remote_session_id: Option<String>,
) -> Self {
Self {
code,
success,
#[cfg(unix)]
signal: None,
remote_process_id,
remote_session_id,
}
}
}

View File

@@ -6,9 +6,12 @@ use tokio::io::{AsyncBufReadExt, BufReader};
use ts_rs::TS;
use uuid::Uuid;
use crate::executors::{
AiderExecutor, AmpExecutor, CCRExecutor, CharmOpencodeExecutor, ClaudeExecutor, EchoExecutor,
GeminiExecutor, SetupScriptExecutor, SstOpencodeExecutor,
use crate::{
command_runner::{CommandError, CommandProcess, CommandRunner},
executors::{
AiderExecutor, AmpExecutor, CCRExecutor, CharmOpencodeExecutor, ClaudeExecutor,
EchoExecutor, GeminiExecutor, SetupScriptExecutor, SstOpencodeExecutor,
},
};
// Constants for database streaming - fast for near-real-time updates
@@ -106,37 +109,28 @@ impl SpawnContext {
self.additional_context = Some(context.into());
self
}
/// Create SpawnContext from Command, then use builder methods for additional context
pub fn from_command(
command: &tokio::process::Command,
executor_type: impl Into<String>,
) -> Self {
pub fn from_command(command: &CommandRunner, executor_type: impl Into<String>) -> Self {
Self::from(command).with_executor_type(executor_type)
}
/// Finalize the context and create an ExecutorError
pub fn spawn_error(self, error: std::io::Error) -> ExecutorError {
pub fn spawn_error(self, error: CommandError) -> ExecutorError {
ExecutorError::spawn_failed(error, self)
}
}
/// Extract SpawnContext from a tokio::process::Command
/// This automatically captures all available information from the Command object
impl From<&tokio::process::Command> for SpawnContext {
fn from(command: &tokio::process::Command) -> Self {
let program = command.as_std().get_program().to_string_lossy().to_string();
let args = command
.as_std()
.get_args()
.map(|s| s.to_string_lossy().to_string())
.collect();
impl From<&CommandRunner> for SpawnContext {
fn from(command: &CommandRunner) -> Self {
let program = command.get_program().to_string();
let args = command.get_args().to_vec();
let working_dir = command
.as_std()
.get_current_dir()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|| "current_dir".to_string());
.unwrap_or("current_dir")
.to_string();
Self {
executor_type: "Unknown".to_string(), // Must be set using with_executor_type()
@@ -153,7 +147,7 @@ impl From<&tokio::process::Command> for SpawnContext {
#[derive(Debug)]
pub enum ExecutorError {
SpawnFailed {
error: std::io::Error,
error: CommandError,
context: SpawnContext,
},
TaskNotFound,
@@ -249,7 +243,7 @@ impl From<crate::models::task_attempt::TaskAttemptError> for ExecutorError {
impl ExecutorError {
/// Create a new SpawnFailed error with context
pub fn spawn_failed(error: std::io::Error, context: SpawnContext) -> Self {
pub fn spawn_failed(error: CommandError, context: SpawnContext) -> Self {
ExecutorError::SpawnFailed { error, context }
}
}
@@ -263,7 +257,7 @@ pub trait Executor: Send + Sync {
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError>;
) -> Result<CommandProcess, ExecutorError>;
/// Spawn a follow-up session for executors that support it
///
@@ -277,10 +271,9 @@ pub trait Executor: Send + Sync {
_session_id: &str,
_prompt: &str,
_worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
Err(ExecutorError::FollowUpNotSupported)
}
/// Normalize executor logs into a standard format
fn normalize_logs(
&self,
@@ -298,22 +291,22 @@ pub trait Executor: Send + Sync {
}
#[allow(clippy::result_large_err)]
fn setup_streaming(
async fn setup_streaming(
&self,
child: &mut command_group::AsyncGroupChild,
child: &mut CommandProcess,
pool: &sqlx::SqlitePool,
attempt_id: Uuid,
execution_process_id: Uuid,
) -> Result<(), ExecutorError> {
let stdout = child
.inner()
let streams = child
.stream()
.await
.expect("Failed to get stdio from child process");
let stdout = streams
.stdout
.take()
.expect("Failed to take stdout from child process");
let stderr = child
.inner()
let stderr = streams
.stderr
.take()
.expect("Failed to take stderr from child process");
let pool_clone1 = pool.clone();
@@ -345,9 +338,9 @@ pub trait Executor: Send + Sync {
attempt_id: Uuid,
execution_process_id: Uuid,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
let mut child = self.spawn(pool, task_id, worktree_path).await?;
Self::setup_streaming(self, &mut child, pool, attempt_id, execution_process_id)?;
Self::setup_streaming(self, &mut child, pool, attempt_id, execution_process_id).await?;
Ok(child)
}
@@ -362,11 +355,11 @@ pub trait Executor: Send + Sync {
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
let mut child = self
.spawn_followup(pool, task_id, session_id, prompt, worktree_path)
.await?;
Self::setup_streaming(self, &mut child, pool, attempt_id, execution_process_id)?;
Self::setup_streaming(self, &mut child, pool, attempt_id, execution_process_id).await?;
Ok(child)
}
}

View File

@@ -1,13 +1,10 @@
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use serde_json::Value;
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
};
use tokio::io::{AsyncBufReadExt, BufReader};
use uuid::Uuid;
use crate::{
command_runner::{CommandProcess, CommandRunner},
executor::{
ActionType, Executor, ExecutorError, NormalizedConversation, NormalizedEntry,
NormalizedEntryType,
@@ -478,7 +475,7 @@ impl Executor for AiderExecutor {
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Get the task to fetch its description
let task = Task::find_by_id(pool, task_id)
.await?
@@ -526,40 +523,33 @@ impl Executor for AiderExecutor {
message_file.to_string_lossy()
);
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.current_dir(worktree_path)
.env("COLUMNS", "1000") // Prevent line wrapping in aider output
.arg(shell_arg)
.arg(&aider_command);
tracing::debug!("Spawning Aider command: {}", &aider_command);
// Write message file after command is prepared for better error context
tokio::fs::write(&message_file, prompt.as_bytes())
.await
.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_task(task_id, Some(task.title.clone()))
.with_context(format!(
"Failed to write message file {}",
message_file.display()
));
ExecutorError::spawn_failed(e, context)
ExecutorError::ContextCollectionFailed(format!(
"Failed to write message file {}: {}",
message_file.display(),
e
))
})?;
let child = command
.group_spawn() // Create new process group so we can kill entire tree
.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_task(task_id, Some(task.title.clone()))
.with_context(format!("{} CLI execution for new task", self.executor_type))
.spawn_error(e)
})?;
tracing::debug!("Spawning Aider command: {}", &aider_command);
let mut command = CommandRunner::new();
command
.command(shell_cmd)
.arg(shell_arg)
.arg(&aider_command)
.working_dir(worktree_path)
.env("COLUMNS", "1000"); // Prevent line wrapping in aider output
let child = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_task(task_id, Some(task.title.clone()))
.with_context(format!("{} CLI execution for new task", self.executor_type))
.spawn_error(e)
})?;
tracing::debug!(
"Started Aider with message file {} for task {}: {:?}",
@@ -579,7 +569,7 @@ impl Executor for AiderExecutor {
attempt_id: Uuid,
execution_process_id: Uuid,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Generate our own session ID and store it in the database immediately
let session_id = format!("aider_task_{}", task_id);
if let Err(e) =
@@ -601,16 +591,15 @@ impl Executor for AiderExecutor {
let mut child = self.spawn(pool, task_id, worktree_path).await?;
// Take stdout and stderr pipes for Aider filtering
let stdout = child
.inner()
let streams = child
.stream()
.await
.expect("Failed to get stdio from child process");
let stdout = streams
.stdout
.take()
.expect("Failed to take stdout from child process");
let stderr = child
.inner()
let stderr = streams
.stderr
.take()
.expect("Failed to take stderr from child process");
// Start Aider filtering task
@@ -666,7 +655,7 @@ impl Executor for AiderExecutor {
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Update session ID for this execution process to ensure continuity
if let Err(e) =
ExecutorSession::update_session_id(pool, execution_process_id, session_id).await
@@ -689,16 +678,15 @@ impl Executor for AiderExecutor {
.await?;
// Take stdout and stderr pipes for Aider filtering
let stdout = child
.inner()
let streams = child
.stream()
.await
.expect("Failed to get stdio from child process");
let stdout = streams
.stdout
.take()
.expect("Failed to take stdout from child process");
let stderr = child
.inner()
let stderr = streams
.stderr
.take()
.expect("Failed to take stderr from child process");
// Start Aider filtering task
@@ -723,7 +711,7 @@ impl Executor for AiderExecutor {
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
let base_dir = TaskAttempt::get_worktree_base_dir();
// Create session directory if it doesn't exist
@@ -759,32 +747,28 @@ impl Executor for AiderExecutor {
message_file.to_string_lossy()
);
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.current_dir(worktree_path)
.env("COLUMNS", "1000") // Prevent line wrapping in aider output
.arg(shell_arg)
.arg(&aider_command);
tracing::debug!("Spawning Aider command: {}", &aider_command);
// Write message file after command is prepared for better error context
tokio::fs::write(&message_file, prompt.as_bytes())
.await
.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to write followup message file {}",
message_file.display()
));
ExecutorError::spawn_failed(e, context)
ExecutorError::ContextCollectionFailed(format!(
"Failed to write followup message file {}: {}",
message_file.display(),
e
))
})?;
let child = command.group_spawn().map_err(|e| {
tracing::debug!("Spawning Aider command: {}", &aider_command);
let mut command = CommandRunner::new();
command
.command(shell_cmd)
.arg(shell_arg)
.arg(&aider_command)
.working_dir(worktree_path)
.env("COLUMNS", "1000"); // Prevent line wrapping in aider output
let child = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"{} CLI followup execution for session {}",

View File

@@ -1,11 +1,12 @@
use std::path::Path;
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{
command_runner::{CommandProcess, CommandRunner},
executor,
executor::{
ActionType, Executor, ExecutorError, NormalizedConversation, NormalizedEntry,
NormalizedEntryType,
@@ -193,16 +194,12 @@ impl Executor for AmpExecutor {
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Get the task to fetch its description
let task = Task::find_by_id(pool, task_id)
.await?
.ok_or(ExecutorError::TaskNotFound)?;
use std::process::Stdio;
use tokio::{io::AsyncWriteExt, process::Command};
let prompt = if let Some(task_description) = task.description {
format!(
r#"project_id: {}
@@ -225,32 +222,22 @@ Task title: {}"#,
// --format=jsonl is deprecated in latest versions of Amp CLI
let amp_command = "npx @sourcegraph/amp@0.0.1752148945-gd8844f --format=jsonl";
let mut command = Command::new(shell_cmd);
let mut command = CommandRunner::new();
command
.kill_on_drop(true)
.stdin(Stdio::piped()) // <-- open a pipe
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(worktree_path)
.command(shell_cmd)
.arg(shell_arg)
.arg(amp_command);
.arg(amp_command)
.stdin(&prompt)
.working_dir(worktree_path);
let mut child = command
.group_spawn() // Create new process group so we can kill entire tree
.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "Amp")
.with_task(task_id, Some(task.title.clone()))
.with_context("Amp CLI execution for new task")
.spawn_error(e)
})?;
let proc = command.start().await.map_err(|e| {
executor::SpawnContext::from_command(&command, "Amp")
.with_task(task_id, Some(task.title.clone()))
.with_context("Amp CLI execution for new task")
.spawn_error(e)
})?;
// feed the prompt in, then close the pipe so `amp` sees EOF
if let Some(mut stdin) = child.inner().stdin.take() {
stdin.write_all(prompt.as_bytes()).await.unwrap();
stdin.shutdown().await.unwrap(); // or `drop(stdin);`
}
Ok(child)
Ok(proc)
}
async fn spawn_followup(
@@ -260,11 +247,7 @@ Task title: {}"#,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
use std::process::Stdio;
use tokio::{io::AsyncWriteExt, process::Command};
) -> Result<CommandProcess, ExecutorError> {
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();
let amp_command = format!(
@@ -272,17 +255,15 @@ Task title: {}"#,
session_id
);
let mut command = Command::new(shell_cmd);
let mut command = CommandRunner::new();
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(worktree_path)
.command(shell_cmd)
.arg(shell_arg)
.arg(&amp_command);
.arg(&amp_command)
.stdin(prompt)
.working_dir(worktree_path);
let mut child = command.group_spawn().map_err(|e| {
let proc = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "Amp")
.with_context(format!(
"Amp CLI followup execution for thread {}",
@@ -291,27 +272,7 @@ Task title: {}"#,
.spawn_error(e)
})?;
// Feed the prompt in, then close the pipe so amp sees EOF
if let Some(mut stdin) = child.inner().stdin.take() {
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "Amp")
.with_context(format!(
"Failed to write prompt to Amp CLI stdin for thread {}",
session_id
))
.spawn_error(e)
})?;
stdin.shutdown().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "Amp")
.with_context(format!(
"Failed to close Amp CLI stdin for thread {}",
session_id
))
.spawn_error(e)
})?;
}
Ok(child)
Ok(proc)
}
fn normalize_logs(

View File

@@ -1,8 +1,8 @@
use async_trait::async_trait;
use command_group::AsyncGroupChild;
use uuid::Uuid;
use crate::{
command_runner::CommandProcess,
executor::{Executor, ExecutorError, NormalizedConversation},
executors::ClaudeExecutor,
};
@@ -33,7 +33,7 @@ impl Executor for CCRExecutor {
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
self.0.spawn(pool, task_id, worktree_path).await
}
@@ -44,7 +44,7 @@ impl Executor for CCRExecutor {
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
self.0
.spawn_followup(pool, task_id, session_id, prompt, worktree_path)
.await

View File

@@ -1,8 +1,8 @@
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use uuid::Uuid;
use crate::{
command_runner::{CommandProcess, CommandRunner},
executor::{Executor, ExecutorError},
models::task::Task,
utils::shell::get_shell_command,
@@ -18,16 +18,12 @@ impl Executor for CharmOpencodeExecutor {
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Get the task to fetch its description
let task = Task::find_by_id(pool, task_id)
.await?
.ok_or(ExecutorError::TaskNotFound)?;
use std::process::Stdio;
use tokio::process::Command;
let prompt = if let Some(task_description) = task.description {
format!(
r#"project_id: {}
@@ -52,25 +48,21 @@ Task title: {}"#,
prompt.replace('"', "\\\"")
);
let mut command = Command::new(shell_cmd);
let mut command = CommandRunner::new();
command
.kill_on_drop(true)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(worktree_path)
.command(shell_cmd)
.arg(shell_arg)
.arg(opencode_command);
.arg(&opencode_command)
.working_dir(worktree_path);
let child = command
.group_spawn() // Create new process group so we can kill entire tree
.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "CharmOpenCode")
.with_task(task_id, Some(task.title.clone()))
.with_context("CharmOpenCode CLI execution for new task")
.spawn_error(e)
})?;
let proc = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "CharmOpenCode")
.with_task(task_id, Some(task.title.clone()))
.with_context("CharmOpenCode CLI execution for new task")
.spawn_error(e)
})?;
Ok(child)
Ok(proc)
}
async fn spawn_followup(
@@ -80,11 +72,7 @@ Task title: {}"#,
_session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
use std::process::Stdio;
use tokio::process::Command;
) -> Result<CommandProcess, ExecutorError> {
// CharmOpencode doesn't support session-based followup, so we ignore session_id
// and just run with the new prompt
let (shell_cmd, shell_arg) = get_shell_command();
@@ -93,21 +81,19 @@ Task title: {}"#,
prompt.replace('"', "\\\"")
);
let mut command = Command::new(shell_cmd);
let mut command = CommandRunner::new();
command
.kill_on_drop(true)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(worktree_path)
.command(shell_cmd)
.arg(shell_arg)
.arg(&opencode_command);
.arg(&opencode_command)
.working_dir(worktree_path);
let child = command.group_spawn().map_err(|e| {
let proc = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "CharmOpenCode")
.with_context("CharmOpenCode CLI followup execution")
.spawn_error(e)
})?;
Ok(child)
Ok(proc)
}
}

View File

@@ -1,11 +1,10 @@
use std::path::Path;
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use tokio::process::Command;
use uuid::Uuid;
use crate::{
command_runner::{CommandProcess, CommandRunner},
executor::{
ActionType, Executor, ExecutorError, NormalizedConversation, NormalizedEntry,
NormalizedEntryType,
@@ -84,7 +83,7 @@ impl Executor for ClaudeExecutor {
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Get the task to fetch its description
let task = Task::find_by_id(pool, task_id)
.await?
@@ -112,54 +111,22 @@ Task title: {}"#,
// Pass prompt via stdin instead of command line to avoid shell escaping issues
let claude_command = &self.command;
let mut command = Command::new(shell_cmd);
let mut command = CommandRunner::new();
command
.kill_on_drop(true)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.current_dir(worktree_path)
.command(shell_cmd)
.arg(shell_arg)
.arg(claude_command)
.stdin(&prompt)
.working_dir(worktree_path)
.env("NODE_NO_WARNINGS", "1");
let mut child = command
.group_spawn() // Create new process group so we can kill entire tree
.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_task(task_id, Some(task.title.clone()))
.with_context(format!("{} CLI execution for new task", self.executor_type))
.spawn_error(e)
})?;
// Write prompt to stdin safely
if let Some(mut stdin) = child.inner().stdin.take() {
use tokio::io::AsyncWriteExt;
tracing::debug!(
"Writing prompt to Claude stdin for task {}: {:?}",
task_id,
prompt
);
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_task(task_id, Some(task.title.clone()))
.with_context(format!(
"Failed to write prompt to {} CLI stdin",
self.executor_type
));
ExecutorError::spawn_failed(e, context)
})?;
stdin.shutdown().await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_task(task_id, Some(task.title.clone()))
.with_context(format!("Failed to close {} CLI stdin", self.executor_type));
ExecutorError::spawn_failed(e, context)
})?;
}
Ok(child)
let proc = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_task(task_id, Some(task.title.clone()))
.with_context(format!("{} CLI execution for new task", self.executor_type))
.spawn_error(e)
})?;
Ok(proc)
}
async fn spawn_followup(
@@ -169,7 +136,7 @@ Task title: {}"#,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();
@@ -184,18 +151,16 @@ Task title: {}"#,
format!("{} --resume={}", self.command, session_id)
};
let mut command = Command::new(shell_cmd);
let mut command = CommandRunner::new();
command
.kill_on_drop(true)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.current_dir(worktree_path)
.command(shell_cmd)
.arg(shell_arg)
.arg(&claude_command)
.stdin(prompt)
.working_dir(worktree_path)
.env("NODE_NO_WARNINGS", "1");
let mut child = command.group_spawn().map_err(|e| {
let proc = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"{} CLI followup execution for session {}",
@@ -204,36 +169,7 @@ Task title: {}"#,
.spawn_error(e)
})?;
// Write prompt to stdin safely
if let Some(mut stdin) = child.inner().stdin.take() {
use tokio::io::AsyncWriteExt;
tracing::debug!(
"Writing prompt to {} stdin for session {}: {:?}",
self.executor_type,
session_id,
prompt
);
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to write prompt to {} CLI stdin for session {}",
self.executor_type, session_id
));
ExecutorError::spawn_failed(e, context)
})?;
stdin.shutdown().await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to close {} CLI stdin for session {}",
self.executor_type, session_id
));
ExecutorError::spawn_failed(e, context)
})?;
}
Ok(child)
Ok(proc)
}
fn normalize_logs(

View File

@@ -1,9 +1,8 @@
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use tokio::process::Command;
use uuid::Uuid;
use crate::{
command_runner::{CommandProcess, CommandRunner},
executor::{Executor, ExecutorError},
models::{project::Project, task::Task},
utils::shell::get_shell_command,
@@ -21,7 +20,7 @@ impl Executor for CleanupScriptExecutor {
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Validate the task and project exist
let task = Task::find_by_id(pool, task_id)
.await?
@@ -32,23 +31,21 @@ impl Executor for CleanupScriptExecutor {
.ok_or(ExecutorError::TaskNotFound)?; // Reuse TaskNotFound for simplicity
let (shell_cmd, shell_arg) = get_shell_command();
let mut command = Command::new(shell_cmd);
let mut command = CommandRunner::new();
command
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.command(shell_cmd)
.arg(shell_arg)
.arg(&self.script)
.current_dir(worktree_path);
.working_dir(worktree_path);
let child = command.group_spawn().map_err(|e| {
let proc = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "CleanupScript")
.with_task(task_id, Some(task.title.clone()))
.with_context("Cleanup script execution")
.spawn_error(e)
})?;
Ok(child)
Ok(proc)
}
/// Normalize cleanup script logs into a readable format

View File

@@ -1,9 +1,8 @@
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use tokio::process::Command;
use uuid::Uuid;
use crate::{
command_runner::{CommandProcess, CommandRunner},
executor::{Executor, ExecutorError},
models::{project::Project, task::Task},
utils::shell::get_shell_command,
@@ -21,7 +20,7 @@ impl Executor for DevServerExecutor {
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Validate the task and project exist
let task = Task::find_by_id(pool, task_id)
.await?
@@ -32,22 +31,20 @@ impl Executor for DevServerExecutor {
.ok_or(ExecutorError::TaskNotFound)?; // Reuse TaskNotFound for simplicity
let (shell_cmd, shell_arg) = get_shell_command();
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
let mut runner = CommandRunner::new();
runner
.command(shell_cmd)
.arg(shell_arg)
.arg(&self.script)
.current_dir(worktree_path);
.working_dir(worktree_path);
let child = command.group_spawn().map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "DevServer")
let process = runner.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&runner, "DevServer")
.with_task(task_id, Some(task.title.clone()))
.with_context("Development server execution")
.spawn_error(e)
})?;
Ok(child)
Ok(process)
}
}

View File

@@ -1,10 +1,9 @@
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use tokio::process::Command;
use uuid::Uuid;
use crate::{
executor::{Executor, ExecutorError},
command_runner::{CommandProcess, CommandRunner},
executor::{Executor, ExecutorError, SpawnContext},
models::task::Task,
utils::shell::get_shell_command,
};
@@ -19,7 +18,7 @@ impl Executor for EchoExecutor {
pool: &sqlx::SqlitePool,
task_id: Uuid,
_worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Get the task to fetch its description
let task = Task::find_by_id(pool, task_id)
.await?
@@ -57,22 +56,18 @@ echo "Task completed: {}""#,
)
};
let mut command = Command::new(shell_cmd);
command
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
let mut command_runner = CommandRunner::new();
command_runner
.command(shell_cmd)
.arg(shell_arg)
.arg(&script);
let child = command
.group_spawn() // Create new process group so we can kill entire tree
.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "Echo")
.with_task(task_id, Some(task.title.clone()))
.with_context("Shell script execution for echo demo")
.spawn_error(e)
})?;
let child = command_runner.start().await.map_err(|e| {
SpawnContext::from_command(&command_runner, "Echo")
.with_task(task_id, Some(task.title.clone()))
.with_context("Shell script execution for echo demo")
.spawn_error(e)
})?;
Ok(child)
}

View File

@@ -5,10 +5,9 @@
mod config;
mod streaming;
use std::{process::Stdio, time::Instant};
use std::time::Instant;
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use config::{
max_chunk_size, max_display_size, max_latency_ms, max_message_size, GeminiStreamConfig,
};
@@ -16,10 +15,10 @@ use config::{
use serde_json::Value;
pub use streaming::GeminiPatchBatch;
use streaming::GeminiStreaming;
use tokio::{io::AsyncWriteExt, process::Command};
use uuid::Uuid;
use crate::{
command_runner::{CommandProcess, CommandRunner},
executor::{
Executor, ExecutorError, NormalizedConversation, NormalizedEntry, NormalizedEntryType,
},
@@ -37,7 +36,7 @@ impl Executor for GeminiExecutor {
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Get the task to fetch its description
let task = Task::find_by_id(pool, task_id)
.await?
@@ -61,42 +60,18 @@ Task title: {}"#,
};
let mut command = Self::create_gemini_command(worktree_path);
command.stdin(&prompt);
let mut child = command
.group_spawn() // Create new process group so we can kill entire tree
.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "Gemini")
.with_task(task_id, Some(task.title.clone()))
.with_context("Gemini CLI execution for new task")
.spawn_error(e)
})?;
let proc = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "Gemini")
.with_task(task_id, Some(task.title.clone()))
.with_context("Gemini CLI execution for new task")
.spawn_error(e)
})?;
// Write prompt to stdin
if let Some(mut stdin) = child.inner().stdin.take() {
tracing::debug!(
"Writing prompt to Gemini stdin for task {}: {:?}",
task_id,
prompt
);
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
let context = crate::executor::SpawnContext::from_command(&command, "Gemini")
.with_task(task_id, Some(task.title.clone()))
.with_context("Failed to write prompt to Gemini CLI stdin");
ExecutorError::spawn_failed(e, context)
})?;
stdin.shutdown().await.map_err(|e| {
let context = crate::executor::SpawnContext::from_command(&command, "Gemini")
.with_task(task_id, Some(task.title.clone()))
.with_context("Failed to close Gemini CLI stdin");
ExecutorError::spawn_failed(e, context)
})?;
tracing::info!(
"Successfully sent prompt to Gemini stdin for task {}",
task_id
);
}
tracing::info!("Successfully started Gemini process for task {}", task_id);
Ok(child)
Ok(proc)
}
async fn execute_streaming(
@@ -106,7 +81,7 @@ Task title: {}"#,
attempt_id: Uuid,
execution_process_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
tracing::info!(
"Starting Gemini execution for task {} attempt {}",
task_id,
@@ -115,17 +90,16 @@ Task title: {}"#,
Self::update_session_id(pool, execution_process_id, &attempt_id.to_string()).await;
let mut child = self.spawn(pool, task_id, worktree_path).await?;
let mut proc = self.spawn(pool, task_id, worktree_path).await?;
tracing::info!(
"Gemini process spawned successfully for attempt {}, PID: {:?}",
attempt_id,
child.inner().id()
"Gemini process spawned successfully for attempt {}",
attempt_id
);
Self::setup_streaming(pool, &mut child, attempt_id, execution_process_id);
Self::setup_streaming(pool, &mut proc, attempt_id, execution_process_id).await;
Ok(child)
Ok(proc)
}
async fn spawn_followup(
@@ -135,7 +109,7 @@ Task title: {}"#,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// For Gemini, session_id is the attempt_id
let attempt_id = Uuid::parse_str(session_id)
.map_err(|_| ExecutorError::InvalidSessionId(session_id.to_string()))?;
@@ -156,7 +130,7 @@ Task title: {}"#,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
tracing::info!(
"Starting Gemini follow-up execution for attempt {} (session {})",
attempt_id,
@@ -166,19 +140,18 @@ Task title: {}"#,
// For Gemini, session_id is the attempt_id - update it in the database
Self::update_session_id(pool, execution_process_id, session_id).await;
let mut child = self
let mut proc = self
.spawn_followup(pool, task_id, session_id, prompt, worktree_path)
.await?;
tracing::info!(
"Gemini follow-up process spawned successfully for attempt {}, PID: {:?}",
attempt_id,
child.inner().id()
"Gemini follow-up process spawned successfully for attempt {}",
attempt_id
);
Self::setup_streaming(pool, &mut child, attempt_id, execution_process_id);
Self::setup_streaming(pool, &mut proc, attempt_id, execution_process_id).await;
Ok(child)
Ok(proc)
}
fn normalize_logs(
@@ -261,19 +234,16 @@ Task title: {}"#,
impl GeminiExecutor {
/// Create a standardized Gemini CLI command
fn create_gemini_command(worktree_path: &str) -> Command {
fn create_gemini_command(worktree_path: &str) -> CommandRunner {
let (shell_cmd, shell_arg) = get_shell_command();
let gemini_command = "npx @google/gemini-cli@latest --yolo";
let mut command = Command::new(shell_cmd);
let mut command = CommandRunner::new();
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(worktree_path)
.command(shell_cmd)
.arg(shell_arg)
.arg(gemini_command)
.working_dir(worktree_path)
.env("NODE_NO_WARNINGS", "1");
command
}
@@ -306,23 +276,25 @@ impl GeminiExecutor {
}
/// Setup streaming for both stdout and stderr
fn setup_streaming(
async fn setup_streaming(
pool: &sqlx::SqlitePool,
child: &mut AsyncGroupChild,
proc: &mut CommandProcess,
attempt_id: Uuid,
execution_process_id: Uuid,
) {
// Take stdout and stderr pipes for streaming
let stdout = child
.inner()
// Get stdout and stderr streams from CommandProcess
let mut stream = proc
.stream()
.await
.expect("Failed to get streams from command process");
let stdout = stream
.stdout
.take()
.expect("Failed to take stdout from child process");
let stderr = child
.inner()
.expect("Failed to get stdout from command stream");
let stderr = stream
.stderr
.take()
.expect("Failed to take stderr from child process");
.expect("Failed to get stderr from command stream");
// Start streaming tasks with Gemini-specific line-based message updates
let pool_clone1 = pool.clone();
@@ -521,7 +493,7 @@ You are continuing work on the above task. The execution history shows what has
worktree_path: &str,
comprehensive_prompt: &str,
attempt_id: Uuid,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
tracing::info!(
"Spawning Gemini followup execution for attempt {} with resume context ({} chars)",
attempt_id,
@@ -529,8 +501,9 @@ You are continuing work on the above task. The execution history shows what has
);
let mut command = GeminiExecutor::create_gemini_command(worktree_path);
command.stdin(comprehensive_prompt);
let mut child = command.group_spawn().map_err(|e| {
let proc = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "Gemini")
.with_context(format!(
"Gemini CLI followup execution with context for attempt {}",
@@ -539,53 +512,12 @@ You are continuing work on the above task. The execution history shows what has
.spawn_error(e)
})?;
self.send_prompt_to_stdin(&mut child, &command, comprehensive_prompt, attempt_id)
.await?;
Ok(child)
}
tracing::info!(
"Successfully started Gemini followup process for attempt {}",
attempt_id
);
async fn send_prompt_to_stdin(
&self,
child: &mut AsyncGroupChild,
command: &Command,
comprehensive_prompt: &str,
attempt_id: Uuid,
) -> Result<(), ExecutorError> {
if let Some(mut stdin) = child.inner().stdin.take() {
tracing::debug!(
"Sending resume context to Gemini for attempt {}: {} characters",
attempt_id,
comprehensive_prompt.len()
);
stdin
.write_all(comprehensive_prompt.as_bytes())
.await
.map_err(|e| {
let context = crate::executor::SpawnContext::from_command(command, "Gemini")
.with_context(format!(
"Failed to write resume prompt to Gemini CLI stdin for attempt {}",
attempt_id
));
ExecutorError::spawn_failed(e, context)
})?;
stdin.shutdown().await.map_err(|e| {
let context = crate::executor::SpawnContext::from_command(command, "Gemini")
.with_context(format!(
"Failed to close Gemini CLI stdin for attempt {}",
attempt_id
));
ExecutorError::spawn_failed(e, context)
})?;
tracing::info!(
"Successfully sent resume context to Gemini for attempt {}",
attempt_id
);
}
Ok(())
Ok(proc)
}
/// Format Gemini CLI output by inserting line breaks where periods are directly

View File

@@ -1,9 +1,8 @@
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use tokio::process::Command;
use uuid::Uuid;
use crate::{
command_runner::{CommandProcess, CommandRunner},
executor::{Executor, ExecutorError},
models::{project::Project, task::Task},
utils::shell::get_shell_command,
@@ -27,7 +26,7 @@ impl Executor for SetupScriptExecutor {
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Validate the task and project exist
let task = Task::find_by_id(pool, task_id)
.await?
@@ -38,23 +37,21 @@ impl Executor for SetupScriptExecutor {
.ok_or(ExecutorError::TaskNotFound)?; // Reuse TaskNotFound for simplicity
let (shell_cmd, shell_arg) = get_shell_command();
let mut command = Command::new(shell_cmd);
let mut command = CommandRunner::new();
command
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.command(shell_cmd)
.arg(shell_arg)
.arg(&self.script)
.current_dir(worktree_path);
.working_dir(worktree_path);
let child = command.group_spawn().map_err(|e| {
let proc = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, "SetupScript")
.with_task(task_id, Some(task.title.clone()))
.with_context("Setup script execution")
.spawn_error(e)
})?;
Ok(child)
Ok(proc)
}
/// Normalize setup script logs into a readable format

View File

@@ -1,13 +1,10 @@
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use serde_json::{json, Value};
use tokio::{
io::{AsyncBufReadExt, BufReader},
process::Command,
};
use tokio::io::{AsyncBufReadExt, BufReader};
use uuid::Uuid;
use crate::{
command_runner::{CommandProcess, CommandRunner},
executor::{Executor, ExecutorError, NormalizedConversation, NormalizedEntry},
models::{execution_process::ExecutionProcess, executor_session::ExecutorSession, task::Task},
utils::shell::get_shell_command,
@@ -244,7 +241,7 @@ impl Executor for SstOpencodeExecutor {
pool: &sqlx::SqlitePool,
task_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
) -> Result<CommandProcess, ExecutorError> {
// Get the task to fetch its description
let task = Task::find_by_id(pool, task_id)
.await?
@@ -271,54 +268,23 @@ Task title: {}"#,
let (shell_cmd, shell_arg) = get_shell_command();
let opencode_command = &self.command;
let mut command = Command::new(shell_cmd);
let mut command = CommandRunner::new();
command
.kill_on_drop(true)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::null()) // Ignore stdout for OpenCode
.stderr(std::process::Stdio::piped())
.current_dir(worktree_path)
.command(shell_cmd)
.arg(shell_arg)
.arg(opencode_command)
.stdin(&prompt)
.working_dir(worktree_path)
.env("NODE_NO_WARNINGS", "1");
let mut child = command
.group_spawn() // Create new process group so we can kill entire tree
.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_task(task_id, Some(task.title.clone()))
.with_context(format!("{} CLI execution for new task", self.executor_type))
.spawn_error(e)
})?;
let proc = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_task(task_id, Some(task.title.clone()))
.with_context(format!("{} CLI execution for new task", self.executor_type))
.spawn_error(e)
})?;
// Write prompt to stdin safely
if let Some(mut stdin) = child.inner().stdin.take() {
use tokio::io::AsyncWriteExt;
tracing::debug!(
"Writing prompt to OpenCode stdin for task {}: {:?}",
task_id,
prompt
);
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_task(task_id, Some(task.title.clone()))
.with_context(format!(
"Failed to write prompt to {} CLI stdin",
self.executor_type
));
ExecutorError::spawn_failed(e, context)
})?;
stdin.shutdown().await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_task(task_id, Some(task.title.clone()))
.with_context(format!("Failed to close {} CLI stdin", self.executor_type));
ExecutorError::spawn_failed(e, context)
})?;
}
Ok(child)
Ok(proc)
}
/// Execute with OpenCode filtering for stderr
@@ -329,15 +295,18 @@ Task title: {}"#,
attempt_id: Uuid,
execution_process_id: Uuid,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError> {
let mut child = self.spawn(pool, task_id, worktree_path).await?;
) -> Result<CommandProcess, ExecutorError> {
let mut proc = self.spawn(pool, task_id, worktree_path).await?;
// Take stderr pipe for OpenCode filtering
let stderr = child
.inner()
// Get stderr stream from CommandProcess for OpenCode filtering
let mut stream = proc
.stream()
.await
.expect("Failed to get streams from command process");
let stderr = stream
.stderr
.take()
.expect("Failed to take stderr from child process");
.expect("Failed to get stderr from command stream");
// Start OpenCode stderr filtering task
let pool_clone = pool.clone();
@@ -350,7 +319,7 @@ Task title: {}"#,
worktree_path_clone,
));
Ok(child)
Ok(proc)
}
fn normalize_logs(
@@ -391,17 +360,20 @@ Task title: {}"#,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, ExecutorError> {
let mut child = self
) -> Result<CommandProcess, ExecutorError> {
let mut proc = self
.spawn_followup(pool, task_id, session_id, prompt, worktree_path)
.await?;
// Take stderr pipe for OpenCode filtering
let stderr = child
.inner()
// Get stderr stream from CommandProcess for OpenCode filtering
let mut stream = proc
.stream()
.await
.expect("Failed to get streams from command process");
let stderr = stream
.stderr
.take()
.expect("Failed to take stderr from child process");
.expect("Failed to get stderr from command stream");
// Start OpenCode stderr filtering task
let pool_clone = pool.clone();
@@ -414,7 +386,7 @@ Task title: {}"#,
worktree_path_clone,
));
Ok(child)
Ok(proc)
}
async fn spawn_followup(
@@ -424,27 +396,21 @@ Task title: {}"#,
session_id: &str,
prompt: &str,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
use std::process::Stdio;
use tokio::io::AsyncWriteExt;
) -> Result<CommandProcess, ExecutorError> {
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();
let opencode_command = format!("{} --session {}", self.command, session_id);
let mut command = Command::new(shell_cmd);
let mut command = CommandRunner::new();
command
.kill_on_drop(true)
.stdin(Stdio::piped())
.stdout(Stdio::null()) // Ignore stdout for OpenCode
.stderr(Stdio::piped())
.current_dir(worktree_path)
.command(shell_cmd)
.arg(shell_arg)
.arg(&opencode_command)
.stdin(prompt)
.working_dir(worktree_path)
.env("NODE_NO_WARNINGS", "1");
let mut child = command.group_spawn().map_err(|e| {
let proc = command.start().await.map_err(|e| {
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"{} CLI followup execution for session {}",
@@ -453,35 +419,7 @@ Task title: {}"#,
.spawn_error(e)
})?;
// Write prompt to stdin safely
if let Some(mut stdin) = child.inner().stdin.take() {
tracing::debug!(
"Writing prompt to {} stdin for session {}: {:?}",
self.executor_type,
session_id,
prompt
);
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to write prompt to {} CLI stdin for session {}",
self.executor_type, session_id
));
ExecutorError::spawn_failed(e, context)
})?;
stdin.shutdown().await.map_err(|e| {
let context =
crate::executor::SpawnContext::from_command(&command, &self.executor_type)
.with_context(format!(
"Failed to close {} CLI stdin for session {}",
self.executor_type, session_id
));
ExecutorError::spawn_failed(e, context)
})?;
}
Ok(child)
Ok(proc)
}
}

View File

@@ -3,6 +3,7 @@ use sentry_tracing::{EventFilter, SentryLayer};
use tracing::Level;
pub mod app_state;
pub mod command_runner;
pub mod execution_monitor;
pub mod executor;
pub mod executors;

View File

@@ -17,6 +17,7 @@ use tracing_subscriber::{filter::LevelFilter, prelude::*};
use vibe_kanban::{sentry_layer, Assets, ScriptAssets, SoundAssets};
mod app_state;
mod command_runner;
mod execution_monitor;
mod executor;
mod executors;

View File

@@ -3,6 +3,7 @@ use tracing::{debug, info};
use uuid::Uuid;
use crate::{
command_runner,
executor::Executor,
models::{
execution_process::{CreateExecutionProcess, ExecutionProcess, ExecutionProcessType},
@@ -803,7 +804,7 @@ impl ProcessService {
attempt_id: Uuid,
process_id: Uuid,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, TaskAttemptError> {
) -> Result<command_runner::CommandProcess, TaskAttemptError> {
use crate::executors::{CleanupScriptExecutor, DevServerExecutor, SetupScriptExecutor};
let result = match executor_type {
@@ -863,7 +864,7 @@ impl ProcessService {
process_id: Uuid,
attempt_id: Uuid,
process_type: &ExecutionProcessType,
child: command_group::AsyncGroupChild,
child: command_runner::CommandProcess,
) {
let execution_type = match process_type {
ExecutionProcessType::SetupScript => crate::app_state::ExecutionType::SetupScript,
@@ -925,7 +926,7 @@ impl ProcessService {
attempt_id: Uuid,
process_id: Uuid,
worktree_path: &str,
) -> Result<command_group::AsyncGroupChild, TaskAttemptError> {
) -> Result<command_runner::CommandProcess, TaskAttemptError> {
use crate::executors::SetupScriptExecutor;
let executor = SetupScriptExecutor {