diff --git a/backend/src/routes/stream.rs b/backend/src/routes/stream.rs index 18814813..229eab9a 100644 --- a/backend/src/routes/stream.rs +++ b/backend/src/routes/stream.rs @@ -1,4 +1,4 @@ -use std::{str::FromStr, time::Duration}; +use std::time::Duration; use axum::{ extract::{Path, Query, State}, @@ -61,10 +61,13 @@ pub async fn normalized_logs_stream( let stream = async_stream::stream! { // Track previous stdout length and entry count for database polling fallback let mut last_len: usize = 0; - let mut last_entry_count: usize = 0; + let mut last_entry_count: usize = query.since_batch_id.unwrap_or(1) as usize; let mut interval = tokio::time::interval(Duration::from_millis(poll_interval)); let mut last_seen_batch_id: u64 = query.since_batch_id.unwrap_or(0); // Cursor for WAL streaming - let mut fallback_batch_id: u64 = query.since_batch_id.map(|id| id + 1).unwrap_or(1); // Monotonic batch ID for fallback polling + + // Monotonic batch ID for fallback polling (always start at 1) + let since = query.since_batch_id.unwrap_or(1); + let mut fallback_batch_id: u64 = since + 1; loop { interval.tick().await; @@ -97,58 +100,87 @@ pub async fn normalized_logs_stream( } } else { // Fallback: Database polling for non-streaming executors - let patch_result = ExecutionProcess::find_by_id(&app_state.db_pool, process_id) + // 1. Load the process + let proc = match ExecutionProcess::find_by_id(&app_state.db_pool, process_id) .await .ok() - .and_then(|proc_option| proc_option) - .filter(|proc| { - proc.stdout - .as_ref() - .is_some_and(|stdout| stdout.len() > last_len && !stdout[last_len..].trim().is_empty()) - }) - .and_then(|proc| { - let executor_type = proc.executor_type.as_deref().unwrap_or("unknown"); - crate::executor::ExecutorConfig::from_str(executor_type) - .ok() - .map(|config| (config.create_executor(), proc)) - }) - .and_then(|(executor, proc)| { - let stdout = proc.stdout.unwrap_or_default(); - executor.normalize_logs(&stdout, &proc.working_directory) - .ok() - .map(|normalized| (normalized, stdout.len())) - }) - .and_then(|(normalized, new_len)| { - let new_entries = &normalized.entries[last_entry_count..]; - (!new_entries.is_empty()).then(|| { - let patch = new_entries - .iter() - .map(|entry| serde_json::json!({ - "op": "add", - "path": "/entries/-", - "value": entry - })) - .collect::>(); + .flatten() + { + Some(p) => p, + None => { + tracing::warn!("Execution process {} not found during SSE polling", process_id); + continue; + } + }; - (patch, normalized.entries.len(), new_len) - }) - }) - .filter(|(patch, _, _): &(Vec, usize, usize)| !patch.is_empty()); + // 2. Grab the stdout and check if there's new content + let stdout = match proc.stdout { + Some(ref s) if s.len() > last_len && !s[last_len..].trim().is_empty() => s.clone(), + _ => continue, // no new output + }; - if let Some((patch, entries_len, new_len)) = patch_result { - // Use same format as fast-path for backward compatibility - let batch_data = BatchData { - batch_id: fallback_batch_id, - patches: patch, - }; - let json = serde_json::to_string(&batch_data).unwrap_or_default(); - yield Ok(Event::default().event("patch").data(json)); + // 3. Instantiate the right executor + let executor = match proc.executor_type + .as_deref() + .unwrap_or("unknown") + .parse::() + .ok() + .map(|cfg| cfg.create_executor()) + { + Some(exec) => exec, + None => { + tracing::warn!( + "Unknown executor '{}' for process {}", + proc.executor_type.unwrap_or_default(), + process_id + ); + continue; + } + }; - // Update tracking variables after successful send - fallback_batch_id += 1; - last_entry_count = entries_len; - last_len = new_len; + // 4. Normalize logs + let normalized = match executor.normalize_logs(&stdout, &proc.working_directory) { + Ok(norm) => norm, + Err(err) => { + tracing::error!( + "Failed to normalize logs for process {}: {}", + process_id, + err + ); + continue; + } + }; + + if last_entry_count > normalized.entries.len() { + continue; } + + // 5. Compute patches for any new entries + let new_entries = [&normalized.entries[last_entry_count - 1]]; + if new_entries.is_empty() { + continue; + } + let patches: Vec = new_entries + .iter() + .map(|entry| serde_json::json!({ + "op": "add", + "path": "/entries/-", + "value": entry + })) + .collect(); + + // 6. Emit the batch + let batch_data = BatchData { + batch_id: fallback_batch_id - 1, + patches, + }; + let json = serde_json::to_string(&batch_data).unwrap_or_default(); + yield Ok(Event::default().event("patch").data(json)); + + // 7. Update our cursors + fallback_batch_id += 1; + last_entry_count += 1; + last_len = stdout.len(); } // Stop streaming when process completed