From 52ca113ada8ebfb9358cae9bbd948e9102917a21 Mon Sep 17 00:00:00 2001 From: Louis Knight-Webb Date: Mon, 15 Sep 2025 21:07:40 +0100 Subject: [PATCH] Prevent the DB from storing massive logs (vibe-kanban) (#633) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ## Implementation Complete ✅ I've successfully implemented the log size limiting with debouncing. Here's what was built: **Key Changes:** 1. **[`logs_limits.rs`](file:///private/var/folders/m1/9q_ct1913z10v6wbnv54j25r0000gn/T/vibe-kanban/worktrees/vk-f914-prevent-th/crates/utils/src/logs_limits.rs)** - Constants for size limits 2. **[`stream_ext.rs`](file:///private/var/folders/m1/9q_ct1913z10v6wbnv54j25r0000gn/T/vibe-kanban/worktrees/vk-f914-prevent-th/crates/utils/src/stream_ext.rs)** - Debouncing logic with 200ms windows 3. **[`container.rs`](file:///private/var/folders/m1/9q_ct1913z10v6wbnv54j25r0000gn/T/vibe-kanban/worktrees/vk-f914-prevent-th/crates/local-deployment/src/container.rs#L466)** - Integration point 4. **[DB safety](file:///private/var/folders/m1/9q_ct1913z10v6wbnv54j25r0000gn/T/vibe-kanban/worktrees/vk-f914-prevent-th/crates/services/src/services/container.rs#L390-L420)** - 100kb total limit **Protection Layers:** - **Window-level**: 10kb per 200ms window with truncation - **DB-level**: 100kb total per execution with final truncation marker - **Memory-level**: Existing 100MB MsgStore limit preserved The solution maintains real-time SSE streaming (<200ms latency) while preventing massive logs from overwhelming storage. * Cleanup script changes for task attempt f91483fd-14c1-4cf3-bc05-32b3b2c8965c * Perfect! I've successfully implemented the feedback changes: ## Changes Made ✅ 1. **Removed `logs_limits.rs`** - Deleted the separate constants file 2. **Moved constants to `stream_ext.rs`** - Now `WINDOW_MS` and `WINDOW_LIMIT` are defined directly in the stream extension module 3. **Removed DB-level safety** - Eliminated the `total_db_bytes` checking and truncation logic from `spawn_stream_raw_logs_to_db` ## Final Implementation The solution now has a single, clean protection layer: - **Window-based debouncing**: 10kb limit per 200ms window with "[truncated]" markers - **Real-time streaming**: <200ms latency preserved for SSE - **Memory protection**: Existing 100MB MsgStore limit remains - **Simple architecture**: No complex DB-level tracking The implementation is fully functional and passes all type checking. The debouncing logic in [`stream_ext.rs`](file:///private/var/folders/m1/9q_ct1913z10v6wbnv54j25r0000gn/T/vibe-kanban/worktrees/vk-f914-prevent-th/crates/utils/src/stream_ext.rs) will prevent massive logs from overwhelming the system while maintaining real-time log viewing experience. * improve window capping --- crates/local-deployment/src/container.rs | 3 +- crates/utils/Cargo.toml | 1 + crates/utils/src/lib.rs | 1 + crates/utils/src/stream_ext.rs | 128 +++++++++++++++++++++++ 4 files changed, 132 insertions(+), 1 deletion(-) create mode 100644 crates/utils/src/stream_ext.rs diff --git a/crates/local-deployment/src/container.rs b/crates/local-deployment/src/container.rs index d26b3b01..334f7339 100644 --- a/crates/local-deployment/src/container.rs +++ b/crates/local-deployment/src/container.rs @@ -498,7 +498,8 @@ impl LocalContainerService { // Merge and forward into the store let merged = select(out, err); // Stream> - store.clone().spawn_forwarder(merged); + let debounced = utils::stream_ext::debounce_logs(merged); + store.clone().spawn_forwarder(debounced); let mut map = self.msg_stores().write().await; map.insert(id, store); diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index 45bff2f7..1b3fb4ae 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -27,6 +27,7 @@ base64 = "0.22" tokio = { workspace = true } futures = "0.3.31" tokio-stream = { version = "0.1.17", features = ["sync"] } +async-stream = "0.3" shellexpand = "3.1.1" which = "8.0.0" similar = "2" diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 753bf5fe..4941bad5 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -12,6 +12,7 @@ pub mod port_file; pub mod response; pub mod sentry; pub mod shell; +pub mod stream_ext; pub mod stream_lines; pub mod text; pub mod version; diff --git a/crates/utils/src/stream_ext.rs b/crates/utils/src/stream_ext.rs new file mode 100644 index 00000000..37d37bd6 --- /dev/null +++ b/crates/utils/src/stream_ext.rs @@ -0,0 +1,128 @@ +use std::io; + +use futures::{Stream, StreamExt}; +use tokio::time::{Duration, Instant, sleep_until}; + +use crate::log_msg::LogMsg; + +const WINDOW_MS: u64 = 10; +const WINDOW_LIMIT: usize = 10 * 1024; // 10 KiB per window + +// helper that flushes buf + optional [truncated] marker +fn flush_buf( + buf: &mut Vec, + kind: Option, + truncated_in_window: &mut bool, +) -> Option { + if buf.is_empty() && !*truncated_in_window { + return None; + } + let mut out = String::from_utf8_lossy(buf).into_owned(); + if *truncated_in_window { + if !out.ends_with('\n') { + out.push('\n'); + } + out.push_str("[truncated]\n"); + } + buf.clear(); + *truncated_in_window = false; + + match kind { + Some(true) => Some(LogMsg::Stdout(out)), + Some(false) => Some(LogMsg::Stderr(out)), + None => None, + } +} + +pub fn debounce_logs(input: S) -> impl Stream> +where + S: Stream> + Unpin, +{ + async_stream::stream! { + let mut buf: Vec = Vec::with_capacity(WINDOW_LIMIT); + let mut current_stream_type: Option = None; // Some(true)=stdout, Some(false)=stderr + let mut timer = Instant::now() + Duration::from_millis(WINDOW_MS); + + // per-window accounting + let mut window_bytes_emitted: usize = 0; + let mut truncated_in_window: bool = false; + + tokio::pin!(input); + + loop { + tokio::select! { + maybe = input.next() => { + let msg = match maybe { + Some(Ok(v)) => v, + Some(Err(e)) => { yield Err(e); continue; } + None => break, + }; + + match &msg { + LogMsg::Stdout(s) | LogMsg::Stderr(s) => { + let is_stdout = matches!(msg, LogMsg::Stdout(_)); + + // Flush if switching stream kind + if current_stream_type != Some(is_stdout) { + if let Some(flushed) = flush_buf(&mut buf, current_stream_type, &mut truncated_in_window) { + yield Ok(flushed); + } + current_stream_type = Some(is_stdout); + window_bytes_emitted = 0; + truncated_in_window = false; + } + + // How many bytes can we still emit in *this* window? + let remaining = WINDOW_LIMIT.saturating_sub(window_bytes_emitted); + + if remaining == 0 { + // We've hit the budget; drop this chunk and mark truncated + truncated_in_window = true; + } else { + let bytes = s.as_bytes(); + let take = remaining.min(bytes.len()); + buf.extend_from_slice(&bytes[..take]); + window_bytes_emitted += take; + + if bytes.len() > take { + // Dropped tail of this chunk + truncated_in_window = true; + } + } + } + + _ => { + // Flush accumulated stdout/stderr before passing through other messages + if let Some(flushed) = flush_buf(&mut buf, current_stream_type, &mut truncated_in_window) { + yield Ok(flushed); + } + current_stream_type = None; + yield Ok(msg); + } + } + } + + _ = sleep_until(timer) => { + if let Some(flushed) = { + let kind = current_stream_type; + flush_buf(&mut buf, kind, &mut truncated_in_window) + } { + yield Ok(flushed); + } + // Start a fresh time window + timer = Instant::now() + Duration::from_millis(WINDOW_MS); + window_bytes_emitted = 0; + truncated_in_window = false; + } + } + } + + // Final flush on stream end + if let Some(flushed) = { + let kind = current_stream_type; + flush_buf(&mut buf, kind, &mut truncated_in_window) + } { + yield Ok(flushed); + } + } +}