diff --git a/backend/src/routes/stream.rs b/backend/src/routes/stream.rs index 229eab9a..a9d97630 100644 --- a/backend/src/routes/stream.rs +++ b/backend/src/routes/stream.rs @@ -69,6 +69,48 @@ pub async fn normalized_logs_stream( let since = query.since_batch_id.unwrap_or(1); let mut fallback_batch_id: u64 = since + 1; + // Fast catch-up phase for resumable streaming + if let Some(since_batch) = query.since_batch_id { + if !is_gemini { + // Load current process state to get all available entries + if let Ok(Some(proc)) = ExecutionProcess::find_by_id(&app_state.db_pool, process_id).await { + if let Some(stdout) = &proc.stdout { + // Create executor and normalize logs to get all entries + if let Some(executor) = proc.executor_type + .as_deref() + .unwrap_or("unknown") + .parse::() + .ok() + .map(|cfg| cfg.create_executor()) + { + if let Ok(normalized) = executor.normalize_logs(stdout, &proc.working_directory) { + // Send all entries after since_batch_id immediately + let start_entry = since_batch as usize; + let catch_up_entries = normalized.entries.get(start_entry..).unwrap_or(&[]); + + for (i, entry) in catch_up_entries.iter().enumerate() { + let batch_data = BatchData { + batch_id: since_batch + 1 + i as u64, + patches: vec![serde_json::json!({ + "op": "add", + "path": "/entries/-", + "value": entry + })], + }; + yield Ok(Event::default().event("patch").data(serde_json::to_string(&batch_data).unwrap_or_default())); + } + + // Update cursors to current state + last_entry_count = normalized.entries.len(); + fallback_batch_id = since_batch + 1 + catch_up_entries.len() as u64; + last_len = stdout.len(); + } + } + } + } + } + } + loop { interval.tick().await; @@ -156,10 +198,10 @@ pub async fn normalized_logs_stream( } // 5. Compute patches for any new entries - let new_entries = [&normalized.entries[last_entry_count - 1]]; - if new_entries.is_empty() { + if last_entry_count >= normalized.entries.len() { continue; } + let new_entries = [&normalized.entries[last_entry_count]]; let patches: Vec = new_entries .iter() .map(|entry| serde_json::json!({