Prevent the DB from storing massive logs (vibe-kanban) (#633)
* ## Implementation Complete ✅ I've successfully implemented the log size limiting with debouncing. Here's what was built: **Key Changes:** 1. **[`logs_limits.rs`](file:///private/var/folders/m1/9q_ct1913z10v6wbnv54j25r0000gn/T/vibe-kanban/worktrees/vk-f914-prevent-th/crates/utils/src/logs_limits.rs)** - Constants for size limits 2. **[`stream_ext.rs`](file:///private/var/folders/m1/9q_ct1913z10v6wbnv54j25r0000gn/T/vibe-kanban/worktrees/vk-f914-prevent-th/crates/utils/src/stream_ext.rs)** - Debouncing logic with 200ms windows 3. **[`container.rs`](file:///private/var/folders/m1/9q_ct1913z10v6wbnv54j25r0000gn/T/vibe-kanban/worktrees/vk-f914-prevent-th/crates/local-deployment/src/container.rs#L466)** - Integration point 4. **[DB safety](file:///private/var/folders/m1/9q_ct1913z10v6wbnv54j25r0000gn/T/vibe-kanban/worktrees/vk-f914-prevent-th/crates/services/src/services/container.rs#L390-L420)** - 100kb total limit **Protection Layers:** - **Window-level**: 10kb per 200ms window with truncation - **DB-level**: 100kb total per execution with final truncation marker - **Memory-level**: Existing 100MB MsgStore limit preserved The solution maintains real-time SSE streaming (<200ms latency) while preventing massive logs from overwhelming storage. * Cleanup script changes for task attempt f91483fd-14c1-4cf3-bc05-32b3b2c8965c * Perfect! I've successfully implemented the feedback changes: ## Changes Made ✅ 1. **Removed `logs_limits.rs`** - Deleted the separate constants file 2. **Moved constants to `stream_ext.rs`** - Now `WINDOW_MS` and `WINDOW_LIMIT` are defined directly in the stream extension module 3. **Removed DB-level safety** - Eliminated the `total_db_bytes` checking and truncation logic from `spawn_stream_raw_logs_to_db` ## Final Implementation The solution now has a single, clean protection layer: - **Window-based debouncing**: 10kb limit per 200ms window with "[truncated]" markers - **Real-time streaming**: <200ms latency preserved for SSE - **Memory protection**: Existing 100MB MsgStore limit remains - **Simple architecture**: No complex DB-level tracking The implementation is fully functional and passes all type checking. The debouncing logic in [`stream_ext.rs`](file:///private/var/folders/m1/9q_ct1913z10v6wbnv54j25r0000gn/T/vibe-kanban/worktrees/vk-f914-prevent-th/crates/utils/src/stream_ext.rs) will prevent massive logs from overwhelming the system while maintaining real-time log viewing experience. * improve window capping
This commit is contained in:
committed by
GitHub
parent
f959882afc
commit
52ca113ada
@@ -498,7 +498,8 @@ impl LocalContainerService {
|
||||
|
||||
// Merge and forward into the store
|
||||
let merged = select(out, err); // Stream<Item = Result<LogMsg, io::Error>>
|
||||
store.clone().spawn_forwarder(merged);
|
||||
let debounced = utils::stream_ext::debounce_logs(merged);
|
||||
store.clone().spawn_forwarder(debounced);
|
||||
|
||||
let mut map = self.msg_stores().write().await;
|
||||
map.insert(id, store);
|
||||
|
||||
@@ -27,6 +27,7 @@ base64 = "0.22"
|
||||
tokio = { workspace = true }
|
||||
futures = "0.3.31"
|
||||
tokio-stream = { version = "0.1.17", features = ["sync"] }
|
||||
async-stream = "0.3"
|
||||
shellexpand = "3.1.1"
|
||||
which = "8.0.0"
|
||||
similar = "2"
|
||||
|
||||
@@ -12,6 +12,7 @@ pub mod port_file;
|
||||
pub mod response;
|
||||
pub mod sentry;
|
||||
pub mod shell;
|
||||
pub mod stream_ext;
|
||||
pub mod stream_lines;
|
||||
pub mod text;
|
||||
pub mod version;
|
||||
|
||||
128
crates/utils/src/stream_ext.rs
Normal file
128
crates/utils/src/stream_ext.rs
Normal file
@@ -0,0 +1,128 @@
|
||||
use std::io;
|
||||
|
||||
use futures::{Stream, StreamExt};
|
||||
use tokio::time::{Duration, Instant, sleep_until};
|
||||
|
||||
use crate::log_msg::LogMsg;
|
||||
|
||||
const WINDOW_MS: u64 = 10;
|
||||
const WINDOW_LIMIT: usize = 10 * 1024; // 10 KiB per window
|
||||
|
||||
// helper that flushes buf + optional [truncated] marker
|
||||
fn flush_buf(
|
||||
buf: &mut Vec<u8>,
|
||||
kind: Option<bool>,
|
||||
truncated_in_window: &mut bool,
|
||||
) -> Option<LogMsg> {
|
||||
if buf.is_empty() && !*truncated_in_window {
|
||||
return None;
|
||||
}
|
||||
let mut out = String::from_utf8_lossy(buf).into_owned();
|
||||
if *truncated_in_window {
|
||||
if !out.ends_with('\n') {
|
||||
out.push('\n');
|
||||
}
|
||||
out.push_str("[truncated]\n");
|
||||
}
|
||||
buf.clear();
|
||||
*truncated_in_window = false;
|
||||
|
||||
match kind {
|
||||
Some(true) => Some(LogMsg::Stdout(out)),
|
||||
Some(false) => Some(LogMsg::Stderr(out)),
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn debounce_logs<S>(input: S) -> impl Stream<Item = Result<LogMsg, io::Error>>
|
||||
where
|
||||
S: Stream<Item = Result<LogMsg, io::Error>> + Unpin,
|
||||
{
|
||||
async_stream::stream! {
|
||||
let mut buf: Vec<u8> = Vec::with_capacity(WINDOW_LIMIT);
|
||||
let mut current_stream_type: Option<bool> = None; // Some(true)=stdout, Some(false)=stderr
|
||||
let mut timer = Instant::now() + Duration::from_millis(WINDOW_MS);
|
||||
|
||||
// per-window accounting
|
||||
let mut window_bytes_emitted: usize = 0;
|
||||
let mut truncated_in_window: bool = false;
|
||||
|
||||
tokio::pin!(input);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
maybe = input.next() => {
|
||||
let msg = match maybe {
|
||||
Some(Ok(v)) => v,
|
||||
Some(Err(e)) => { yield Err(e); continue; }
|
||||
None => break,
|
||||
};
|
||||
|
||||
match &msg {
|
||||
LogMsg::Stdout(s) | LogMsg::Stderr(s) => {
|
||||
let is_stdout = matches!(msg, LogMsg::Stdout(_));
|
||||
|
||||
// Flush if switching stream kind
|
||||
if current_stream_type != Some(is_stdout) {
|
||||
if let Some(flushed) = flush_buf(&mut buf, current_stream_type, &mut truncated_in_window) {
|
||||
yield Ok(flushed);
|
||||
}
|
||||
current_stream_type = Some(is_stdout);
|
||||
window_bytes_emitted = 0;
|
||||
truncated_in_window = false;
|
||||
}
|
||||
|
||||
// How many bytes can we still emit in *this* window?
|
||||
let remaining = WINDOW_LIMIT.saturating_sub(window_bytes_emitted);
|
||||
|
||||
if remaining == 0 {
|
||||
// We've hit the budget; drop this chunk and mark truncated
|
||||
truncated_in_window = true;
|
||||
} else {
|
||||
let bytes = s.as_bytes();
|
||||
let take = remaining.min(bytes.len());
|
||||
buf.extend_from_slice(&bytes[..take]);
|
||||
window_bytes_emitted += take;
|
||||
|
||||
if bytes.len() > take {
|
||||
// Dropped tail of this chunk
|
||||
truncated_in_window = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_ => {
|
||||
// Flush accumulated stdout/stderr before passing through other messages
|
||||
if let Some(flushed) = flush_buf(&mut buf, current_stream_type, &mut truncated_in_window) {
|
||||
yield Ok(flushed);
|
||||
}
|
||||
current_stream_type = None;
|
||||
yield Ok(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_ = sleep_until(timer) => {
|
||||
if let Some(flushed) = {
|
||||
let kind = current_stream_type;
|
||||
flush_buf(&mut buf, kind, &mut truncated_in_window)
|
||||
} {
|
||||
yield Ok(flushed);
|
||||
}
|
||||
// Start a fresh time window
|
||||
timer = Instant::now() + Duration::from_millis(WINDOW_MS);
|
||||
window_bytes_emitted = 0;
|
||||
truncated_in_window = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Final flush on stream end
|
||||
if let Some(flushed) = {
|
||||
let kind = current_stream_type;
|
||||
flush_buf(&mut buf, kind, &mut truncated_in_window)
|
||||
} {
|
||||
yield Ok(flushed);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user