Add user message to normalized log history (#521)

This commit is contained in:
Solomon
2025-08-19 13:52:34 +01:00
committed by GitHub
parent 01662fbd52
commit 85cc1d6211
9 changed files with 100 additions and 40 deletions

View File

@@ -100,7 +100,7 @@ impl StandardCodingAgentExecutor for Amp {
}
fn normalize_logs(&self, raw_logs_msg_store: Arc<MsgStore>, current_dir: &PathBuf) {
let entry_index_provider = EntryIndexProvider::new();
let entry_index_provider = EntryIndexProvider::seeded_from_msg_store(&raw_logs_msg_store);
// Process stderr logs using the standard stderr processor
normalize_stderr_logs(raw_logs_msg_store.clone(), entry_index_provider.clone());
@@ -410,7 +410,7 @@ impl AmpContentItem {
match self {
AmpContentItem::Text { text } => {
let entry_type = match role {
"user" => NormalizedEntryType::UserMessage,
"user" => return None,
"assistant" => NormalizedEntryType::AssistantMessage,
_ => return None,
};

View File

@@ -112,7 +112,7 @@ impl StandardCodingAgentExecutor for ClaudeCode {
}
fn normalize_logs(&self, msg_store: Arc<MsgStore>, current_dir: &PathBuf) {
let entry_index_provider = EntryIndexProvider::new();
let entry_index_provider = EntryIndexProvider::seeded_from_msg_store(&msg_store);
// Process stdout logs (Claude's JSON output)
ClaudeLogProcessor::process_logs(
@@ -329,16 +329,8 @@ impl ClaudeLogProcessor {
}
entries
}
ClaudeJson::User { message, .. } => {
let mut entries = Vec::new();
for content_item in &message.content {
if let Some(entry) =
Self::content_item_to_normalized_entry(content_item, "user", worktree_path)
{
entries.push(entry);
}
}
entries
ClaudeJson::User { .. } => {
vec![]
}
ClaudeJson::ToolUse { tool_data, .. } => {
let tool_name = tool_data.get_name();
@@ -386,7 +378,6 @@ impl ClaudeLogProcessor {
match content_item {
ClaudeContentItem::Text { text } => {
let entry_type = match role {
"user" => NormalizedEntryType::UserMessage,
"assistant" => NormalizedEntryType::AssistantMessage,
_ => return None,
};

View File

@@ -190,7 +190,7 @@ impl StandardCodingAgentExecutor for Codex {
}
fn normalize_logs(&self, msg_store: Arc<MsgStore>, current_dir: &PathBuf) {
let entry_index_provider = EntryIndexProvider::new();
let entry_index_provider = EntryIndexProvider::seeded_from_msg_store(&msg_store);
// Process stderr logs for session extraction only (errors come through JSONL)
SessionHandler::start_session_id_extraction(msg_store.clone());

View File

@@ -100,7 +100,7 @@ impl StandardCodingAgentExecutor for Cursor {
}
fn normalize_logs(&self, msg_store: Arc<MsgStore>, worktree_path: &PathBuf) {
let entry_index_provider = EntryIndexProvider::new();
let entry_index_provider = EntryIndexProvider::seeded_from_msg_store(&msg_store);
// Process Cursor stdout JSONL with typed serde models
let current_dir = worktree_path.clone();
@@ -116,7 +116,7 @@ impl StandardCodingAgentExecutor for Cursor {
metadata: None,
}))
.time_gap(Duration::from_secs(2)) // Break messages if they are 2 seconds apart
.index_provider(EntryIndexProvider::new())
.index_provider(entry_index_provider.clone())
.build();
// Assistant streaming coalescer state
@@ -183,19 +183,7 @@ impl StandardCodingAgentExecutor for Cursor {
}
}
CursorJson::User { message, .. } => {
if let Some(text) = message.concat_text() {
let entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::UserMessage,
content: text,
metadata: None,
};
let id = entry_index_provider.next();
msg_store
.push_patch(ConversationPatch::add_normalized_entry(id, entry));
}
}
CursorJson::User { .. } => {}
CursorJson::Assistant { message, .. } => {
if let Some(chunk) = message.concat_text() {

View File

@@ -135,7 +135,7 @@ impl StandardCodingAgentExecutor for Gemini {
/// - stderr via [`normalize_stderr_logs`]
/// - stdout via [`PlainTextLogProcessor`] with Gemini-specific formatting and default heuristics
fn normalize_logs(&self, msg_store: Arc<MsgStore>, worktree_path: &PathBuf) {
let entry_index_counter = EntryIndexProvider::new();
let entry_index_counter = EntryIndexProvider::seeded_from_msg_store(&msg_store);
normalize_stderr_logs(msg_store.clone(), entry_index_counter.clone());
// Send session ID to msg_store to enable follow-ups

View File

@@ -108,7 +108,7 @@ impl StandardCodingAgentExecutor for Opencode {
/// 3. Main normalizer thread: read stderr by line, filter out log lines, send lines (with '\n' appended) to plain text normalizer,
/// then define predicate for split and create appropriate normalized entry (either assistant or tool call).
fn normalize_logs(&self, msg_store: Arc<MsgStore>, worktree_path: &PathBuf) {
let entry_index_counter = EntryIndexProvider::new();
let entry_index_counter = EntryIndexProvider::seeded_from_msg_store(&msg_store);
let worktree_path = worktree_path.clone();
let stderr_lines = msg_store

View File

@@ -393,7 +393,7 @@ mod tests {
let mut processor = PlainTextLogProcessor::builder()
.normalized_entry_producer(producer)
.index_provider(EntryIndexProvider::new())
.index_provider(EntryIndexProvider::test_new())
.build();
let patches = processor.process("hello world\n".to_string());
@@ -429,7 +429,7 @@ mod tests {
let mut processor = PlainTextLogProcessor::builder()
.normalized_entry_producer(tool_producer)
.index_provider(EntryIndexProvider::new())
.index_provider(EntryIndexProvider::test_new())
.build();
let patches = processor.process("TOOL: file_read\n".to_string());

View File

@@ -5,13 +5,16 @@ use std::sync::{
atomic::{AtomicUsize, Ordering},
};
use json_patch::PatchOperation;
use utils::{log_msg::LogMsg, msg_store::MsgStore};
/// Thread-safe provider for monotonically increasing entry indexes
#[derive(Debug, Clone)]
pub struct EntryIndexProvider(Arc<AtomicUsize>);
impl EntryIndexProvider {
/// Create a new index provider starting from 0
pub fn new() -> Self {
/// Create a new index provider starting from 0 (private; prefer seeding)
fn new() -> Self {
Self(Arc::new(AtomicUsize::new(0)))
}
@@ -28,6 +31,36 @@ impl EntryIndexProvider {
pub fn reset(&self) {
self.0.store(0, Ordering::Relaxed);
}
/// Create a provider seeded from the maximum existing normalized-entry index
/// observed in prior JSON patches in `MsgStore`.
pub fn seeded_from_msg_store(msg_store: &MsgStore) -> Self {
let provider = EntryIndexProvider::new();
let max_index: Option<usize> = msg_store
.get_history()
.iter()
.filter_map(|msg| {
if let LogMsg::JsonPatch(patch) = msg {
patch.iter().find_map(|op| {
if let PatchOperation::Add(add) = op {
add.path
.strip_prefix("/entries/")
.and_then(|n_str| n_str.parse::<usize>().ok())
} else {
None
}
})
} else {
None
}
})
.max();
let start_at = max_index.map_or(0, |n| n.saturating_add(1));
provider.0.store(start_at, Ordering::Relaxed);
provider
}
}
impl Default for EntryIndexProvider {
@@ -42,7 +75,7 @@ mod tests {
#[test]
fn test_entry_index_provider() {
let provider = EntryIndexProvider::new();
let provider = EntryIndexProvider::test_new();
assert_eq!(provider.next(), 0);
assert_eq!(provider.next(), 1);
assert_eq!(provider.next(), 2);
@@ -50,7 +83,7 @@ mod tests {
#[test]
fn test_entry_index_provider_clone() {
let provider1 = EntryIndexProvider::new();
let provider1 = EntryIndexProvider::test_new();
let provider2 = provider1.clone();
assert_eq!(provider1.next(), 0);
@@ -60,7 +93,7 @@ mod tests {
#[test]
fn test_current_index() {
let provider = EntryIndexProvider::new();
let provider = EntryIndexProvider::test_new();
assert_eq!(provider.current(), 0);
provider.next();
@@ -70,3 +103,11 @@ mod tests {
assert_eq!(provider.current(), 2);
}
}
#[cfg(test)]
impl EntryIndexProvider {
/// Test-only constructor for a fresh provider starting at 0
pub fn test_new() -> Self {
Self::new()
}
}

View File

@@ -30,7 +30,7 @@ use executors::{
script::{ScriptContext, ScriptRequest, ScriptRequestLanguage},
},
executors::{CodingAgent, ExecutorError, StandardCodingAgentExecutor},
logs::utils::patch::ConversationPatch,
logs::{NormalizedEntry, NormalizedEntryType, utils::patch::ConversationPatch},
profile::ProfileVariantLabel,
};
use futures::{StreamExt, TryStreamExt, future};
@@ -321,6 +321,16 @@ pub trait ContainerService {
if let Ok(executor) =
CodingAgent::from_profile_variant_label(&request.profile_variant_label)
{
// Inject the initial user prompt before normalization (DB fallback path)
let user_entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::UserMessage,
content: request.prompt.clone(),
metadata: None,
};
temp_store
.push_patch(ConversationPatch::add_normalized_entry(0, user_entry));
executor.normalize_logs(temp_store.clone(), &current_dir);
} else {
tracing::error!(
@@ -333,6 +343,16 @@ pub trait ContainerService {
if let Ok(executor) =
CodingAgent::from_profile_variant_label(&request.profile_variant_label)
{
// Inject the follow-up user prompt before normalization (DB fallback path)
let user_entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::UserMessage,
content: request.prompt.clone(),
metadata: None,
};
temp_store
.push_patch(ConversationPatch::add_normalized_entry(0, user_entry));
executor.normalize_logs(temp_store.clone(), &current_dir);
} else {
tracing::error!(
@@ -578,6 +598,16 @@ pub trait ContainerService {
if let Ok(executor) =
CodingAgent::from_profile_variant_label(&request.profile_variant_label)
{
// Prepend the initial user prompt as a normalized entry
let user_entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::UserMessage,
content: request.prompt.clone(),
metadata: None,
};
msg_store
.push_patch(ConversationPatch::add_normalized_entry(0, user_entry));
executor.normalize_logs(
msg_store,
&self.task_attempt_to_current_dir(task_attempt),
@@ -595,6 +625,16 @@ pub trait ContainerService {
if let Ok(executor) =
CodingAgent::from_profile_variant_label(&request.profile_variant_label)
{
// Prepend the follow-up user prompt as a normalized entry
let user_entry = NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::UserMessage,
content: request.prompt.clone(),
metadata: None,
};
msg_store
.push_patch(ConversationPatch::add_normalized_entry(0, user_entry));
executor.normalize_logs(
msg_store,
&self.task_attempt_to_current_dir(task_attempt),