From 6d2819490f832cf5e3b66edbbe7f9b060cc8128c Mon Sep 17 00:00:00 2001 From: Solomon Date: Tue, 25 Nov 2025 11:16:12 +0000 Subject: [PATCH] Remove log truncation (#1368) `[truncated]` corrupts json logs. --- crates/local-deployment/src/container.rs | 3 +- crates/utils/src/lib.rs | 1 - crates/utils/src/stream_ext.rs | 155 ----------------------- 3 files changed, 1 insertion(+), 158 deletions(-) delete mode 100644 crates/utils/src/stream_ext.rs diff --git a/crates/local-deployment/src/container.rs b/crates/local-deployment/src/container.rs index 855c342a..4a191e87 100644 --- a/crates/local-deployment/src/container.rs +++ b/crates/local-deployment/src/container.rs @@ -532,8 +532,7 @@ impl LocalContainerService { // Merge and forward into the store let merged = select(out, err); // Stream> - let debounced = utils::stream_ext::debounce_logs(merged); - store.clone().spawn_forwarder(debounced); + store.clone().spawn_forwarder(merged); let mut map = self.msg_stores().write().await; map.insert(id, store); diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 5eb5e0c2..bad90be9 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -16,7 +16,6 @@ pub mod port_file; pub mod response; pub mod sentry; pub mod shell; -pub mod stream_ext; pub mod stream_lines; pub mod text; pub mod tokio; diff --git a/crates/utils/src/stream_ext.rs b/crates/utils/src/stream_ext.rs deleted file mode 100644 index 374adc8e..00000000 --- a/crates/utils/src/stream_ext.rs +++ /dev/null @@ -1,155 +0,0 @@ -use std::io; - -use futures::{Stream, StreamExt}; -use tokio::time::{Duration, Instant, sleep_until}; - -use crate::log_msg::LogMsg; - -const WINDOW_MS: u64 = 10; -const WINDOW_LIMIT: usize = 100 * 1024; // 100 KiB per window -// To avoid unbounded growth within a window, cap accumulation. -// We allow collecting more than WINDOW_LIMIT to preserve both head and tail, -// then apply middle truncation on flush. -const COLLECT_LIMIT: usize = WINDOW_LIMIT * 2; - -const TRUNC_MARKER: &str = " [truncated] "; - -fn middle_truncate_bytes(bytes: &[u8], limit: usize, marker: &str) -> String { - if bytes.len() <= limit { - return String::from_utf8_lossy(bytes).into_owned(); - } - let m = marker.as_bytes(); - let mlen = m.len(); - if limit <= mlen { - // Degenerate case: not enough room; return a cut marker - return String::from_utf8_lossy(&m[..limit]).into_owned(); - } - let keep_prefix = (limit - mlen) / 2; - let keep_suffix = limit - mlen - keep_prefix; - - let mut out = Vec::with_capacity(limit); - out.extend_from_slice(&bytes[..keep_prefix]); - out.extend_from_slice(m); - out.extend_from_slice(&bytes[bytes.len() - keep_suffix..]); - String::from_utf8_lossy(&out).into_owned() -} - -fn shrink_middle(buf: &mut Vec, target_len: usize) { - if buf.len() <= target_len { - return; - } - let extra = buf.len() - target_len; - let mid = buf.len() / 2; - let start = mid.saturating_sub(extra / 2); - let end = start + extra; - buf.drain(start..end); -} - -// Helper that flushes buffer, inserting a middle [truncated] marker when needed -fn flush_buf( - buf: &mut Vec, - kind: Option, - truncated_in_window: &mut bool, -) -> Option { - if buf.is_empty() && !*truncated_in_window { - return None; - } - - let needs_marker = *truncated_in_window || buf.len() > WINDOW_LIMIT; - let out = if needs_marker { - middle_truncate_bytes(buf, WINDOW_LIMIT, TRUNC_MARKER) - } else { - String::from_utf8_lossy(buf).into_owned() - }; - - buf.clear(); - *truncated_in_window = false; - - match kind { - Some(true) => Some(LogMsg::Stdout(out)), - Some(false) => Some(LogMsg::Stderr(out)), - None => None, - } -} - -pub fn debounce_logs(input: S) -> impl Stream> -where - S: Stream> + Unpin, -{ - async_stream::stream! { - // Single accumulation buffer per window; we trim from the middle when exceeding COLLECT_LIMIT - let mut buf: Vec = Vec::with_capacity(WINDOW_LIMIT); - let mut current_stream_type: Option = None; // Some(true)=stdout, Some(false)=stderr - let mut timer = Instant::now() + Duration::from_millis(WINDOW_MS); - - // per-window accounting - let mut truncated_in_window: bool = false; - - tokio::pin!(input); - - loop { - tokio::select! { - maybe = input.next() => { - let msg = match maybe { - Some(Ok(v)) => v, - Some(Err(e)) => { yield Err(e); continue; } - None => break, - }; - - match &msg { - LogMsg::Stdout(s) | LogMsg::Stderr(s) => { - let is_stdout = matches!(msg, LogMsg::Stdout(_)); - - // Flush if switching stream kind - if current_stream_type != Some(is_stdout) { - if let Some(flushed) = flush_buf(&mut buf, current_stream_type, &mut truncated_in_window) { - yield Ok(flushed); - } - current_stream_type = Some(is_stdout); - buf.clear(); - truncated_in_window = false; - } - - let bytes = s.as_bytes(); - buf.extend_from_slice(bytes); - if buf.len() > COLLECT_LIMIT { - truncated_in_window = true; - shrink_middle(&mut buf, COLLECT_LIMIT); - } - } - - _ => { - // Flush accumulated stdout/stderr before passing through other messages - if let Some(flushed) = flush_buf(&mut buf, current_stream_type, &mut truncated_in_window) { - yield Ok(flushed); - } - current_stream_type = None; - yield Ok(msg); - } - } - } - - _ = sleep_until(timer) => { - if let Some(flushed) = { - let kind = current_stream_type; - flush_buf(&mut buf, kind, &mut truncated_in_window) - } { - yield Ok(flushed); - } - // Start a fresh time window - timer = Instant::now() + Duration::from_millis(WINDOW_MS); - buf.clear(); - truncated_in_window = false; - } - } - } - - // Final flush on stream end - if let Some(flushed) = { - let kind = current_stream_type; - flush_buf(&mut buf, kind, &mut truncated_in_window) - } { - yield Ok(flushed); - } - } -}