Improve normalized logs (#455)

* Increase broadcast channel size

* Use stdout_lines_stream

* WIP fix amp logs containing all previous conversation turns

* Mark raw and normalized log streams from the DB as finished

* Update event source manager to handle removed entries

* Clippy

* Cargo fmt
This commit is contained in:
Louis Knight-Webb
2025-08-12 16:31:56 +01:00
committed by GitHub
parent 74db7161b6
commit 0fdc73f8b7
8 changed files with 109 additions and 115 deletions

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, path::PathBuf, process::Stdio, sync::Arc, time::Duration}; use std::{collections::HashMap, path::PathBuf, process::Stdio, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild}; use command_group::{AsyncCommandGroup, AsyncGroupChild};
@@ -7,9 +7,7 @@ use json_patch::Patch;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::{io::AsyncWriteExt, process::Command}; use tokio::{io::AsyncWriteExt, process::Command};
use ts_rs::TS; use ts_rs::TS;
use utils::{ use utils::{msg_store::MsgStore, path::make_path_relative, shell::get_shell_command};
log_msg::LogMsg, msg_store::MsgStore, path::make_path_relative, shell::get_shell_command,
};
use crate::{ use crate::{
command::{AgentProfiles, CommandBuilder}, command::{AgentProfiles, CommandBuilder},
@@ -108,34 +106,16 @@ impl StandardCodingAgentExecutor for Amp {
// Process stdout logs (Amp's JSON output) // Process stdout logs (Amp's JSON output)
let current_dir = current_dir.clone(); let current_dir = current_dir.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut s = raw_logs_msg_store.history_plus_stream(); let mut s = raw_logs_msg_store.stdout_lines_stream();
let mut buf = String::new();
// 1 amp message id = multiple patch entry ids
let mut seen_amp_message_ids: HashMap<usize, Vec<usize>> = HashMap::new();
while let Some(Ok(m)) = s.next().await {
let chunk = match m {
LogMsg::Stdout(x) => x,
LogMsg::JsonPatch(_) | LogMsg::SessionId(_) | LogMsg::Stderr(_) => {
continue;
}
LogMsg::Finished => break,
};
buf.push_str(&chunk);
// Print complete lines; keep the trailing partial (if any) let mut seen_amp_message_ids: HashMap<usize, Vec<usize>> = HashMap::new();
for line in buf while let Some(Ok(line)) = s.next().await {
.split_inclusive('\n')
.filter(|l| l.ends_with('\n'))
.map(str::to_owned)
.collect::<Vec<_>>()
{
let trimmed = line.trim(); let trimmed = line.trim();
match serde_json::from_str(trimmed) { match serde_json::from_str(trimmed) {
Ok(amp_json) => { Ok(amp_json) => match amp_json {
match amp_json {
AmpJson::Messages { AmpJson::Messages {
messages, messages,
tool_results: _, tool_results,
} => { } => {
for (amp_message_id, message) in messages { for (amp_message_id, message) in messages {
let role = &message.role; let role = &message.role;
@@ -151,6 +131,22 @@ impl StandardCodingAgentExecutor for Amp {
&message, &message,
&current_dir.to_string_lossy(), &current_dir.to_string_lossy(),
) { ) {
// Text
if matches!(&content_item, AmpContentItem::Text { .. })
&& role == "user"
{
// Remove all previous roles
for index_to_remove in 0..entry_index_provider.current()
{
raw_logs_msg_store.push_patch(
ConversationPatch::remove_diff(
index_to_remove.to_string(),
),
);
}
entry_index_provider.reset();
}
let patch: Patch = match &mut has_patch_ids { let patch: Patch = match &mut has_patch_ids {
None => { None => {
let new_id = entry_index_provider.next(); let new_id = entry_index_provider.next();
@@ -162,26 +158,21 @@ impl StandardCodingAgentExecutor for Amp {
new_id, entry, new_id, entry,
) )
} }
Some(patch_ids) => { Some(patch_ids) => match patch_ids.get(content_index) {
match patch_ids.get(content_index) {
Some(patch_id) => { Some(patch_id) => {
ConversationPatch::replace( ConversationPatch::replace(*patch_id, entry)
*patch_id, entry,
)
} }
None => { None => {
let new_id = let new_id = entry_index_provider.next();
entry_index_provider.next();
patch_ids.push(new_id); patch_ids.push(new_id);
ConversationPatch::add_normalized_entry(new_id, entry) ConversationPatch::add_normalized_entry(
} new_id, entry,
} )
} }
},
}; };
raw_logs_msg_store.push_patch(patch); raw_logs_msg_store.push_patch(patch);
// TODO: debug this race condition
tokio::time::sleep(Duration::from_millis(1)).await;
} }
} }
} }
@@ -192,8 +183,7 @@ impl StandardCodingAgentExecutor for Amp {
} }
} }
_ => {} _ => {}
} },
}
Err(_) => { Err(_) => {
let trimmed = line.trim(); let trimmed = line.trim();
if !trimmed.is_empty() { if !trimmed.is_empty() {
@@ -207,17 +197,10 @@ impl StandardCodingAgentExecutor for Amp {
let new_id = entry_index_provider.next(); let new_id = entry_index_provider.next();
let patch = ConversationPatch::add_normalized_entry(new_id, entry); let patch = ConversationPatch::add_normalized_entry(new_id, entry);
raw_logs_msg_store.push_patch(patch); raw_logs_msg_store.push_patch(patch);
// TODO: debug this race condition
tokio::time::sleep(Duration::from_millis(1)).await;
} }
} }
}; };
} }
buf = buf.rsplit('\n').next().unwrap_or("").to_owned();
}
if !buf.is_empty() {
print!("{buf}");
}
}); });
} }
} }

