diff --git a/backend/src/executor.rs b/backend/src/executor.rs index 37592c2f..22cc2e29 100644 --- a/backend/src/executor.rs +++ b/backend/src/executor.rs @@ -44,6 +44,7 @@ pub enum NormalizedEntryType { action_type: ActionType, }, SystemMessage, + ErrorMessage, Thinking, } @@ -400,6 +401,20 @@ pub async fn stream_output_to_db( attempt_id: Uuid, execution_process_id: Uuid, is_stdout: bool, +) { + if is_stdout { + stream_stdout_to_db(output, pool, attempt_id, execution_process_id).await; + } else { + stream_stderr_to_db(output, pool, attempt_id, execution_process_id).await; + } +} + +/// Stream stdout from a child process to the database (immediate updates) +async fn stream_stdout_to_db( + output: impl tokio::io::AsyncRead + Unpin, + pool: sqlx::SqlitePool, + attempt_id: Uuid, + execution_process_id: Uuid, ) { use crate::models::{execution_process::ExecutionProcess, executor_session::ExecutorSession}; @@ -414,8 +429,8 @@ pub async fn stream_output_to_db( match reader.read_line(&mut line).await { Ok(0) => break, // EOF Ok(_) => { - // Parse session ID from the first JSONL line (stdout only) - if is_stdout && !session_id_parsed { + // Parse session ID from the first JSONL line + if !session_id_parsed { if let Some(external_session_id) = parse_session_id_from_line(&line) { if let Err(e) = ExecutorSession::update_session_id( &pool, @@ -448,22 +463,13 @@ pub async fn stream_output_to_db( if let Err(e) = ExecutionProcess::append_output( &pool, execution_process_id, - if is_stdout { - Some(&accumulated_output) - } else { - None - }, - if !is_stdout { - Some(&accumulated_output) - } else { - None - }, + Some(&accumulated_output), + None, ) .await { tracing::error!( - "Failed to update {} for attempt {}: {}", - if is_stdout { "stdout" } else { "stderr" }, + "Failed to update stdout for attempt {}: {}", attempt_id, e ); @@ -473,12 +479,7 @@ pub async fn stream_output_to_db( } } Err(e) => { - tracing::error!( - "Error reading {} for attempt {}: {}", - if is_stdout { "stdout" } else { "stderr" }, - attempt_id, - e - ); + tracing::error!("Error reading stdout for attempt {}: {}", attempt_id, e); break; } } @@ -489,29 +490,110 @@ pub async fn stream_output_to_db( if let Err(e) = ExecutionProcess::append_output( &pool, execution_process_id, - if is_stdout { - Some(&accumulated_output) - } else { - None - }, - if !is_stdout { - Some(&accumulated_output) - } else { - None - }, + Some(&accumulated_output), + None, ) .await { - tracing::error!( - "Failed to flush {} for attempt {}: {}", - if is_stdout { "stdout" } else { "stderr" }, - attempt_id, - e - ); + tracing::error!("Failed to flush stdout for attempt {}: {}", attempt_id, e); } } } +/// Stream stderr from a child process to the database (buffered with timeout) +async fn stream_stderr_to_db( + output: impl tokio::io::AsyncRead + Unpin, + pool: sqlx::SqlitePool, + attempt_id: Uuid, + execution_process_id: Uuid, +) { + use tokio::time::{timeout, Duration}; + + let mut reader = BufReader::new(output); + let mut line = String::new(); + let mut accumulated_output = String::new(); + const STDERR_FLUSH_TIMEOUT: Duration = Duration::from_millis(1000); // 1000ms timeout + + loop { + line.clear(); + + // Try to read a line with a timeout + let read_result = timeout(STDERR_FLUSH_TIMEOUT, reader.read_line(&mut line)).await; + + match read_result { + Ok(Ok(0)) => { + // EOF - flush remaining output and break + break; + } + Ok(Ok(_)) => { + // Successfully read a line - just accumulate it + accumulated_output.push_str(&line); + } + Ok(Err(e)) => { + tracing::error!("Error reading stderr for attempt {}: {}", attempt_id, e); + break; + } + Err(_) => { + // Timeout occurred - flush accumulated output if any + if !accumulated_output.is_empty() { + flush_stderr_chunk( + &pool, + execution_process_id, + &accumulated_output, + attempt_id, + ) + .await; + accumulated_output.clear(); + } + } + } + } + + // Final flush for any remaining output + if !accumulated_output.is_empty() { + flush_stderr_chunk(&pool, execution_process_id, &accumulated_output, attempt_id).await; + } +} + +/// Flush a chunk of stderr output to the database +async fn flush_stderr_chunk( + pool: &sqlx::SqlitePool, + execution_process_id: Uuid, + content: &str, + attempt_id: Uuid, +) { + use crate::models::execution_process::ExecutionProcess; + + let trimmed = content.trim(); + if trimmed.is_empty() { + return; + } + + // Add a delimiter to separate chunks in the database + let chunk_with_delimiter = format!("{}\n---STDERR_CHUNK_BOUNDARY---\n", trimmed); + + if let Err(e) = ExecutionProcess::append_output( + pool, + execution_process_id, + None, + Some(&chunk_with_delimiter), + ) + .await + { + tracing::error!( + "Failed to flush stderr chunk for attempt {}: {}", + attempt_id, + e + ); + } else { + tracing::debug!( + "Flushed stderr chunk ({} chars) for process {}", + trimmed.len(), + execution_process_id + ); + } +} + /// Parse assistant message from executor logs (JSONL format) pub fn parse_assistant_message_from_logs(logs: &str) -> Option { use serde_json::Value; diff --git a/backend/src/executors/gemini.rs b/backend/src/executors/gemini.rs index 2e35824c..9aed2d3a 100644 --- a/backend/src/executors/gemini.rs +++ b/backend/src/executors/gemini.rs @@ -1,4 +1,4 @@ -use std::{collections::VecDeque, process::Stdio}; +use std::{collections::VecDeque, process::Stdio, time::Instant}; use async_trait::async_trait; use command_group::{AsyncCommandGroup, AsyncGroupChild}; @@ -6,7 +6,9 @@ use tokio::{io::AsyncWriteExt, process::Command}; use uuid::Uuid; use crate::{ - executor::{Executor, ExecutorError}, + executor::{ + Executor, ExecutorError, NormalizedConversation, NormalizedEntry, NormalizedEntryType, + }, models::{execution_process::ExecutionProcess, task::Task}, utils::shell::get_shell_command, }; @@ -72,6 +74,11 @@ impl Executor for GeminiExecutor { // Write prompt to stdin if let Some(mut stdin) = child.inner().stdin.take() { + tracing::debug!( + "Writing prompt to Gemini stdin for task {}: {:?}", + task_id, + prompt + ); stdin.write_all(prompt.as_bytes()).await.map_err(|e| { let context = crate::executor::SpawnContext::from_command(&command, "Gemini") .with_task(task_id, Some(task.title.clone())) @@ -84,6 +91,10 @@ impl Executor for GeminiExecutor { .with_context("Failed to close Gemini CLI stdin"); ExecutorError::spawn_failed(e, context) })?; + tracing::info!( + "Successfully sent prompt to Gemini stdin for task {}", + task_id + ); } Ok(child) @@ -97,8 +108,20 @@ impl Executor for GeminiExecutor { execution_process_id: Uuid, worktree_path: &str, ) -> Result { + tracing::info!( + "Starting Gemini execution for task {} attempt {}", + task_id, + attempt_id + ); + let mut child = self.spawn(pool, task_id, worktree_path).await?; + tracing::info!( + "Gemini process spawned successfully for attempt {}, PID: {:?}", + attempt_id, + child.inner().id() + ); + // Take stdout and stderr pipes for streaming let stdout = child .inner() @@ -122,7 +145,8 @@ impl Executor for GeminiExecutor { execution_process_id, true, )); - tokio::spawn(Self::stream_gemini_with_lines( + // Use default stderr streaming (no custom parsing) + tokio::spawn(crate::executor::stream_output_to_db( stderr, pool_clone2, attempt_id, @@ -132,6 +156,75 @@ impl Executor for GeminiExecutor { Ok(child) } + + fn normalize_logs(&self, logs: &str) -> Result { + let mut entries: Vec = Vec::new(); + let mut parse_errors = Vec::new(); + + for (line_num, line) in logs.lines().enumerate() { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + match serde_json::from_str::(trimmed) { + Ok(entry) => { + entries.push(entry); + } + Err(e) => { + tracing::warn!( + "Failed to parse JSONL line {} in Gemini logs: {} - Line: {}", + line_num + 1, + e, + trimmed + ); + parse_errors.push(format!("Line {}: {}", line_num + 1, e)); + + // If this is clearly a JSONL line (starts with {), create a fallback entry + if trimmed.starts_with('{') { + let fallback_entry = NormalizedEntry { + timestamp: Some(chrono::Utc::now().to_rfc3339()), + entry_type: NormalizedEntryType::SystemMessage, + content: format!("Parse error for JSONL line: {}", trimmed), + metadata: None, + }; + entries.push(fallback_entry); + } else { + // For non-JSON lines, treat as plain text content + let text_entry = NormalizedEntry { + timestamp: Some(chrono::Utc::now().to_rfc3339()), + entry_type: NormalizedEntryType::AssistantMessage, + content: trimmed.to_string(), + metadata: None, + }; + entries.push(text_entry); + } + } + } + } + + if !parse_errors.is_empty() { + tracing::warn!( + "Gemini normalize_logs encountered {} parse errors: {}", + parse_errors.len(), + parse_errors.join("; ") + ); + } + + tracing::debug!( + "Gemini normalize_logs processed {} lines, created {} entries", + logs.lines().count(), + entries.len() + ); + + Ok(NormalizedConversation { + entries, + session_id: None, // session_id is not available in the current Gemini implementation + executor_type: "gemini".to_string(), + prompt: None, + summary: None, + }) + } } impl GeminiExecutor { @@ -161,7 +254,7 @@ impl GeminiExecutor { } let mut reader = BufReader::new(output); - let mut message_index = 0; + let mut last_emit_time = Instant::now(); let mut full_raw_output = String::new(); let mut segment_queue: VecDeque = VecDeque::new(); let mut incomplete_line_buffer = String::new(); @@ -176,14 +269,18 @@ impl GeminiExecutor { // First, drain any pending segments from the queue while let Some(segment_content) = segment_queue.pop_front() { if !segment_content.trim().is_empty() { - Self::emit_jsonl_message( + tracing::debug!( + "Emitting segment for attempt {}: {:?}", + attempt_id, + segment_content + ); + Self::emit_normalized_message( &pool, execution_process_id, - message_index, &segment_content, + &mut last_emit_time, ) .await; - message_index += 1; } } @@ -191,10 +288,20 @@ impl GeminiExecutor { match reader.read(&mut buffer).await { Ok(0) => { // EOF - process any remaining content + tracing::info!( + "Gemini stdout reached EOF for attempt {}, processing final content", + attempt_id + ); break; } Ok(n) => { let chunk_str = String::from_utf8_lossy(&buffer[..n]); + tracing::debug!( + "Gemini stdout chunk received for attempt {} ({} bytes): {:?}", + attempt_id, + n, + chunk_str + ); full_raw_output.push_str(&chunk_str); // Process the chunk and add segments to queue @@ -202,10 +309,16 @@ impl GeminiExecutor { &chunk_str, &mut segment_queue, &mut incomplete_line_buffer, + &mut last_emit_time, ); } - Err(_) => { - // Error - break and let queue drain on next iteration + Err(e) => { + // Error - log the error and break + tracing::error!( + "Error reading stdout for Gemini attempt {}: {}", + attempt_id, + e + ); break; } } @@ -213,7 +326,8 @@ impl GeminiExecutor { // Process any remaining incomplete line at EOF if !incomplete_line_buffer.is_empty() { - let segments = Self::split_by_pattern_breaks(&incomplete_line_buffer); + let segments = + Self::split_by_pattern_breaks(&incomplete_line_buffer, &mut last_emit_time); for segment in segments.iter() { if !segment.trim().is_empty() { segment_queue.push_back(segment.to_string()); @@ -222,31 +336,36 @@ impl GeminiExecutor { } // Final drain of any remaining segments + tracing::info!( + "Final drain - {} segments remaining for attempt {}", + segment_queue.len(), + attempt_id + ); while let Some(segment_content) = segment_queue.pop_front() { if !segment_content.trim().is_empty() { - Self::emit_jsonl_message( + tracing::debug!( + "Final drain segment for attempt {}: {:?}", + attempt_id, + segment_content + ); + Self::emit_normalized_message( &pool, execution_process_id, - message_index, &segment_content, + &mut last_emit_time, ) .await; - message_index += 1; } } - // After the loop, store the full raw output in stderr for the "raw" view - if !full_raw_output.is_empty() { - if let Err(e) = - ExecutionProcess::append_stderr(&pool, execution_process_id, &full_raw_output).await - { - tracing::error!( - "Failed to store full raw output for attempt {}: {}", - attempt_id, - e - ); - } - } + // Note: We don't store the full raw output in stderr anymore since we're already + // processing it into normalized stdout messages. Storing it in stderr would cause + // the normalization route to treat it as error messages. + tracing::info!( + "Gemini processing complete for attempt {} ({} bytes processed)", + attempt_id, + full_raw_output.len() + ); tracing::info!( "Gemini line-based stdout streaming ended for attempt {}", @@ -259,6 +378,7 @@ impl GeminiExecutor { chunk: &str, queue: &mut VecDeque, incomplete_line_buffer: &mut String, + last_emit_time: &mut Instant, ) { // Combine any incomplete line from previous chunk with current chunk let text_to_process = incomplete_line_buffer.clone() + chunk; @@ -277,7 +397,7 @@ impl GeminiExecutor { // This is a complete line - process it if !line.is_empty() { // Check for pattern breaks within the line - let segments = Self::split_by_pattern_breaks(line); + let segments = Self::split_by_pattern_breaks(line, last_emit_time); for segment in segments.iter() { if !segment.trim().is_empty() { @@ -295,7 +415,7 @@ impl GeminiExecutor { } /// Split text by pattern breaks (period + capital letter) - fn split_by_pattern_breaks(text: &str) -> Vec { + fn split_by_pattern_breaks(text: &str, last_emit_time: &mut Instant) -> Vec { let mut segments = Vec::new(); let mut current_segment = String::new(); let mut chars = text.chars().peekable(); @@ -303,10 +423,14 @@ impl GeminiExecutor { while let Some(ch) = chars.next() { current_segment.push(ch); - // Check for pattern break: period followed by capital letter + // Check for pattern break: period followed by capital letter or space if ch == '.' { if let Some(&next_ch) = chars.peek() { - if next_ch.is_uppercase() && next_ch.is_alphabetic() { + let is_capital = next_ch.is_uppercase() && next_ch.is_alphabetic(); + let is_space = next_ch.is_whitespace(); + let should_force_break = is_space && last_emit_time.elapsed().as_secs() > 5; + + if is_capital || should_force_break { // Pattern break detected - current segment ends here segments.push(current_segment.clone()); current_segment.clear(); @@ -328,49 +452,51 @@ impl GeminiExecutor { segments } - /// Emits a JSONL message to the database stdout stream. - async fn emit_jsonl_message( + /// Emits a normalized message to the database stdout stream. + async fn emit_normalized_message( pool: &sqlx::SqlitePool, execution_process_id: Uuid, - message_index: u32, content: &str, + last_emit_time: &mut Instant, ) { if content.is_empty() { return; } - // Create AMP-like format with streaming extensions for Gemini - let jsonl_message = serde_json::json!({ - "type": "messages", - "messages": [ - [ - message_index, - { - "role": "assistant", - "content": [ - { - "type": "text", - "text": content - } - ], - "meta": { - "sentAt": chrono::Utc::now().timestamp_millis() - } - } - ] - ], - "messageKey": message_index, - "isStreaming": true - }); + let entry = NormalizedEntry { + timestamp: Some(chrono::Utc::now().to_rfc3339()), + entry_type: NormalizedEntryType::AssistantMessage, + content: content.to_string(), + metadata: None, + }; - if let Ok(jsonl_line) = serde_json::to_string(&jsonl_message) { - let formatted_line = format!("{}\n", jsonl_line); + match serde_json::to_string(&entry) { + Ok(jsonl_line) => { + let formatted_line = format!("{}\n", jsonl_line); - // Store as stdout to make it available to conversation viewer - if let Err(e) = - ExecutionProcess::append_stdout(pool, execution_process_id, &formatted_line).await - { - tracing::error!("Failed to emit JSONL message: {}", e); + tracing::debug!( + "Storing normalized message to DB for execution {}: {}", + execution_process_id, + jsonl_line + ); + + // Store as stdout to make it available to conversation viewer + if let Err(e) = + ExecutionProcess::append_stdout(pool, execution_process_id, &formatted_line) + .await + { + tracing::error!("Failed to emit normalized message: {}", e); + } else { + *last_emit_time = Instant::now(); + tracing::debug!("Successfully stored normalized message to DB"); + } + } + Err(e) => { + tracing::error!( + "Failed to serialize normalized entry for content: {:?} - Error: {}", + content, + e + ); } } } @@ -437,47 +563,9 @@ impl Executor for GeminiFollowupExecutor { Ok(child) } - async fn execute_streaming( - &self, - pool: &sqlx::SqlitePool, - task_id: Uuid, - attempt_id: Uuid, - execution_process_id: Uuid, - worktree_path: &str, - ) -> Result { - let mut child = self.spawn(pool, task_id, worktree_path).await?; - - // Take stdout and stderr pipes for streaming - let stdout = child - .inner() - .stdout - .take() - .expect("Failed to take stdout from child process"); - let stderr = child - .inner() - .stderr - .take() - .expect("Failed to take stderr from child process"); - - // Start streaming tasks with Gemini-specific line-based message updates - let pool_clone1 = pool.clone(); - let pool_clone2 = pool.clone(); - - tokio::spawn(GeminiExecutor::stream_gemini_with_lines( - stdout, - pool_clone1, - attempt_id, - execution_process_id, - true, - )); - tokio::spawn(GeminiExecutor::stream_gemini_with_lines( - stderr, - pool_clone2, - attempt_id, - execution_process_id, - false, - )); - - Ok(child) + fn normalize_logs(&self, logs: &str) -> Result { + // Reuse the same logic as the main GeminiExecutor + let main_executor = GeminiExecutor; + main_executor.normalize_logs(logs) } } diff --git a/backend/src/routes/task_attempts.rs b/backend/src/routes/task_attempts.rs index d45f815c..5a1ba8bd 100644 --- a/backend/src/routes/task_attempts.rs +++ b/backend/src/routes/task_attempts.rs @@ -10,7 +10,7 @@ use uuid::Uuid; use crate::{ app_state::AppState, - executor::{ExecutorConfig, NormalizedConversation}, + executor::{ExecutorConfig, NormalizedConversation, NormalizedEntry, NormalizedEntryType}, models::{ config::Config, execution_process::{ExecutionProcess, ExecutionProcessStatus, ExecutionProcessSummary}, @@ -1078,73 +1078,6 @@ pub async fn get_execution_process_normalized_logs( } }; - // Get logs from the execution process - let logs = match process.stdout { - Some(stdout) => stdout, - None => { - // If the process is still running, return empty logs instead of an error - if process.status == ExecutionProcessStatus::Running { - // Get executor session data for this execution process - let executor_session = match ExecutorSession::find_by_execution_process_id( - &app_state.db_pool, - process_id, - ) - .await - { - Ok(session) => session, - Err(e) => { - tracing::error!( - "Failed to fetch executor session for process {}: {}", - process_id, - e - ); - None - } - }; - - return Ok(ResponseJson(ApiResponse { - success: true, - data: Some(NormalizedConversation { - entries: vec![], - session_id: None, - executor_type: process - .executor_type - .clone() - .unwrap_or("unknown".to_string()), - prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), - summary: executor_session.as_ref().and_then(|s| s.summary.clone()), - }), - message: None, - })); - } - - return Ok(ResponseJson(ApiResponse { - success: false, - data: None, - message: Some("No logs available for this execution process".to_string()), - })); - } - }; - - // Determine executor type and create appropriate executor for normalization - let executor_type = process.executor_type.as_deref().unwrap_or("unknown"); - let executor_config = match executor_type { - "amp" => ExecutorConfig::Amp, - "claude" => ExecutorConfig::Claude, - "echo" => ExecutorConfig::Echo, - "gemini" => ExecutorConfig::Gemini, - "opencode" => ExecutorConfig::Opencode, - _ => { - return Ok(ResponseJson(ApiResponse { - success: false, - data: None, - message: Some(format!("Unsupported executor type: {}", executor_type)), - })); - } - }; - - let executor = executor_config.create_executor(); - // Get executor session data for this execution process let executor_session = match ExecutorSession::find_by_execution_process_id(&app_state.db_pool, process_id).await { @@ -1159,30 +1092,150 @@ pub async fn get_execution_process_normalized_logs( } }; - // Normalize the logs - match executor.normalize_logs(&logs) { - Ok(mut normalized) => { - // Add prompt and summary from executor session - if let Some(session) = executor_session { - normalized.prompt = session.prompt; - normalized.summary = session.summary; - } + // Handle the case where no logs are available + let has_stdout = + process.stdout.is_some() && !process.stdout.as_ref().unwrap().trim().is_empty(); + let has_stderr = + process.stderr.is_some() && !process.stderr.as_ref().unwrap().trim().is_empty(); - Ok(ResponseJson(ApiResponse { - success: true, - data: Some(normalized), - message: None, - })) - } - Err(e) => { - tracing::error!("Failed to normalize logs for process {}: {}", process_id, e); - Ok(ResponseJson(ApiResponse { - success: false, - data: None, - message: Some(format!("Failed to normalize logs: {}", e)), - })) + // If the process is still running and has no stdout/stderr, return empty logs + if process.status == ExecutionProcessStatus::Running && !has_stdout && !has_stderr { + return Ok(ResponseJson(ApiResponse { + success: true, + data: Some(NormalizedConversation { + entries: vec![], + session_id: None, + executor_type: process + .executor_type + .clone() + .unwrap_or("unknown".to_string()), + prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), + summary: executor_session.as_ref().and_then(|s| s.summary.clone()), + }), + message: None, + })); + } + + // If process is completed but has no logs, return appropriate error + if process.status != ExecutionProcessStatus::Running && !has_stdout && !has_stderr { + return Ok(ResponseJson(ApiResponse { + success: false, + data: None, + message: Some("No logs available for this execution process".to_string()), + })); + } + + // Parse stdout as JSONL using executor normalization + let mut stdout_entries = Vec::new(); + if let Some(stdout) = &process.stdout { + if !stdout.trim().is_empty() { + // Determine executor type and create appropriate executor for normalization + let executor_type = process.executor_type.as_deref().unwrap_or("unknown"); + let executor_config = match executor_type { + "amp" => ExecutorConfig::Amp, + "claude" => ExecutorConfig::Claude, + "echo" => ExecutorConfig::Echo, + "gemini" => ExecutorConfig::Gemini, + "opencode" => ExecutorConfig::Opencode, + _ => { + tracing::warn!( + "Unsupported executor type: {}, cannot normalize logs properly", + executor_type + ); + return Ok(ResponseJson(ApiResponse { + success: false, + data: None, + message: Some(format!("Unsupported executor type: {}", executor_type)), + })); + } + }; + + let executor = executor_config.create_executor(); + + // Normalize stdout logs with error handling + match executor.normalize_logs(stdout) { + Ok(normalized) => { + stdout_entries = normalized.entries; + tracing::debug!( + "Successfully normalized {} stdout entries for process {}", + stdout_entries.len(), + process_id + ); + } + Err(e) => { + tracing::error!( + "Failed to normalize stdout for process {}: {}", + process_id, + e + ); + return Ok(ResponseJson(ApiResponse { + success: false, + data: None, + message: Some(format!("Failed to normalize logs: {}", e)), + })); + } + } } } + + // Parse stderr chunks separated by boundary markers + let mut stderr_entries = Vec::new(); + if let Some(stderr) = &process.stderr { + let trimmed = stderr.trim(); + if !trimmed.is_empty() { + // Split stderr by chunk boundaries and create separate error messages + let chunks: Vec<&str> = trimmed.split("---STDERR_CHUNK_BOUNDARY---").collect(); + + for chunk in chunks { + let chunk_trimmed = chunk.trim(); + if !chunk_trimmed.is_empty() { + stderr_entries.push(NormalizedEntry { + timestamp: Some(chrono::Utc::now().to_rfc3339()), + entry_type: NormalizedEntryType::ErrorMessage, + content: chunk_trimmed.to_string(), + metadata: None, + }); + } + } + + tracing::debug!( + "Processed stderr content into {} error messages for process {}", + stderr_entries.len(), + process_id + ); + } + } + + // Merge stdout and stderr entries chronologically + let mut all_entries = Vec::new(); + all_entries.extend(stdout_entries); + all_entries.extend(stderr_entries); + + // Sort by timestamp (entries without timestamps go to the end) + all_entries.sort_by(|a, b| match (&a.timestamp, &b.timestamp) { + (Some(a_ts), Some(b_ts)) => a_ts.cmp(b_ts), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + (None, None) => std::cmp::Ordering::Equal, + }); + + // Create final normalized conversation + let normalized_conversation = NormalizedConversation { + entries: all_entries, + session_id: None, + executor_type: process + .executor_type + .clone() + .unwrap_or("unknown".to_string()), + prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), + summary: executor_session.as_ref().and_then(|s| s.summary.clone()), + }; + + Ok(ResponseJson(ApiResponse { + success: true, + data: Some(normalized_conversation), + message: None, + })) } pub fn task_attempts_router() -> Router { diff --git a/frontend/src/components/tasks/NormalizedConversationViewer.tsx b/frontend/src/components/tasks/NormalizedConversationViewer.tsx index 0712cc11..d1a08619 100644 --- a/frontend/src/components/tasks/NormalizedConversationViewer.tsx +++ b/frontend/src/components/tasks/NormalizedConversationViewer.tsx @@ -11,6 +11,9 @@ import { Settings, Brain, Hammer, + AlertCircle, + ChevronRight, + ChevronUp, } from 'lucide-react'; import { makeRequest } from '@/lib/api'; import type { @@ -39,6 +42,9 @@ const getEntryIcon = (entryType: NormalizedEntryType) => { if (entryType.type === 'thinking') { return ; } + if (entryType.type === 'error_message') { + return ; + } if (entryType.type === 'tool_use') { const { action_type } = entryType; if (action_type.action === 'file_read') { @@ -74,6 +80,10 @@ const getContentClassName = (entryType: NormalizedEntryType) => { return `${baseClasses} font-mono`; } + if (entryType.type === 'error_message') { + return `${baseClasses} text-red-600 font-mono bg-red-50 dark:bg-red-950/20 px-2 py-1 rounded`; + } + return baseClasses; }; @@ -86,6 +96,19 @@ export function NormalizedConversationViewer({ useState(null); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); + const [expandedErrors, setExpandedErrors] = useState>(new Set()); + + const toggleErrorExpansion = (index: number) => { + setExpandedErrors((prev) => { + const newSet = new Set(prev); + if (newSet.has(index)) { + newSet.delete(index); + } else { + newSet.add(index); + } + return newSet; + }); + }; const fetchNormalizedLogs = useCallback( async (isPolling = false) => { @@ -203,18 +226,64 @@ export function NormalizedConversationViewer({ {/* Display conversation entries */}
- {conversation.entries.map((entry, index) => ( -
-
- {getEntryIcon(entry.entry_type)} -
-
-
- {entry.content} + {conversation.entries.map((entry, index) => { + const isErrorMessage = entry.entry_type.type === 'error_message'; + const isExpanded = expandedErrors.has(index); + const hasMultipleLines = + isErrorMessage && entry.content.includes('\n'); + + return ( +
+
+ {isErrorMessage && hasMultipleLines ? ( + + ) : ( + getEntryIcon(entry.entry_type) + )} +
+
+ {isErrorMessage && hasMultipleLines ? ( +
+
+ {isExpanded ? ( + entry.content + ) : ( + <> + {entry.content.split('\n')[0]} + + + )} +
+ {isExpanded && ( + + )} +
+ ) : ( +
+ {entry.content} +
+ )}
-
- ))} + ); + })}
); diff --git a/frontend/src/components/tasks/TaskDetailsPanel.tsx b/frontend/src/components/tasks/TaskDetailsPanel.tsx index 518c98fa..7f1a6eb1 100644 --- a/frontend/src/components/tasks/TaskDetailsPanel.tsx +++ b/frontend/src/components/tasks/TaskDetailsPanel.tsx @@ -578,7 +578,7 @@ export function TaskDetailsPanel({ ); } - // When setup failed, show error message and stderr + // When setup failed, show error message and conversation if (isSetupFailed) { const setupProcess = executionState.setup_process_id ? attemptData.runningProcessDetails[executionState.setup_process_id] @@ -598,20 +598,17 @@ export function TaskDetailsPanel({
{setupProcess && ( -
- {(() => { - const stderr = setupProcess.stderr || ''; - const stdout = setupProcess.stdout || ''; - const combined = [stderr, stdout].filter(Boolean).join('\n'); - return combined || 'No error output available'; - })()} -
+ )} ); } - // When coding agent failed, show error message and stderr + // When coding agent failed, show error message and conversation if (isCodingAgentFailed) { const codingAgentProcess = executionState.coding_agent_process_id ? attemptData.runningProcessDetails[ @@ -633,14 +630,11 @@ export function TaskDetailsPanel({ {codingAgentProcess && ( -
- {(() => { - const stderr = codingAgentProcess.stderr || ''; - const stdout = codingAgentProcess.stdout || ''; - const combined = [stderr, stdout].filter(Boolean).join('\n'); - return combined || 'No error output available'; - })()} -
+ )} ); diff --git a/package-lock.json b/package-lock.json index 113c5063..9cde8baf 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "vibe-kanban", - "version": "0.0.32", + "version": "0.0.37-ersion.3", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "vibe-kanban", - "version": "0.0.32", + "version": "0.0.37-ersion.3", "devDependencies": { "concurrently": "^8.2.2", "vite": "^6.3.5" diff --git a/shared/types.ts b/shared/types.ts index 83346977..18522e27 100644 --- a/shared/types.ts +++ b/shared/types.ts @@ -108,7 +108,7 @@ export type NormalizedConversation = { entries: Array, session_ export type NormalizedEntry = { timestamp: string | null, entry_type: NormalizedEntryType, content: string, }; -export type NormalizedEntryType = { "type": "user_message" } | { "type": "assistant_message" } | { "type": "tool_use", tool_name: string, action_type: ActionType, } | { "type": "system_message" } | { "type": "thinking" }; +export type NormalizedEntryType = { "type": "user_message" } | { "type": "assistant_message" } | { "type": "tool_use", tool_name: string, action_type: ActionType, } | { "type": "system_message" } | { "type": "error_message" } | { "type": "thinking" }; export type ActionType = { "action": "file_read", path: string, } | { "action": "file_write", path: string, } | { "action": "command_run", command: string, } | { "action": "search", query: string, } | { "action": "web_fetch", url: string, } | { "action": "task_create", description: string, } | { "action": "other", description: string, };