Debug stream (#272)

* debug-stream

* 1 index

* Clippy
This commit is contained in:
Louis Knight-Webb
2025-07-19 17:03:32 +01:00
committed by GitHub
parent bf2d344abb
commit 123e24220b

View File

@@ -1,4 +1,4 @@
use std::{str::FromStr, time::Duration}; use std::time::Duration;
use axum::{ use axum::{
extract::{Path, Query, State}, extract::{Path, Query, State},
@@ -61,10 +61,13 @@ pub async fn normalized_logs_stream(
let stream = async_stream::stream! { let stream = async_stream::stream! {
// Track previous stdout length and entry count for database polling fallback // Track previous stdout length and entry count for database polling fallback
let mut last_len: usize = 0; let mut last_len: usize = 0;
let mut last_entry_count: usize = 0; let mut last_entry_count: usize = query.since_batch_id.unwrap_or(1) as usize;
let mut interval = tokio::time::interval(Duration::from_millis(poll_interval)); let mut interval = tokio::time::interval(Duration::from_millis(poll_interval));
let mut last_seen_batch_id: u64 = query.since_batch_id.unwrap_or(0); // Cursor for WAL streaming let mut last_seen_batch_id: u64 = query.since_batch_id.unwrap_or(0); // Cursor for WAL streaming
let mut fallback_batch_id: u64 = query.since_batch_id.map(|id| id + 1).unwrap_or(1); // Monotonic batch ID for fallback polling
// Monotonic batch ID for fallback polling (always start at 1)
let since = query.since_batch_id.unwrap_or(1);
let mut fallback_batch_id: u64 = since + 1;
loop { loop {
interval.tick().await; interval.tick().await;
@@ -97,58 +100,87 @@ pub async fn normalized_logs_stream(
} }
} else { } else {
// Fallback: Database polling for non-streaming executors // Fallback: Database polling for non-streaming executors
let patch_result = ExecutionProcess::find_by_id(&app_state.db_pool, process_id) // 1. Load the process
let proc = match ExecutionProcess::find_by_id(&app_state.db_pool, process_id)
.await .await
.ok() .ok()
.and_then(|proc_option| proc_option) .flatten()
.filter(|proc| { {
proc.stdout Some(p) => p,
.as_ref() None => {
.is_some_and(|stdout| stdout.len() > last_len && !stdout[last_len..].trim().is_empty()) tracing::warn!("Execution process {} not found during SSE polling", process_id);
}) continue;
.and_then(|proc| { }
let executor_type = proc.executor_type.as_deref().unwrap_or("unknown"); };
crate::executor::ExecutorConfig::from_str(executor_type)
.ok()
.map(|config| (config.create_executor(), proc))
})
.and_then(|(executor, proc)| {
let stdout = proc.stdout.unwrap_or_default();
executor.normalize_logs(&stdout, &proc.working_directory)
.ok()
.map(|normalized| (normalized, stdout.len()))
})
.and_then(|(normalized, new_len)| {
let new_entries = &normalized.entries[last_entry_count..];
(!new_entries.is_empty()).then(|| {
let patch = new_entries
.iter()
.map(|entry| serde_json::json!({
"op": "add",
"path": "/entries/-",
"value": entry
}))
.collect::<Vec<_>>();
(patch, normalized.entries.len(), new_len) // 2. Grab the stdout and check if there's new content
}) let stdout = match proc.stdout {
}) Some(ref s) if s.len() > last_len && !s[last_len..].trim().is_empty() => s.clone(),
.filter(|(patch, _, _): &(Vec<Value>, usize, usize)| !patch.is_empty()); _ => continue, // no new output
};
if let Some((patch, entries_len, new_len)) = patch_result { // 3. Instantiate the right executor
// Use same format as fast-path for backward compatibility let executor = match proc.executor_type
let batch_data = BatchData { .as_deref()
batch_id: fallback_batch_id, .unwrap_or("unknown")
patches: patch, .parse::<crate::executor::ExecutorConfig>()
}; .ok()
let json = serde_json::to_string(&batch_data).unwrap_or_default(); .map(|cfg| cfg.create_executor())
yield Ok(Event::default().event("patch").data(json)); {
Some(exec) => exec,
None => {
tracing::warn!(
"Unknown executor '{}' for process {}",
proc.executor_type.unwrap_or_default(),
process_id
);
continue;
}
};
// Update tracking variables after successful send // 4. Normalize logs
fallback_batch_id += 1; let normalized = match executor.normalize_logs(&stdout, &proc.working_directory) {
last_entry_count = entries_len; Ok(norm) => norm,
last_len = new_len; Err(err) => {
tracing::error!(
"Failed to normalize logs for process {}: {}",
process_id,
err
);
continue;
}
};
if last_entry_count > normalized.entries.len() {
continue;
} }
// 5. Compute patches for any new entries
let new_entries = [&normalized.entries[last_entry_count - 1]];
if new_entries.is_empty() {
continue;
}
let patches: Vec<Value> = new_entries
.iter()
.map(|entry| serde_json::json!({
"op": "add",
"path": "/entries/-",
"value": entry
}))
.collect();
// 6. Emit the batch
let batch_data = BatchData {
batch_id: fallback_batch_id - 1,
patches,
};
let json = serde_json::to_string(&batch_data).unwrap_or_default();
yield Ok(Event::default().event("patch").data(json));
// 7. Update our cursors
fallback_batch_id += 1;
last_entry_count += 1;
last_len = stdout.len();
} }
// Stop streaming when process completed // Stop streaming when process completed