View File

@@ -24,6 +24,10 @@ impl EntryIndexProvider {
pub fn current(&self) -> usize { pub fn current(&self) -> usize {
self.0.load(Ordering::Relaxed) self.0.load(Ordering::Relaxed)
} }
pub fn reset(&self) {
self.0.store(0, Ordering::Relaxed);
}
} }
impl Default for EntryIndexProvider { impl Default for EntryIndexProvider {

View File

@@ -94,10 +94,10 @@ impl ConversationPatch {
} }
/// Create a REMOVE patch for removing a diff /// Create a REMOVE patch for removing a diff
pub fn remove_diff(entry_index: String, path: &str) -> Patch { pub fn remove_diff(entry_index: String) -> Patch {
from_value(json!([{ from_value(json!([{
"op": PatchOperation::Remove, "op": PatchOperation::Remove,
path: format!("/entries/{entry_index}"), "path": format!("/entries/{entry_index}"),
}])) }]))
.unwrap() .unwrap()
} }

View File

@@ -617,10 +617,8 @@ impl LocalContainerService {
// Remove files that changed but no longer have diffs // Remove files that changed but no longer have diffs
for changed_path in changed_paths { for changed_path in changed_paths {
if !files_with_diffs.contains(changed_path) { if !files_with_diffs.contains(changed_path) {
let patch = ConversationPatch::remove_diff( let patch =
escape_json_pointer_segment(changed_path), ConversationPatch::remove_diff(escape_json_pointer_segment(changed_path));
changed_path,
);
let event = LogMsg::JsonPatch(patch).to_sse_event(); let event = LogMsg::JsonPatch(patch).to_sse_event();
events.push(event); events.push(event);
} }

View File

@@ -51,11 +51,11 @@ impl Config {
impl From<String> for Config { impl From<String> for Config {
fn from(raw_config: String) -> Self { fn from(raw_config: String) -> Self {
if let Ok(config) = serde_json::from_str::<Config>(&raw_config) { if let Ok(config) = serde_json::from_str::<Config>(&raw_config)
if config.config_version == "v3" { && config.config_version == "v3"
{
return config; return config;
} }
}
match Self::from_previous_version(&raw_config) { match Self::from_previous_version(&raw_config) {
Ok(config) => { Ok(config) => {

View File

@@ -207,6 +207,9 @@ pub trait ContainerService {
Ok::<_, std::io::Error>(event) Ok::<_, std::io::Error>(event)
}), }),
) )
.chain(futures::stream::once(async {
Ok::<_, std::io::Error>(LogMsg::Finished.to_sse_event())
}))
.boxed(); .boxed();
Some(stream) Some(stream)
@@ -331,6 +334,9 @@ pub trait ContainerService {
.history_plus_stream() .history_plus_stream()
.filter(|msg| future::ready(matches!(msg, Ok(LogMsg::JsonPatch(..))))) .filter(|msg| future::ready(matches!(msg, Ok(LogMsg::JsonPatch(..)))))
.map_ok(|m| m.to_sse_event()) .map_ok(|m| m.to_sse_event())
.chain(futures::stream::once(async {
Ok::<_, std::io::Error>(LogMsg::Finished.to_sse_event())
}))
.boxed(), .boxed(),
) )
} }

View File

@@ -37,7 +37,7 @@ impl Default for MsgStore {
impl MsgStore { impl MsgStore {
pub fn new() -> Self { pub fn new() -> Self {
let (sender, _) = broadcast::channel(100); let (sender, _) = broadcast::channel(10000);
Self { Self {
inner: RwLock::new(Inner { inner: RwLock::new(Inner {
history: VecDeque::with_capacity(32), history: VecDeque::with_capacity(32),

View File

@@ -113,6 +113,9 @@ export const useEventSourceManager = ({
return false; // Already processed return false; // Already processed
} }
processedSet.add(entryIndex); processedSet.add(entryIndex);
} else if (match && patch.op === 'remove') {
const entryIndex = parseInt(match[1], 10);
processedSet.delete(entryIndex);
} }
// Always allow replace operations and non-entry patches // Always allow replace operations and non-entry patches
return true; return true;