Capture logs
This commit is contained in:
@@ -99,12 +99,12 @@ pub async fn execution_monitor(app_state: AppState) {
|
||||
}
|
||||
};
|
||||
|
||||
// Get the executor and spawn the process
|
||||
// Get the executor and start streaming execution
|
||||
let executor = task_attempt.get_executor();
|
||||
let child = match executor.spawn(&app_state.db_pool, task_attempt.task_id, &task_attempt.worktree_path).await {
|
||||
let child = match executor.execute_streaming(&app_state.db_pool, task_attempt.task_id, attempt_id, &task_attempt.worktree_path).await {
|
||||
Ok(child) => child,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to spawn command for task attempt {}: {}", attempt_id, e);
|
||||
tracing::error!("Failed to start streaming execution for task attempt {}: {}", attempt_id, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::process::Child;
|
||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||
use ts_rs::TS;
|
||||
use uuid::Uuid;
|
||||
|
||||
@@ -31,6 +32,14 @@ impl From<sqlx::Error> for ExecutorError {
|
||||
}
|
||||
}
|
||||
|
||||
/// Result of executing a command
|
||||
#[derive(Debug)]
|
||||
pub struct ExecutionResult {
|
||||
pub stdout: String,
|
||||
pub stderr: String,
|
||||
pub exit_code: Option<i32>,
|
||||
}
|
||||
|
||||
/// Trait for defining CLI commands that can be executed for task attempts
|
||||
#[async_trait]
|
||||
pub trait Executor: Send + Sync {
|
||||
@@ -40,6 +49,71 @@ pub trait Executor: Send + Sync {
|
||||
/// Spawn the command for a given task attempt
|
||||
async fn spawn(&self, pool: &sqlx::PgPool, task_id: Uuid, worktree_path: &str) -> Result<Child, ExecutorError>;
|
||||
|
||||
/// Execute the command and stream output to database in real-time
|
||||
async fn execute_streaming(&self, pool: &sqlx::PgPool, task_id: Uuid, attempt_id: Uuid, worktree_path: &str) -> Result<Child, ExecutorError> {
|
||||
let mut child = self.spawn(pool, task_id, worktree_path).await?;
|
||||
|
||||
// Take stdout and stderr pipes for streaming
|
||||
let stdout = child.stdout.take().expect("Failed to take stdout from child process");
|
||||
let stderr = child.stderr.take().expect("Failed to take stderr from child process");
|
||||
|
||||
// Start streaming tasks
|
||||
let pool_clone1 = pool.clone();
|
||||
let pool_clone2 = pool.clone();
|
||||
|
||||
tokio::spawn(stream_output_to_db(stdout, pool_clone1, attempt_id, true));
|
||||
tokio::spawn(stream_output_to_db(stderr, pool_clone2, attempt_id, false));
|
||||
|
||||
Ok(child)
|
||||
}
|
||||
|
||||
/// Execute the command and capture output, then store in database (for backward compatibility)
|
||||
async fn execute(&self, pool: &sqlx::PgPool, task_id: Uuid, attempt_id: Uuid, worktree_path: &str) -> Result<ExecutionResult, ExecutorError> {
|
||||
use tokio::io::AsyncReadExt;
|
||||
use crate::models::task_attempt::TaskAttempt;
|
||||
|
||||
let mut child = self.spawn(pool, task_id, worktree_path).await?;
|
||||
|
||||
// Take stdout and stderr pipes
|
||||
let mut stdout = child.stdout.take().unwrap_or_else(|| {
|
||||
panic!("Failed to take stdout from child process")
|
||||
});
|
||||
let mut stderr = child.stderr.take().unwrap_or_else(|| {
|
||||
panic!("Failed to take stderr from child process")
|
||||
});
|
||||
|
||||
// Read stdout and stderr concurrently
|
||||
let mut stdout_buf = String::new();
|
||||
let mut stderr_buf = String::new();
|
||||
|
||||
let (stdout_result, stderr_result, exit_result) = tokio::join!(
|
||||
stdout.read_to_string(&mut stdout_buf),
|
||||
stderr.read_to_string(&mut stderr_buf),
|
||||
child.wait()
|
||||
);
|
||||
|
||||
// Handle potential errors
|
||||
stdout_result.map_err(ExecutorError::SpawnFailed)?;
|
||||
stderr_result.map_err(ExecutorError::SpawnFailed)?;
|
||||
let exit_status = exit_result.map_err(ExecutorError::SpawnFailed)?;
|
||||
|
||||
let result = ExecutionResult {
|
||||
stdout: stdout_buf,
|
||||
stderr: stderr_buf,
|
||||
exit_code: exit_status.code(),
|
||||
};
|
||||
|
||||
// Store output in database
|
||||
TaskAttempt::update_output(
|
||||
pool,
|
||||
attempt_id,
|
||||
Some(&result.stdout),
|
||||
Some(&result.stderr),
|
||||
).await?;
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Get a human-readable description of what this executor does
|
||||
fn description(&self) -> &'static str;
|
||||
}
|
||||
@@ -69,4 +143,63 @@ impl ExecutorConfig {
|
||||
}
|
||||
}
|
||||
|
||||
/// Stream output from a child process to the database
|
||||
async fn stream_output_to_db(
|
||||
output: impl tokio::io::AsyncRead + Unpin,
|
||||
pool: sqlx::PgPool,
|
||||
attempt_id: Uuid,
|
||||
is_stdout: bool,
|
||||
) {
|
||||
use crate::models::task_attempt::TaskAttempt;
|
||||
|
||||
let mut reader = BufReader::new(output);
|
||||
let mut line = String::new();
|
||||
let mut accumulated_output = String::new();
|
||||
let mut update_counter = 0;
|
||||
|
||||
loop {
|
||||
line.clear();
|
||||
match reader.read_line(&mut line).await {
|
||||
Ok(0) => break, // EOF
|
||||
Ok(_) => {
|
||||
accumulated_output.push_str(&line);
|
||||
update_counter += 1;
|
||||
|
||||
// Update database every 10 lines or when we have a significant amount of data
|
||||
if update_counter >= 10 || accumulated_output.len() > 1024 {
|
||||
if let Err(e) = TaskAttempt::append_output(
|
||||
&pool,
|
||||
attempt_id,
|
||||
if is_stdout { Some(&accumulated_output) } else { None },
|
||||
if !is_stdout { Some(&accumulated_output) } else { None },
|
||||
).await {
|
||||
tracing::error!("Failed to update {} for attempt {}: {}",
|
||||
if is_stdout { "stdout" } else { "stderr" }, attempt_id, e);
|
||||
}
|
||||
accumulated_output.clear();
|
||||
update_counter = 0;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error reading {} for attempt {}: {}",
|
||||
if is_stdout { "stdout" } else { "stderr" }, attempt_id, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush any remaining output
|
||||
if !accumulated_output.is_empty() {
|
||||
if let Err(e) = TaskAttempt::append_output(
|
||||
&pool,
|
||||
attempt_id,
|
||||
if is_stdout { Some(&accumulated_output) } else { None },
|
||||
if !is_stdout { Some(&accumulated_output) } else { None },
|
||||
).await {
|
||||
tracing::error!("Failed to flush {} for attempt {}: {}",
|
||||
if is_stdout { "stdout" } else { "stderr" }, attempt_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -31,9 +31,24 @@ impl Executor for EchoExecutor {
|
||||
task.description.as_deref().unwrap_or("No description")
|
||||
);
|
||||
|
||||
let child = Command::new("echo")
|
||||
// For demonstration of streaming, we can use a shell command that outputs multiple lines
|
||||
let script = format!(
|
||||
r#"echo "Starting task: {}"
|
||||
for i in {{1..5}}; do
|
||||
echo "Progress line $i"
|
||||
sleep 1
|
||||
done
|
||||
echo "Task completed: {}""#,
|
||||
task.title,
|
||||
task.title
|
||||
);
|
||||
|
||||
let child = Command::new("sh")
|
||||
.kill_on_drop(true)
|
||||
.arg(&message)
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped())
|
||||
.arg("-c")
|
||||
.arg(&script)
|
||||
.spawn()
|
||||
.map_err(ExecutorError::SpawnFailed)?;
|
||||
|
||||
@@ -41,6 +56,6 @@ impl Executor for EchoExecutor {
|
||||
}
|
||||
|
||||
fn description(&self) -> &'static str {
|
||||
"Echoes the task title and description"
|
||||
"Demonstrates streaming output with a multi-line shell script"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,4 +189,52 @@ impl TaskAttempt {
|
||||
// Default to echo executor
|
||||
ExecutorConfig::Echo.create_executor()
|
||||
}
|
||||
|
||||
/// Update stdout and stderr for this task attempt
|
||||
pub async fn update_output(
|
||||
pool: &PgPool,
|
||||
id: Uuid,
|
||||
stdout: Option<&str>,
|
||||
stderr: Option<&str>,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
sqlx::query!(
|
||||
"UPDATE task_attempts SET stdout = $1, stderr = $2, updated_at = NOW() WHERE id = $3",
|
||||
stdout,
|
||||
stderr,
|
||||
id
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Append to stdout and stderr for this task attempt (for streaming updates)
|
||||
pub async fn append_output(
|
||||
pool: &PgPool,
|
||||
id: Uuid,
|
||||
stdout_append: Option<&str>,
|
||||
stderr_append: Option<&str>,
|
||||
) -> Result<(), sqlx::Error> {
|
||||
if let Some(stdout_data) = stdout_append {
|
||||
sqlx::query!(
|
||||
"UPDATE task_attempts SET stdout = COALESCE(stdout, '') || $1, updated_at = NOW() WHERE id = $2",
|
||||
stdout_data,
|
||||
id
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
}
|
||||
|
||||
if let Some(stderr_data) = stderr_append {
|
||||
sqlx::query!(
|
||||
"UPDATE task_attempts SET stderr = COALESCE(stderr, '') || $1, updated_at = NOW() WHERE id = $2",
|
||||
stderr_data,
|
||||
id
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user