@@ -7,8 +7,45 @@ use crate::log_msg::LogMsg;
|
||||
|
||||
const WINDOW_MS: u64 = 10;
|
||||
const WINDOW_LIMIT: usize = 100 * 1024; // 100 KiB per window
|
||||
// To avoid unbounded growth within a window, cap accumulation.
|
||||
// We allow collecting more than WINDOW_LIMIT to preserve both head and tail,
|
||||
// then apply middle truncation on flush.
|
||||
const COLLECT_LIMIT: usize = WINDOW_LIMIT * 2;
|
||||
|
||||
// helper that flushes buf + optional [truncated] marker
|
||||
const TRUNC_MARKER: &str = " [truncated] ";
|
||||
|
||||
fn middle_truncate_bytes(bytes: &[u8], limit: usize, marker: &str) -> String {
|
||||
if bytes.len() <= limit {
|
||||
return String::from_utf8_lossy(bytes).into_owned();
|
||||
}
|
||||
let m = marker.as_bytes();
|
||||
let mlen = m.len();
|
||||
if limit <= mlen {
|
||||
// Degenerate case: not enough room; return a cut marker
|
||||
return String::from_utf8_lossy(&m[..limit]).into_owned();
|
||||
}
|
||||
let keep_prefix = (limit - mlen) / 2;
|
||||
let keep_suffix = limit - mlen - keep_prefix;
|
||||
|
||||
let mut out = Vec::with_capacity(limit);
|
||||
out.extend_from_slice(&bytes[..keep_prefix]);
|
||||
out.extend_from_slice(m);
|
||||
out.extend_from_slice(&bytes[bytes.len() - keep_suffix..]);
|
||||
String::from_utf8_lossy(&out).into_owned()
|
||||
}
|
||||
|
||||
fn shrink_middle(buf: &mut Vec<u8>, target_len: usize) {
|
||||
if buf.len() <= target_len {
|
||||
return;
|
||||
}
|
||||
let extra = buf.len() - target_len;
|
||||
let mid = buf.len() / 2;
|
||||
let start = mid.saturating_sub(extra / 2);
|
||||
let end = start + extra;
|
||||
buf.drain(start..end);
|
||||
}
|
||||
|
||||
// Helper that flushes buffer, inserting a middle [truncated] marker when needed
|
||||
fn flush_buf(
|
||||
buf: &mut Vec<u8>,
|
||||
kind: Option<bool>,
|
||||
@@ -17,13 +54,14 @@ fn flush_buf(
|
||||
if buf.is_empty() && !*truncated_in_window {
|
||||
return None;
|
||||
}
|
||||
let mut out = String::from_utf8_lossy(buf).into_owned();
|
||||
if *truncated_in_window {
|
||||
if !out.ends_with('\n') {
|
||||
out.push('\n');
|
||||
}
|
||||
out.push_str("[truncated]\n");
|
||||
}
|
||||
|
||||
let needs_marker = *truncated_in_window || buf.len() > WINDOW_LIMIT;
|
||||
let out = if needs_marker {
|
||||
middle_truncate_bytes(buf, WINDOW_LIMIT, TRUNC_MARKER)
|
||||
} else {
|
||||
String::from_utf8_lossy(buf).into_owned()
|
||||
};
|
||||
|
||||
buf.clear();
|
||||
*truncated_in_window = false;
|
||||
|
||||
@@ -39,12 +77,12 @@ where
|
||||
S: Stream<Item = Result<LogMsg, io::Error>> + Unpin,
|
||||
{
|
||||
async_stream::stream! {
|
||||
// Single accumulation buffer per window; we trim from the middle when exceeding COLLECT_LIMIT
|
||||
let mut buf: Vec<u8> = Vec::with_capacity(WINDOW_LIMIT);
|
||||
let mut current_stream_type: Option<bool> = None; // Some(true)=stdout, Some(false)=stderr
|
||||
let mut timer = Instant::now() + Duration::from_millis(WINDOW_MS);
|
||||
|
||||
// per-window accounting
|
||||
let mut window_bytes_emitted: usize = 0;
|
||||
let mut truncated_in_window: bool = false;
|
||||
|
||||
tokio::pin!(input);
|
||||
@@ -68,26 +106,15 @@ where
|
||||
yield Ok(flushed);
|
||||
}
|
||||
current_stream_type = Some(is_stdout);
|
||||
window_bytes_emitted = 0;
|
||||
buf.clear();
|
||||
truncated_in_window = false;
|
||||
}
|
||||
|
||||
// How many bytes can we still emit in *this* window?
|
||||
let remaining = WINDOW_LIMIT.saturating_sub(window_bytes_emitted);
|
||||
|
||||
if remaining == 0 {
|
||||
// We've hit the budget; drop this chunk and mark truncated
|
||||
let bytes = s.as_bytes();
|
||||
buf.extend_from_slice(bytes);
|
||||
if buf.len() > COLLECT_LIMIT {
|
||||
truncated_in_window = true;
|
||||
} else {
|
||||
let bytes = s.as_bytes();
|
||||
let take = remaining.min(bytes.len());
|
||||
buf.extend_from_slice(&bytes[..take]);
|
||||
window_bytes_emitted += take;
|
||||
|
||||
if bytes.len() > take {
|
||||
// Dropped tail of this chunk
|
||||
truncated_in_window = true;
|
||||
}
|
||||
shrink_middle(&mut buf, COLLECT_LIMIT);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,7 +138,7 @@ where
|
||||
}
|
||||
// Start a fresh time window
|
||||
timer = Instant::now() + Duration::from_millis(WINDOW_MS);
|
||||
window_bytes_emitted = 0;
|
||||
buf.clear();
|
||||
truncated_in_window = false;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user