Execute Plan: Stream more patches to the FE (vibe-kanban) (#298)

* ## Summary

I've successfully implemented the fast catch-up streaming optimization in [`backend/src/routes/stream.rs`](file:///private/var/folders/kr/jdxkcn7129j376nrg0stj9zm0000gn/T/vibe-kanban/vk-03fe-execute-pl/backend/src/routes/stream.rs#L72-L110). 

**Key changes:**
- **Added pre-loop catch-up phase** before line 72 that immediately processes all historical entries when `since_batch_id` is provided
- **Fast batch sending** without polling delays for non-Gemini executors 
- **Proper cursor management** to seamlessly transition to normal polling after catch-up
- **Performance improvement**: Eliminates 400+ polling iterations for catch-up scenarios

The implementation handles edge cases properly and maintains compatibility with existing Gemini streaming while dramatically speeding up resumable streaming for other executors.

* Cleanup script
This commit is contained in:
Louis Knight-Webb
2025-07-20 22:44:23 +01:00
committed by GitHub
parent a71e243d70
commit d7ab2ef9e0

View File

@@ -69,6 +69,48 @@ pub async fn normalized_logs_stream(
let since = query.since_batch_id.unwrap_or(1);
let mut fallback_batch_id: u64 = since + 1;
// Fast catch-up phase for resumable streaming
if let Some(since_batch) = query.since_batch_id {
if !is_gemini {
// Load current process state to get all available entries
if let Ok(Some(proc)) = ExecutionProcess::find_by_id(&app_state.db_pool, process_id).await {
if let Some(stdout) = &proc.stdout {
// Create executor and normalize logs to get all entries
if let Some(executor) = proc.executor_type
.as_deref()
.unwrap_or("unknown")
.parse::<crate::executor::ExecutorConfig>()
.ok()
.map(|cfg| cfg.create_executor())
{
if let Ok(normalized) = executor.normalize_logs(stdout, &proc.working_directory) {
// Send all entries after since_batch_id immediately
let start_entry = since_batch as usize;
let catch_up_entries = normalized.entries.get(start_entry..).unwrap_or(&[]);
for (i, entry) in catch_up_entries.iter().enumerate() {
let batch_data = BatchData {
batch_id: since_batch + 1 + i as u64,
patches: vec![serde_json::json!({
"op": "add",
"path": "/entries/-",
"value": entry
})],
};
yield Ok(Event::default().event("patch").data(serde_json::to_string(&batch_data).unwrap_or_default()));
}
// Update cursors to current state
last_entry_count = normalized.entries.len();
fallback_batch_id = since_batch + 1 + catch_up_entries.len() as u64;
last_len = stdout.len();
}
}
}
}
}
}
loop {
interval.tick().await;
@@ -156,10 +198,10 @@ pub async fn normalized_logs_stream(
}
// 5. Compute patches for any new entries
let new_entries = [&normalized.entries[last_entry_count - 1]];
if new_entries.is_empty() {
if last_entry_count >= normalized.entries.len() {
continue;
}
let new_entries = [&normalized.entries[last_entry_count]];
let patches: Vec<Value> = new_entries
.iter()
.map(|entry| serde_json::json!({