From 0fdc73f8b7228c42e6ce2f6880536d071ffab612 Mon Sep 17 00:00:00 2001 From: Louis Knight-Webb Date: Tue, 12 Aug 2025 16:31:56 +0100 Subject: [PATCH] Improve normalized logs (#455) * Increase broadcast channel size * Use stdout_lines_stream * WIP fix amp logs containing all previous conversation turns * Mark raw and normalized log streams from the DB as finished * Update event source manager to handle removed entries * Clippy * Cargo fmt --- crates/executors/src/executors/amp.rs | 191 ++++++++---------- .../executors/src/logs/utils/entry_index.rs | 4 + crates/executors/src/logs/utils/patch.rs | 4 +- crates/local-deployment/src/container.rs | 6 +- .../src/services/config/versions/v3.rs | 8 +- crates/services/src/services/container.rs | 6 + crates/utils/src/msg_store.rs | 2 +- frontend/src/hooks/useEventSourceManager.ts | 3 + 8 files changed, 109 insertions(+), 115 deletions(-) diff --git a/crates/executors/src/executors/amp.rs b/crates/executors/src/executors/amp.rs index a079da87..08307b3a 100644 --- a/crates/executors/src/executors/amp.rs +++ b/crates/executors/src/executors/amp.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, path::PathBuf, process::Stdio, sync::Arc, time::Duration}; +use std::{collections::HashMap, path::PathBuf, process::Stdio, sync::Arc}; use async_trait::async_trait; use command_group::{AsyncCommandGroup, AsyncGroupChild}; @@ -7,9 +7,7 @@ use json_patch::Patch; use serde::{Deserialize, Serialize}; use tokio::{io::AsyncWriteExt, process::Command}; use ts_rs::TS; -use utils::{ - log_msg::LogMsg, msg_store::MsgStore, path::make_path_relative, shell::get_shell_command, -}; +use utils::{msg_store::MsgStore, path::make_path_relative, shell::get_shell_command}; use crate::{ command::{AgentProfiles, CommandBuilder}, @@ -108,115 +106,100 @@ impl StandardCodingAgentExecutor for Amp { // Process stdout logs (Amp's JSON output) let current_dir = current_dir.clone(); tokio::spawn(async move { - let mut s = raw_logs_msg_store.history_plus_stream(); - let mut buf = String::new(); - // 1 amp message id = multiple patch entry ids + let mut s = raw_logs_msg_store.stdout_lines_stream(); + let mut seen_amp_message_ids: HashMap> = HashMap::new(); - while let Some(Ok(m)) = s.next().await { - let chunk = match m { - LogMsg::Stdout(x) => x, - LogMsg::JsonPatch(_) | LogMsg::SessionId(_) | LogMsg::Stderr(_) => { - continue; - } - LogMsg::Finished => break, - }; - buf.push_str(&chunk); + while let Some(Ok(line)) = s.next().await { + let trimmed = line.trim(); + match serde_json::from_str(trimmed) { + Ok(amp_json) => match amp_json { + AmpJson::Messages { + messages, + tool_results, + } => { + for (amp_message_id, message) in messages { + let role = &message.role; - // Print complete lines; keep the trailing partial (if any) - for line in buf - .split_inclusive('\n') - .filter(|l| l.ends_with('\n')) - .map(str::to_owned) - .collect::>() - { - let trimmed = line.trim(); - match serde_json::from_str(trimmed) { - Ok(amp_json) => { - match amp_json { - AmpJson::Messages { - messages, - tool_results: _, - } => { - for (amp_message_id, message) in messages { - let role = &message.role; + for (content_index, content_item) in + message.content.iter().enumerate() + { + let mut has_patch_ids = + seen_amp_message_ids.get_mut(&_message_id); - for (content_index, content_item) in - message.content.iter().enumerate() + if let Some(entry) = content_item.to_normalized_entry( + role, + &message, + ¤t_dir.to_string_lossy(), + ) { + // Text + if matches!(&content_item, AmpContentItem::Text { .. }) + && role == "user" { - let mut has_patch_ids = - seen_amp_message_ids.get_mut(&_message_id); - - if let Some(entry) = content_item.to_normalized_entry( - role, - &message, - ¤t_dir.to_string_lossy(), - ) { - let patch: Patch = match &mut has_patch_ids { - None => { - let new_id = entry_index_provider.next(); - seen_amp_message_ids - .entry(amp_message_id) - .or_default() - .push(new_id); - ConversationPatch::add_normalized_entry( - new_id, entry, - ) - } - Some(patch_ids) => { - match patch_ids.get(content_index) { - Some(patch_id) => { - ConversationPatch::replace( - *patch_id, entry, - ) - } - None => { - let new_id = - entry_index_provider.next(); - patch_ids.push(new_id); - ConversationPatch::add_normalized_entry(new_id, entry) - } - } - } - }; - - raw_logs_msg_store.push_patch(patch); - // TODO: debug this race condition - tokio::time::sleep(Duration::from_millis(1)).await; + // Remove all previous roles + for index_to_remove in 0..entry_index_provider.current() + { + raw_logs_msg_store.push_patch( + ConversationPatch::remove_diff( + index_to_remove.to_string(), + ), + ); } + entry_index_provider.reset(); } - } - } - AmpJson::Initial { thread_id } => { - if let Some(thread_id) = thread_id { - raw_logs_msg_store.push_session_id(thread_id); - } - } - _ => {} - } - } - Err(_) => { - let trimmed = line.trim(); - if !trimmed.is_empty() { - let entry = NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::SystemMessage, - content: format!("Raw output: {trimmed}"), - metadata: None, - }; - let new_id = entry_index_provider.next(); - let patch = ConversationPatch::add_normalized_entry(new_id, entry); - raw_logs_msg_store.push_patch(patch); - // TODO: debug this race condition - tokio::time::sleep(Duration::from_millis(1)).await; + let patch: Patch = match &mut has_patch_ids { + None => { + let new_id = entry_index_provider.next(); + seen_amp_message_ids + .entry(amp_message_id) + .or_default() + .push(new_id); + ConversationPatch::add_normalized_entry( + new_id, entry, + ) + } + Some(patch_ids) => match patch_ids.get(content_index) { + Some(patch_id) => { + ConversationPatch::replace(*patch_id, entry) + } + None => { + let new_id = entry_index_provider.next(); + patch_ids.push(new_id); + ConversationPatch::add_normalized_entry( + new_id, entry, + ) + } + }, + }; + + raw_logs_msg_store.push_patch(patch); + } + } } } - }; - } - buf = buf.rsplit('\n').next().unwrap_or("").to_owned(); - } - if !buf.is_empty() { - print!("{buf}"); + AmpJson::Initial { thread_id } => { + if let Some(thread_id) = thread_id { + raw_logs_msg_store.push_session_id(thread_id); + } + } + _ => {} + }, + Err(_) => { + let trimmed = line.trim(); + if !trimmed.is_empty() { + let entry = NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::SystemMessage, + content: format!("Raw output: {trimmed}"), + metadata: None, + }; + + let new_id = entry_index_provider.next(); + let patch = ConversationPatch::add_normalized_entry(new_id, entry); + raw_logs_msg_store.push_patch(patch); + } + } + }; } }); } diff --git a/crates/executors/src/logs/utils/entry_index.rs b/crates/executors/src/logs/utils/entry_index.rs index 559c6ec3..58b0b9ed 100644 --- a/crates/executors/src/logs/utils/entry_index.rs +++ b/crates/executors/src/logs/utils/entry_index.rs @@ -24,6 +24,10 @@ impl EntryIndexProvider { pub fn current(&self) -> usize { self.0.load(Ordering::Relaxed) } + + pub fn reset(&self) { + self.0.store(0, Ordering::Relaxed); + } } impl Default for EntryIndexProvider { diff --git a/crates/executors/src/logs/utils/patch.rs b/crates/executors/src/logs/utils/patch.rs index fe3cd2a3..737907d5 100644 --- a/crates/executors/src/logs/utils/patch.rs +++ b/crates/executors/src/logs/utils/patch.rs @@ -94,10 +94,10 @@ impl ConversationPatch { } /// Create a REMOVE patch for removing a diff - pub fn remove_diff(entry_index: String, path: &str) -> Patch { + pub fn remove_diff(entry_index: String) -> Patch { from_value(json!([{ "op": PatchOperation::Remove, - path: format!("/entries/{entry_index}"), + "path": format!("/entries/{entry_index}"), }])) .unwrap() } diff --git a/crates/local-deployment/src/container.rs b/crates/local-deployment/src/container.rs index 5c5bd0f3..33ad280c 100644 --- a/crates/local-deployment/src/container.rs +++ b/crates/local-deployment/src/container.rs @@ -617,10 +617,8 @@ impl LocalContainerService { // Remove files that changed but no longer have diffs for changed_path in changed_paths { if !files_with_diffs.contains(changed_path) { - let patch = ConversationPatch::remove_diff( - escape_json_pointer_segment(changed_path), - changed_path, - ); + let patch = + ConversationPatch::remove_diff(escape_json_pointer_segment(changed_path)); let event = LogMsg::JsonPatch(patch).to_sse_event(); events.push(event); } diff --git a/crates/services/src/services/config/versions/v3.rs b/crates/services/src/services/config/versions/v3.rs index 2267bf8e..1f44f0f6 100644 --- a/crates/services/src/services/config/versions/v3.rs +++ b/crates/services/src/services/config/versions/v3.rs @@ -51,10 +51,10 @@ impl Config { impl From for Config { fn from(raw_config: String) -> Self { - if let Ok(config) = serde_json::from_str::(&raw_config) { - if config.config_version == "v3" { - return config; - } + if let Ok(config) = serde_json::from_str::(&raw_config) + && config.config_version == "v3" + { + return config; } match Self::from_previous_version(&raw_config) { diff --git a/crates/services/src/services/container.rs b/crates/services/src/services/container.rs index 0b242a8b..b8ecd775 100644 --- a/crates/services/src/services/container.rs +++ b/crates/services/src/services/container.rs @@ -207,6 +207,9 @@ pub trait ContainerService { Ok::<_, std::io::Error>(event) }), ) + .chain(futures::stream::once(async { + Ok::<_, std::io::Error>(LogMsg::Finished.to_sse_event()) + })) .boxed(); Some(stream) @@ -331,6 +334,9 @@ pub trait ContainerService { .history_plus_stream() .filter(|msg| future::ready(matches!(msg, Ok(LogMsg::JsonPatch(..))))) .map_ok(|m| m.to_sse_event()) + .chain(futures::stream::once(async { + Ok::<_, std::io::Error>(LogMsg::Finished.to_sse_event()) + })) .boxed(), ) } diff --git a/crates/utils/src/msg_store.rs b/crates/utils/src/msg_store.rs index dcadeba6..906f3549 100644 --- a/crates/utils/src/msg_store.rs +++ b/crates/utils/src/msg_store.rs @@ -37,7 +37,7 @@ impl Default for MsgStore { impl MsgStore { pub fn new() -> Self { - let (sender, _) = broadcast::channel(100); + let (sender, _) = broadcast::channel(10000); Self { inner: RwLock::new(Inner { history: VecDeque::with_capacity(32), diff --git a/frontend/src/hooks/useEventSourceManager.ts b/frontend/src/hooks/useEventSourceManager.ts index f7d01e16..8ca6a7d0 100644 --- a/frontend/src/hooks/useEventSourceManager.ts +++ b/frontend/src/hooks/useEventSourceManager.ts @@ -113,6 +113,9 @@ export const useEventSourceManager = ({ return false; // Already processed } processedSet.add(entryIndex); + } else if (match && patch.op === 'remove') { + const entryIndex = parseInt(match[1], 10); + processedSet.delete(entryIndex); } // Always allow replace operations and non-entry patches return true;