diff --git a/backend/src/executors/gemini.rs b/backend/src/executors/gemini.rs index 43ed87c5..ef6d446e 100644 --- a/backend/src/executors/gemini.rs +++ b/backend/src/executors/gemini.rs @@ -1,4 +1,4 @@ -use std::process::Stdio; +use std::{collections::VecDeque, process::Stdio}; use async_trait::async_trait; use command_group::{AsyncCommandGroup, AsyncGroupChild}; @@ -7,7 +7,7 @@ use uuid::Uuid; use crate::{ executor::{Executor, ExecutorError}, - models::task::Task, + models::{execution_process::ExecutionProcess, task::Task}, utils::shell::get_shell_command, }; @@ -65,8 +65,7 @@ impl Executor for GeminiExecutor { .spawn_error(e) })?; - // Send the prompt via stdin instead of command line arguments - // This avoids Windows command line parsing issues + // Write prompt to stdin if let Some(mut stdin) = child.inner().stdin.take() { stdin.write_all(prompt.as_bytes()).await.map_err(|e| { let context = crate::executor::SpawnContext::from_command(&command, "Gemini") @@ -84,6 +83,292 @@ impl Executor for GeminiExecutor { Ok(child) } + + async fn execute_streaming( + &self, + pool: &sqlx::SqlitePool, + task_id: Uuid, + attempt_id: Uuid, + execution_process_id: Uuid, + worktree_path: &str, + ) -> Result { + let mut child = self.spawn(pool, task_id, worktree_path).await?; + + // Take stdout and stderr pipes for streaming + let stdout = child + .inner() + .stdout + .take() + .expect("Failed to take stdout from child process"); + let stderr = child + .inner() + .stderr + .take() + .expect("Failed to take stderr from child process"); + + // Start streaming tasks with Gemini-specific line-based message updates + let pool_clone1 = pool.clone(); + let pool_clone2 = pool.clone(); + + tokio::spawn(Self::stream_gemini_with_lines( + stdout, + pool_clone1, + attempt_id, + execution_process_id, + true, + )); + tokio::spawn(Self::stream_gemini_with_lines( + stderr, + pool_clone2, + attempt_id, + execution_process_id, + false, + )); + + Ok(child) + } +} + +impl GeminiExecutor { + /// Stream Gemini output with real-time, line-by-line message updates using a queue-based approach. + async fn stream_gemini_with_lines( + output: impl tokio::io::AsyncRead + Unpin, + pool: sqlx::SqlitePool, + attempt_id: Uuid, + execution_process_id: Uuid, + is_stdout: bool, + ) { + use std::collections::VecDeque; + + use tokio::io::{AsyncReadExt, BufReader}; + + if !is_stdout { + // For stderr, use the default database streaming without special formatting + crate::executor::stream_output_to_db( + output, + pool, + attempt_id, + execution_process_id, + false, + ) + .await; + return; + } + + let mut reader = BufReader::new(output); + let mut message_index = 0; + let mut full_raw_output = String::new(); + let mut segment_queue: VecDeque = VecDeque::new(); + let mut incomplete_line_buffer = String::new(); + + tracing::info!( + "Starting Gemini line-based stdout streaming for attempt {}", + attempt_id + ); + + let mut buffer = [0; 1024]; // Read in chunks for performance and UTF-8 safety + loop { + // First, drain any pending segments from the queue + while let Some(segment_content) = segment_queue.pop_front() { + if !segment_content.trim().is_empty() { + Self::emit_jsonl_message( + &pool, + execution_process_id, + message_index, + &segment_content, + ) + .await; + message_index += 1; + } + } + + // Then read new content from the reader + match reader.read(&mut buffer).await { + Ok(0) => { + // EOF - process any remaining content + break; + } + Ok(n) => { + let chunk_str = String::from_utf8_lossy(&buffer[..n]); + full_raw_output.push_str(&chunk_str); + + // Process the chunk and add segments to queue + Self::process_chunk_to_queue( + &chunk_str, + &mut segment_queue, + &mut incomplete_line_buffer, + ); + } + Err(_) => { + // Error - break and let queue drain on next iteration + break; + } + } + } + + // Process any remaining incomplete line at EOF + if !incomplete_line_buffer.is_empty() { + let segments = Self::split_by_pattern_breaks(&incomplete_line_buffer); + for segment in segments.iter() { + if !segment.trim().is_empty() { + segment_queue.push_back(segment.to_string()); + } + } + } + + // Final drain of any remaining segments + while let Some(segment_content) = segment_queue.pop_front() { + if !segment_content.trim().is_empty() { + Self::emit_jsonl_message( + &pool, + execution_process_id, + message_index, + &segment_content, + ) + .await; + message_index += 1; + } + } + + // After the loop, store the full raw output in stderr for the "raw" view + if !full_raw_output.is_empty() { + if let Err(e) = + ExecutionProcess::append_stderr(&pool, execution_process_id, &full_raw_output).await + { + tracing::error!( + "Failed to store full raw output for attempt {}: {}", + attempt_id, + e + ); + } + } + + tracing::info!( + "Gemini line-based stdout streaming ended for attempt {}", + attempt_id + ); + } + + /// Process a chunk of text and add segments to the queue based on break behavior + fn process_chunk_to_queue( + chunk: &str, + queue: &mut VecDeque, + incomplete_line_buffer: &mut String, + ) { + // Combine any incomplete line from previous chunk with current chunk + let text_to_process = incomplete_line_buffer.clone() + chunk; + incomplete_line_buffer.clear(); + + // Split by newlines + let lines: Vec<&str> = text_to_process.split('\n').collect(); + + for (i, line) in lines.iter().enumerate() { + let is_last_line = i == lines.len() - 1; + + if is_last_line && !chunk.ends_with('\n') { + // This is an incomplete line - store it in the buffer for next chunk + incomplete_line_buffer.push_str(line); + } else { + // This is a complete line - process it + if !line.is_empty() { + // Check for pattern breaks within the line + let segments = Self::split_by_pattern_breaks(line); + + for segment in segments.iter() { + if !segment.trim().is_empty() { + queue.push_back(segment.to_string()); + } + } + } + + // Add newline as separate segment (except for the last line if chunk doesn't end with newline) + if !is_last_line || chunk.ends_with('\n') { + queue.push_back("\n".to_string()); + } + } + } + } + + /// Split text by pattern breaks (period + capital letter) + fn split_by_pattern_breaks(text: &str) -> Vec { + let mut segments = Vec::new(); + let mut current_segment = String::new(); + let mut chars = text.chars().peekable(); + + while let Some(ch) = chars.next() { + current_segment.push(ch); + + // Check for pattern break: period followed by capital letter + if ch == '.' { + if let Some(&next_ch) = chars.peek() { + if next_ch.is_uppercase() && next_ch.is_alphabetic() { + // Pattern break detected - current segment ends here + segments.push(current_segment.clone()); + current_segment.clear(); + } + } + } + } + + // Add the final segment if it's not empty + if !current_segment.is_empty() { + segments.push(current_segment); + } + + // If no segments were created, return the original text + if segments.is_empty() { + segments.push(text.to_string()); + } + + segments + } + + /// Emits a JSONL message to the database stdout stream. + async fn emit_jsonl_message( + pool: &sqlx::SqlitePool, + execution_process_id: Uuid, + message_index: u32, + content: &str, + ) { + if content.is_empty() { + return; + } + + // Create AMP-like format with streaming extensions for Gemini + let jsonl_message = serde_json::json!({ + "type": "messages", + "messages": [ + [ + message_index, + { + "role": "assistant", + "content": [ + { + "type": "text", + "text": content + } + ], + "meta": { + "sentAt": chrono::Utc::now().timestamp_millis() + } + } + ] + ], + "messageKey": message_index, + "isStreaming": true + }); + + if let Ok(jsonl_line) = serde_json::to_string(&jsonl_message) { + let formatted_line = format!("{}\n", jsonl_line); + + // Store as stdout to make it available to conversation viewer + if let Err(e) = + ExecutionProcess::append_stdout(pool, execution_process_id, &formatted_line).await + { + tracing::error!("Failed to emit JSONL message: {}", e); + } + } + } } #[async_trait] @@ -146,4 +431,48 @@ impl Executor for GeminiFollowupExecutor { Ok(child) } + + async fn execute_streaming( + &self, + pool: &sqlx::SqlitePool, + task_id: Uuid, + attempt_id: Uuid, + execution_process_id: Uuid, + worktree_path: &str, + ) -> Result { + let mut child = self.spawn(pool, task_id, worktree_path).await?; + + // Take stdout and stderr pipes for streaming + let stdout = child + .inner() + .stdout + .take() + .expect("Failed to take stdout from child process"); + let stderr = child + .inner() + .stderr + .take() + .expect("Failed to take stderr from child process"); + + // Start streaming tasks with Gemini-specific line-based message updates + let pool_clone1 = pool.clone(); + let pool_clone2 = pool.clone(); + + tokio::spawn(GeminiExecutor::stream_gemini_with_lines( + stdout, + pool_clone1, + attempt_id, + execution_process_id, + true, + )); + tokio::spawn(GeminiExecutor::stream_gemini_with_lines( + stderr, + pool_clone2, + attempt_id, + execution_process_id, + false, + )); + + Ok(child) + } } diff --git a/frontend/src/components/tasks/ConversationViewer.tsx b/frontend/src/components/tasks/ConversationViewer.tsx index f9a3652e..38dca6a9 100644 --- a/frontend/src/components/tasks/ConversationViewer.tsx +++ b/frontend/src/components/tasks/ConversationViewer.tsx @@ -80,6 +80,8 @@ interface JSONLLine { }> | string; }; + messageKey?: number; + isStreaming?: boolean; // Tool rejection message (string format) rejectionMessage?: string; usage?: { @@ -207,6 +209,7 @@ export function ConversationViewer({ jsonlOutput }: ConversationViewerProps) { }, [jsonlOutput]); const conversation = useMemo(() => { + const streamingMessageMap = new Map(); const items: Array<{ type: 'message' | 'tool-rejection' | 'parse-error' | 'unknown'; role?: 'user' | 'assistant'; @@ -253,14 +256,38 @@ export function ConversationViewer({ jsonlOutput }: ConversationViewerProps) { ) { // Amp format for (const [messageIndex, message] of line.messages) { - items.push({ - type: 'message', + const messageItem = { + type: 'message' as const, role: message.role, content: message.content, timestamp: message.meta?.sentAt, messageIndex, lineIndex: line._lineIndex, - }); + }; + + // Handle Gemini streaming via top-level messageKey and isStreaming + if (line.isStreaming && line.messageKey !== undefined) { + const existingMessage = streamingMessageMap.get(line.messageKey); + if (existingMessage) { + // Append new content to existing message + if ( + existingMessage.content && + existingMessage.content[0] && + messageItem.content && + messageItem.content[0] + ) { + existingMessage.content[0].text = + (existingMessage.content[0].text || '') + + (messageItem.content[0].text || ''); + existingMessage.timestamp = messageItem.timestamp; // Update timestamp + } + } else { + // First segment for this message + streamingMessageMap.set(line.messageKey, messageItem); + } + } else { + items.push(messageItem); + } } } else if ( (line.type === 'user' || @@ -342,8 +369,11 @@ export function ConversationViewer({ jsonlOutput }: ConversationViewerProps) { } } + const streamingMessages = Array.from(streamingMessageMap.values()); + const finalItems = [...items, ...streamingMessages]; + // Sort by messageIndex for messages, then by lineIndex for everything else - items.sort((a, b) => { + finalItems.sort((a, b) => { if (a.type === 'message' && b.type === 'message') { return (a.messageIndex || 0) - (b.messageIndex || 0); } @@ -351,7 +381,7 @@ export function ConversationViewer({ jsonlOutput }: ConversationViewerProps) { }); return { - items, + items: finalItems, tokenUsages, states, }; diff --git a/package-lock.json b/package-lock.json index 4d38af69..113c5063 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "vibe-kanban", - "version": "0.0.24", + "version": "0.0.32", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "vibe-kanban", - "version": "0.0.24", + "version": "0.0.32", "devDependencies": { "concurrently": "^8.2.2", "vite": "^6.3.5"