Stderr Display & Gemini output display in conversation history (#78)
- Implement noramlized output for Gemini - Display stderr messages in conversation history - Do not leak message history to stderr
This commit is contained in:
@@ -44,6 +44,7 @@ pub enum NormalizedEntryType {
|
||||
action_type: ActionType,
|
||||
},
|
||||
SystemMessage,
|
||||
ErrorMessage,
|
||||
Thinking,
|
||||
}
|
||||
|
||||
@@ -400,6 +401,20 @@ pub async fn stream_output_to_db(
|
||||
attempt_id: Uuid,
|
||||
execution_process_id: Uuid,
|
||||
is_stdout: bool,
|
||||
) {
|
||||
if is_stdout {
|
||||
stream_stdout_to_db(output, pool, attempt_id, execution_process_id).await;
|
||||
} else {
|
||||
stream_stderr_to_db(output, pool, attempt_id, execution_process_id).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Stream stdout from a child process to the database (immediate updates)
|
||||
async fn stream_stdout_to_db(
|
||||
output: impl tokio::io::AsyncRead + Unpin,
|
||||
pool: sqlx::SqlitePool,
|
||||
attempt_id: Uuid,
|
||||
execution_process_id: Uuid,
|
||||
) {
|
||||
use crate::models::{execution_process::ExecutionProcess, executor_session::ExecutorSession};
|
||||
|
||||
@@ -414,8 +429,8 @@ pub async fn stream_output_to_db(
|
||||
match reader.read_line(&mut line).await {
|
||||
Ok(0) => break, // EOF
|
||||
Ok(_) => {
|
||||
// Parse session ID from the first JSONL line (stdout only)
|
||||
if is_stdout && !session_id_parsed {
|
||||
// Parse session ID from the first JSONL line
|
||||
if !session_id_parsed {
|
||||
if let Some(external_session_id) = parse_session_id_from_line(&line) {
|
||||
if let Err(e) = ExecutorSession::update_session_id(
|
||||
&pool,
|
||||
@@ -448,22 +463,13 @@ pub async fn stream_output_to_db(
|
||||
if let Err(e) = ExecutionProcess::append_output(
|
||||
&pool,
|
||||
execution_process_id,
|
||||
if is_stdout {
|
||||
Some(&accumulated_output)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
if !is_stdout {
|
||||
Some(&accumulated_output)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
Some(&accumulated_output),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
"Failed to update {} for attempt {}: {}",
|
||||
if is_stdout { "stdout" } else { "stderr" },
|
||||
"Failed to update stdout for attempt {}: {}",
|
||||
attempt_id,
|
||||
e
|
||||
);
|
||||
@@ -473,12 +479,7 @@ pub async fn stream_output_to_db(
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Error reading {} for attempt {}: {}",
|
||||
if is_stdout { "stdout" } else { "stderr" },
|
||||
attempt_id,
|
||||
e
|
||||
);
|
||||
tracing::error!("Error reading stdout for attempt {}: {}", attempt_id, e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -489,29 +490,110 @@ pub async fn stream_output_to_db(
|
||||
if let Err(e) = ExecutionProcess::append_output(
|
||||
&pool,
|
||||
execution_process_id,
|
||||
if is_stdout {
|
||||
Some(&accumulated_output)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
if !is_stdout {
|
||||
Some(&accumulated_output)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
Some(&accumulated_output),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
"Failed to flush {} for attempt {}: {}",
|
||||
if is_stdout { "stdout" } else { "stderr" },
|
||||
attempt_id,
|
||||
e
|
||||
);
|
||||
tracing::error!("Failed to flush stdout for attempt {}: {}", attempt_id, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Stream stderr from a child process to the database (buffered with timeout)
|
||||
async fn stream_stderr_to_db(
|
||||
output: impl tokio::io::AsyncRead + Unpin,
|
||||
pool: sqlx::SqlitePool,
|
||||
attempt_id: Uuid,
|
||||
execution_process_id: Uuid,
|
||||
) {
|
||||
use tokio::time::{timeout, Duration};
|
||||
|
||||
let mut reader = BufReader::new(output);
|
||||
let mut line = String::new();
|
||||
let mut accumulated_output = String::new();
|
||||
const STDERR_FLUSH_TIMEOUT: Duration = Duration::from_millis(1000); // 1000ms timeout
|
||||
|
||||
loop {
|
||||
line.clear();
|
||||
|
||||
// Try to read a line with a timeout
|
||||
let read_result = timeout(STDERR_FLUSH_TIMEOUT, reader.read_line(&mut line)).await;
|
||||
|
||||
match read_result {
|
||||
Ok(Ok(0)) => {
|
||||
// EOF - flush remaining output and break
|
||||
break;
|
||||
}
|
||||
Ok(Ok(_)) => {
|
||||
// Successfully read a line - just accumulate it
|
||||
accumulated_output.push_str(&line);
|
||||
}
|
||||
Ok(Err(e)) => {
|
||||
tracing::error!("Error reading stderr for attempt {}: {}", attempt_id, e);
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeout occurred - flush accumulated output if any
|
||||
if !accumulated_output.is_empty() {
|
||||
flush_stderr_chunk(
|
||||
&pool,
|
||||
execution_process_id,
|
||||
&accumulated_output,
|
||||
attempt_id,
|
||||
)
|
||||
.await;
|
||||
accumulated_output.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Final flush for any remaining output
|
||||
if !accumulated_output.is_empty() {
|
||||
flush_stderr_chunk(&pool, execution_process_id, &accumulated_output, attempt_id).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Flush a chunk of stderr output to the database
|
||||
async fn flush_stderr_chunk(
|
||||
pool: &sqlx::SqlitePool,
|
||||
execution_process_id: Uuid,
|
||||
content: &str,
|
||||
attempt_id: Uuid,
|
||||
) {
|
||||
use crate::models::execution_process::ExecutionProcess;
|
||||
|
||||
let trimmed = content.trim();
|
||||
if trimmed.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Add a delimiter to separate chunks in the database
|
||||
let chunk_with_delimiter = format!("{}\n---STDERR_CHUNK_BOUNDARY---\n", trimmed);
|
||||
|
||||
if let Err(e) = ExecutionProcess::append_output(
|
||||
pool,
|
||||
execution_process_id,
|
||||
None,
|
||||
Some(&chunk_with_delimiter),
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(
|
||||
"Failed to flush stderr chunk for attempt {}: {}",
|
||||
attempt_id,
|
||||
e
|
||||
);
|
||||
} else {
|
||||
tracing::debug!(
|
||||
"Flushed stderr chunk ({} chars) for process {}",
|
||||
trimmed.len(),
|
||||
execution_process_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse assistant message from executor logs (JSONL format)
|
||||
pub fn parse_assistant_message_from_logs(logs: &str) -> Option<String> {
|
||||
use serde_json::Value;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{collections::VecDeque, process::Stdio};
|
||||
use std::{collections::VecDeque, process::Stdio, time::Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use command_group::{AsyncCommandGroup, AsyncGroupChild};
|
||||
@@ -6,7 +6,9 @@ use tokio::{io::AsyncWriteExt, process::Command};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
executor::{Executor, ExecutorError},
|
||||
executor::{
|
||||
Executor, ExecutorError, NormalizedConversation, NormalizedEntry, NormalizedEntryType,
|
||||
},
|
||||
models::{execution_process::ExecutionProcess, task::Task},
|
||||
utils::shell::get_shell_command,
|
||||
};
|
||||
@@ -72,6 +74,11 @@ impl Executor for GeminiExecutor {
|
||||
|
||||
// Write prompt to stdin
|
||||
if let Some(mut stdin) = child.inner().stdin.take() {
|
||||
tracing::debug!(
|
||||
"Writing prompt to Gemini stdin for task {}: {:?}",
|
||||
task_id,
|
||||
prompt
|
||||
);
|
||||
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
|
||||
let context = crate::executor::SpawnContext::from_command(&command, "Gemini")
|
||||
.with_task(task_id, Some(task.title.clone()))
|
||||
@@ -84,6 +91,10 @@ impl Executor for GeminiExecutor {
|
||||
.with_context("Failed to close Gemini CLI stdin");
|
||||
ExecutorError::spawn_failed(e, context)
|
||||
})?;
|
||||
tracing::info!(
|
||||
"Successfully sent prompt to Gemini stdin for task {}",
|
||||
task_id
|
||||
);
|
||||
}
|
||||
|
||||
Ok(child)
|
||||
@@ -97,8 +108,20 @@ impl Executor for GeminiExecutor {
|
||||
execution_process_id: Uuid,
|
||||
worktree_path: &str,
|
||||
) -> Result<AsyncGroupChild, ExecutorError> {
|
||||
tracing::info!(
|
||||
"Starting Gemini execution for task {} attempt {}",
|
||||
task_id,
|
||||
attempt_id
|
||||
);
|
||||
|
||||
let mut child = self.spawn(pool, task_id, worktree_path).await?;
|
||||
|
||||
tracing::info!(
|
||||
"Gemini process spawned successfully for attempt {}, PID: {:?}",
|
||||
attempt_id,
|
||||
child.inner().id()
|
||||
);
|
||||
|
||||
// Take stdout and stderr pipes for streaming
|
||||
let stdout = child
|
||||
.inner()
|
||||
@@ -122,7 +145,8 @@ impl Executor for GeminiExecutor {
|
||||
execution_process_id,
|
||||
true,
|
||||
));
|
||||
tokio::spawn(Self::stream_gemini_with_lines(
|
||||
// Use default stderr streaming (no custom parsing)
|
||||
tokio::spawn(crate::executor::stream_output_to_db(
|
||||
stderr,
|
||||
pool_clone2,
|
||||
attempt_id,
|
||||
@@ -132,6 +156,75 @@ impl Executor for GeminiExecutor {
|
||||
|
||||
Ok(child)
|
||||
}
|
||||
|
||||
fn normalize_logs(&self, logs: &str) -> Result<NormalizedConversation, String> {
|
||||
let mut entries: Vec<NormalizedEntry> = Vec::new();
|
||||
let mut parse_errors = Vec::new();
|
||||
|
||||
for (line_num, line) in logs.lines().enumerate() {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
match serde_json::from_str::<NormalizedEntry>(trimmed) {
|
||||
Ok(entry) => {
|
||||
entries.push(entry);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Failed to parse JSONL line {} in Gemini logs: {} - Line: {}",
|
||||
line_num + 1,
|
||||
e,
|
||||
trimmed
|
||||
);
|
||||
parse_errors.push(format!("Line {}: {}", line_num + 1, e));
|
||||
|
||||
// If this is clearly a JSONL line (starts with {), create a fallback entry
|
||||
if trimmed.starts_with('{') {
|
||||
let fallback_entry = NormalizedEntry {
|
||||
timestamp: Some(chrono::Utc::now().to_rfc3339()),
|
||||
entry_type: NormalizedEntryType::SystemMessage,
|
||||
content: format!("Parse error for JSONL line: {}", trimmed),
|
||||
metadata: None,
|
||||
};
|
||||
entries.push(fallback_entry);
|
||||
} else {
|
||||
// For non-JSON lines, treat as plain text content
|
||||
let text_entry = NormalizedEntry {
|
||||
timestamp: Some(chrono::Utc::now().to_rfc3339()),
|
||||
entry_type: NormalizedEntryType::AssistantMessage,
|
||||
content: trimmed.to_string(),
|
||||
metadata: None,
|
||||
};
|
||||
entries.push(text_entry);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !parse_errors.is_empty() {
|
||||
tracing::warn!(
|
||||
"Gemini normalize_logs encountered {} parse errors: {}",
|
||||
parse_errors.len(),
|
||||
parse_errors.join("; ")
|
||||
);
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"Gemini normalize_logs processed {} lines, created {} entries",
|
||||
logs.lines().count(),
|
||||
entries.len()
|
||||
);
|
||||
|
||||
Ok(NormalizedConversation {
|
||||
entries,
|
||||
session_id: None, // session_id is not available in the current Gemini implementation
|
||||
executor_type: "gemini".to_string(),
|
||||
prompt: None,
|
||||
summary: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl GeminiExecutor {
|
||||
@@ -161,7 +254,7 @@ impl GeminiExecutor {
|
||||
}
|
||||
|
||||
let mut reader = BufReader::new(output);
|
||||
let mut message_index = 0;
|
||||
let mut last_emit_time = Instant::now();
|
||||
let mut full_raw_output = String::new();
|
||||
let mut segment_queue: VecDeque<String> = VecDeque::new();
|
||||
let mut incomplete_line_buffer = String::new();
|
||||
@@ -176,14 +269,18 @@ impl GeminiExecutor {
|
||||
// First, drain any pending segments from the queue
|
||||
while let Some(segment_content) = segment_queue.pop_front() {
|
||||
if !segment_content.trim().is_empty() {
|
||||
Self::emit_jsonl_message(
|
||||
tracing::debug!(
|
||||
"Emitting segment for attempt {}: {:?}",
|
||||
attempt_id,
|
||||
segment_content
|
||||
);
|
||||
Self::emit_normalized_message(
|
||||
&pool,
|
||||
execution_process_id,
|
||||
message_index,
|
||||
&segment_content,
|
||||
&mut last_emit_time,
|
||||
)
|
||||
.await;
|
||||
message_index += 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -191,10 +288,20 @@ impl GeminiExecutor {
|
||||
match reader.read(&mut buffer).await {
|
||||
Ok(0) => {
|
||||
// EOF - process any remaining content
|
||||
tracing::info!(
|
||||
"Gemini stdout reached EOF for attempt {}, processing final content",
|
||||
attempt_id
|
||||
);
|
||||
break;
|
||||
}
|
||||
Ok(n) => {
|
||||
let chunk_str = String::from_utf8_lossy(&buffer[..n]);
|
||||
tracing::debug!(
|
||||
"Gemini stdout chunk received for attempt {} ({} bytes): {:?}",
|
||||
attempt_id,
|
||||
n,
|
||||
chunk_str
|
||||
);
|
||||
full_raw_output.push_str(&chunk_str);
|
||||
|
||||
// Process the chunk and add segments to queue
|
||||
@@ -202,10 +309,16 @@ impl GeminiExecutor {
|
||||
&chunk_str,
|
||||
&mut segment_queue,
|
||||
&mut incomplete_line_buffer,
|
||||
&mut last_emit_time,
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
// Error - break and let queue drain on next iteration
|
||||
Err(e) => {
|
||||
// Error - log the error and break
|
||||
tracing::error!(
|
||||
"Error reading stdout for Gemini attempt {}: {}",
|
||||
attempt_id,
|
||||
e
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -213,7 +326,8 @@ impl GeminiExecutor {
|
||||
|
||||
// Process any remaining incomplete line at EOF
|
||||
if !incomplete_line_buffer.is_empty() {
|
||||
let segments = Self::split_by_pattern_breaks(&incomplete_line_buffer);
|
||||
let segments =
|
||||
Self::split_by_pattern_breaks(&incomplete_line_buffer, &mut last_emit_time);
|
||||
for segment in segments.iter() {
|
||||
if !segment.trim().is_empty() {
|
||||
segment_queue.push_back(segment.to_string());
|
||||
@@ -222,31 +336,36 @@ impl GeminiExecutor {
|
||||
}
|
||||
|
||||
// Final drain of any remaining segments
|
||||
tracing::info!(
|
||||
"Final drain - {} segments remaining for attempt {}",
|
||||
segment_queue.len(),
|
||||
attempt_id
|
||||
);
|
||||
while let Some(segment_content) = segment_queue.pop_front() {
|
||||
if !segment_content.trim().is_empty() {
|
||||
Self::emit_jsonl_message(
|
||||
tracing::debug!(
|
||||
"Final drain segment for attempt {}: {:?}",
|
||||
attempt_id,
|
||||
segment_content
|
||||
);
|
||||
Self::emit_normalized_message(
|
||||
&pool,
|
||||
execution_process_id,
|
||||
message_index,
|
||||
&segment_content,
|
||||
&mut last_emit_time,
|
||||
)
|
||||
.await;
|
||||
message_index += 1;
|
||||
}
|
||||
}
|
||||
|
||||
// After the loop, store the full raw output in stderr for the "raw" view
|
||||
if !full_raw_output.is_empty() {
|
||||
if let Err(e) =
|
||||
ExecutionProcess::append_stderr(&pool, execution_process_id, &full_raw_output).await
|
||||
{
|
||||
tracing::error!(
|
||||
"Failed to store full raw output for attempt {}: {}",
|
||||
attempt_id,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
// Note: We don't store the full raw output in stderr anymore since we're already
|
||||
// processing it into normalized stdout messages. Storing it in stderr would cause
|
||||
// the normalization route to treat it as error messages.
|
||||
tracing::info!(
|
||||
"Gemini processing complete for attempt {} ({} bytes processed)",
|
||||
attempt_id,
|
||||
full_raw_output.len()
|
||||
);
|
||||
|
||||
tracing::info!(
|
||||
"Gemini line-based stdout streaming ended for attempt {}",
|
||||
@@ -259,6 +378,7 @@ impl GeminiExecutor {
|
||||
chunk: &str,
|
||||
queue: &mut VecDeque<String>,
|
||||
incomplete_line_buffer: &mut String,
|
||||
last_emit_time: &mut Instant,
|
||||
) {
|
||||
// Combine any incomplete line from previous chunk with current chunk
|
||||
let text_to_process = incomplete_line_buffer.clone() + chunk;
|
||||
@@ -277,7 +397,7 @@ impl GeminiExecutor {
|
||||
// This is a complete line - process it
|
||||
if !line.is_empty() {
|
||||
// Check for pattern breaks within the line
|
||||
let segments = Self::split_by_pattern_breaks(line);
|
||||
let segments = Self::split_by_pattern_breaks(line, last_emit_time);
|
||||
|
||||
for segment in segments.iter() {
|
||||
if !segment.trim().is_empty() {
|
||||
@@ -295,7 +415,7 @@ impl GeminiExecutor {
|
||||
}
|
||||
|
||||
/// Split text by pattern breaks (period + capital letter)
|
||||
fn split_by_pattern_breaks(text: &str) -> Vec<String> {
|
||||
fn split_by_pattern_breaks(text: &str, last_emit_time: &mut Instant) -> Vec<String> {
|
||||
let mut segments = Vec::new();
|
||||
let mut current_segment = String::new();
|
||||
let mut chars = text.chars().peekable();
|
||||
@@ -303,10 +423,14 @@ impl GeminiExecutor {
|
||||
while let Some(ch) = chars.next() {
|
||||
current_segment.push(ch);
|
||||
|
||||
// Check for pattern break: period followed by capital letter
|
||||
// Check for pattern break: period followed by capital letter or space
|
||||
if ch == '.' {
|
||||
if let Some(&next_ch) = chars.peek() {
|
||||
if next_ch.is_uppercase() && next_ch.is_alphabetic() {
|
||||
let is_capital = next_ch.is_uppercase() && next_ch.is_alphabetic();
|
||||
let is_space = next_ch.is_whitespace();
|
||||
let should_force_break = is_space && last_emit_time.elapsed().as_secs() > 5;
|
||||
|
||||
if is_capital || should_force_break {
|
||||
// Pattern break detected - current segment ends here
|
||||
segments.push(current_segment.clone());
|
||||
current_segment.clear();
|
||||
@@ -328,49 +452,51 @@ impl GeminiExecutor {
|
||||
segments
|
||||
}
|
||||
|
||||
/// Emits a JSONL message to the database stdout stream.
|
||||
async fn emit_jsonl_message(
|
||||
/// Emits a normalized message to the database stdout stream.
|
||||
async fn emit_normalized_message(
|
||||
pool: &sqlx::SqlitePool,
|
||||
execution_process_id: Uuid,
|
||||
message_index: u32,
|
||||
content: &str,
|
||||
last_emit_time: &mut Instant,
|
||||
) {
|
||||
if content.is_empty() {
|
||||
return;
|
||||
}
|
||||
|
||||
// Create AMP-like format with streaming extensions for Gemini
|
||||
let jsonl_message = serde_json::json!({
|
||||
"type": "messages",
|
||||
"messages": [
|
||||
[
|
||||
message_index,
|
||||
{
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{
|
||||
"type": "text",
|
||||
"text": content
|
||||
}
|
||||
],
|
||||
"meta": {
|
||||
"sentAt": chrono::Utc::now().timestamp_millis()
|
||||
}
|
||||
}
|
||||
]
|
||||
],
|
||||
"messageKey": message_index,
|
||||
"isStreaming": true
|
||||
});
|
||||
let entry = NormalizedEntry {
|
||||
timestamp: Some(chrono::Utc::now().to_rfc3339()),
|
||||
entry_type: NormalizedEntryType::AssistantMessage,
|
||||
content: content.to_string(),
|
||||
metadata: None,
|
||||
};
|
||||
|
||||
if let Ok(jsonl_line) = serde_json::to_string(&jsonl_message) {
|
||||
let formatted_line = format!("{}\n", jsonl_line);
|
||||
match serde_json::to_string(&entry) {
|
||||
Ok(jsonl_line) => {
|
||||
let formatted_line = format!("{}\n", jsonl_line);
|
||||
|
||||
// Store as stdout to make it available to conversation viewer
|
||||
if let Err(e) =
|
||||
ExecutionProcess::append_stdout(pool, execution_process_id, &formatted_line).await
|
||||
{
|
||||
tracing::error!("Failed to emit JSONL message: {}", e);
|
||||
tracing::debug!(
|
||||
"Storing normalized message to DB for execution {}: {}",
|
||||
execution_process_id,
|
||||
jsonl_line
|
||||
);
|
||||
|
||||
// Store as stdout to make it available to conversation viewer
|
||||
if let Err(e) =
|
||||
ExecutionProcess::append_stdout(pool, execution_process_id, &formatted_line)
|
||||
.await
|
||||
{
|
||||
tracing::error!("Failed to emit normalized message: {}", e);
|
||||
} else {
|
||||
*last_emit_time = Instant::now();
|
||||
tracing::debug!("Successfully stored normalized message to DB");
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to serialize normalized entry for content: {:?} - Error: {}",
|
||||
content,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -437,47 +563,9 @@ impl Executor for GeminiFollowupExecutor {
|
||||
Ok(child)
|
||||
}
|
||||
|
||||
async fn execute_streaming(
|
||||
&self,
|
||||
pool: &sqlx::SqlitePool,
|
||||
task_id: Uuid,
|
||||
attempt_id: Uuid,
|
||||
execution_process_id: Uuid,
|
||||
worktree_path: &str,
|
||||
) -> Result<AsyncGroupChild, ExecutorError> {
|
||||
let mut child = self.spawn(pool, task_id, worktree_path).await?;
|
||||
|
||||
// Take stdout and stderr pipes for streaming
|
||||
let stdout = child
|
||||
.inner()
|
||||
.stdout
|
||||
.take()
|
||||
.expect("Failed to take stdout from child process");
|
||||
let stderr = child
|
||||
.inner()
|
||||
.stderr
|
||||
.take()
|
||||
.expect("Failed to take stderr from child process");
|
||||
|
||||
// Start streaming tasks with Gemini-specific line-based message updates
|
||||
let pool_clone1 = pool.clone();
|
||||
let pool_clone2 = pool.clone();
|
||||
|
||||
tokio::spawn(GeminiExecutor::stream_gemini_with_lines(
|
||||
stdout,
|
||||
pool_clone1,
|
||||
attempt_id,
|
||||
execution_process_id,
|
||||
true,
|
||||
));
|
||||
tokio::spawn(GeminiExecutor::stream_gemini_with_lines(
|
||||
stderr,
|
||||
pool_clone2,
|
||||
attempt_id,
|
||||
execution_process_id,
|
||||
false,
|
||||
));
|
||||
|
||||
Ok(child)
|
||||
fn normalize_logs(&self, logs: &str) -> Result<NormalizedConversation, String> {
|
||||
// Reuse the same logic as the main GeminiExecutor
|
||||
let main_executor = GeminiExecutor;
|
||||
main_executor.normalize_logs(logs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
app_state::AppState,
|
||||
executor::{ExecutorConfig, NormalizedConversation},
|
||||
executor::{ExecutorConfig, NormalizedConversation, NormalizedEntry, NormalizedEntryType},
|
||||
models::{
|
||||
config::Config,
|
||||
execution_process::{ExecutionProcess, ExecutionProcessStatus, ExecutionProcessSummary},
|
||||
@@ -1078,73 +1078,6 @@ pub async fn get_execution_process_normalized_logs(
|
||||
}
|
||||
};
|
||||
|
||||
// Get logs from the execution process
|
||||
let logs = match process.stdout {
|
||||
Some(stdout) => stdout,
|
||||
None => {
|
||||
// If the process is still running, return empty logs instead of an error
|
||||
if process.status == ExecutionProcessStatus::Running {
|
||||
// Get executor session data for this execution process
|
||||
let executor_session = match ExecutorSession::find_by_execution_process_id(
|
||||
&app_state.db_pool,
|
||||
process_id,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(session) => session,
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to fetch executor session for process {}: {}",
|
||||
process_id,
|
||||
e
|
||||
);
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
return Ok(ResponseJson(ApiResponse {
|
||||
success: true,
|
||||
data: Some(NormalizedConversation {
|
||||
entries: vec![],
|
||||
session_id: None,
|
||||
executor_type: process
|
||||
.executor_type
|
||||
.clone()
|
||||
.unwrap_or("unknown".to_string()),
|
||||
prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()),
|
||||
summary: executor_session.as_ref().and_then(|s| s.summary.clone()),
|
||||
}),
|
||||
message: None,
|
||||
}));
|
||||
}
|
||||
|
||||
return Ok(ResponseJson(ApiResponse {
|
||||
success: false,
|
||||
data: None,
|
||||
message: Some("No logs available for this execution process".to_string()),
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
// Determine executor type and create appropriate executor for normalization
|
||||
let executor_type = process.executor_type.as_deref().unwrap_or("unknown");
|
||||
let executor_config = match executor_type {
|
||||
"amp" => ExecutorConfig::Amp,
|
||||
"claude" => ExecutorConfig::Claude,
|
||||
"echo" => ExecutorConfig::Echo,
|
||||
"gemini" => ExecutorConfig::Gemini,
|
||||
"opencode" => ExecutorConfig::Opencode,
|
||||
_ => {
|
||||
return Ok(ResponseJson(ApiResponse {
|
||||
success: false,
|
||||
data: None,
|
||||
message: Some(format!("Unsupported executor type: {}", executor_type)),
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
let executor = executor_config.create_executor();
|
||||
|
||||
// Get executor session data for this execution process
|
||||
let executor_session =
|
||||
match ExecutorSession::find_by_execution_process_id(&app_state.db_pool, process_id).await {
|
||||
@@ -1159,30 +1092,150 @@ pub async fn get_execution_process_normalized_logs(
|
||||
}
|
||||
};
|
||||
|
||||
// Normalize the logs
|
||||
match executor.normalize_logs(&logs) {
|
||||
Ok(mut normalized) => {
|
||||
// Add prompt and summary from executor session
|
||||
if let Some(session) = executor_session {
|
||||
normalized.prompt = session.prompt;
|
||||
normalized.summary = session.summary;
|
||||
}
|
||||
// Handle the case where no logs are available
|
||||
let has_stdout =
|
||||
process.stdout.is_some() && !process.stdout.as_ref().unwrap().trim().is_empty();
|
||||
let has_stderr =
|
||||
process.stderr.is_some() && !process.stderr.as_ref().unwrap().trim().is_empty();
|
||||
|
||||
Ok(ResponseJson(ApiResponse {
|
||||
success: true,
|
||||
data: Some(normalized),
|
||||
message: None,
|
||||
}))
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to normalize logs for process {}: {}", process_id, e);
|
||||
Ok(ResponseJson(ApiResponse {
|
||||
success: false,
|
||||
data: None,
|
||||
message: Some(format!("Failed to normalize logs: {}", e)),
|
||||
}))
|
||||
// If the process is still running and has no stdout/stderr, return empty logs
|
||||
if process.status == ExecutionProcessStatus::Running && !has_stdout && !has_stderr {
|
||||
return Ok(ResponseJson(ApiResponse {
|
||||
success: true,
|
||||
data: Some(NormalizedConversation {
|
||||
entries: vec![],
|
||||
session_id: None,
|
||||
executor_type: process
|
||||
.executor_type
|
||||
.clone()
|
||||
.unwrap_or("unknown".to_string()),
|
||||
prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()),
|
||||
summary: executor_session.as_ref().and_then(|s| s.summary.clone()),
|
||||
}),
|
||||
message: None,
|
||||
}));
|
||||
}
|
||||
|
||||
// If process is completed but has no logs, return appropriate error
|
||||
if process.status != ExecutionProcessStatus::Running && !has_stdout && !has_stderr {
|
||||
return Ok(ResponseJson(ApiResponse {
|
||||
success: false,
|
||||
data: None,
|
||||
message: Some("No logs available for this execution process".to_string()),
|
||||
}));
|
||||
}
|
||||
|
||||
// Parse stdout as JSONL using executor normalization
|
||||
let mut stdout_entries = Vec::new();
|
||||
if let Some(stdout) = &process.stdout {
|
||||
if !stdout.trim().is_empty() {
|
||||
// Determine executor type and create appropriate executor for normalization
|
||||
let executor_type = process.executor_type.as_deref().unwrap_or("unknown");
|
||||
let executor_config = match executor_type {
|
||||
"amp" => ExecutorConfig::Amp,
|
||||
"claude" => ExecutorConfig::Claude,
|
||||
"echo" => ExecutorConfig::Echo,
|
||||
"gemini" => ExecutorConfig::Gemini,
|
||||
"opencode" => ExecutorConfig::Opencode,
|
||||
_ => {
|
||||
tracing::warn!(
|
||||
"Unsupported executor type: {}, cannot normalize logs properly",
|
||||
executor_type
|
||||
);
|
||||
return Ok(ResponseJson(ApiResponse {
|
||||
success: false,
|
||||
data: None,
|
||||
message: Some(format!("Unsupported executor type: {}", executor_type)),
|
||||
}));
|
||||
}
|
||||
};
|
||||
|
||||
let executor = executor_config.create_executor();
|
||||
|
||||
// Normalize stdout logs with error handling
|
||||
match executor.normalize_logs(stdout) {
|
||||
Ok(normalized) => {
|
||||
stdout_entries = normalized.entries;
|
||||
tracing::debug!(
|
||||
"Successfully normalized {} stdout entries for process {}",
|
||||
stdout_entries.len(),
|
||||
process_id
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to normalize stdout for process {}: {}",
|
||||
process_id,
|
||||
e
|
||||
);
|
||||
return Ok(ResponseJson(ApiResponse {
|
||||
success: false,
|
||||
data: None,
|
||||
message: Some(format!("Failed to normalize logs: {}", e)),
|
||||
}));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Parse stderr chunks separated by boundary markers
|
||||
let mut stderr_entries = Vec::new();
|
||||
if let Some(stderr) = &process.stderr {
|
||||
let trimmed = stderr.trim();
|
||||
if !trimmed.is_empty() {
|
||||
// Split stderr by chunk boundaries and create separate error messages
|
||||
let chunks: Vec<&str> = trimmed.split("---STDERR_CHUNK_BOUNDARY---").collect();
|
||||
|
||||
for chunk in chunks {
|
||||
let chunk_trimmed = chunk.trim();
|
||||
if !chunk_trimmed.is_empty() {
|
||||
stderr_entries.push(NormalizedEntry {
|
||||
timestamp: Some(chrono::Utc::now().to_rfc3339()),
|
||||
entry_type: NormalizedEntryType::ErrorMessage,
|
||||
content: chunk_trimmed.to_string(),
|
||||
metadata: None,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
"Processed stderr content into {} error messages for process {}",
|
||||
stderr_entries.len(),
|
||||
process_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Merge stdout and stderr entries chronologically
|
||||
let mut all_entries = Vec::new();
|
||||
all_entries.extend(stdout_entries);
|
||||
all_entries.extend(stderr_entries);
|
||||
|
||||
// Sort by timestamp (entries without timestamps go to the end)
|
||||
all_entries.sort_by(|a, b| match (&a.timestamp, &b.timestamp) {
|
||||
(Some(a_ts), Some(b_ts)) => a_ts.cmp(b_ts),
|
||||
(Some(_), None) => std::cmp::Ordering::Less,
|
||||
(None, Some(_)) => std::cmp::Ordering::Greater,
|
||||
(None, None) => std::cmp::Ordering::Equal,
|
||||
});
|
||||
|
||||
// Create final normalized conversation
|
||||
let normalized_conversation = NormalizedConversation {
|
||||
entries: all_entries,
|
||||
session_id: None,
|
||||
executor_type: process
|
||||
.executor_type
|
||||
.clone()
|
||||
.unwrap_or("unknown".to_string()),
|
||||
prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()),
|
||||
summary: executor_session.as_ref().and_then(|s| s.summary.clone()),
|
||||
};
|
||||
|
||||
Ok(ResponseJson(ApiResponse {
|
||||
success: true,
|
||||
data: Some(normalized_conversation),
|
||||
message: None,
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn task_attempts_router() -> Router<AppState> {
|
||||
|
||||
@@ -11,6 +11,9 @@ import {
|
||||
Settings,
|
||||
Brain,
|
||||
Hammer,
|
||||
AlertCircle,
|
||||
ChevronRight,
|
||||
ChevronUp,
|
||||
} from 'lucide-react';
|
||||
import { makeRequest } from '@/lib/api';
|
||||
import type {
|
||||
@@ -39,6 +42,9 @@ const getEntryIcon = (entryType: NormalizedEntryType) => {
|
||||
if (entryType.type === 'thinking') {
|
||||
return <Brain className="h-4 w-4 text-purple-600" />;
|
||||
}
|
||||
if (entryType.type === 'error_message') {
|
||||
return <AlertCircle className="h-4 w-4 text-red-600" />;
|
||||
}
|
||||
if (entryType.type === 'tool_use') {
|
||||
const { action_type } = entryType;
|
||||
if (action_type.action === 'file_read') {
|
||||
@@ -74,6 +80,10 @@ const getContentClassName = (entryType: NormalizedEntryType) => {
|
||||
return `${baseClasses} font-mono`;
|
||||
}
|
||||
|
||||
if (entryType.type === 'error_message') {
|
||||
return `${baseClasses} text-red-600 font-mono bg-red-50 dark:bg-red-950/20 px-2 py-1 rounded`;
|
||||
}
|
||||
|
||||
return baseClasses;
|
||||
};
|
||||
|
||||
@@ -86,6 +96,19 @@ export function NormalizedConversationViewer({
|
||||
useState<NormalizedConversation | null>(null);
|
||||
const [loading, setLoading] = useState(true);
|
||||
const [error, setError] = useState<string | null>(null);
|
||||
const [expandedErrors, setExpandedErrors] = useState<Set<number>>(new Set());
|
||||
|
||||
const toggleErrorExpansion = (index: number) => {
|
||||
setExpandedErrors((prev) => {
|
||||
const newSet = new Set(prev);
|
||||
if (newSet.has(index)) {
|
||||
newSet.delete(index);
|
||||
} else {
|
||||
newSet.add(index);
|
||||
}
|
||||
return newSet;
|
||||
});
|
||||
};
|
||||
|
||||
const fetchNormalizedLogs = useCallback(
|
||||
async (isPolling = false) => {
|
||||
@@ -203,18 +226,64 @@ export function NormalizedConversationViewer({
|
||||
|
||||
{/* Display conversation entries */}
|
||||
<div className="space-y-2">
|
||||
{conversation.entries.map((entry, index) => (
|
||||
<div key={index} className="flex items-start gap-3">
|
||||
<div className="flex-shrink-0 mt-1">
|
||||
{getEntryIcon(entry.entry_type)}
|
||||
</div>
|
||||
<div className="flex-1 min-w-0">
|
||||
<div className={getContentClassName(entry.entry_type)}>
|
||||
{entry.content}
|
||||
{conversation.entries.map((entry, index) => {
|
||||
const isErrorMessage = entry.entry_type.type === 'error_message';
|
||||
const isExpanded = expandedErrors.has(index);
|
||||
const hasMultipleLines =
|
||||
isErrorMessage && entry.content.includes('\n');
|
||||
|
||||
return (
|
||||
<div key={index} className="flex items-start gap-3">
|
||||
<div className="flex-shrink-0 mt-1">
|
||||
{isErrorMessage && hasMultipleLines ? (
|
||||
<button
|
||||
onClick={() => toggleErrorExpansion(index)}
|
||||
className="transition-colors hover:opacity-70"
|
||||
>
|
||||
{getEntryIcon(entry.entry_type)}
|
||||
</button>
|
||||
) : (
|
||||
getEntryIcon(entry.entry_type)
|
||||
)}
|
||||
</div>
|
||||
<div className="flex-1 min-w-0">
|
||||
{isErrorMessage && hasMultipleLines ? (
|
||||
<div className={isExpanded ? 'space-y-2' : ''}>
|
||||
<div className={getContentClassName(entry.entry_type)}>
|
||||
{isExpanded ? (
|
||||
entry.content
|
||||
) : (
|
||||
<>
|
||||
{entry.content.split('\n')[0]}
|
||||
<button
|
||||
onClick={() => toggleErrorExpansion(index)}
|
||||
className="ml-2 inline-flex items-center gap-1 text-xs text-red-600 hover:text-red-700 dark:text-red-400 dark:hover:text-red-300 transition-colors"
|
||||
>
|
||||
<ChevronRight className="h-3 w-3" />
|
||||
Show more
|
||||
</button>
|
||||
</>
|
||||
)}
|
||||
</div>
|
||||
{isExpanded && (
|
||||
<button
|
||||
onClick={() => toggleErrorExpansion(index)}
|
||||
className="flex items-center gap-1 text-xs text-red-600 hover:text-red-700 dark:text-red-400 dark:hover:text-red-300 transition-colors"
|
||||
>
|
||||
<ChevronUp className="h-3 w-3" />
|
||||
Show less
|
||||
</button>
|
||||
)}
|
||||
</div>
|
||||
) : (
|
||||
<div className={getContentClassName(entry.entry_type)}>
|
||||
{entry.content}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
))}
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
|
||||
@@ -578,7 +578,7 @@ export function TaskDetailsPanel({
|
||||
);
|
||||
}
|
||||
|
||||
// When setup failed, show error message and stderr
|
||||
// When setup failed, show error message and conversation
|
||||
if (isSetupFailed) {
|
||||
const setupProcess = executionState.setup_process_id
|
||||
? attemptData.runningProcessDetails[executionState.setup_process_id]
|
||||
@@ -598,20 +598,17 @@ export function TaskDetailsPanel({
|
||||
</div>
|
||||
|
||||
{setupProcess && (
|
||||
<div className="font-mono text-sm whitespace-pre-wrap text-muted-foreground">
|
||||
{(() => {
|
||||
const stderr = setupProcess.stderr || '';
|
||||
const stdout = setupProcess.stdout || '';
|
||||
const combined = [stderr, stdout].filter(Boolean).join('\n');
|
||||
return combined || 'No error output available';
|
||||
})()}
|
||||
</div>
|
||||
<NormalizedConversationViewer
|
||||
executionProcess={setupProcess}
|
||||
projectId={projectId}
|
||||
onConversationUpdate={handleConversationUpdate}
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
// When coding agent failed, show error message and stderr
|
||||
// When coding agent failed, show error message and conversation
|
||||
if (isCodingAgentFailed) {
|
||||
const codingAgentProcess = executionState.coding_agent_process_id
|
||||
? attemptData.runningProcessDetails[
|
||||
@@ -633,14 +630,11 @@ export function TaskDetailsPanel({
|
||||
</div>
|
||||
|
||||
{codingAgentProcess && (
|
||||
<div className="font-mono text-sm whitespace-pre-wrap text-muted-foreground">
|
||||
{(() => {
|
||||
const stderr = codingAgentProcess.stderr || '';
|
||||
const stdout = codingAgentProcess.stdout || '';
|
||||
const combined = [stderr, stdout].filter(Boolean).join('\n');
|
||||
return combined || 'No error output available';
|
||||
})()}
|
||||
</div>
|
||||
<NormalizedConversationViewer
|
||||
executionProcess={codingAgentProcess}
|
||||
projectId={projectId}
|
||||
onConversationUpdate={handleConversationUpdate}
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
|
||||
4
package-lock.json
generated
4
package-lock.json
generated
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "vibe-kanban",
|
||||
"version": "0.0.32",
|
||||
"version": "0.0.37-ersion.3",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "vibe-kanban",
|
||||
"version": "0.0.32",
|
||||
"version": "0.0.37-ersion.3",
|
||||
"devDependencies": {
|
||||
"concurrently": "^8.2.2",
|
||||
"vite": "^6.3.5"
|
||||
|
||||
@@ -108,7 +108,7 @@ export type NormalizedConversation = { entries: Array<NormalizedEntry>, session_
|
||||
|
||||
export type NormalizedEntry = { timestamp: string | null, entry_type: NormalizedEntryType, content: string, };
|
||||
|
||||
export type NormalizedEntryType = { "type": "user_message" } | { "type": "assistant_message" } | { "type": "tool_use", tool_name: string, action_type: ActionType, } | { "type": "system_message" } | { "type": "thinking" };
|
||||
export type NormalizedEntryType = { "type": "user_message" } | { "type": "assistant_message" } | { "type": "tool_use", tool_name: string, action_type: ActionType, } | { "type": "system_message" } | { "type": "error_message" } | { "type": "thinking" };
|
||||
|
||||
export type ActionType = { "action": "file_read", path: string, } | { "action": "file_write", path: string, } | { "action": "command_run", command: string, } | { "action": "search", query: string, } | { "action": "web_fetch", url: string, } | { "action": "task_create", description: string, } | { "action": "other", description: string, };
|
||||
|
||||
|
||||
Reference in New Issue
Block a user