From 6a51020fd983739babe7f7350595a01d209fca74 Mon Sep 17 00:00:00 2001 From: Solomon Date: Wed, 16 Jul 2025 13:31:49 +0100 Subject: [PATCH] Streaming support for conversation history with SSE (#167) * Streaming support with SSE The main focus was on Gemini-CLI token streaming, which uses the standard JSON-Patch format to stream real-time updates to the frontend visa SSE. There is also a default database-backed SSE implementation which covers the remaining executors like Claude-code. * minor refactorings --- backend/Cargo.toml | 4 + backend/src/executor.rs | 12 +- backend/src/executors/claude.rs | 59 +- backend/src/executors/gemini.rs | 766 +++++++++--------- backend/src/executors/gemini/config.rs | 67 ++ backend/src/executors/gemini/streaming.rs | 363 +++++++++ backend/src/main.rs | 3 +- backend/src/routes/mod.rs | 1 + backend/src/routes/stream.rs | 170 ++++ backend/src/routes/task_attempts.rs | 41 +- frontend/package-lock.json | 14 + frontend/package.json | 2 + .../LogsTab/NormalizedConversationViewer.tsx | 550 +++++++++---- pnpm-lock.yaml | 16 + 14 files changed, 1463 insertions(+), 605 deletions(-) create mode 100644 backend/src/executors/gemini/config.rs create mode 100644 backend/src/executors/gemini/streaming.rs create mode 100644 backend/src/routes/stream.rs diff --git a/backend/Cargo.toml b/backend/Cargo.toml index f6a97ce6..bd48ce76 100644 --- a/backend/Cargo.toml +++ b/backend/Cargo.toml @@ -50,9 +50,13 @@ reqwest = { version = "0.11", features = ["json"] } strip-ansi-escapes = "0.2.1" urlencoding = "2.1.3" lazy_static = "1.4" +futures-util = "0.3" +async-stream = "0.3" +json-patch = "2.0" [dev-dependencies] tempfile = "3.8" +tower = { version = "0.4", features = ["util"] } [build-dependencies] dotenv = "0.15" diff --git a/backend/src/executor.rs b/backend/src/executor.rs index 573ade29..4cff6d39 100644 --- a/backend/src/executor.rs +++ b/backend/src/executor.rs @@ -11,9 +11,9 @@ use crate::executors::{ SetupScriptExecutor, }; -// Constants for database streaming +// Constants for database streaming - fast for near-real-time updates const STDOUT_UPDATE_THRESHOLD: usize = 1; -const BUFFER_SIZE_THRESHOLD: usize = 1024; +const BUFFER_SIZE_THRESHOLD: usize = 256; /// Normalized conversation representation for different executor formats #[derive(Debug, Clone, Serialize, Deserialize, TS)] @@ -277,6 +277,10 @@ pub trait Executor: Send + Sync { }) } + // Note: Fast-path streaming is now handled by the Gemini WAL system. + // The Gemini executor uses its own push_patch() method to emit patches, + // which are automatically served via SSE endpoints with resumable streaming. + /// Execute the command and stream output to database in real-time async fn execute_streaming( &self, @@ -588,8 +592,8 @@ async fn stream_stderr_to_db( let mut reader = BufReader::new(output); let mut line = String::new(); let mut accumulated_output = String::new(); - const STDERR_FLUSH_TIMEOUT_MS: u64 = 1000; - const STDERR_FLUSH_TIMEOUT: Duration = Duration::from_millis(STDERR_FLUSH_TIMEOUT_MS); // 1000ms timeout + const STDERR_FLUSH_TIMEOUT_MS: u64 = 100; // Fast flush for near-real-time streaming + const STDERR_FLUSH_TIMEOUT: Duration = Duration::from_millis(STDERR_FLUSH_TIMEOUT_MS); loop { line.clear(); diff --git a/backend/src/executors/claude.rs b/backend/src/executors/claude.rs index 7182326b..ab53bd52 100644 --- a/backend/src/executors/claude.rs +++ b/backend/src/executors/claude.rs @@ -349,6 +349,7 @@ impl ClaudeExecutor { /// Convert absolute paths to relative paths based on worktree path fn make_path_relative(&self, path: &str, worktree_path: &str) -> String { let path_obj = Path::new(path); + let worktree_path_obj = Path::new(worktree_path); tracing::debug!("Making path relative: {} -> {}", path, worktree_path); @@ -358,13 +359,59 @@ impl ClaudeExecutor { } // Try to make path relative to the worktree path - let worktree_path_obj = Path::new(worktree_path); - if let Ok(relative_path) = path_obj.strip_prefix(worktree_path_obj) { - return relative_path.to_string_lossy().to_string(); - } + match path_obj.strip_prefix(worktree_path_obj) { + Ok(relative_path) => { + let result = relative_path.to_string_lossy().to_string(); + tracing::debug!("Successfully made relative: '{}' -> '{}'", path, result); + result + } + Err(_) => { + // Handle symlinks by resolving canonical paths + let canonical_path = std::fs::canonicalize(path); + let canonical_worktree = std::fs::canonicalize(worktree_path); - // If we can't make it relative, return the original path - path.to_string() + match (canonical_path, canonical_worktree) { + (Ok(canon_path), Ok(canon_worktree)) => { + tracing::debug!( + "Trying canonical path resolution: '{}' -> '{}', '{}' -> '{}'", + path, + canon_path.display(), + worktree_path, + canon_worktree.display() + ); + + match canon_path.strip_prefix(&canon_worktree) { + Ok(relative_path) => { + let result = relative_path.to_string_lossy().to_string(); + tracing::debug!( + "Successfully made relative with canonical paths: '{}' -> '{}'", + path, + result + ); + result + } + Err(e) => { + tracing::warn!( + "Failed to make canonical path relative: '{}' relative to '{}', error: {}, returning original", + canon_path.display(), + canon_worktree.display(), + e + ); + path.to_string() + } + } + } + _ => { + tracing::debug!( + "Could not canonicalize paths (paths may not exist): '{}', '{}', returning original", + path, + worktree_path + ); + path.to_string() + } + } + } + } } fn generate_concise_content( diff --git a/backend/src/executors/gemini.rs b/backend/src/executors/gemini.rs index 70ee0115..dff708af 100644 --- a/backend/src/executors/gemini.rs +++ b/backend/src/executors/gemini.rs @@ -1,7 +1,21 @@ -use std::{collections::VecDeque, process::Stdio, time::Instant}; +//! Gemini executor implementation +//! +//! This module provides Gemini CLI-based task execution with streaming support. + +mod config; +mod streaming; + +use std::{process::Stdio, time::Instant}; use async_trait::async_trait; use command_group::{AsyncCommandGroup, AsyncGroupChild}; +use config::{ + max_chunk_size, max_display_size, max_latency_ms, max_message_size, GeminiStreamConfig, +}; +// Re-export for external use +use serde_json::Value; +pub use streaming::GeminiPatchBatch; +use streaming::GeminiStreaming; use tokio::{io::AsyncWriteExt, process::Command}; use uuid::Uuid; @@ -9,13 +23,10 @@ use crate::{ executor::{ Executor, ExecutorError, NormalizedConversation, NormalizedEntry, NormalizedEntryType, }, - models::{execution_process::ExecutionProcess, task::Task}, + models::task::Task, utils::shell::get_shell_command, }; -// Constants for configuration -const PATTERN_BREAK_TIMEOUT_SECS: u64 = 5; - /// An executor that uses Gemini CLI to process tasks pub struct GeminiExecutor; @@ -55,20 +66,7 @@ Task title: {}"#, ) }; - // Use shell command for cross-platform compatibility - let (shell_cmd, shell_arg) = get_shell_command(); - let gemini_command = "npx @google/gemini-cli@latest --yolo"; - - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(worktree_path) - .arg(shell_arg) - .arg(gemini_command) - .env("NODE_NO_WARNINGS", "1"); + let mut command = Self::create_gemini_command(worktree_path); let mut child = command .group_spawn() // Create new process group so we can kill entire tree @@ -121,26 +119,7 @@ Task title: {}"#, attempt_id ); - // Update ExecutorSession with the session_id immediately - if let Err(e) = crate::models::executor_session::ExecutorSession::update_session_id( - pool, - execution_process_id, - &attempt_id.to_string(), - ) - .await - { - tracing::error!( - "Failed to update session ID for Gemini execution process {}: {}", - execution_process_id, - e - ); - } else { - tracing::info!( - "Updated session ID {} for Gemini execution process {}", - attempt_id, - execution_process_id - ); - } + Self::update_session_id(pool, execution_process_id, &attempt_id.to_string()).await; let mut child = self.spawn(pool, task_id, worktree_path).await?; @@ -150,37 +129,7 @@ Task title: {}"#, child.inner().id() ); - // Take stdout and stderr pipes for streaming - let stdout = child - .inner() - .stdout - .take() - .expect("Failed to take stdout from child process"); - let stderr = child - .inner() - .stderr - .take() - .expect("Failed to take stderr from child process"); - - // Start streaming tasks with Gemini-specific line-based message updates - let pool_clone1 = pool.clone(); - let pool_clone2 = pool.clone(); - - tokio::spawn(Self::stream_gemini_with_lines( - stdout, - pool_clone1, - attempt_id, - execution_process_id, - true, - )); - // Use default stderr streaming (no custom parsing) - tokio::spawn(crate::executor::stream_output_to_db( - stderr, - pool_clone2, - attempt_id, - execution_process_id, - false, - )); + Self::setup_streaming(pool, &mut child, attempt_id, execution_process_id); Ok(child) } @@ -258,280 +207,193 @@ Task title: {}"#, summary: None, }) } + + // Note: Gemini streaming is handled by the Gemini-specific WAL system. + // See emit_content_batch() method which calls GeminiExecutor::push_patch(). } impl GeminiExecutor { - /// Stream Gemini output with real-time, line-by-line message updates using a queue-based approach. - async fn stream_gemini_with_lines( - output: impl tokio::io::AsyncRead + Unpin, - pool: sqlx::SqlitePool, - attempt_id: Uuid, - execution_process_id: Uuid, - is_stdout: bool, - ) { - use std::collections::VecDeque; + /// Create a standardized Gemini CLI command + fn create_gemini_command(worktree_path: &str) -> Command { + let (shell_cmd, shell_arg) = get_shell_command(); + let gemini_command = "npx @google/gemini-cli@latest --yolo"; - use tokio::io::{AsyncReadExt, BufReader}; - - if !is_stdout { - // For stderr, use the default database streaming without special formatting - crate::executor::stream_output_to_db( - output, - pool, - attempt_id, - execution_process_id, - false, - ) - .await; - return; - } - - let mut reader = BufReader::new(output); - let mut last_emit_time = Instant::now(); - let mut full_raw_output = String::new(); - let mut segment_queue: VecDeque = VecDeque::new(); - let mut incomplete_line_buffer = String::new(); - - tracing::info!( - "Starting Gemini line-based stdout streaming for attempt {}", - attempt_id - ); - - let mut buffer = [0; 1024]; // Read in chunks for performance and UTF-8 safety - loop { - // First, drain any pending segments from the queue - while let Some(segment_content) = segment_queue.pop_front() { - if !segment_content.trim().is_empty() { - tracing::debug!( - "Emitting segment for attempt {}: {:?}", - attempt_id, - segment_content - ); - Self::emit_normalized_message( - &pool, - execution_process_id, - &segment_content, - &mut last_emit_time, - ) - .await; - } - } - - // Then read new content from the reader - match reader.read(&mut buffer).await { - Ok(0) => { - // EOF - process any remaining content - tracing::info!( - "Gemini stdout reached EOF for attempt {}, processing final content", - attempt_id - ); - break; - } - Ok(n) => { - let chunk_str = String::from_utf8_lossy(&buffer[..n]); - tracing::debug!( - "Gemini stdout chunk received for attempt {} ({} bytes): {:?}", - attempt_id, - n, - chunk_str - ); - full_raw_output.push_str(&chunk_str); - - // Process the chunk and add segments to queue - Self::process_chunk_to_queue( - &chunk_str, - &mut segment_queue, - &mut incomplete_line_buffer, - &mut last_emit_time, - ); - } - Err(e) => { - // Error - log the error and break - tracing::error!( - "Error reading stdout for Gemini attempt {}: {}", - attempt_id, - e - ); - break; - } - } - } - - // Process any remaining incomplete line at EOF - if !incomplete_line_buffer.is_empty() { - let segments = - Self::split_by_pattern_breaks(&incomplete_line_buffer, &mut last_emit_time); - for segment in segments.iter() { - if !segment.trim().is_empty() { - segment_queue.push_back(segment.to_string()); - } - } - } - - // Final drain of any remaining segments - tracing::info!( - "Final drain - {} segments remaining for attempt {}", - segment_queue.len(), - attempt_id - ); - while let Some(segment_content) = segment_queue.pop_front() { - if !segment_content.trim().is_empty() { - tracing::debug!( - "Final drain segment for attempt {}: {:?}", - attempt_id, - segment_content - ); - Self::emit_normalized_message( - &pool, - execution_process_id, - &segment_content, - &mut last_emit_time, - ) - .await; - } - } - - // Note: We don't store the full raw output in stderr anymore since we're already - // processing it into normalized stdout messages. Storing it in stderr would cause - // the normalization route to treat it as error messages. - tracing::info!( - "Gemini processing complete for attempt {} ({} bytes processed)", - attempt_id, - full_raw_output.len() - ); - - tracing::info!( - "Gemini line-based stdout streaming ended for attempt {}", - attempt_id - ); + let mut command = Command::new(shell_cmd); + command + .kill_on_drop(true) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .current_dir(worktree_path) + .arg(shell_arg) + .arg(gemini_command) + .env("NODE_NO_WARNINGS", "1"); + command } - /// Process a chunk of text and add segments to the queue based on break behavior - fn process_chunk_to_queue( - chunk: &str, - queue: &mut VecDeque, - incomplete_line_buffer: &mut String, - last_emit_time: &mut Instant, - ) { - // Combine any incomplete line from previous chunk with current chunk - let text_to_process = incomplete_line_buffer.clone() + chunk; - incomplete_line_buffer.clear(); - - // Split by newlines - let lines: Vec<&str> = text_to_process.split('\n').collect(); - - for (i, line) in lines.iter().enumerate() { - let is_last_line = i == lines.len() - 1; - - if is_last_line && !chunk.ends_with('\n') { - // This is an incomplete line - store it in the buffer for next chunk - incomplete_line_buffer.push_str(line); - } else { - // This is a complete line - process it - if !line.is_empty() { - // Check for pattern breaks within the line - let segments = Self::split_by_pattern_breaks(line, last_emit_time); - - for segment in segments.iter() { - if !segment.trim().is_empty() { - queue.push_back(segment.to_string()); - } - } - } - - // Add newline as separate segment (except for the last line if chunk doesn't end with newline) - if !is_last_line || chunk.ends_with('\n') { - queue.push_back("\n".to_string()); - } - } - } - } - - /// Split text by pattern breaks (period + capital letter) - fn split_by_pattern_breaks(text: &str, last_emit_time: &mut Instant) -> Vec { - let mut segments = Vec::new(); - let mut current_segment = String::new(); - let mut chars = text.chars().peekable(); - - while let Some(ch) = chars.next() { - current_segment.push(ch); - - // Check for pattern break: period followed by capital letter or space - if ch == '.' { - if let Some(&next_ch) = chars.peek() { - let is_capital = next_ch.is_uppercase() && next_ch.is_alphabetic(); - let is_space = next_ch.is_whitespace(); - let should_force_break = - is_space && last_emit_time.elapsed().as_secs() > PATTERN_BREAK_TIMEOUT_SECS; - - if is_capital || should_force_break { - // Pattern break detected - current segment ends here - segments.push(current_segment.clone()); - current_segment.clear(); - } - } - } - } - - // Add the final segment if it's not empty - if !current_segment.is_empty() { - segments.push(current_segment); - } - - // If no segments were created, return the original text - if segments.is_empty() { - segments.push(text.to_string()); - } - - segments - } - - /// Emits a normalized message to the database stdout stream. - async fn emit_normalized_message( + /// Update executor session ID with error handling + async fn update_session_id( pool: &sqlx::SqlitePool, execution_process_id: Uuid, - content: &str, - last_emit_time: &mut Instant, + session_id: &str, ) { - if content.is_empty() { + if let Err(e) = crate::models::executor_session::ExecutorSession::update_session_id( + pool, + execution_process_id, + session_id, + ) + .await + { + tracing::error!( + "Failed to update session ID for Gemini execution process {}: {}", + execution_process_id, + e + ); + } else { + tracing::info!( + "Updated session ID {} for Gemini execution process {}", + session_id, + execution_process_id + ); + } + } + + /// Setup streaming for both stdout and stderr + fn setup_streaming( + pool: &sqlx::SqlitePool, + child: &mut AsyncGroupChild, + attempt_id: Uuid, + execution_process_id: Uuid, + ) { + // Take stdout and stderr pipes for streaming + let stdout = child + .inner() + .stdout + .take() + .expect("Failed to take stdout from child process"); + let stderr = child + .inner() + .stderr + .take() + .expect("Failed to take stderr from child process"); + + // Start streaming tasks with Gemini-specific line-based message updates + let pool_clone1 = pool.clone(); + let pool_clone2 = pool.clone(); + + tokio::spawn(Self::stream_gemini_chunked( + stdout, + pool_clone1, + attempt_id, + execution_process_id, + )); + // Use default stderr streaming (no custom parsing) + tokio::spawn(crate::executor::stream_output_to_db( + stderr, + pool_clone2, + attempt_id, + execution_process_id, + false, + )); + } + + /// Push patches to the Gemini WAL system + pub fn push_patch(execution_process_id: Uuid, patches: Vec, content_length: usize) { + GeminiStreaming::push_patch(execution_process_id, patches, content_length); + } + + /// Get WAL batches for an execution process, optionally filtering by cursor + pub fn get_wal_batches( + execution_process_id: Uuid, + after_batch_id: Option, + ) -> Option> { + GeminiStreaming::get_wal_batches(execution_process_id, after_batch_id) + } + + /// Clean up WAL when execution process finishes + pub async fn finalize_execution( + pool: &sqlx::SqlitePool, + execution_process_id: Uuid, + final_buffer: &str, + ) { + GeminiStreaming::finalize_execution(pool, execution_process_id, final_buffer).await; + } + + /// Find the best boundary to split a chunk (newline preferred, sentence fallback) + pub fn find_chunk_boundary(buffer: &str, max_size: usize) -> usize { + GeminiStreaming::find_chunk_boundary(buffer, max_size) + } + + /// Conditionally flush accumulated content to database in chunks + pub async fn maybe_flush_chunk( + pool: &sqlx::SqlitePool, + execution_process_id: Uuid, + buffer: &mut String, + config: &GeminiStreamConfig, + ) { + GeminiStreaming::maybe_flush_chunk(pool, execution_process_id, buffer, config).await; + } + + /// Emit JSON patch for current message state - either "replace" for growing message or "add" for new message. + fn emit_message_patch( + execution_process_id: Uuid, + current_message: &str, + entry_count: &mut usize, + force_new_message: bool, + ) { + if current_message.is_empty() { return; } - let entry = NormalizedEntry { - timestamp: Some(chrono::Utc::now().to_rfc3339()), - entry_type: NormalizedEntryType::AssistantMessage, - content: content.to_string(), - metadata: None, - }; - - match serde_json::to_string(&entry) { - Ok(jsonl_line) => { - let formatted_line = format!("{}\n", jsonl_line); - - tracing::debug!( - "Storing normalized message to DB for execution {}: {}", - execution_process_id, - jsonl_line - ); - - // Store as stdout to make it available to conversation viewer - if let Err(e) = - ExecutionProcess::append_stdout(pool, execution_process_id, &formatted_line) - .await - { - tracing::error!("Failed to emit normalized message: {}", e); - } else { - *last_emit_time = Instant::now(); - tracing::debug!("Successfully stored normalized message to DB"); + if force_new_message && *entry_count > 0 { + // Start new message: add new entry to array + *entry_count += 1; + let patch_vec = vec![serde_json::json!({ + "op": "add", + "path": format!("/entries/{}", *entry_count - 1), + "value": { + "timestamp": chrono::Utc::now().to_rfc3339(), + "entry_type": {"type": "assistant_message"}, + "content": current_message, + "metadata": null, } + })]; + + Self::push_patch(execution_process_id, patch_vec, current_message.len()); + } else { + // Growing message: replace current entry + if *entry_count == 0 { + *entry_count = 1; // Initialize first message } - Err(e) => { - tracing::error!( - "Failed to serialize normalized entry for content: {:?} - Error: {}", - content, - e - ); - } + + let patch_vec = vec![serde_json::json!({ + "op": "replace", + "path": format!("/entries/{}", *entry_count - 1), + "value": { + "timestamp": chrono::Utc::now().to_rfc3339(), + "entry_type": {"type": "assistant_message"}, + "content": current_message, + "metadata": null, + } + })]; + + Self::push_patch(execution_process_id, patch_vec, current_message.len()); + } + } + + /// Emit final content when stream ends + async fn emit_final_content( + execution_process_id: Uuid, + remaining_content: &str, + entry_count: &mut usize, + ) { + if !remaining_content.trim().is_empty() { + Self::emit_message_patch( + execution_process_id, + remaining_content, + entry_count, + false, // Don't force new message for final content + ); } } } @@ -619,25 +481,13 @@ You are continuing work on the above task. The execution history shows what has worktree_path: &str, comprehensive_prompt: &str, ) -> Result { - let (shell_cmd, shell_arg) = get_shell_command(); - let gemini_command = "npx @google/gemini-cli@latest --yolo"; - tracing::info!( "Spawning Gemini followup execution for attempt {} with resume context ({} chars)", self.attempt_id, comprehensive_prompt.len() ); - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(worktree_path) - .arg(shell_arg) - .arg(gemini_command) - .env("NODE_NO_WARNINGS", "1"); + let mut command = GeminiExecutor::create_gemini_command(worktree_path); let mut child = command.group_spawn().map_err(|e| { crate::executor::SpawnContext::from_command(&command, "Gemini") @@ -727,25 +577,8 @@ impl Executor for GeminiFollowupExecutor { ); // Update ExecutorSession with the session_id immediately - if let Err(e) = crate::models::executor_session::ExecutorSession::update_session_id( - pool, - execution_process_id, - &self.attempt_id.to_string(), - ) - .await - { - tracing::error!( - "Failed to update session ID for Gemini followup execution process {}: {}", - execution_process_id, - e - ); - } else { - tracing::info!( - "Updated session ID {} for Gemini followup execution process {}", - self.attempt_id, - execution_process_id - ); - } + GeminiExecutor::update_session_id(pool, execution_process_id, &self.attempt_id.to_string()) + .await; let mut child = self.spawn(pool, task_id, worktree_path).await?; @@ -755,37 +588,7 @@ impl Executor for GeminiFollowupExecutor { child.inner().id() ); - // Take stdout and stderr pipes for streaming - let stdout = child - .inner() - .stdout - .take() - .expect("Failed to take stdout from child process"); - let stderr = child - .inner() - .stderr - .take() - .expect("Failed to take stderr from child process"); - - // Start streaming tasks with Gemini-specific line-based message updates - let pool_clone1 = pool.clone(); - let pool_clone2 = pool.clone(); - - tokio::spawn(GeminiExecutor::stream_gemini_with_lines( - stdout, - pool_clone1, - attempt_id, - execution_process_id, - true, - )); - // Use default stderr streaming (no custom parsing) - tokio::spawn(crate::executor::stream_output_to_db( - stderr, - pool_clone2, - attempt_id, - execution_process_id, - false, - )); + GeminiExecutor::setup_streaming(pool, &mut child, attempt_id, execution_process_id); Ok(child) } @@ -800,3 +603,180 @@ impl Executor for GeminiFollowupExecutor { main_executor.normalize_logs(logs, worktree_path) } } + +impl GeminiExecutor { + /// Format Gemini CLI output by inserting line breaks where periods are directly + /// followed by capital letters (common Gemini CLI formatting issue). + /// Handles both intra-chunk and cross-chunk period-to-capital transitions. + fn format_gemini_output(content: &str, accumulated_message: &str) -> String { + let mut result = String::with_capacity(content.len() + 100); // Reserve some extra space for potential newlines + let chars: Vec = content.chars().collect(); + + // Check for cross-chunk boundary: previous chunk ended with period, current starts with capital + if !accumulated_message.is_empty() && !content.is_empty() { + let ends_with_period = accumulated_message.ends_with('.'); + let starts_with_capital = chars + .first() + .map(|&c| c.is_uppercase() && c.is_alphabetic()) + .unwrap_or(false); + + if ends_with_period && starts_with_capital { + result.push('\n'); + } + } + + // Handle intra-chunk period-to-capital transitions + for i in 0..chars.len() { + result.push(chars[i]); + + // Check if current char is '.' and next char is uppercase letter (no space between) + if chars[i] == '.' && i + 1 < chars.len() { + let next_char = chars[i + 1]; + if next_char.is_uppercase() && next_char.is_alphabetic() { + result.push('\n'); + } + } + } + + result + } + + /// Stream Gemini output with dual-buffer approach: chunks for UI updates, messages for storage. + /// + /// **Chunks** (~2KB): Frequent UI updates using "replace" patches for smooth streaming + /// **Messages** (~8KB): Logical boundaries using "add" patches for new entries + /// **Consistent WAL/DB**: Both systems see same message structure via JSON patches + pub async fn stream_gemini_chunked( + mut output: impl tokio::io::AsyncRead + Unpin, + pool: sqlx::SqlitePool, + attempt_id: Uuid, + execution_process_id: Uuid, + ) { + use tokio::io::{AsyncReadExt, BufReader}; + + let chunk_limit = max_chunk_size(); + let display_chunk_size = max_display_size(); // ~2KB for UI updates + let message_boundary_size = max_message_size(); // ~8KB for new message boundaries + let max_latency = std::time::Duration::from_millis(max_latency_ms()); + + let mut reader = BufReader::new(&mut output); + + // Dual buffers: chunk buffer for UI, message buffer for DB + let mut current_message = String::new(); // Current assistant message content + let mut db_buffer = String::new(); // Buffer for database storage (using ChunkStore) + let mut entry_count = 0usize; // Track assistant message entries + + let mut read_buf = vec![0u8; chunk_limit.min(max_chunk_size())]; // Use configurable chunk limit, capped for memory efficiency + let mut last_chunk_emit = Instant::now(); + + // Configuration for WAL and DB management + let config = GeminiStreamConfig::default(); + + tracing::info!( + "Starting dual-buffer Gemini streaming for attempt {} (chunks: {}B, messages: {}B)", + attempt_id, + display_chunk_size, + message_boundary_size + ); + + loop { + match reader.read(&mut read_buf).await { + Ok(0) => { + // EOF: emit final content and flush to database + Self::emit_final_content( + execution_process_id, + ¤t_message, + &mut entry_count, + ) + .await; + + // Flush any remaining database buffer + Self::finalize_execution(&pool, execution_process_id, &db_buffer).await; + break; + } + Ok(n) => { + // Convert bytes to string and apply Gemini-specific formatting + let raw_chunk = String::from_utf8_lossy(&read_buf[..n]); + let formatted_chunk = Self::format_gemini_output(&raw_chunk, ¤t_message); + + // Add to both buffers + current_message.push_str(&formatted_chunk); + db_buffer.push_str(&formatted_chunk); + + // 1. Check for chunk emission (frequent UI updates ~2KB) + let should_emit_chunk = current_message.len() >= display_chunk_size + || (last_chunk_emit.elapsed() >= max_latency + && !current_message.is_empty()); + + if should_emit_chunk { + // Emit "replace" patch for growing message (smooth UI) + Self::emit_message_patch( + execution_process_id, + ¤t_message, + &mut entry_count, + false, // Not forcing new message + ); + last_chunk_emit = Instant::now(); + } + + // 2. Check for message boundary (new assistant message ~8KB) + let should_start_new_message = current_message.len() >= message_boundary_size; + + if should_start_new_message { + // Find optimal boundary for new message + let boundary = + Self::find_chunk_boundary(¤t_message, message_boundary_size); + + if boundary > 0 && boundary < current_message.len() { + // Split at boundary: complete current message, start new one + let completed_message = current_message[..boundary].to_string(); + let remaining_content = current_message[boundary..].to_string(); + + // CRITICAL FIX: Only emit "replace" patch to complete current message + // Do NOT emit "add" patch as it shifts existing database entries + Self::emit_message_patch( + execution_process_id, + &completed_message, + &mut entry_count, + false, // Complete current message + ); + + // Store the completed message to database + // This ensures the database gets the completed content at the boundary + Self::maybe_flush_chunk( + &pool, + execution_process_id, + &mut db_buffer, + &config, + ) + .await; + + // Start fresh message with remaining content (no WAL patch yet) + // Next chunk emission will create "replace" patch for entry_count + 1 + current_message = remaining_content; + entry_count += 1; // Move to next entry index for future patches + } + } + + // 3. Flush to database (same boundary detection) + Self::maybe_flush_chunk(&pool, execution_process_id, &mut db_buffer, &config) + .await; + } + Err(e) => { + tracing::error!( + "Error reading stdout for Gemini attempt {}: {}", + attempt_id, + e + ); + break; + } + } + } + + tracing::info!( + "Dual-buffer Gemini streaming completed for attempt {} ({} messages)", + attempt_id, + entry_count + ); + } +} diff --git a/backend/src/executors/gemini/config.rs b/backend/src/executors/gemini/config.rs new file mode 100644 index 00000000..04675dd5 --- /dev/null +++ b/backend/src/executors/gemini/config.rs @@ -0,0 +1,67 @@ +//! Gemini executor configuration and environment variable resolution +//! +//! This module contains configuration structures and functions for the Gemini executor, +//! including environment variable resolution for runtime parameters. + +/// Configuration for Gemini WAL compaction and DB chunking +#[derive(Debug, Clone)] +pub struct GeminiStreamConfig { + pub max_db_chunk_size: usize, + pub wal_compaction_threshold: usize, + pub wal_compaction_size: usize, + pub wal_compaction_interval_ms: u64, + pub max_wal_batches: usize, + pub max_wal_total_size: usize, +} + +impl Default for GeminiStreamConfig { + fn default() -> Self { + Self { + max_db_chunk_size: max_message_size(), + wal_compaction_threshold: 40, + wal_compaction_size: max_message_size() * 2, + wal_compaction_interval_ms: 30000, + max_wal_batches: 100, + max_wal_total_size: 1024 * 1024, // 1MB per process + } + } +} + +// Constants for configuration +/// Size-based streaming configuration +pub const DEFAULT_MAX_CHUNK_SIZE: usize = 5120; // bytes (read buffer size) +pub const DEFAULT_MAX_DISPLAY_SIZE: usize = 2000; // bytes (SSE emission threshold for smooth UI) +pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 8000; // bytes (message boundary for new assistant entries) +pub const DEFAULT_MAX_LATENCY_MS: u64 = 50; // milliseconds + +/// Resolve MAX_CHUNK_SIZE from env or fallback +pub fn max_chunk_size() -> usize { + std::env::var("GEMINI_CLI_MAX_CHUNK_SIZE") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_MAX_CHUNK_SIZE) +} + +/// Resolve MAX_DISPLAY_SIZE from env or fallback +pub fn max_display_size() -> usize { + std::env::var("GEMINI_CLI_MAX_DISPLAY_SIZE") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_MAX_DISPLAY_SIZE) +} + +/// Resolve MAX_MESSAGE_SIZE from env or fallback +pub fn max_message_size() -> usize { + std::env::var("GEMINI_CLI_MAX_MESSAGE_SIZE") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_MAX_MESSAGE_SIZE) +} + +/// Resolve MAX_LATENCY_MS from env or fallback +pub fn max_latency_ms() -> u64 { + std::env::var("GEMINI_CLI_MAX_LATENCY_MS") + .ok() + .and_then(|v| v.parse::().ok()) + .unwrap_or(DEFAULT_MAX_LATENCY_MS) +} diff --git a/backend/src/executors/gemini/streaming.rs b/backend/src/executors/gemini/streaming.rs new file mode 100644 index 00000000..9fcee6f8 --- /dev/null +++ b/backend/src/executors/gemini/streaming.rs @@ -0,0 +1,363 @@ +//! Gemini streaming functionality with WAL and chunked storage +//! +//! This module provides real-time streaming support for Gemini execution processes +//! with Write-Ahead Log (WAL) capabilities for resumable streaming. + +use std::{collections::HashMap, sync::Mutex, time::Instant}; + +use json_patch::{patch, Patch, PatchOperation}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use uuid::Uuid; + +use super::config::GeminiStreamConfig; +use crate::{ + executor::{NormalizedEntry, NormalizedEntryType}, + models::execution_process::ExecutionProcess, +}; + +lazy_static::lazy_static! { + /// Write-Ahead Log: Maps execution_process_id → WAL state (Gemini-specific) + static ref GEMINI_WAL_MAP: Mutex> = Mutex::new(HashMap::new()); +} + +/// A batch of JSON patches for Gemini streaming +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GeminiPatchBatch { + /// Monotonic batch identifier for cursor-based streaming + pub batch_id: u64, + /// Array of JSON Patch operations (RFC 6902 format) + pub patches: Vec, + /// ISO 8601 timestamp when this batch was created + pub timestamp: String, + /// Total content length after applying all patches in this batch + pub content_length: usize, +} + +/// WAL state for a single Gemini execution process +#[derive(Debug)] +pub struct GeminiWalState { + pub batches: Vec, + pub total_content_length: usize, + pub next_batch_id: u64, + pub last_compaction: Instant, + pub last_db_flush: Instant, + pub last_access: Instant, +} + +impl Default for GeminiWalState { + fn default() -> Self { + Self::new() + } +} + +impl GeminiWalState { + pub fn new() -> Self { + let now = Instant::now(); + Self { + batches: Vec::new(), + total_content_length: 0, + next_batch_id: 1, + last_compaction: now, + last_db_flush: now, + last_access: now, + } + } +} + +/// Gemini streaming utilities +pub struct GeminiStreaming; + +impl GeminiStreaming { + /// Push patches to the Gemini WAL system + pub fn push_patch(execution_process_id: Uuid, patches: Vec, content_length: usize) { + let mut wal_map = GEMINI_WAL_MAP.lock().unwrap(); + let wal_state = wal_map.entry(execution_process_id).or_default(); + let config = GeminiStreamConfig::default(); + + // Update access time for orphan cleanup + wal_state.last_access = Instant::now(); + + // Enforce size limits - force compaction instead of clearing to prevent data loss + if wal_state.batches.len() >= config.max_wal_batches + || wal_state.total_content_length >= config.max_wal_total_size + { + tracing::warn!( + "WAL size limits exceeded for process {} (batches: {}, size: {}), forcing compaction", + execution_process_id, + wal_state.batches.len(), + wal_state.total_content_length + ); + + // Force compaction to preserve data instead of losing it + Self::compact_wal(wal_state); + + // If still over limits after compaction, keep only the most recent batches + if wal_state.batches.len() >= config.max_wal_batches { + let keep_count = config.max_wal_batches / 2; // Keep half + let remove_count = wal_state.batches.len() - keep_count; + wal_state.batches.drain(..remove_count); + tracing::warn!( + "After compaction still over limit, kept {} most recent batches", + keep_count + ); + } + } + + let batch = GeminiPatchBatch { + batch_id: wal_state.next_batch_id, + patches, + timestamp: chrono::Utc::now().to_rfc3339(), + content_length, + }; + + wal_state.next_batch_id += 1; + wal_state.batches.push(batch); + wal_state.total_content_length = content_length; + + // Check if compaction is needed + if Self::should_compact(wal_state, &config) { + Self::compact_wal(wal_state); + } + } + + /// Get WAL batches for an execution process, optionally filtering by cursor + pub fn get_wal_batches( + execution_process_id: Uuid, + after_batch_id: Option, + ) -> Option> { + GEMINI_WAL_MAP.lock().ok().and_then(|mut wal_map| { + wal_map.get_mut(&execution_process_id).map(|wal_state| { + // Update access time when WAL is retrieved + wal_state.last_access = Instant::now(); + + match after_batch_id { + Some(cursor) => { + // Return only batches with batch_id > cursor + wal_state + .batches + .iter() + .filter(|batch| batch.batch_id > cursor) + .cloned() + .collect() + } + None => { + // Return all batches + wal_state.batches.clone() + } + } + }) + }) + } + + /// Clean up WAL when execution process finishes + pub async fn finalize_execution( + pool: &sqlx::SqlitePool, + execution_process_id: Uuid, + final_buffer: &str, + ) { + // Flush any remaining content to database + if !final_buffer.trim().is_empty() { + Self::store_chunk_to_db(pool, execution_process_id, final_buffer).await; + } + + // Remove WAL entry + Self::purge_wal(execution_process_id); + } + + /// Remove WAL entry for a specific execution process + pub fn purge_wal(execution_process_id: Uuid) { + if let Ok(mut wal_map) = GEMINI_WAL_MAP.lock() { + wal_map.remove(&execution_process_id); + tracing::debug!( + "Cleaned up WAL for execution process {}", + execution_process_id + ); + } + } + + /// Find the best boundary to split a chunk (newline preferred, sentence fallback) + pub fn find_chunk_boundary(buffer: &str, max_size: usize) -> usize { + if buffer.len() <= max_size { + return buffer.len(); + } + + let search_window = &buffer[..max_size]; + + // First preference: newline boundary + if let Some(pos) = search_window.rfind('\n') { + return pos + 1; // Include the newline + } + + // Second preference: sentence boundary (., !, ?) + if let Some(pos) = search_window.rfind(&['.', '!', '?'][..]) { + if pos + 1 < search_window.len() { + return pos + 1; + } + } + + // Fallback: word boundary + if let Some(pos) = search_window.rfind(' ') { + return pos + 1; + } + + // Last resort: split at max_size + max_size + } + + /// Store a chunk to the database + async fn store_chunk_to_db(pool: &sqlx::SqlitePool, execution_process_id: Uuid, content: &str) { + if content.trim().is_empty() { + return; + } + + let entry = NormalizedEntry { + timestamp: Some(chrono::Utc::now().to_rfc3339()), + entry_type: NormalizedEntryType::AssistantMessage, + content: content.to_string(), + metadata: None, + }; + + match serde_json::to_string(&entry) { + Ok(jsonl_line) => { + let formatted_line = format!("{}\n", jsonl_line); + if let Err(e) = + ExecutionProcess::append_stdout(pool, execution_process_id, &formatted_line) + .await + { + tracing::error!("Failed to store chunk to database: {}", e); + } else { + tracing::debug!("Stored {}B chunk to database", content.len()); + } + } + Err(e) => { + tracing::error!("Failed to serialize chunk: {}", e); + } + } + } + + /// Conditionally flush accumulated content to database in chunks + pub async fn maybe_flush_chunk( + pool: &sqlx::SqlitePool, + execution_process_id: Uuid, + buffer: &mut String, + config: &GeminiStreamConfig, + ) { + if buffer.len() < config.max_db_chunk_size { + return; + } + + // Find the best split point (newline preferred, sentence boundary fallback) + let split_point = Self::find_chunk_boundary(buffer, config.max_db_chunk_size); + + if split_point > 0 { + let chunk = buffer[..split_point].to_string(); + buffer.drain(..split_point); + + // Store chunk to database + Self::store_chunk_to_db(pool, execution_process_id, &chunk).await; + + // Update WAL flush time + if let Ok(mut wal_map) = GEMINI_WAL_MAP.lock() { + if let Some(wal_state) = wal_map.get_mut(&execution_process_id) { + wal_state.last_db_flush = Instant::now(); + } + } + } + } + + /// Check if WAL compaction is needed based on configured thresholds + fn should_compact(wal_state: &GeminiWalState, config: &GeminiStreamConfig) -> bool { + wal_state.batches.len() >= config.wal_compaction_threshold + || wal_state.total_content_length >= config.wal_compaction_size + || wal_state.last_compaction.elapsed().as_millis() as u64 + >= config.wal_compaction_interval_ms + } + + /// Compact WAL by losslessly merging older patches into a snapshot + fn compact_wal(wal_state: &mut GeminiWalState) { + // Need at least a few batches to make compaction worthwhile + if wal_state.batches.len() <= 5 { + return; + } + + // Keep the most recent 3 batches for smooth incremental updates + let recent_count = 3; + let compact_count = wal_state.batches.len() - recent_count; + + if compact_count <= 1 { + return; // Not enough to compact + } + + // Start with an empty conversation and apply all patches sequentially + let mut conversation_value = serde_json::json!({ + "entries": [], + "session_id": null, + "executor_type": "gemini", + "prompt": null, + "summary": null + }); + + let mut total_content_length = 0; + let oldest_batch_id = wal_state.batches[0].batch_id; + let compact_timestamp = chrono::Utc::now().to_rfc3339(); + + // Apply patches from oldest to newest (excluding recent ones) using json-patch crate + for batch in &wal_state.batches[..compact_count] { + // Convert Vec to json_patch::Patch + let patch_operations: Result, _> = batch + .patches + .iter() + .map(|p| serde_json::from_value(p.clone())) + .collect(); + + match patch_operations { + Ok(ops) => { + let patch_obj = Patch(ops); + if let Err(e) = patch(&mut conversation_value, &patch_obj) { + tracing::warn!("Failed to apply patch during compaction: {}, skipping", e); + continue; + } + } + Err(e) => { + tracing::warn!("Failed to deserialize patch operations: {}, skipping", e); + continue; + } + } + total_content_length = batch.content_length; // Use the final length + } + + // Extract the final entries array for the snapshot + let final_entries = conversation_value + .get("entries") + .and_then(|v| v.as_array()) + .cloned() + .unwrap_or_default(); + + // Create a single snapshot patch that replaces the entire entries array + let snapshot_patch = GeminiPatchBatch { + batch_id: oldest_batch_id, // Use the oldest batch_id to maintain cursor compatibility + patches: vec![serde_json::json!({ + "op": "replace", + "path": "/entries", + "value": final_entries + })], + timestamp: compact_timestamp, + content_length: total_content_length, + }; + + // Replace old batches with snapshot + keep recent batches + let mut new_batches = vec![snapshot_patch]; + new_batches.extend_from_slice(&wal_state.batches[compact_count..]); + wal_state.batches = new_batches; + + wal_state.last_compaction = Instant::now(); + + tracing::info!( + "Losslessly compacted WAL: {} batches → {} (1 snapshot + {} recent), preserving all content", + compact_count + recent_count, + wal_state.batches.len(), + recent_count + ); + } +} diff --git a/backend/src/main.rs b/backend/src/main.rs index 5713b060..f539fbd8 100644 --- a/backend/src/main.rs +++ b/backend/src/main.rs @@ -29,7 +29,7 @@ mod utils; use app_state::AppState; use execution_monitor::execution_monitor; use models::{ApiResponse, Config}; -use routes::{auth, config, filesystem, health, projects, task_attempts, tasks}; +use routes::{auth, config, filesystem, health, projects, stream, task_attempts, tasks}; use services::PrMonitorService; async fn echo_handler( @@ -198,6 +198,7 @@ fn main() -> anyhow::Result<()> { .merge(projects::projects_router()) .merge(tasks::tasks_router()) .merge(task_attempts::task_attempts_router()) + .merge(stream::stream_router()) .merge(filesystem::filesystem_router()) .merge(config::config_router()) .merge(auth::auth_router()) diff --git a/backend/src/routes/mod.rs b/backend/src/routes/mod.rs index 92b93d94..31290d3e 100644 --- a/backend/src/routes/mod.rs +++ b/backend/src/routes/mod.rs @@ -3,5 +3,6 @@ pub mod config; pub mod filesystem; pub mod health; pub mod projects; +pub mod stream; pub mod task_attempts; pub mod tasks; diff --git a/backend/src/routes/stream.rs b/backend/src/routes/stream.rs new file mode 100644 index 00000000..18814813 --- /dev/null +++ b/backend/src/routes/stream.rs @@ -0,0 +1,170 @@ +use std::{str::FromStr, time::Duration}; + +use axum::{ + extract::{Path, Query, State}, + response::sse::{Event, Sse}, + routing::get, + Router, +}; +use futures_util::stream::Stream; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use uuid::Uuid; + +use crate::{ + app_state::AppState, + executors::gemini::GeminiExecutor, + models::execution_process::{ExecutionProcess, ExecutionProcessStatus}, +}; + +/// Interval for DB tail polling (ms) - now blazing fast for real-time updates +const TAIL_INTERVAL_MS: u64 = 100; + +/// Structured batch data for SSE streaming +#[derive(Serialize)] +struct BatchData { + batch_id: u64, + patches: Vec, +} + +/// Query parameters for resumable SSE streaming +#[derive(Debug, Deserialize)] +pub struct StreamQuery { + /// Optional cursor to resume streaming from specific batch ID + since_batch_id: Option, +} + +/// SSE handler for incremental normalized-logs JSON-Patch streaming +/// +/// GET /api/projects/:project_id/execution-processes/:process_id/normalized-logs/stream?since_batch_id=123 +pub async fn normalized_logs_stream( + Path((_project_id, process_id)): Path<(Uuid, Uuid)>, + Query(query): Query, + State(app_state): State, +) -> Sse>> { + // Check if this is a Gemini executor (only executor with streaming support) + let is_gemini = match ExecutionProcess::find_by_id(&app_state.db_pool, process_id).await { + Ok(Some(process)) => process.executor_type.as_deref() == Some("gemini"), + _ => { + tracing::warn!( + "Failed to find execution process {} for SSE streaming", + process_id + ); + false + } + }; + + // Use blazing fast polling interval for Gemini (only streaming executor) + let poll_interval = if is_gemini { 50 } else { TAIL_INTERVAL_MS }; + + // Stream that yields patches from WAL (fast-path) or DB tail (fallback) + let stream = async_stream::stream! { + // Track previous stdout length and entry count for database polling fallback + let mut last_len: usize = 0; + let mut last_entry_count: usize = 0; + let mut interval = tokio::time::interval(Duration::from_millis(poll_interval)); + let mut last_seen_batch_id: u64 = query.since_batch_id.unwrap_or(0); // Cursor for WAL streaming + let mut fallback_batch_id: u64 = query.since_batch_id.map(|id| id + 1).unwrap_or(1); // Monotonic batch ID for fallback polling + + loop { + interval.tick().await; + + // Check process status first + let process_status = match ExecutionProcess::find_by_id(&app_state.db_pool, process_id).await { + Ok(Some(proc)) => proc.status, + _ => { + tracing::warn!("Execution process {} not found during SSE streaming", process_id); + break; + } + }; + + if is_gemini { + // Gemini streaming: Read from Gemini WAL using cursor + let cursor = if last_seen_batch_id == 0 { None } else { Some(last_seen_batch_id) }; + if let Some(new_batches) = GeminiExecutor::get_wal_batches(process_id, cursor) { + // Send any new batches since last cursor + for batch in &new_batches { + // Send full batch including batch_id for cursor tracking + let batch_data = BatchData { + batch_id: batch.batch_id, + patches: batch.patches.clone(), + }; + let json = serde_json::to_string(&batch_data).unwrap_or_default(); + yield Ok(Event::default().event("patch").data(json)); + // Update cursor to highest batch_id seen + last_seen_batch_id = batch.batch_id.max(last_seen_batch_id); + } + } + } else { + // Fallback: Database polling for non-streaming executors + let patch_result = ExecutionProcess::find_by_id(&app_state.db_pool, process_id) + .await + .ok() + .and_then(|proc_option| proc_option) + .filter(|proc| { + proc.stdout + .as_ref() + .is_some_and(|stdout| stdout.len() > last_len && !stdout[last_len..].trim().is_empty()) + }) + .and_then(|proc| { + let executor_type = proc.executor_type.as_deref().unwrap_or("unknown"); + crate::executor::ExecutorConfig::from_str(executor_type) + .ok() + .map(|config| (config.create_executor(), proc)) + }) + .and_then(|(executor, proc)| { + let stdout = proc.stdout.unwrap_or_default(); + executor.normalize_logs(&stdout, &proc.working_directory) + .ok() + .map(|normalized| (normalized, stdout.len())) + }) + .and_then(|(normalized, new_len)| { + let new_entries = &normalized.entries[last_entry_count..]; + (!new_entries.is_empty()).then(|| { + let patch = new_entries + .iter() + .map(|entry| serde_json::json!({ + "op": "add", + "path": "/entries/-", + "value": entry + })) + .collect::>(); + + (patch, normalized.entries.len(), new_len) + }) + }) + .filter(|(patch, _, _): &(Vec, usize, usize)| !patch.is_empty()); + + if let Some((patch, entries_len, new_len)) = patch_result { + // Use same format as fast-path for backward compatibility + let batch_data = BatchData { + batch_id: fallback_batch_id, + patches: patch, + }; + let json = serde_json::to_string(&batch_data).unwrap_or_default(); + yield Ok(Event::default().event("patch").data(json)); + + // Update tracking variables after successful send + fallback_batch_id += 1; + last_entry_count = entries_len; + last_len = new_len; + } + } + + // Stop streaming when process completed + if process_status != ExecutionProcessStatus::Running { + break; + } + } + }; + + Sse::new(stream).keep_alive(axum::response::sse::KeepAlive::default()) +} + +/// Router exposing `/normalized-logs/stream` +pub fn stream_router() -> Router { + Router::new().route( + "/projects/:project_id/execution-processes/:process_id/normalized-logs/stream", + get(normalized_logs_stream), + ) +} diff --git a/backend/src/routes/task_attempts.rs b/backend/src/routes/task_attempts.rs index c5aae7b0..60987bc5 100644 --- a/backend/src/routes/task_attempts.rs +++ b/backend/src/routes/task_attempts.rs @@ -13,9 +13,7 @@ use crate::{ executor::{ExecutorConfig, NormalizedConversation, NormalizedEntry, NormalizedEntryType}, models::{ config::Config, - execution_process::{ - ExecutionProcess, ExecutionProcessStatus, ExecutionProcessSummary, ExecutionProcessType, - }, + execution_process::{ExecutionProcess, ExecutionProcessSummary, ExecutionProcessType}, executor_session::ExecutorSession, task::Task, task_attempt::{ @@ -1178,39 +1176,32 @@ pub async fn get_execution_process_normalized_logs( } }; - // Handle the case where no logs are available + // Check if logs are available let has_stdout = process.stdout.is_some() && !process.stdout.as_ref().unwrap().trim().is_empty(); let has_stderr = process.stderr.is_some() && !process.stderr.as_ref().unwrap().trim().is_empty(); - // If the process is still running and has no stdout/stderr, return empty logs - if process.status == ExecutionProcessStatus::Running && !has_stdout && !has_stderr { + // If no logs available, return empty conversation + if !has_stdout && !has_stderr { + let empty_conversation = NormalizedConversation { + entries: vec![], + session_id: None, + executor_type: process + .executor_type + .clone() + .unwrap_or("unknown".to_string()), + prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), + summary: executor_session.as_ref().and_then(|s| s.summary.clone()), + }; + return Ok(ResponseJson(ApiResponse { success: true, - data: Some(NormalizedConversation { - entries: vec![], - session_id: None, - executor_type: process - .executor_type - .clone() - .unwrap_or("unknown".to_string()), - prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), - summary: executor_session.as_ref().and_then(|s| s.summary.clone()), - }), + data: Some(empty_conversation), message: None, })); } - // If process is completed but has no logs, return appropriate error - if process.status != ExecutionProcessStatus::Running && !has_stdout && !has_stderr { - return Ok(ResponseJson(ApiResponse { - success: false, - data: None, - message: Some("No logs available for this execution process".to_string()), - })); - } - // Parse stdout as JSONL using executor normalization let mut stdout_entries = Vec::new(); if let Some(stdout) = &process.stdout { diff --git a/frontend/package-lock.json b/frontend/package-lock.json index dbfa23fb..45ffc7f4 100644 --- a/frontend/package-lock.json +++ b/frontend/package-lock.json @@ -10,6 +10,7 @@ "dependencies": { "@dnd-kit/core": "^6.3.1", "@dnd-kit/modifiers": "^9.0.0", + "@microsoft/fetch-event-source": "^2.0.1", "@radix-ui/react-dropdown-menu": "^2.1.15", "@radix-ui/react-label": "^2.1.7", "@radix-ui/react-portal": "^1.1.9", @@ -23,6 +24,7 @@ "class-variance-authority": "^0.7.0", "click-to-react-component": "^1.1.2", "clsx": "^2.0.0", + "fast-json-patch": "^3.1.1", "lucide-react": "^0.303.0", "react": "^18.2.0", "react-dom": "^18.2.0", @@ -1131,6 +1133,12 @@ "@jridgewell/sourcemap-codec": "^1.4.14" } }, + "node_modules/@microsoft/fetch-event-source": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz", + "integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA==", + "license": "MIT" + }, "node_modules/@nodelib/fs.scandir": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz", @@ -3961,6 +3969,12 @@ "node": ">= 6" } }, + "node_modules/fast-json-patch": { + "version": "3.1.1", + "resolved": "https://registry.npmjs.org/fast-json-patch/-/fast-json-patch-3.1.1.tgz", + "integrity": "sha512-vf6IHUX2SBcA+5/+4883dsIjpBTqmfBjmYiWK1savxQmFk4JfBMLa7ynTYOs1Rolp/T1betJxHiGD3g1Mn8lUQ==", + "license": "MIT" + }, "node_modules/fast-json-stable-stringify": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.1.0.tgz", diff --git a/frontend/package.json b/frontend/package.json index 0adf6b33..31b0baff 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -15,6 +15,7 @@ "dependencies": { "@dnd-kit/core": "^6.3.1", "@dnd-kit/modifiers": "^9.0.0", + "@microsoft/fetch-event-source": "^2.0.1", "@radix-ui/react-dropdown-menu": "^2.1.15", "@radix-ui/react-label": "^2.1.7", "@radix-ui/react-portal": "^1.1.9", @@ -28,6 +29,7 @@ "class-variance-authority": "^0.7.0", "click-to-react-component": "^1.1.2", "clsx": "^2.0.0", + "fast-json-patch": "^3.1.1", "lucide-react": "^0.303.0", "react": "^18.2.0", "react-dom": "^18.2.0", diff --git a/frontend/src/components/tasks/TaskDetails/LogsTab/NormalizedConversationViewer.tsx b/frontend/src/components/tasks/TaskDetails/LogsTab/NormalizedConversationViewer.tsx index 3e5cb418..ba0b8100 100644 --- a/frontend/src/components/tasks/TaskDetails/LogsTab/NormalizedConversationViewer.tsx +++ b/frontend/src/components/tasks/TaskDetails/LogsTab/NormalizedConversationViewer.tsx @@ -1,8 +1,17 @@ -import { useCallback, useContext, useEffect, useMemo, useState } from 'react'; -import { Bot, Hammer, ToggleLeft, ToggleRight } from 'lucide-react'; +import { + useCallback, + useContext, + useEffect, + useState, + useMemo, + useRef, +} from 'react'; +import { Hammer } from 'lucide-react'; import { Loader } from '@/components/ui/loader.tsx'; import { executionProcessesApi } from '@/lib/api.ts'; import MarkdownRenderer from '@/components/ui/markdown-renderer.tsx'; +import { applyPatch } from 'fast-json-patch'; +import { fetchEventSource } from '@microsoft/fetch-event-source'; import type { ExecutionProcess, NormalizedConversation, @@ -20,125 +29,288 @@ interface NormalizedConversationViewerProps { diffDeletable?: boolean; } -// Configuration for Gemini message clustering -const GEMINI_CLUSTERING_CONFIG = { - enabled: true, - maxClusterSize: 5000, // Maximum characters per cluster - maxClusterCount: 50, // Maximum number of messages to cluster together - minClusterSize: 2, // Minimum number of messages to consider clustering -}; - -/** - * Utility function to cluster adjacent assistant messages for Gemini executor. - * - * This function merges consecutive assistant messages into larger chunks to improve - * readability while preserving the progressive nature of Gemini's output. - * - * Clustering rules: - * - Only assistant messages are clustered together - * - Non-assistant messages (errors, tool use, etc.) break clustering - * - Clusters are limited by size (characters) and count (number of messages) - * - Requires minimum of 2 messages to form a cluster - * - Original content and formatting is preserved - * - * @param entries - Original conversation entries - * @param enabled - Whether clustering is enabled - * @returns - Processed entries with clustering applied - */ -const clusterGeminiMessages = ( - entries: NormalizedEntry[], - enabled: boolean -): NormalizedEntry[] => { - if (!enabled) { - return entries; - } - - const clustered: NormalizedEntry[] = []; - let currentCluster: NormalizedEntry[] = []; - - const flushCluster = () => { - if (currentCluster.length === 0) return; - - if (currentCluster.length < GEMINI_CLUSTERING_CONFIG.minClusterSize) { - // Not enough messages to cluster, add them individually - clustered.push(...currentCluster); - } else { - // Merge multiple messages into one - // Join with newlines to preserve message boundaries and readability - const mergedContent = currentCluster - .map((entry) => entry.content) - .join('\n'); - - const mergedEntry: NormalizedEntry = { - timestamp: currentCluster[0].timestamp, // Use timestamp of first message - entry_type: currentCluster[0].entry_type, - content: mergedContent, - }; - clustered.push(mergedEntry); - } - currentCluster = []; - }; - - for (const entry of entries) { - const isAssistantMessage = entry.entry_type.type === 'assistant_message'; - - if (isAssistantMessage) { - // Check if we can add to current cluster - const wouldExceedSize = - currentCluster.length > 0 && - currentCluster.map((e) => e.content).join('').length + - entry.content.length > - GEMINI_CLUSTERING_CONFIG.maxClusterSize; - const wouldExceedCount = - currentCluster.length >= GEMINI_CLUSTERING_CONFIG.maxClusterCount; - - if (wouldExceedSize || wouldExceedCount) { - // Flush current cluster and start new one - flushCluster(); - } - - currentCluster.push(entry); - } else { - // Non-assistant message, flush current cluster and add this message separately - flushCluster(); - clustered.push(entry); - } - } - - // Flush any remaining cluster - flushCluster(); - - return clustered; -}; - export function NormalizedConversationViewer({ executionProcess, diffDeletable, onConversationUpdate, }: NormalizedConversationViewerProps) { const { projectId } = useContext(TaskDetailsContext); + + // Development-only logging helper + const debugLog = useCallback((message: string, ...args: any[]) => { + if (import.meta.env.DEV) { + console.log(message, ...args); + } + }, []); + const [conversation, setConversation] = useState(null); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); - const [clusteringEnabled, setClusteringEnabled] = useState( - GEMINI_CLUSTERING_CONFIG.enabled + + // Track fetched processes to prevent redundant database calls + const fetchedProcesses = useRef(new Set()); + + // SSE Connection Manager - production-ready with reconnection and resilience + const sseManagerRef = useRef<{ + abortController: AbortController | null; + isActive: boolean; + highestBatchId: number; + reconnectAttempts: number; + reconnectTimeout: number | null; + processId: string; + processStatus: string; + patchFailureCount: number; + }>({ + abortController: null, + isActive: false, + highestBatchId: 0, + reconnectAttempts: 0, + reconnectTimeout: null, + processId: executionProcess.id, + processStatus: executionProcess.status, + patchFailureCount: 0, + }); + + // SSE Connection Manager with Production-Ready Resilience using fetch-event-source + const createSSEConnection = useCallback( + (processId: string, projectId: string): AbortController => { + const manager = sseManagerRef.current; + // Build URL with resume cursor if we have processed batches + const baseUrl = `/api/projects/${projectId}/execution-processes/${processId}/normalized-logs/stream`; + const url = + manager.highestBatchId > 0 + ? `${baseUrl}?since_batch_id=${manager.highestBatchId}` + : baseUrl; + debugLog( + `🚀 SSE: Creating connection for process ${processId} (cursor: ${manager.highestBatchId})` + ); + + const abortController = new AbortController(); + + fetchEventSource(url, { + signal: abortController.signal, + onopen: async (response) => { + if (response.ok) { + debugLog(`✅ SSE: Connected to ${processId}`); + manager.isActive = true; + manager.reconnectAttempts = 0; // Reset on successful connection + manager.patchFailureCount = 0; // Reset patch failure count + + if (manager.reconnectTimeout) { + clearTimeout(manager.reconnectTimeout); + manager.reconnectTimeout = null; + } + } else { + throw new Error(`SSE connection failed: ${response.status}`); + } + }, + onmessage: (event) => { + if (event.event === 'patch') { + try { + const batchData = JSON.parse(event.data); + const { batch_id, patches } = batchData; + + // Skip duplicates - use manager's batch tracking + if (batch_id && batch_id <= manager.highestBatchId) { + debugLog( + `⏭️ SSE: Skipping duplicate batch_id=${batch_id} (current=${manager.highestBatchId})` + ); + return; + } + + // Update cursor BEFORE processing + if (batch_id) { + manager.highestBatchId = batch_id; + debugLog(`📍 SSE: Processing batch_id=${batch_id}`); + } + + setConversation((prev) => { + // Create empty conversation if none exists + const baseConversation = prev || { + entries: [], + session_id: null, + executor_type: 'unknown', + prompt: null, + summary: null, + }; + + try { + const updated = applyPatch( + JSON.parse(JSON.stringify(baseConversation)), + patches + ).newDocument as NormalizedConversation; + + updated.entries = updated.entries.filter(Boolean); + + debugLog( + `🔧 SSE: Applied batch_id=${batch_id}, entries: ${updated.entries.length}` + ); + + // Reset patch failure count on successful application + manager.patchFailureCount = 0; + + // Clear loading state on first successful patch + if (!prev) { + setLoading(false); + setError(null); + } + + if (onConversationUpdate) { + setTimeout(onConversationUpdate, 0); + } + + return updated; + } catch (patchError) { + console.warn('❌ SSE: Patch failed:', patchError); + // Reset cursor on failure for potential retry + if (batch_id && batch_id > 0) { + manager.highestBatchId = batch_id - 1; + } + // Track patch failures for monitoring + manager.patchFailureCount++; + debugLog( + `⚠️ SSE: Patch failure #${manager.patchFailureCount} for batch_id=${batch_id}` + ); + return prev || baseConversation; + } + }); + } catch (e) { + console.warn('❌ SSE: Parse failed:', e); + } + } + }, + onerror: (err) => { + console.warn(`🔌 SSE: Connection error for ${processId}:`, err); + manager.isActive = false; + + // Only attempt reconnection if process is still running + if (manager.processStatus === 'running') { + scheduleReconnect(processId, projectId); + } + }, + onclose: () => { + debugLog(`🔌 SSE: Connection closed for ${processId}`); + manager.isActive = false; + }, + }).catch((error) => { + if (error.name !== 'AbortError') { + console.warn(`❌ SSE: Fetch error for ${processId}:`, error); + manager.isActive = false; + + // Only attempt reconnection if process is still running + if (manager.processStatus === 'running') { + scheduleReconnect(processId, projectId); + } + } + }); + + return abortController; + }, + [onConversationUpdate, debugLog] ); - const fetchNormalizedLogs = useCallback( - async (isPolling = false) => { - try { - if (!isPolling) { - setLoading(true); - setError(null); + const scheduleReconnect = useCallback( + (processId: string, projectId: string) => { + const manager = sseManagerRef.current; + + // Clear any existing reconnection timeout + if (manager.reconnectTimeout) { + clearTimeout(manager.reconnectTimeout); + } + + // Exponential backoff: 1s, 2s, 4s, 8s, max 30s + const delay = Math.min( + 1000 * Math.pow(2, manager.reconnectAttempts), + 30000 + ); + manager.reconnectAttempts++; + + debugLog( + `🔄 SSE: Scheduling reconnect attempt ${manager.reconnectAttempts} in ${delay}ms` + ); + + manager.reconnectTimeout = window.setTimeout(() => { + if (manager.processStatus === 'running') { + debugLog(`🔄 SSE: Attempting reconnect for ${processId}`); + establishSSEConnection(processId, projectId); } + }, delay); + }, + [debugLog] + ); + + const establishSSEConnection = useCallback( + (processId: string, projectId: string) => { + const manager = sseManagerRef.current; + + // Close existing connection if any + if (manager.abortController) { + manager.abortController.abort(); + manager.abortController = null; + manager.isActive = false; + } + + const abortController = createSSEConnection(processId, projectId); + manager.abortController = abortController; + + return abortController; + }, + [createSSEConnection] + ); + + // Helper functions for SSE manager + const setProcessId = (id: string) => { + sseManagerRef.current.processId = id; + }; + const setProcessStatus = (status: string) => { + sseManagerRef.current.processStatus = status; + }; + + // Consolidated cleanup function to avoid duplication + const cleanupSSEConnection = useCallback(() => { + const manager = sseManagerRef.current; + + if (manager.abortController) { + manager.abortController.abort(); + manager.abortController = null; + manager.isActive = false; + } + + if (manager.reconnectTimeout) { + clearTimeout(manager.reconnectTimeout); + manager.reconnectTimeout = null; + } + }, []); + + const fetchNormalizedLogsOnce = useCallback( + async (processId: string) => { + // Only fetch if not already fetched for this process + if (fetchedProcesses.current.has(processId)) { + debugLog(`📋 DB: Already fetched ${processId}, skipping`); + return; + } + + try { + setLoading(true); + setError(null); + debugLog(`📋 DB: Fetching logs for ${processId}`); + const result = await executionProcessesApi.getNormalizedLogs( projectId, - executionProcess.id + processId ); + + // Mark as fetched + fetchedProcesses.current.add(processId); + setConversation((prev) => { - // Only update if content actually changed - if (!prev || JSON.stringify(prev) !== JSON.stringify(result)) { + // Only update if content actually changed - use lightweight comparison + if ( + !prev || + prev.entries.length !== result.entries.length || + prev.prompt !== result.prompt + ) { // Notify parent component of conversation update if (onConversationUpdate) { // Use setTimeout to ensure state update happens first @@ -149,55 +321,114 @@ export function NormalizedConversationViewer({ return prev; }); } catch (err) { - if (!isPolling) { - setError( - `Error fetching logs: ${err instanceof Error ? err.message : 'Unknown error'}` - ); - } + // Remove from fetched set on error to allow retry + fetchedProcesses.current.delete(processId); + setError( + `Error fetching logs: ${err instanceof Error ? err.message : 'Unknown error'}` + ); } finally { - if (!isPolling) { - setLoading(false); - } + setLoading(false); } }, - [executionProcess.id, projectId, onConversationUpdate] + [projectId, onConversationUpdate, debugLog] ); - // Initial fetch + // Process-based data fetching - fetch once from appropriate source useEffect(() => { - fetchNormalizedLogs(); - }, [fetchNormalizedLogs]); + const processId = executionProcess.id; + const processStatus = executionProcess.status; - // Auto-refresh every 2 seconds when process is running - useEffect(() => { - if (executionProcess.status === 'running') { - const interval = setInterval(() => { - fetchNormalizedLogs(true); - }, 2000); + debugLog(`🎯 Data: Process ${processId} is ${processStatus}`); - return () => clearInterval(interval); + // Reset conversation state when switching processes + const manager = sseManagerRef.current; + if (manager.processId !== processId) { + setConversation(null); + setLoading(true); + setError(null); + + // Clear fetch tracking for old processes (keep memory bounded) + if (fetchedProcesses.current.size > 10) { + fetchedProcesses.current.clear(); + } } - }, [executionProcess.status, fetchNormalizedLogs]); - // Apply clustering for Gemini executor conversations - const isGeminiExecutor = useMemo( - () => conversation?.executor_type === 'gemini', - [conversation?.executor_type] - ); - const hasAssistantMessages = useMemo( - () => - conversation?.entries.some( - (entry) => entry.entry_type.type === 'assistant_message' - ), - [conversation?.entries] - ); - const displayEntries = useMemo( - () => - isGeminiExecutor && conversation?.entries - ? clusterGeminiMessages(conversation.entries, clusteringEnabled) - : conversation?.entries || [], - [isGeminiExecutor, conversation?.entries, clusteringEnabled] - ); + if (processStatus === 'running') { + // Running processes: SSE will handle data (including initial state) + debugLog(`🚀 Data: Using SSE for running process ${processId}`); + // SSE connection will be established by the SSE management effect + } else { + // Completed processes: Single database fetch + debugLog(`📋 Data: Using database for completed process ${processId}`); + fetchNormalizedLogsOnce(processId); + } + }, [ + executionProcess.id, + executionProcess.status, + fetchNormalizedLogsOnce, + debugLog, + ]); + + // SSE connection management for running processes only + useEffect(() => { + const processId = executionProcess.id; + const processStatus = executionProcess.status; + const manager = sseManagerRef.current; + + // Update manager state + setProcessId(processId); + setProcessStatus(processStatus); + + // Only establish SSE for running processes + if (processStatus !== 'running') { + debugLog( + `🚫 SSE: Process ${processStatus}, cleaning up any existing connection` + ); + cleanupSSEConnection(); + return; + } + + // Check if connection already exists for same process ID + if (manager.abortController && manager.processId === processId) { + debugLog(`⚠️ SSE: Connection already exists for ${processId}, reusing`); + return; + } + + // Process changed - close existing and reset state + if (manager.abortController && manager.processId !== processId) { + debugLog(`🔄 SSE: Switching from ${manager.processId} to ${processId}`); + cleanupSSEConnection(); + manager.highestBatchId = 0; // Reset cursor for new process + manager.reconnectAttempts = 0; + manager.patchFailureCount = 0; // Reset failure count for new process + } + + // Update manager state + manager.processId = processId; + manager.processStatus = processStatus; + + // Establish new connection + establishSSEConnection(processId, projectId); + + return () => { + debugLog(`🔌 SSE: Cleanup connection for ${processId}`); + + // Close connection if it belongs to this effect + if (manager.abortController && manager.processId === processId) { + cleanupSSEConnection(); + } + }; + }, [executionProcess.id, executionProcess.status]); + + // Memoize display entries to avoid unnecessary re-renders + const displayEntries = useMemo(() => { + if (!conversation?.entries) return []; + + // Filter out any null entries that may have been created by duplicate patch application + return conversation.entries.filter((entry): entry is NormalizedEntry => + Boolean(entry && (entry as NormalizedEntry).entry_type) + ); + }, [conversation?.entries]); if (loading) { return ( @@ -228,39 +459,6 @@ export function NormalizedConversationViewer({ return (
- {/* Display clustering controls for Gemini */} - {isGeminiExecutor && hasAssistantMessages && ( -
-
-
- - - {clusteringEnabled && - displayEntries.length !== conversation.entries.length - ? `Messages clustered for better readability (${conversation.entries.length} → ${displayEntries.length} messages)` - : 'Gemini message clustering'} - -
- -
-
- )} - {/* Display prompt if available */} {conversation.prompt && (
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 673d1421..aaaf456c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -23,6 +23,9 @@ importers: '@dnd-kit/modifiers': specifier: ^9.0.0 version: 9.0.0(@dnd-kit/core@6.3.1(react-dom@18.3.1(react@18.3.1))(react@18.3.1))(react@18.3.1) + '@microsoft/fetch-event-source': + specifier: ^2.0.1 + version: 2.0.1 '@radix-ui/react-dropdown-menu': specifier: ^2.1.15 version: 2.1.15(@types/react-dom@18.3.7(@types/react@18.3.23))(@types/react@18.3.23)(react-dom@18.3.1(react@18.3.1))(react@18.3.1) @@ -62,6 +65,9 @@ importers: clsx: specifier: ^2.0.0 version: 2.1.1 + fast-json-patch: + specifier: ^3.1.1 + version: 3.1.1 lucide-react: specifier: ^0.303.0 version: 0.303.0(react@18.3.1) @@ -620,6 +626,9 @@ packages: '@jridgewell/trace-mapping@0.3.25': resolution: {integrity: sha512-vNk6aEwybGtawWmy/PzwnGDOjCkLWSD2wqvjGGAgOAwCGWySYXfYoxt00IJkTF+8Lb57DwOb3Aa0o9CApepiYQ==} + '@microsoft/fetch-event-source@2.0.1': + resolution: {integrity: sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA==} + '@nodelib/fs.scandir@2.1.5': resolution: {integrity: sha512-vq24Bq3ym5HEQm2NKCr3yXDwjc7vTsEThRDnkp2DK9p1uqLR+DHurm/NOTo0KG7HYHU7eppKZj3MyqYuMBf62g==} engines: {node: '>= 8'} @@ -1634,6 +1643,9 @@ packages: resolution: {integrity: sha512-7MptL8U0cqcFdzIzwOTHoilX9x5BrNqye7Z/LuC7kCMRio1EMSyqRK3BEAUD7sXRq4iT4AzTVuZdhgQ2TCvYLg==} engines: {node: '>=8.6.0'} + fast-json-patch@3.1.1: + resolution: {integrity: sha512-vf6IHUX2SBcA+5/+4883dsIjpBTqmfBjmYiWK1savxQmFk4JfBMLa7ynTYOs1Rolp/T1betJxHiGD3g1Mn8lUQ==} + fast-json-stable-stringify@2.1.0: resolution: {integrity: sha512-lhd/wF+Lk98HZoTCtlVraHtfh5XYijIjalXck7saUtuanSDyLMxnHhSXEDJqHxD7msR8D0uCmqlkwjCV8xvwHw==} @@ -3089,6 +3101,8 @@ snapshots: '@jridgewell/resolve-uri': 3.1.2 '@jridgewell/sourcemap-codec': 1.5.0 + '@microsoft/fetch-event-source@2.0.1': {} + '@nodelib/fs.scandir@2.1.5': dependencies: '@nodelib/fs.stat': 2.0.5 @@ -4141,6 +4155,8 @@ snapshots: merge2: 1.4.1 micromatch: 4.0.8 + fast-json-patch@3.1.1: {} + fast-json-stable-stringify@2.1.0: {} fast-levenshtein@2.0.6: {}