Improve Gemini-CLI progress display (#56)

This commit is contained in:
Solomon
2025-07-03 15:02:13 +01:00
committed by GitHub
parent d341ab74e4
commit 2170f86c9c
3 changed files with 370 additions and 11 deletions

View File

@@ -1,4 +1,4 @@
use std::process::Stdio;
use std::{collections::VecDeque, process::Stdio};
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
@@ -7,7 +7,7 @@ use uuid::Uuid;
use crate::{
executor::{Executor, ExecutorError},
models::task::Task,
models::{execution_process::ExecutionProcess, task::Task},
utils::shell::get_shell_command,
};
@@ -65,8 +65,7 @@ impl Executor for GeminiExecutor {
.spawn_error(e)
})?;
// Send the prompt via stdin instead of command line arguments
// This avoids Windows command line parsing issues
// Write prompt to stdin
if let Some(mut stdin) = child.inner().stdin.take() {
stdin.write_all(prompt.as_bytes()).await.map_err(|e| {
let context = crate::executor::SpawnContext::from_command(&command, "Gemini")
@@ -84,6 +83,292 @@ impl Executor for GeminiExecutor {
Ok(child)
}
async fn execute_streaming(
&self,
pool: &sqlx::SqlitePool,
task_id: Uuid,
attempt_id: Uuid,
execution_process_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
let mut child = self.spawn(pool, task_id, worktree_path).await?;
// Take stdout and stderr pipes for streaming
let stdout = child
.inner()
.stdout
.take()
.expect("Failed to take stdout from child process");
let stderr = child
.inner()
.stderr
.take()
.expect("Failed to take stderr from child process");
// Start streaming tasks with Gemini-specific line-based message updates
let pool_clone1 = pool.clone();
let pool_clone2 = pool.clone();
tokio::spawn(Self::stream_gemini_with_lines(
stdout,
pool_clone1,
attempt_id,
execution_process_id,
true,
));
tokio::spawn(Self::stream_gemini_with_lines(
stderr,
pool_clone2,
attempt_id,
execution_process_id,
false,
));
Ok(child)
}
}
impl GeminiExecutor {
/// Stream Gemini output with real-time, line-by-line message updates using a queue-based approach.
async fn stream_gemini_with_lines(
output: impl tokio::io::AsyncRead + Unpin,
pool: sqlx::SqlitePool,
attempt_id: Uuid,
execution_process_id: Uuid,
is_stdout: bool,
) {
use std::collections::VecDeque;
use tokio::io::{AsyncReadExt, BufReader};
if !is_stdout {
// For stderr, use the default database streaming without special formatting
crate::executor::stream_output_to_db(
output,
pool,
attempt_id,
execution_process_id,
false,
)
.await;
return;
}
let mut reader = BufReader::new(output);
let mut message_index = 0;
let mut full_raw_output = String::new();
let mut segment_queue: VecDeque<String> = VecDeque::new();
let mut incomplete_line_buffer = String::new();
tracing::info!(
"Starting Gemini line-based stdout streaming for attempt {}",
attempt_id
);
let mut buffer = [0; 1024]; // Read in chunks for performance and UTF-8 safety
loop {
// First, drain any pending segments from the queue
while let Some(segment_content) = segment_queue.pop_front() {
if !segment_content.trim().is_empty() {
Self::emit_jsonl_message(
&pool,
execution_process_id,
message_index,
&segment_content,
)
.await;
message_index += 1;
}
}
// Then read new content from the reader
match reader.read(&mut buffer).await {
Ok(0) => {
// EOF - process any remaining content
break;
}
Ok(n) => {
let chunk_str = String::from_utf8_lossy(&buffer[..n]);
full_raw_output.push_str(&chunk_str);
// Process the chunk and add segments to queue
Self::process_chunk_to_queue(
&chunk_str,
&mut segment_queue,
&mut incomplete_line_buffer,
);
}
Err(_) => {
// Error - break and let queue drain on next iteration
break;
}
}
}
// Process any remaining incomplete line at EOF
if !incomplete_line_buffer.is_empty() {
let segments = Self::split_by_pattern_breaks(&incomplete_line_buffer);
for segment in segments.iter() {
if !segment.trim().is_empty() {
segment_queue.push_back(segment.to_string());
}
}
}
// Final drain of any remaining segments
while let Some(segment_content) = segment_queue.pop_front() {
if !segment_content.trim().is_empty() {
Self::emit_jsonl_message(
&pool,
execution_process_id,
message_index,
&segment_content,
)
.await;
message_index += 1;
}
}
// After the loop, store the full raw output in stderr for the "raw" view
if !full_raw_output.is_empty() {
if let Err(e) =
ExecutionProcess::append_stderr(&pool, execution_process_id, &full_raw_output).await
{
tracing::error!(
"Failed to store full raw output for attempt {}: {}",
attempt_id,
e
);
}
}
tracing::info!(
"Gemini line-based stdout streaming ended for attempt {}",
attempt_id
);
}
/// Process a chunk of text and add segments to the queue based on break behavior
fn process_chunk_to_queue(
chunk: &str,
queue: &mut VecDeque<String>,
incomplete_line_buffer: &mut String,
) {
// Combine any incomplete line from previous chunk with current chunk
let text_to_process = incomplete_line_buffer.clone() + chunk;
incomplete_line_buffer.clear();
// Split by newlines
let lines: Vec<&str> = text_to_process.split('\n').collect();
for (i, line) in lines.iter().enumerate() {
let is_last_line = i == lines.len() - 1;
if is_last_line && !chunk.ends_with('\n') {
// This is an incomplete line - store it in the buffer for next chunk
incomplete_line_buffer.push_str(line);
} else {
// This is a complete line - process it
if !line.is_empty() {
// Check for pattern breaks within the line
let segments = Self::split_by_pattern_breaks(line);
for segment in segments.iter() {
if !segment.trim().is_empty() {
queue.push_back(segment.to_string());
}
}
}
// Add newline as separate segment (except for the last line if chunk doesn't end with newline)
if !is_last_line || chunk.ends_with('\n') {
queue.push_back("\n".to_string());
}
}
}
}
/// Split text by pattern breaks (period + capital letter)
fn split_by_pattern_breaks(text: &str) -> Vec<String> {
let mut segments = Vec::new();
let mut current_segment = String::new();
let mut chars = text.chars().peekable();
while let Some(ch) = chars.next() {
current_segment.push(ch);
// Check for pattern break: period followed by capital letter
if ch == '.' {
if let Some(&next_ch) = chars.peek() {
if next_ch.is_uppercase() && next_ch.is_alphabetic() {
// Pattern break detected - current segment ends here
segments.push(current_segment.clone());
current_segment.clear();
}
}
}
}
// Add the final segment if it's not empty
if !current_segment.is_empty() {
segments.push(current_segment);
}
// If no segments were created, return the original text
if segments.is_empty() {
segments.push(text.to_string());
}
segments
}
/// Emits a JSONL message to the database stdout stream.
async fn emit_jsonl_message(
pool: &sqlx::SqlitePool,
execution_process_id: Uuid,
message_index: u32,
content: &str,
) {
if content.is_empty() {
return;
}
// Create AMP-like format with streaming extensions for Gemini
let jsonl_message = serde_json::json!({
"type": "messages",
"messages": [
[
message_index,
{
"role": "assistant",
"content": [
{
"type": "text",
"text": content
}
],
"meta": {
"sentAt": chrono::Utc::now().timestamp_millis()
}
}
]
],
"messageKey": message_index,
"isStreaming": true
});
if let Ok(jsonl_line) = serde_json::to_string(&jsonl_message) {
let formatted_line = format!("{}\n", jsonl_line);
// Store as stdout to make it available to conversation viewer
if let Err(e) =
ExecutionProcess::append_stdout(pool, execution_process_id, &formatted_line).await
{
tracing::error!("Failed to emit JSONL message: {}", e);
}
}
}
}
#[async_trait]
@@ -146,4 +431,48 @@ impl Executor for GeminiFollowupExecutor {
Ok(child)
}
async fn execute_streaming(
&self,
pool: &sqlx::SqlitePool,
task_id: Uuid,
attempt_id: Uuid,
execution_process_id: Uuid,
worktree_path: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
let mut child = self.spawn(pool, task_id, worktree_path).await?;
// Take stdout and stderr pipes for streaming
let stdout = child
.inner()
.stdout
.take()
.expect("Failed to take stdout from child process");
let stderr = child
.inner()
.stderr
.take()
.expect("Failed to take stderr from child process");
// Start streaming tasks with Gemini-specific line-based message updates
let pool_clone1 = pool.clone();
let pool_clone2 = pool.clone();
tokio::spawn(GeminiExecutor::stream_gemini_with_lines(
stdout,
pool_clone1,
attempt_id,
execution_process_id,
true,
));
tokio::spawn(GeminiExecutor::stream_gemini_with_lines(
stderr,
pool_clone2,
attempt_id,
execution_process_id,
false,
));
Ok(child)
}
}

View File

@@ -80,6 +80,8 @@ interface JSONLLine {
}>
| string;
};
messageKey?: number;
isStreaming?: boolean;
// Tool rejection message (string format)
rejectionMessage?: string;
usage?: {
@@ -207,6 +209,7 @@ export function ConversationViewer({ jsonlOutput }: ConversationViewerProps) {
}, [jsonlOutput]);
const conversation = useMemo(() => {
const streamingMessageMap = new Map<number, any>();
const items: Array<{
type: 'message' | 'tool-rejection' | 'parse-error' | 'unknown';
role?: 'user' | 'assistant';
@@ -253,14 +256,38 @@ export function ConversationViewer({ jsonlOutput }: ConversationViewerProps) {
) {
// Amp format
for (const [messageIndex, message] of line.messages) {
items.push({
type: 'message',
const messageItem = {
type: 'message' as const,
role: message.role,
content: message.content,
timestamp: message.meta?.sentAt,
messageIndex,
lineIndex: line._lineIndex,
});
};
// Handle Gemini streaming via top-level messageKey and isStreaming
if (line.isStreaming && line.messageKey !== undefined) {
const existingMessage = streamingMessageMap.get(line.messageKey);
if (existingMessage) {
// Append new content to existing message
if (
existingMessage.content &&
existingMessage.content[0] &&
messageItem.content &&
messageItem.content[0]
) {
existingMessage.content[0].text =
(existingMessage.content[0].text || '') +
(messageItem.content[0].text || '');
existingMessage.timestamp = messageItem.timestamp; // Update timestamp
}
} else {
// First segment for this message
streamingMessageMap.set(line.messageKey, messageItem);
}
} else {
items.push(messageItem);
}
}
} else if (
(line.type === 'user' ||
@@ -342,8 +369,11 @@ export function ConversationViewer({ jsonlOutput }: ConversationViewerProps) {
}
}
const streamingMessages = Array.from(streamingMessageMap.values());
const finalItems = [...items, ...streamingMessages];
// Sort by messageIndex for messages, then by lineIndex for everything else
items.sort((a, b) => {
finalItems.sort((a, b) => {
if (a.type === 'message' && b.type === 'message') {
return (a.messageIndex || 0) - (b.messageIndex || 0);
}
@@ -351,7 +381,7 @@ export function ConversationViewer({ jsonlOutput }: ConversationViewerProps) {
});
return {
items,
items: finalItems,
tokenUsages,
states,
};

4
package-lock.json generated
View File

@@ -1,12 +1,12 @@
{
"name": "vibe-kanban",
"version": "0.0.24",
"version": "0.0.32",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "vibe-kanban",
"version": "0.0.24",
"version": "0.0.32",
"devDependencies": {
"concurrently": "^8.2.2",
"vite": "^6.3.5"