diff --git a/crates/executors/Cargo.toml b/crates/executors/Cargo.toml index 4ff8e8a5..e2e08517 100644 --- a/crates/executors/Cargo.toml +++ b/crates/executors/Cargo.toml @@ -43,3 +43,6 @@ sqlx = "0.8.6" axum = { workspace = true } shlex = "1.3.0" agent-client-protocol = "0.4" +codex-protocol = { git = "https://github.com/openai/codex.git", package = "codex-protocol", rev = "488ec061bf4d36916b8f477c700ea4fde4162a7a" } +codex-app-server-protocol = { git = "https://github.com/openai/codex.git", package = "codex-app-server-protocol", rev = "488ec061bf4d36916b8f477c700ea4fde4162a7a" } +codex-mcp-types = { git = "https://github.com/openai/codex.git", package = "mcp-types", rev = "488ec061bf4d36916b8f477c700ea4fde4162a7a" } diff --git a/crates/executors/src/executors/codex.rs b/crates/executors/src/executors/codex.rs index 2f11eb7a..09c3ce9b 100644 --- a/crates/executors/src/executors/codex.rs +++ b/crates/executors/src/executors/codex.rs @@ -1,36 +1,39 @@ -mod session; +pub mod client; +pub mod jsonrpc; +pub mod normalize_logs; +pub mod session; use std::{ + collections::HashMap, path::{Path, PathBuf}, - process::Stdio, sync::Arc, }; use async_trait::async_trait; +use codex_app_server_protocol::NewConversationParams; +use codex_protocol::config_types::SandboxMode as CodexSandboxMode; use command_group::AsyncCommandGroup; -use futures::StreamExt; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use serde_json::Value; use strum_macros::AsRefStr; -use tokio::{io::AsyncWriteExt, process::Command}; +use tokio::process::Command; use ts_rs::TS; -use workspace_utils::{ - diff::{concatenate_diff_hunks, extract_unified_diff_hunks}, - msg_store::MsgStore, - path::make_path_relative, - shell::get_shell_command, -}; +use workspace_utils::{msg_store::MsgStore, shell::get_shell_command}; +use self::{ + client::{AppServerClient, LogWriter}, + jsonrpc::JsonRpcPeer, + normalize_logs::normalize_logs, + session::SessionHandler, +}; use crate::{ command::{CmdOverrides, CommandBuilder, apply_overrides}, executors::{ AppendPrompt, ExecutorError, SpawnedChild, StandardCodingAgentExecutor, - codex::session::SessionHandler, - }, - logs::{ - ActionType, FileChange, NormalizedEntry, NormalizedEntryType, ToolStatus, - utils::{EntryIndexProvider, patch::ConversationPatch}, + codex::{jsonrpc::ExitSignalSender, normalize_logs::Error}, }, + stdout_dup::create_stdout_pipe_writer, }; /// Sandbox policy modes for Codex @@ -90,90 +93,23 @@ pub struct Codex { pub model_reasoning_summary: Option, #[serde(default, skip_serializing_if = "Option::is_none")] pub model_reasoning_summary_format: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub profile: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub base_instructions: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub include_plan_tool: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub include_apply_patch_tool: Option, #[serde(flatten)] pub cmd: CmdOverrides, } -impl Codex { - fn build_command_builder(&self) -> CommandBuilder { - let mut builder = CommandBuilder::new("npx -y @openai/codex@0.38.0 exec") - .params(["--json", "--skip-git-repo-check"]); - - if let Some(sandbox) = &self.sandbox { - if sandbox == &SandboxMode::Auto { - builder = builder.extend_params(["--full-auto"]); - } else { - builder = builder.extend_params(["--sandbox", sandbox.as_ref()]); - if sandbox == &SandboxMode::DangerFullAccess { - builder = builder.extend_params(["--dangerously-bypass-approvals-and-sandbox"]); - } - } - } - - if self.oss.unwrap_or(false) { - builder = builder.extend_params(["--oss"]); - } - - if let Some(model) = &self.model { - builder = builder.extend_params(["--model", model]); - } - - if let Some(effort) = &self.model_reasoning_effort { - builder = builder.extend_params([ - "--config", - &format!("model_reasoning_effort={}", effort.as_ref()), - ]); - } - - if let Some(format) = &self.model_reasoning_summary_format - && format != &ReasoningSummaryFormat::None - { - builder = builder.extend_params([ - "--config", - &format!("model_reasoning_summary_format={}", format.as_ref()), - ]); - } - - if let Some(summary) = &self.model_reasoning_summary { - builder = builder.extend_params([ - "--config", - &format!("model_reasoning_summary={}", summary.as_ref()), - ]); - } - - apply_overrides(builder, &self.cmd) - } -} - #[async_trait] impl StandardCodingAgentExecutor for Codex { async fn spawn(&self, current_dir: &Path, prompt: &str) -> Result { - let (shell_cmd, shell_arg) = get_shell_command(); - let codex_command = self.build_command_builder().build_initial(); - - let combined_prompt = self.append_prompt.combine_prompt(prompt); - - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(current_dir) - .arg(shell_arg) - .arg(&codex_command) - .env("NODE_NO_WARNINGS", "1") - .env("RUST_LOG", "info"); - - let mut child = command.group_spawn()?; - - // Feed the prompt in, then close the pipe so codex sees EOF - if let Some(mut stdin) = child.inner().stdin.take() { - stdin.write_all(combined_prompt.as_bytes()).await?; - stdin.shutdown().await?; - } - - Ok(child.into()) + let command = self.build_command_builder().build_initial(); + self.spawn(current_dir, prompt, command, None).await } async fn spawn_follow_up( @@ -182,1108 +118,202 @@ impl StandardCodingAgentExecutor for Codex { prompt: &str, session_id: &str, ) -> Result { - // Fork rollout: copy and assign a new session id so each execution has a unique session - let (_rollout_file_path, new_session_id) = SessionHandler::fork_rollout_file(session_id) - .map_err(|e| ExecutorError::SpawnError(std::io::Error::other(e)))?; - - let (shell_cmd, shell_arg) = get_shell_command(); - let codex_command = self - .build_command_builder() - .build_follow_up(&["resume".to_string(), new_session_id]); - - let combined_prompt = self.append_prompt.combine_prompt(prompt); - - let mut command = Command::new(shell_cmd); - command - .kill_on_drop(true) - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .current_dir(current_dir) - .arg(shell_arg) - .arg(&codex_command) - .env("NODE_NO_WARNINGS", "1") - .env("RUST_LOG", "info"); - - let mut child = command.group_spawn()?; - - // Feed the prompt in, then close the pipe so codex sees EOF - if let Some(mut stdin) = child.inner().stdin.take() { - stdin.write_all(combined_prompt.as_bytes()).await?; - stdin.shutdown().await?; - } - - Ok(child.into()) + let command = self.build_command_builder().build_follow_up(&[]); + self.spawn(current_dir, prompt, command, Some(session_id)) + .await } - fn normalize_logs(&self, msg_store: Arc, current_dir: &Path) { - let entry_index_provider = EntryIndexProvider::start_from(&msg_store); - - // Process stderr logs for session extraction only (errors come through JSONL) - SessionHandler::start_session_id_extraction(msg_store.clone()); - - // Process stdout logs (Codex's JSONL output) - let current_dir = current_dir.to_path_buf(); - tokio::spawn(async move { - let mut stream = msg_store.stdout_lines_stream(); - use std::collections::HashMap; - // Track exec call ids to entry index, tool_name, content, and command - let mut exec_info_map: HashMap = - HashMap::new(); - // Track MCP calls to index, tool_name, args, and initial content - let mut mcp_info_map: HashMap< - String, - (usize, String, Option, String), - > = HashMap::new(); - - while let Some(Ok(line)) = stream.next().await { - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - - if let Ok(cj) = serde_json::from_str::(trimmed) { - // Handle result-carrying events that require replacement - match &cj { - CodexJson::StructuredMessage { msg, .. } => match msg { - CodexMsgContent::ExecCommandBegin { - call_id, command, .. - } => { - let command_str = command.join(" "); - let entry = NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::ToolUse { - tool_name: if command_str.contains("bash") { - "bash".to_string() - } else { - "shell".to_string() - }, - action_type: ActionType::CommandRun { - command: command_str.clone(), - result: None, - }, - status: ToolStatus::Created, - }, - content: format!("`{command_str}`"), - metadata: None, - }; - let id = entry_index_provider.next(); - if let Some(cid) = call_id.as_ref() { - let tool_name = if command_str.contains("bash") { - "bash".to_string() - } else { - "shell".to_string() - }; - exec_info_map.insert( - cid.clone(), - (id, tool_name, entry.content.clone(), command_str.clone()), - ); - } - msg_store - .push_patch(ConversationPatch::add_normalized_entry(id, entry)); - } - CodexMsgContent::ExecCommandEnd { - call_id, - stdout, - stderr, - success, - exit_code, - } => { - if let Some(cid) = call_id.as_ref() - && let Some((idx, tool_name, prev_content, prev_command)) = - exec_info_map.get(cid).cloned() - { - // Merge stdout and stderr for richer context - let output = match (stdout.as_ref(), stderr.as_ref()) { - (Some(sout), Some(serr)) => { - let sout_trim = sout.trim(); - let serr_trim = serr.trim(); - if sout_trim.is_empty() && serr_trim.is_empty() { - None - } else if sout_trim.is_empty() { - Some(serr.clone()) - } else if serr_trim.is_empty() { - Some(sout.clone()) - } else { - Some(format!( - "STDOUT:\n{sout_trim}\n\nSTDERR:\n{serr_trim}" - )) - } - } - (Some(sout), None) => { - if sout.trim().is_empty() { - None - } else { - Some(sout.clone()) - } - } - (None, Some(serr)) => { - if serr.trim().is_empty() { - None - } else { - Some(serr.clone()) - } - } - (None, None) => None, - }; - let exit_status = if let Some(s) = success { - Some(crate::logs::CommandExitStatus::Success { - success: *s, - }) - } else { - exit_code.as_ref().map(|code| { - crate::logs::CommandExitStatus::ExitCode { code: *code } - }) - }; - - let status = if let Some(s) = success { - if *s { - ToolStatus::Success - } else { - ToolStatus::Failed - } - } else if let Some(code) = exit_code { - if *code == 0 { - ToolStatus::Success - } else { - ToolStatus::Failed - } - } else { - // Default to failed status - ToolStatus::Failed - }; - - let entry = NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::ToolUse { - tool_name, - action_type: ActionType::CommandRun { - command: prev_command, - result: Some(crate::logs::CommandRunResult { - exit_status, - output, - }), - }, - status, - }, - content: prev_content, - metadata: None, - }; - msg_store.push_patch(ConversationPatch::replace(idx, entry)); - } - } - CodexMsgContent::McpToolCallBegin { - call_id, - invocation, - } => { - let tool_name = - format!("mcp:{}:{}", invocation.server, invocation.tool); - let content_str = invocation.tool.clone(); - let entry = NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::ToolUse { - tool_name: tool_name.clone(), - action_type: ActionType::Tool { - tool_name: tool_name.clone(), - arguments: invocation.arguments.clone(), - result: None, - }, - status: ToolStatus::Created, - }, - content: content_str.clone(), - metadata: None, - }; - let id = entry_index_provider.next(); - mcp_info_map.insert( - call_id.clone(), - ( - id, - tool_name.clone(), - invocation.arguments.clone(), - content_str, - ), - ); - msg_store - .push_patch(ConversationPatch::add_normalized_entry(id, entry)); - } - CodexMsgContent::McpToolCallEnd { - call_id, result, .. - } => { - if let Some((idx, tool_name, args, prev_content)) = - mcp_info_map.remove(call_id) - { - let entry = NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::ToolUse { - tool_name: tool_name.clone(), - action_type: ActionType::Tool { - tool_name, - arguments: args, - result: Some(crate::logs::ToolResult { - r#type: crate::logs::ToolResultValueType::Json, - value: result.clone(), - }), - }, - status: ToolStatus::Success, - }, - content: prev_content, - metadata: None, - }; - msg_store.push_patch(ConversationPatch::replace(idx, entry)); - } - } - _ => { - if let Some(entries) = cj.to_normalized_entries(¤t_dir) { - for entry in entries { - let new_id = entry_index_provider.next(); - let patch = - ConversationPatch::add_normalized_entry(new_id, entry); - msg_store.push_patch(patch); - } - } - } - }, - _ => { - if let Some(entries) = cj.to_normalized_entries(¤t_dir) { - for entry in entries { - let new_id = entry_index_provider.next(); - let patch = - ConversationPatch::add_normalized_entry(new_id, entry); - msg_store.push_patch(patch); - } - } - } - } - } else { - // Handle malformed JSON as raw output - let entry = NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::SystemMessage, - content: trimmed.to_string(), - metadata: None, - }; - - let new_id = entry_index_provider.next(); - let patch = ConversationPatch::add_normalized_entry(new_id, entry); - msg_store.push_patch(patch); - } - } - }); + fn normalize_logs(&self, msg_store: Arc, worktree_path: &Path) { + normalize_logs(msg_store, worktree_path); } - // MCP configuration methods - fn default_mcp_config_path(&self) -> Option { + fn default_mcp_config_path(&self) -> Option { dirs::home_dir().map(|home| home.join(".codex").join("config.toml")) } } -// Data structures for parsing Codex's JSON output format -#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] -#[serde(untagged)] -pub enum CodexJson { - /// Structured message with id and msg fields - StructuredMessage { id: String, msg: CodexMsgContent }, - /// Prompt message (user input) - Prompt { prompt: String }, - /// System configuration message (first message with config fields) - SystemConfig { - #[serde(default)] - model: Option, - #[serde(rename = "reasoning effort", default)] - reasoning_effort: Option, - #[serde(default)] - provider: Option, - #[serde(default)] - sandbox: Option, - #[serde(default)] - approval: Option, - #[serde(default)] - workdir: Option, - #[serde(rename = "reasoning summaries", default)] - reasoning_summaries: Option, - #[serde(flatten)] - other_fields: std::collections::HashMap, - }, -} +impl Codex { + fn build_command_builder(&self) -> CommandBuilder { + let mut builder = CommandBuilder::new("npx -y @openai/codex@0.44.0 app-server"); -#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] -pub struct McpInvocation { - pub server: String, - pub tool: String, - #[serde(default)] - pub arguments: Option, -} + if self.oss.unwrap_or(false) { + builder = builder.extend_params(["--oss"]); + } -#[derive(Deserialize, Serialize, Debug, Clone, PartialEq)] -#[serde(tag = "type")] -pub enum CodexMsgContent { - #[serde(rename = "agent_message")] - AgentMessage { message: String }, + apply_overrides(builder, &self.cmd) + } - #[serde(rename = "agent_reasoning")] - AgentReasoning { text: String }, + fn build_new_conversation_params(&self, cwd: &Path) -> NewConversationParams { + let sandbox = match self.sandbox.as_ref() { + None | Some(SandboxMode::Auto) => None, + Some(SandboxMode::ReadOnly) => Some(CodexSandboxMode::ReadOnly), + Some(SandboxMode::WorkspaceWrite) => Some(CodexSandboxMode::WorkspaceWrite), + Some(SandboxMode::DangerFullAccess) => Some(CodexSandboxMode::DangerFullAccess), + }; - #[serde(rename = "agent_reasoning_raw_content")] - AgentReasoningRawContent { text: String }, - - #[serde(rename = "agent_reasoning_raw_content_delta")] - AgentReasoningRawContentDelta { delta: String }, - - #[serde(rename = "error")] - Error { message: Option }, - - #[serde(rename = "mcp_tool_call_begin")] - McpToolCallBegin { - call_id: String, - invocation: McpInvocation, - }, - - #[serde(rename = "mcp_tool_call_end")] - McpToolCallEnd { - call_id: String, - invocation: McpInvocation, - #[serde(default)] - duration: serde_json::Value, - result: serde_json::Value, - }, - - #[serde(rename = "exec_command_begin")] - ExecCommandBegin { - call_id: Option, - command: Vec, - cwd: Option, - }, - - #[serde(rename = "exec_command_output_delta")] - ExecCommandOutputDelta { - call_id: Option, - // "stdout" | "stderr" typically - stream: Option, - // Could be bytes or string; keep flexible - chunk: Option, - }, - - #[serde(rename = "exec_command_end")] - ExecCommandEnd { - call_id: Option, - stdout: Option, - stderr: Option, - // Codex protocol has exit_code + duration; CLI may provide success; keep optional - success: Option, - #[serde(default)] - exit_code: Option, - }, - - #[serde(rename = "exec_approval_request")] - ExecApprovalRequest { - call_id: Option, - command: Vec, - cwd: Option, - reason: Option, - }, - - #[serde(rename = "apply_patch_approval_request")] - ApplyPatchApprovalRequest { - call_id: Option, - changes: std::collections::HashMap, - reason: Option, - grant_root: Option, - }, - - #[serde(rename = "background_event")] - BackgroundEvent { message: String }, - - #[serde(rename = "patch_apply_begin")] - PatchApplyBegin { - call_id: Option, - auto_approved: Option, - changes: std::collections::HashMap, - }, - - #[serde(rename = "patch_apply_end")] - PatchApplyEnd { - call_id: Option, - stdout: Option, - stderr: Option, - success: Option, - }, - - #[serde(rename = "turn_diff")] - TurnDiff { unified_diff: String }, - - #[serde(rename = "get_history_entry_response")] - GetHistoryEntryResponse { - offset: Option, - log_id: Option, - entry: Option, - }, - - #[serde(rename = "plan_update")] - PlanUpdate { - #[serde(flatten)] - value: serde_json::Value, - }, - - #[serde(rename = "task_started")] - TaskStarted, - #[serde(rename = "task_complete")] - TaskComplete { last_agent_message: Option }, - #[serde(rename = "token_count")] - TokenCount { - input_tokens: Option, - cached_input_tokens: Option, - output_tokens: Option, - reasoning_output_tokens: Option, - total_tokens: Option, - }, - - // Catch-all for unknown message types - #[serde(other)] - Unknown, -} - -#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] -#[serde(rename_all = "snake_case")] -pub enum CodexFileChange { - Add { - content: String, - }, - Delete, - Update { - unified_diff: String, - move_path: Option, - }, -} - -impl CodexJson { - /// Convert to normalized entries - pub fn to_normalized_entries(&self, current_dir: &Path) -> Option> { - match self { - CodexJson::SystemConfig { .. } => self.format_config_message().map(|content| { - vec![NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::SystemMessage, - content, - metadata: Some(serde_json::to_value(self).unwrap_or(serde_json::Value::Null)), - }] - }), - CodexJson::Prompt { .. } => None, // Skip prompt messages - CodexJson::StructuredMessage { msg, .. } => { - let this = &msg; - - match this { - CodexMsgContent::AgentMessage { message } => Some(vec![NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::AssistantMessage, - content: message.clone(), - metadata: None, - }]), - CodexMsgContent::AgentReasoning { text } => Some(vec![NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::Thinking, - content: text.clone(), - metadata: None, - }]), - CodexMsgContent::Error { message } => { - let error_message = message - .clone() - .unwrap_or_else(|| "Unknown error occurred".to_string()); - Some(vec![NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::ErrorMessage, - content: error_message, - metadata: None, - }]) - } - CodexMsgContent::ExecCommandBegin { .. } => None, - CodexMsgContent::PatchApplyBegin { changes, .. } => { - let mut entries = Vec::new(); - - for (file_path, change_data) in changes { - // Make path relative to current directory - let relative_path = - make_path_relative(file_path, ¤t_dir.to_string_lossy()); - - // Try to extract unified diff from change data - let mut changes = vec![]; - - match change_data { - CodexFileChange::Update { - unified_diff, - move_path, - } => { - let mut new_path = relative_path.clone(); - - if let Some(move_path) = move_path { - new_path = make_path_relative( - &move_path.to_string_lossy(), - ¤t_dir.to_string_lossy(), - ); - changes.push(FileChange::Rename { - new_path: new_path.clone(), - }); - } - if !unified_diff.is_empty() { - let hunks = extract_unified_diff_hunks(unified_diff); - changes.push(FileChange::Edit { - unified_diff: concatenate_diff_hunks(&new_path, &hunks), - has_line_numbers: true, - }); - } - } - CodexFileChange::Add { content } => { - changes.push(FileChange::Write { - content: content.clone(), - }); - } - CodexFileChange::Delete => { - changes.push(FileChange::Delete); - } - }; - - entries.push(NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::ToolUse { - tool_name: "edit".to_string(), - action_type: ActionType::FileEdit { - path: relative_path.clone(), - changes, - }, - status: ToolStatus::Success, - }, - content: relative_path, - metadata: None, - }); - } - - Some(entries) - } - CodexMsgContent::McpToolCallBegin { .. } => None, - CodexMsgContent::ExecApprovalRequest { - command, - cwd, - reason, - .. - } => { - let command_str = command.join(" "); - let mut parts = vec![format!("command: `{}`", command_str)]; - if let Some(c) = cwd { - parts.push(format!("cwd: {c}")); - } - if let Some(r) = reason { - parts.push(format!("reason: {r}")); - } - let content = - format!("Execution approval requested — {}", parts.join(" ")); - Some(vec![NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::SystemMessage, - content, - metadata: None, - }]) - } - CodexMsgContent::ApplyPatchApprovalRequest { - changes, - reason, - grant_root, - .. - } => { - let mut parts = vec![format!("files: {}", changes.len())]; - if let Some(root) = grant_root { - parts.push(format!("grant_root: {root}")); - } - if let Some(r) = reason { - parts.push(format!("reason: {r}")); - } - let content = format!("Patch approval requested — {}", parts.join(" ")); - Some(vec![NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::SystemMessage, - content, - metadata: None, - }]) - } - CodexMsgContent::PlanUpdate { value } => Some(vec![NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::SystemMessage, - content: "Plan update".to_string(), - metadata: Some(value.clone()), - }]), - - // Ignored message types - CodexMsgContent::AgentReasoningRawContent { .. } - | CodexMsgContent::AgentReasoningRawContentDelta { .. } - | CodexMsgContent::ExecCommandOutputDelta { .. } - | CodexMsgContent::GetHistoryEntryResponse { .. } - | CodexMsgContent::ExecCommandEnd { .. } - | CodexMsgContent::PatchApplyEnd { .. } - | CodexMsgContent::McpToolCallEnd { .. } - | CodexMsgContent::TaskStarted - | CodexMsgContent::TaskComplete { .. } - | CodexMsgContent::TokenCount { .. } - | CodexMsgContent::TurnDiff { .. } - | CodexMsgContent::BackgroundEvent { .. } - | CodexMsgContent::Unknown => None, - } - } + NewConversationParams { + model: self.model.clone(), + profile: self.profile.clone(), + cwd: Some(cwd.to_string_lossy().to_string()), + approval_policy: None, + sandbox, + config: self.build_config_overrides(), + base_instructions: self.base_instructions.clone(), + include_plan_tool: self.include_plan_tool, + include_apply_patch_tool: self.include_apply_patch_tool, } } - /// Format system configuration message for display - fn format_config_message(&self) -> Option { - if let CodexJson::SystemConfig { - model, - reasoning_effort, - provider, - sandbox: _, - approval: _, - workdir: _, - reasoning_summaries: _, - other_fields: _, - } = self + fn build_config_overrides(&self) -> Option> { + let mut overrides = HashMap::new(); + + if let Some(effort) = &self.model_reasoning_effort { + overrides.insert( + "model_reasoning_effort".to_string(), + Value::String(effort.as_ref().to_string()), + ); + } + + if let Some(summary) = &self.model_reasoning_summary { + overrides.insert( + "model_reasoning_summary".to_string(), + Value::String(summary.as_ref().to_string()), + ); + } + + if let Some(format) = &self.model_reasoning_summary_format + && format != &ReasoningSummaryFormat::None { - let mut params = vec![]; + overrides.insert( + "model_reasoning_summary_format".to_string(), + Value::String(format.as_ref().to_string()), + ); + } - if let Some(model) = model { - params.push(format!("model: {model}")); - } - if let Some(provider) = provider { - params.push(format!("provider: {provider}")); - } - if let Some(reasoning_effort) = reasoning_effort { - params.push(format!("reasoning effort: {reasoning_effort}")); - } - - if params.is_empty() { - None - } else { - Some(params.join(" ").to_string()) - } - } else { + if overrides.is_empty() { None + } else { + Some(overrides) } } -} -#[cfg(test)] -mod tests { - use super::*; - use crate::logs::{ActionType, NormalizedEntry, NormalizedEntryType}; + async fn spawn( + &self, + current_dir: &Path, + prompt: &str, + command: String, + resume_session: Option<&str>, + ) -> Result { + let combined_prompt = self.append_prompt.combine_prompt(prompt); + let (shell_cmd, shell_arg) = get_shell_command(); - /// Test helper that directly tests the JSON parsing functions - fn parse_test_json_lines(input: &str) -> Vec { - let current_dir = PathBuf::from("/tmp"); - let mut entries = Vec::new(); + let mut process = Command::new(shell_cmd); + process + .kill_on_drop(true) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .current_dir(current_dir) + .arg(shell_arg) + .arg(&command) + .env("NODE_NO_WARNINGS", "1") + .env("NO_COLOR", "1") + .env("RUST_LOG", "error"); - for line in input.lines() { - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } + let mut child = process.group_spawn()?; - if let Ok(parsed_entries) = - serde_json::from_str::(trimmed).map(|codex_json| { - codex_json - .to_normalized_entries(¤t_dir) - .unwrap_or_default() - }) + let child_stdout = child.inner().stdout.take().ok_or_else(|| { + ExecutorError::Io(std::io::Error::other("Codex app server missing stdout")) + })?; + let child_stdin = child.inner().stdin.take().ok_or_else(|| { + ExecutorError::Io(std::io::Error::other("Codex app server missing stdin")) + })?; + + let new_stdout = create_stdout_pipe_writer(&mut child)?; + let (exit_signal_tx, exit_signal_rx) = tokio::sync::oneshot::channel(); + + let params = self.build_new_conversation_params(current_dir); + let resume_session = resume_session.map(|s| s.to_string()); + tokio::spawn(async move { + let exit_signal_tx = ExitSignalSender::new(exit_signal_tx); + let log_writer = LogWriter::new(new_stdout); + if let Err(err) = Self::launch_codex_app_server( + params, + resume_session, + combined_prompt, + child_stdout, + child_stdin, + log_writer.clone(), + exit_signal_tx.clone(), + ) + .await { - entries.extend(parsed_entries); - } else { - // Handle malformed JSON as raw output - entries.push(NormalizedEntry { - timestamp: None, - entry_type: NormalizedEntryType::SystemMessage, - content: trimmed.to_string(), - metadata: None, - }); - } - } - - entries - } - - /// Test helper for testing CodexJson deserialization - fn test_codex_json_parsing(json_str: &str) -> Result { - serde_json::from_str(json_str) - } - - #[test] - fn test_extract_session_id_from_line() { - let line = "2025-07-23T15:47:59.877058Z INFO codex_exec: Codex initialized with event: Event { id: \"0\", msg: SessionConfigured(SessionConfiguredEvent { session_id: 3cdcc4df-c7c3-4cca-8902-48c3d4a0f96b, model: \"codex-mini-latest\", history_log_id: 9104228, history_entry_count: 1 }) }"; - - let session_id = SessionHandler::extract_session_id_from_line(line); - assert_eq!( - session_id, - Some("3cdcc4df-c7c3-4cca-8902-48c3d4a0f96b".to_string()) - ); - } - - #[test] - fn test_extract_session_id_no_match() { - let line = "Some random log line without session id"; - let session_id = SessionHandler::extract_session_id_from_line(line); - assert_eq!(session_id, None); - } - - #[test] - fn test_extract_session_id_from_line_new_format() { - // Newer Codex versions wrap the UUID in ConversationId(...) - let line = "2025-09-12T14:36:32.515901Z INFO codex_exec: Codex initialized with event: SessionConfiguredEvent { session_id: ConversationId(bd823d48-4bd8-4d9e-9d87-93a66afbf4d2), model: \"gpt-5\", history_log_id: 0, history_entry_count: 0, initial_messages: None, rollout_path: \"/home/user/.codex/sessions/2025/09/12/rollout-2025-09-12T14-36-32-bd823d48-4bd8-4d9e-9d87-93a66afbf4d2.jsonl\" }"; - - let session_id = SessionHandler::extract_session_id_from_line(line); - assert_eq!( - session_id, - Some("bd823d48-4bd8-4d9e-9d87-93a66afbf4d2".to_string()) - ); - } - - #[test] - fn test_normalize_logs_basic() { - let logs = r#"{"id":"1","msg":{"type":"task_started"}} -{"id":"1","msg":{"type":"agent_reasoning","text":"**Inspecting the directory tree**\n\nI want to check the root directory tree and I think using `ls -1` is acceptable since the guidelines don't explicitly forbid it, unlike `ls -R`, `find`, or `grep`. I could also consider using `rg --files`, but that might be too overwhelming if there are many files. Focusing on the top-level files and directories seems like a better approach. I'm particularly interested in `LICENSE`, `README.md`, and any relevant README files. So, let's start with `ls -1`."}} -{"id":"1","msg":{"type":"exec_command_begin","call_id":"call_I1o1QnQDtlLjGMg4Vd9HXJLd","command":["bash","-lc","ls -1"],"cwd":"/Users/user/dev/vk-wip"}} -{"id":"1","msg":{"type":"exec_command_end","call_id":"call_I1o1QnQDtlLjGMg4Vd9HXJLd","stdout":"AGENT.md\nCLAUDE.md\nCODE-OF-CONDUCT.md\nCargo.lock\nCargo.toml\nDockerfile\nLICENSE\nREADME.md\nbackend\nbuild-npm-package.sh\ndev_assets\ndev_assets_seed\nfrontend\nnode_modules\nnpx-cli\npackage-lock.json\npackage.json\npnpm-lock.yaml\npnpm-workspace.yaml\nrust-toolchain.toml\nrustfmt.toml\nscripts\nshared\ntest-npm-package.sh\n","stderr":"","exit_code":0}} -{"id":"1","msg":{"type":"task_complete","last_agent_message":"I can see the directory structure of your project. This appears to be a Rust project with a frontend/backend architecture, using pnpm for package management. The project includes various configuration files, documentation, and development assets."}}"#; - - let entries = parse_test_json_lines(logs); - - // Should have only agent_reasoning (task_started, exec_command_begin, task_complete are skipped in to_normalized_entries) - assert_eq!(entries.len(), 1); - - // Check agent reasoning (thinking) - assert!(matches!( - entries[0].entry_type, - NormalizedEntryType::Thinking - )); - assert!(entries[0].content.contains("Inspecting the directory tree")); - - // Command entries are handled in the streaming path, not to_normalized_entries - } - - #[test] - fn test_normalize_logs_shell_vs_bash_mapping() { - // Test shell command (not bash) - let shell_logs = r#"{"id":"1","msg":{"type":"exec_command_begin","call_id":"call_test","command":["sh","-c","echo hello"],"cwd":"/tmp"}}"#; - let entries = parse_test_json_lines(shell_logs); - // to_normalized_entries skips exec_command_begin; mapping is tested in streaming path - assert_eq!(entries.len(), 0); - - // Test bash command - let bash_logs = r#"{"id":"1","msg":{"type":"exec_command_begin","call_id":"call_test","command":["bash","-c","echo hello"],"cwd":"/tmp"}}"#; - let entries = parse_test_json_lines(bash_logs); - assert_eq!(entries.len(), 0); - - // Mapping to bash is exercised in the streaming path - } - - #[test] - fn test_normalize_logs_token_count_skipped() { - let logs = r#"{"id":"1","msg":{"type":"task_started"}} -{"id":"1","msg":{"type":"token_count","input_tokens":1674,"cached_input_tokens":1627,"output_tokens":384,"reasoning_output_tokens":384,"total_tokens":2058}} -{"id":"1","msg":{"type":"task_complete","last_agent_message":"Done!"}}"#; - - let entries = parse_test_json_lines(logs); - - // Should have: nothing (task_started, task_complete, and token_count all skipped) - assert_eq!(entries.len(), 0); - } - - #[test] - fn test_normalize_logs_malformed_json() { - let logs = r#"{"id":"1","msg":{"type":"task_started"}} -invalid json line here -{"id":"1","msg":{"type":"task_complete","last_agent_message":"Done!"}}"#; - - let entries = parse_test_json_lines(logs); - - // Should have: raw output only (task_started and task_complete skipped) - assert_eq!(entries.len(), 1); - - // Check that malformed JSON becomes raw output - assert!(matches!( - entries[0].entry_type, - NormalizedEntryType::SystemMessage - )); - assert!(entries[0].content.contains("invalid json line here")); - } - - #[test] - fn test_normalize_logs_prompt_ignored() { - let logs = r#"{"prompt":"project_id: f61fbd6a-9552-4b68-a1fe-10561f028dfc\n \nTask title: describe this repo"} -{"id":"1","msg":{"type":"task_started"}} -{"id":"1","msg":{"type":"agent_message","message":"Hello, I'll help you with that."}}"#; - - let entries = parse_test_json_lines(logs); - - // Should have 1 entry (prompt and task_started ignored, only agent_message) - assert_eq!(entries.len(), 1); - - // Check that we only have agent_message - assert!(matches!( - entries[0].entry_type, - NormalizedEntryType::AssistantMessage - )); - assert_eq!(entries[0].content, "Hello, I'll help you with that."); - } - - #[test] - fn test_normalize_logs_error_message() { - let logs = r#"{"id":"1","msg":{"type":"error","message":"Missing environment variable: `OPENAI_API_KEY`. Create an API key (https://platform.openai.com) and export it as an environment variable."}}"#; - - let entries = parse_test_json_lines(logs); - - // Should have 1 entry for the error message - assert_eq!(entries.len(), 1); - - // Check error message - assert!(matches!( - entries[0].entry_type, - NormalizedEntryType::ErrorMessage - )); - assert!( - entries[0] - .content - .contains("Missing environment variable: `OPENAI_API_KEY`") - ); - } - - #[test] - fn test_normalize_logs_error_message_no_content() { - let logs = r#"{"id":"1","msg":{"type":"error"}}"#; - - let entries = parse_test_json_lines(logs); - - // Should have 1 entry for the error message - assert_eq!(entries.len(), 1); - - // Check error message fallback - assert!(matches!( - entries[0].entry_type, - NormalizedEntryType::ErrorMessage - )); - assert_eq!(entries[0].content, "Unknown error occurred"); - } - - #[test] - fn test_normalize_logs_real_example() { - let logs = r#"{"sandbox":"danger-full-access","reasoning summaries":"auto","approval":"Never","provider":"openai","reasoning effort":"medium","workdir":"/private/var/folders/4m/6cwx14sx59lc2k9km5ph76gh0000gn/T/vibe-kanban-dev/vk-ec8b-describe-t","model":"codex-mini-latest"} -{"prompt":"project_id: f61fbd6a-9552-4b68-a1fe-10561f028dfc\n \nTask title: describe this repo"} -{"id":"1","msg":{"type":"task_started"}} -{"id":"1","msg":{"type":"error","message":"Missing environment variable: `OPENAI_API_KEY`. Create an API key (https://platform.openai.com) and export it as an environment variable."}}"#; - - let entries = parse_test_json_lines(logs); - - // Should have 2 entries: config, error (prompt and task_started ignored) - assert_eq!(entries.len(), 2); - - // Check configuration message - assert!(matches!( - entries[0].entry_type, - NormalizedEntryType::SystemMessage - )); - assert!(entries[0].content.contains("model")); - - // Check error message - assert!(matches!( - entries[1].entry_type, - NormalizedEntryType::ErrorMessage - )); - assert!(entries[1].content.contains("Missing environment variable")); - } - - #[test] - fn test_normalize_logs_partial_config() { - // Test with just model and provider (should still work) - let logs = r#"{"model":"codex-mini-latest","provider":"openai"}"#; - - let entries = parse_test_json_lines(logs); - - // Should have 1 entry for the configuration message - assert_eq!(entries.len(), 1); - - // Check configuration message contains available params - assert!(matches!( - entries[0].entry_type, - NormalizedEntryType::SystemMessage - )); - } - - #[test] - fn test_normalize_logs_agent_message() { - let logs = r#"{"id":"1","msg":{"type":"agent_message","message":"I've made a small restructuring of the top‐level README:\n\n- **Inserted a \"Table of Contents\"** under the screenshot, linking to all major sections (Overview, Installation, Documentation, Support, Contributing, Development → Prerequisites/Running/Build, Environment Variables, Custom OAuth, and License).\n- **Appended a \"License\" section** at the bottom pointing to the Apache 2.0 LICENSE file.\n\nThese tweaks should make navigation and licensing info more discoverable. Let me know if you'd like any other adjustments!"}}"#; - - let entries = parse_test_json_lines(logs); - - // Should have 1 entry for the agent message - assert_eq!(entries.len(), 1); - - // Check agent message - assert!(matches!( - entries[0].entry_type, - NormalizedEntryType::AssistantMessage - )); - assert!( - entries[0] - .content - .contains("I've made a small restructuring") - ); - assert!(entries[0].content.contains("Table of Contents")); - } - - #[test] - fn test_normalize_logs_patch_apply() { - let logs = r#"{"id":"1","msg":{"type":"patch_apply_begin","call_id":"call_zr84aWQuwJR3aWgJLkfv56Gl","auto_approved":true,"changes":{"/private/var/folders/4m/6cwx14sx59lc2k9km5ph76gh0000gn/T/vibe-kanban-dev/vk-a712-minor-rest/README.md":{"update":{"unified_diff":"@@ -18,2 +18,17 @@\n \n+## Table of Contents\n+\n+- [Overview](#overview)\n+- [Installation](#installation)","move_path":null}}}}} -{"id":"1","msg":{"type":"patch_apply_end","call_id":"call_zr84aWQuwJR3aWgJLkfv56Gl","stdout":"Success. Updated the following files:\nM /private/var/folders/4m/6cwx14sx59lc2k9km5ph76gh0000gn/T/vibe-kanban-dev/vk-a712-minor-rest/README.md\n","stderr":"","success":true}}"#; - - let entries = parse_test_json_lines(logs); - - // Should have 1 entry (patch_apply_begin, patch_apply_end skipped) - assert_eq!(entries.len(), 1); - - // Check edit tool use (follows claude.rs pattern) - assert!(matches!( - entries[0].entry_type, - NormalizedEntryType::ToolUse { .. } - )); - if let NormalizedEntryType::ToolUse { - tool_name, - action_type, - status: _, - } = &entries[0].entry_type - { - assert_eq!(tool_name, "edit"); - assert!(matches!(action_type, ActionType::FileEdit { .. })); - } - assert!(entries[0].content.contains("README.md")); - } - - #[test] - fn test_normalize_logs_skip_task_messages() { - let logs = r#"{"id":"1","msg":{"type":"task_started"}} -{"id":"1","msg":{"type":"agent_message","message":"Hello world"}} -{"id":"1","msg":{"type":"task_complete","last_agent_message":"Done!"}}"#; - - let entries = parse_test_json_lines(logs); - - // Should have 1 entry (task_started and task_complete skipped) - assert_eq!(entries.len(), 1); - - // Check that only agent_message remains - assert!(matches!( - entries[0].entry_type, - NormalizedEntryType::AssistantMessage - )); - assert_eq!(entries[0].content, "Hello world"); - } - - #[test] - fn test_normalize_logs_mcp_tool_calls() { - let logs = r#"{"id":"1","msg":{"type":"mcp_tool_call_begin","call_id":"call_KHwEJyaUuL5D8sO7lPfImx7I","invocation":{"server":"vibe_kanban","tool":"list_projects","arguments":{}}}} -{"id":"1","msg":{"type":"mcp_tool_call_end","call_id":"call_KHwEJyaUuL5D8sO7lPfImx7I","invocation":{"server":"vibe_kanban","tool":"list_projects","arguments":{}},"result":{"Ok":{"content":[{"text":"Projects listed successfully"}],"isError":false}}}} -{"id":"1","msg":{"type":"agent_message","message":"Here are your projects"}}"#; - - let entries = parse_test_json_lines(logs); - - // Should have only agent_message (mcp_tool_call_begin/end are skipped in to_normalized_entries) - assert_eq!(entries.len(), 1); - assert!(matches!( - entries[0].entry_type, - NormalizedEntryType::AssistantMessage - )); - assert_eq!(entries[0].content, "Here are your projects"); - } - - #[test] - fn test_normalize_logs_mcp_tool_call_multiple() { - let logs = r#"{"id":"1","msg":{"type":"mcp_tool_call_begin","call_id":"call_1","invocation":{"server":"vibe_kanban","tool":"create_task","arguments":{"title":"Test task"}}}} -{"id":"1","msg":{"type":"mcp_tool_call_end","call_id":"call_1","invocation":{"server":"vibe_kanban","tool":"create_task","arguments":{"title":"Test task"}},"result":{"Ok":{"content":[{"text":"Task created"}],"isError":false}}}} -{"id":"1","msg":{"type":"mcp_tool_call_begin","call_id":"call_2","invocation":{"server":"vibe_kanban","tool":"list_tasks","arguments":{}}}} -{"id":"1","msg":{"type":"mcp_tool_call_end","call_id":"call_2","invocation":{"server":"vibe_kanban","tool":"list_tasks","arguments":{}},"result":{"Ok":{"content":[{"text":"Tasks listed"}],"isError":false}}}}"#; - - let entries = parse_test_json_lines(logs); - - // to_normalized_entries skips mcp_tool_call_begin/end; expect none - assert_eq!(entries.len(), 0); - } - - #[test] - fn test_codex_json_system_config_parsing() { - let config_json = r#"{"sandbox":"danger-full-access","reasoning summaries":"auto","approval":"Never","provider":"openai","reasoning effort":"medium","workdir":"/tmp","model":"codex-mini-latest"}"#; - - let parsed = test_codex_json_parsing(config_json).unwrap(); - assert!(matches!(parsed, CodexJson::SystemConfig { .. })); - - let current_dir = PathBuf::from("/tmp"); - let entries = parsed.to_normalized_entries(¤t_dir).unwrap(); - assert_eq!(entries.len(), 1); - assert!(matches!( - entries[0].entry_type, - NormalizedEntryType::SystemMessage - )); - assert!(entries[0].content.contains("model: codex-mini-latest")); - } - - #[test] - fn test_codex_json_prompt_parsing() { - let prompt_json = r#"{"prompt":"project_id: f61fbd6a-9552-4b68-a1fe-10561f028dfc\n\nTask title: describe this repo"}"#; - - let parsed = test_codex_json_parsing(prompt_json).unwrap(); - assert!(matches!(parsed, CodexJson::Prompt { .. })); - - let current_dir = PathBuf::from("/tmp"); - let entries = parsed.to_normalized_entries(¤t_dir); - assert!(entries.is_none()); // Should return None - } - - #[test] - fn test_set_session_id_in_rollout_meta_old_format() { - let mut meta = serde_json::json!({ - "id": "8724aa3f-efb7-4bbb-96a4-63fb3cb7ee90", - "timestamp": "2025-09-09T16:46:39.250Z", - "instructions": "# ...", - "git": { - "commit_hash": "70497c4cb9d64473e1e7602083badf338e59e75a", - "branch": "vk/9986-retry-with", - "repository_url": "https://github.com/bloopai/vibe-kanban" - } - }); - let new_id = "11111111-2222-3333-4444-555555555555"; - SessionHandler::set_session_id_in_rollout_meta(&mut meta, new_id).unwrap(); - // After migration, we should write new-format header - assert_eq!(meta["type"].as_str(), Some("session_meta")); - assert_eq!(meta["payload"]["id"].as_str(), Some(new_id)); - // Preserve instructions and git inside payload when present - assert_eq!(meta["payload"]["instructions"].as_str(), Some("# ...")); - assert!(meta["payload"]["git"].is_object()); - // Top-level id should be absent in new format - assert_eq!(meta.get("id").and_then(|v| v.as_str()), None); - } - - #[test] - fn test_set_session_id_in_rollout_meta_new_format() { - let mut meta = serde_json::json!({ - "timestamp": "2025-09-12T15:34:41.080Z", - "type": "session_meta", - "payload": { - "id": "0c2061fc-1da8-4733-b33f-70159b4c57f2", - "timestamp": "2025-09-12T15:34:41.068Z", - "cwd": "/var/tmp/vibe-kanban-dev/worktrees/vk-f625-hi", - "originator": "codex_cli_rs", - "cli_version": "0.34.0", - "instructions": "# ...", - "git": { - "commit_hash": "07fad5465fcdca9b719cea965372a0ea39f42d15", - "branch": "vk/f625-hi", - "repository_url": "https://github.com/bloopai/vibe-kanban" + if matches!(&err, ExecutorError::Io(io_err) if io_err.kind() == std::io::ErrorKind::BrokenPipe) + { + // Broken pipe likely means the parent process exited, so we can ignore it + return; } + tracing::error!("Codex spawn error: {}", err); + log_writer + .log_raw(&Error::launch_error(err.to_string()).raw()) + .await + .ok(); + exit_signal_tx.send_exit_signal().await; } }); - let new_id = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"; - SessionHandler::set_session_id_in_rollout_meta(&mut meta, new_id).unwrap(); - // New format takes precedence: payload.id updated - assert_eq!(meta["payload"]["id"].as_str(), Some(new_id)); - // Top-level id should remain absent (new format only uses payload.id) - assert_eq!(meta["id"].as_str(), None); + + Ok(SpawnedChild { + child, + exit_signal: Some(exit_signal_rx), + }) + } + + async fn launch_codex_app_server( + conversation_params: NewConversationParams, + resume_session: Option, + combined_prompt: String, + child_stdout: tokio::process::ChildStdout, + child_stdin: tokio::process::ChildStdin, + log_writer: LogWriter, + exit_signal_tx: ExitSignalSender, + ) -> Result<(), ExecutorError> { + let client = AppServerClient::new(log_writer); + let rpc_peer = + JsonRpcPeer::spawn(child_stdin, child_stdout, client.clone(), exit_signal_tx); + client.connect(rpc_peer); + client.initialize().await?; + match resume_session { + None => { + let params = conversation_params; + let response = client.new_conversation(params).await?; + client + .add_conversation_listener(response.conversation_id) + .await?; + client + .send_user_message(response.conversation_id, combined_prompt) + .await?; + } + Some(session_id) => { + let (rollout_path, _forked_session_id) = + SessionHandler::fork_rollout_file(&session_id) + .map_err(|e| ExecutorError::FollowUpNotSupported(e.to_string()))?; + let overrides = conversation_params; + let response = client + .resume_conversation(rollout_path.clone(), overrides) + .await?; + tracing::debug!( + "resuming session using rollout file {}, response {:?}", + rollout_path.display(), + response + ); + client + .add_conversation_listener(response.conversation_id) + .await?; + client + .send_user_message(response.conversation_id, combined_prompt) + .await?; + } + } + Ok(()) } } diff --git a/crates/executors/src/executors/codex/client.rs b/crates/executors/src/executors/codex/client.rs new file mode 100644 index 00000000..3fac37d1 --- /dev/null +++ b/crates/executors/src/executors/codex/client.rs @@ -0,0 +1,271 @@ +use std::{ + io, + sync::{Arc, OnceLock}, +}; + +use async_trait::async_trait; +use codex_app_server_protocol::{ + AddConversationListenerParams, AddConversationSubscriptionResponse, ApplyPatchApprovalResponse, + ClientInfo, ClientNotification, ClientRequest, ExecCommandApprovalResponse, InitializeParams, + InitializeResponse, InputItem, JSONRPCError, JSONRPCNotification, JSONRPCRequest, + JSONRPCResponse, NewConversationParams, NewConversationResponse, RequestId, + ResumeConversationParams, ResumeConversationResponse, SendUserMessageParams, + SendUserMessageResponse, ServerRequest, +}; +use serde::{Serialize, de::DeserializeOwned}; +use serde_json::Value; +use tokio::{ + io::{AsyncWrite, AsyncWriteExt, BufWriter}, + sync::Mutex, +}; + +use super::jsonrpc::{JsonRpcCallbacks, JsonRpcPeer}; +use crate::executors::ExecutorError; + +pub struct AppServerClient { + rpc: OnceLock, + log_writer: LogWriter, +} + +impl AppServerClient { + pub fn new(log_writer: LogWriter) -> Arc { + Arc::new(Self { + rpc: OnceLock::new(), + log_writer, + }) + } + + pub fn connect(&self, peer: JsonRpcPeer) { + let _ = self.rpc.set(peer); + } + + fn rpc(&self) -> &JsonRpcPeer { + self.rpc.get().expect("Codex RPC peer not attached") + } + + pub async fn initialize(&self) -> Result<(), ExecutorError> { + let request = ClientRequest::Initialize { + request_id: self.next_request_id(), + params: InitializeParams { + client_info: ClientInfo { + name: "vibe-codex-executor".to_string(), + title: None, + version: env!("CARGO_PKG_VERSION").to_string(), + }, + }, + }; + + self.send_request::(request, "initialize") + .await?; + self.send_message(&ClientNotification::Initialized).await + } + + pub async fn new_conversation( + &self, + params: NewConversationParams, + ) -> Result { + let request = ClientRequest::NewConversation { + request_id: self.next_request_id(), + params, + }; + self.send_request(request, "newConversation").await + } + + pub async fn resume_conversation( + &self, + rollout_path: std::path::PathBuf, + overrides: NewConversationParams, + ) -> Result { + let request = ClientRequest::ResumeConversation { + request_id: self.next_request_id(), + params: ResumeConversationParams { + path: rollout_path, + overrides: Some(overrides), + }, + }; + self.send_request(request, "resumeConversation").await + } + + pub async fn add_conversation_listener( + &self, + conversation_id: codex_protocol::ConversationId, + ) -> Result { + let request = ClientRequest::AddConversationListener { + request_id: self.next_request_id(), + params: AddConversationListenerParams { conversation_id }, + }; + self.send_request(request, "addConversationListener").await + } + + pub async fn send_user_message( + &self, + conversation_id: codex_protocol::ConversationId, + message: String, + ) -> Result { + let request = ClientRequest::SendUserMessage { + request_id: self.next_request_id(), + params: SendUserMessageParams { + conversation_id, + items: vec![InputItem::Text { text: message }], + }, + }; + self.send_request(request, "sendUserMessage").await + } + + async fn send_message(&self, message: &M) -> Result<(), ExecutorError> + where + M: Serialize + Sync, + { + self.rpc().send(message).await + } + + async fn send_request(&self, request: ClientRequest, label: &str) -> Result + where + R: DeserializeOwned + std::fmt::Debug, + { + let request_id = request_id(&request); + self.rpc().request(request_id, &request, label).await + } + + fn next_request_id(&self) -> RequestId { + self.rpc().next_request_id() + } +} + +#[async_trait] +impl JsonRpcCallbacks for AppServerClient { + async fn on_request( + &self, + peer: &JsonRpcPeer, + raw: &str, + request: JSONRPCRequest, + ) -> Result<(), ExecutorError> { + self.log_writer.log_raw(raw).await?; + match ServerRequest::try_from(request.clone()) { + Ok(server_request) => handle_server_request(peer, server_request).await, + Err(err) => { + tracing::debug!("Unhandled server request `{}`: {err}", request.method); + let response = JSONRPCResponse { + id: request.id, + result: Value::Null, + }; + peer.send(&response).await + } + } + } + + async fn on_response( + &self, + _peer: &JsonRpcPeer, + raw: &str, + _response: &JSONRPCResponse, + ) -> Result<(), ExecutorError> { + self.log_writer.log_raw(raw).await + } + + async fn on_error( + &self, + _peer: &JsonRpcPeer, + raw: &str, + _error: &JSONRPCError, + ) -> Result<(), ExecutorError> { + self.log_writer.log_raw(raw).await + } + + async fn on_notification( + &self, + _peer: &JsonRpcPeer, + raw: &str, + notification: JSONRPCNotification, + ) -> Result { + self.log_writer.log_raw(raw).await?; + let method = notification.method.as_str(); + if !method.starts_with("codex/event") { + return Ok(false); + } + + let has_finished = method + .strip_prefix("codex/event/") + .is_some_and(|suffix| suffix == "task_complete"); + + Ok(has_finished) + } + + async fn on_non_json(&self, raw: &str) -> Result<(), ExecutorError> { + self.log_writer.log_raw(raw).await?; + Ok(()) + } +} + +// Aprovals +async fn handle_server_request( + peer: &JsonRpcPeer, + request: ServerRequest, +) -> Result<(), ExecutorError> { + match request { + ServerRequest::ApplyPatchApproval { request_id, .. } => { + let response = ApplyPatchApprovalResponse { + decision: codex_protocol::protocol::ReviewDecision::ApprovedForSession, + }; + send_server_response(peer, request_id, response).await + } + ServerRequest::ExecCommandApproval { request_id, .. } => { + let response = ExecCommandApprovalResponse { + decision: codex_protocol::protocol::ReviewDecision::ApprovedForSession, + }; + send_server_response(peer, request_id, response).await + } + } +} + +async fn send_server_response( + peer: &JsonRpcPeer, + request_id: RequestId, + response: T, +) -> Result<(), ExecutorError> +where + T: Serialize, +{ + let payload = JSONRPCResponse { + id: request_id, + result: serde_json::to_value(response) + .map_err(|err| ExecutorError::Io(io::Error::other(err.to_string())))?, + }; + + peer.send(&payload).await +} + +fn request_id(request: &ClientRequest) -> RequestId { + match request { + ClientRequest::Initialize { request_id, .. } + | ClientRequest::NewConversation { request_id, .. } + | ClientRequest::ResumeConversation { request_id, .. } + | ClientRequest::AddConversationListener { request_id, .. } + | ClientRequest::SendUserMessage { request_id, .. } => request_id.clone(), + _ => unreachable!("request_id called for unsupported request variant"), + } +} + +#[derive(Clone)] +pub struct LogWriter { + writer: Arc>>>, +} + +impl LogWriter { + pub fn new(writer: impl AsyncWrite + Send + Unpin + 'static) -> Self { + Self { + writer: Arc::new(Mutex::new(BufWriter::new(Box::new(writer)))), + } + } + + pub async fn log_raw(&self, raw: &str) -> Result<(), ExecutorError> { + let mut guard = self.writer.lock().await; + guard + .write_all(raw.as_bytes()) + .await + .map_err(ExecutorError::Io)?; + guard.write_all(b"\n").await.map_err(ExecutorError::Io)?; + guard.flush().await.map_err(ExecutorError::Io)?; + Ok(()) + } +} diff --git a/crates/executors/src/executors/codex/jsonrpc.rs b/crates/executors/src/executors/codex/jsonrpc.rs new file mode 100644 index 00000000..25c01a7d --- /dev/null +++ b/crates/executors/src/executors/codex/jsonrpc.rs @@ -0,0 +1,281 @@ +//! Minimal JSON-RPC helper tailored for the Codex executor. +//! +//! We keep this bespoke layer because the codex-app-server client must handle server-initiated +//! requests as well as client-initiated requests. When a bidirectional client that +//! supports this pattern is available, this module should be straightforward to +//! replace. + +use std::{ + collections::HashMap, + fmt::Debug, + io, + sync::{ + Arc, + atomic::{AtomicI64, Ordering}, + }, +}; + +use async_trait::async_trait; +use codex_app_server_protocol::{ + JSONRPCError, JSONRPCMessage, JSONRPCNotification, JSONRPCRequest, JSONRPCResponse, RequestId, +}; +use serde::{Serialize, de::DeserializeOwned}; +use serde_json::Value; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, + process::{ChildStdin, ChildStdout}, + sync::{Mutex, oneshot}, +}; + +use crate::executors::ExecutorError; + +#[derive(Debug)] +pub enum PendingResponse { + Result(Value), + Error(JSONRPCError), + Shutdown, +} + +#[derive(Clone)] +pub struct ExitSignalSender { + inner: Arc>>>, +} + +impl ExitSignalSender { + pub fn new(sender: oneshot::Sender<()>) -> Self { + Self { + inner: Arc::new(Mutex::new(Some(sender))), + } + } + pub async fn send_exit_signal(&self) { + if let Some(sender) = self.inner.lock().await.take() { + let _ = sender.send(()); + } + } +} + +#[derive(Clone)] +pub struct JsonRpcPeer { + stdin: Arc>, + pending: Arc>>>, + id_counter: Arc, +} + +impl JsonRpcPeer { + pub fn spawn( + stdin: ChildStdin, + stdout: ChildStdout, + callbacks: Arc, + exit_tx: ExitSignalSender, + ) -> Self { + let peer = Self { + stdin: Arc::new(Mutex::new(stdin)), + pending: Arc::new(Mutex::new(HashMap::new())), + id_counter: Arc::new(AtomicI64::new(1)), + }; + + let reader_peer = peer.clone(); + let callbacks = callbacks.clone(); + + tokio::spawn(async move { + let mut reader = BufReader::new(stdout); + let mut buffer = String::new(); + + loop { + buffer.clear(); + match reader.read_line(&mut buffer).await { + Ok(0) => break, + Ok(_) => { + let line = buffer.trim_end_matches(['\n', '\r']); + if line.is_empty() { + continue; + } + + match serde_json::from_str::(line) { + Ok(JSONRPCMessage::Response(response)) => { + let request_id = response.id.clone(); + let result = response.result.clone(); + if callbacks + .on_response(&reader_peer, line, &response) + .await + .is_err() + { + break; + } + reader_peer + .resolve(request_id, PendingResponse::Result(result)) + .await; + } + Ok(JSONRPCMessage::Error(error)) => { + let request_id = error.id.clone(); + if callbacks + .on_error(&reader_peer, line, &error) + .await + .is_err() + { + break; + } + reader_peer + .resolve(request_id, PendingResponse::Error(error)) + .await; + } + Ok(JSONRPCMessage::Request(request)) => { + if callbacks + .on_request(&reader_peer, line, request) + .await + .is_err() + { + break; + } + } + Ok(JSONRPCMessage::Notification(notification)) => { + match callbacks + .on_notification(&reader_peer, line, notification) + .await + { + // finished + Ok(true) => break, + Ok(false) => {} + Err(_) => { + break; + } + } + } + Err(_) => { + if callbacks.on_non_json(line).await.is_err() { + break; + } + } + } + } + Err(err) => { + tracing::warn!("Error reading Codex output: {err}"); + break; + } + } + } + + exit_tx.send_exit_signal().await; + let _ = reader_peer.shutdown().await; + }); + + peer + } + + pub fn next_request_id(&self) -> RequestId { + RequestId::Integer(self.id_counter.fetch_add(1, Ordering::Relaxed)) + } + + pub async fn register(&self, request_id: RequestId) -> PendingReceiver { + let (sender, receiver) = oneshot::channel(); + self.pending.lock().await.insert(request_id, sender); + receiver + } + + pub async fn resolve(&self, request_id: RequestId, response: PendingResponse) { + if let Some(sender) = self.pending.lock().await.remove(&request_id) { + let _ = sender.send(response); + } + } + + pub async fn shutdown(&self) -> Result<(), ExecutorError> { + let mut pending = self.pending.lock().await; + for (_, sender) in pending.drain() { + let _ = sender.send(PendingResponse::Shutdown); + } + Ok(()) + } + + pub async fn send(&self, message: &T) -> Result<(), ExecutorError> + where + T: Serialize + Sync, + { + let raw = serde_json::to_string(message) + .map_err(|err| ExecutorError::Io(io::Error::other(err.to_string())))?; + self.send_raw(&raw).await + } + + pub async fn request( + &self, + request_id: RequestId, + message: &T, + label: &str, + ) -> Result + where + R: DeserializeOwned + Debug, + T: Serialize + Sync, + { + let receiver = self.register(request_id).await; + self.send(message).await?; + await_response(receiver, label).await + } + + async fn send_raw(&self, payload: &str) -> Result<(), ExecutorError> { + let mut guard = self.stdin.lock().await; + guard + .write_all(payload.as_bytes()) + .await + .map_err(ExecutorError::Io)?; + guard.write_all(b"\n").await.map_err(ExecutorError::Io)?; + guard.flush().await.map_err(ExecutorError::Io)?; + Ok(()) + } +} + +pub type PendingReceiver = oneshot::Receiver; + +pub async fn await_response(receiver: PendingReceiver, label: &str) -> Result +where + R: DeserializeOwned + Debug, +{ + match receiver.await { + Ok(PendingResponse::Result(value)) => serde_json::from_value(value).map_err(|err| { + ExecutorError::Io(io::Error::other(format!( + "failed to decode {label} response: {err}", + ))) + }), + Ok(PendingResponse::Error(error)) => Err(ExecutorError::Io(io::Error::other(format!( + "{label} request failed: {}", + error.error.message + )))), + Ok(PendingResponse::Shutdown) => Err(ExecutorError::Io(io::Error::other(format!( + "server was shutdown while waiting for {label} response", + )))), + Err(_) => Err(ExecutorError::Io(io::Error::other(format!( + "{label} request was dropped", + )))), + } +} + +#[async_trait] +pub trait JsonRpcCallbacks: Send + Sync { + async fn on_request( + &self, + peer: &JsonRpcPeer, + raw: &str, + request: JSONRPCRequest, + ) -> Result<(), ExecutorError>; + + async fn on_response( + &self, + peer: &JsonRpcPeer, + raw: &str, + response: &JSONRPCResponse, + ) -> Result<(), ExecutorError>; + + async fn on_error( + &self, + peer: &JsonRpcPeer, + raw: &str, + error: &JSONRPCError, + ) -> Result<(), ExecutorError>; + + async fn on_notification( + &self, + peer: &JsonRpcPeer, + raw: &str, + notification: JSONRPCNotification, + ) -> Result; + + async fn on_non_json(&self, _raw: &str) -> Result<(), ExecutorError>; +} diff --git a/crates/executors/src/executors/codex/normalize_logs.rs b/crates/executors/src/executors/codex/normalize_logs.rs new file mode 100644 index 00000000..cb60c867 --- /dev/null +++ b/crates/executors/src/executors/codex/normalize_logs.rs @@ -0,0 +1,833 @@ +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, +}; + +use codex_app_server_protocol::{ + JSONRPCNotification, JSONRPCResponse, NewConversationResponse, ServerNotification, +}; +use codex_mcp_types::ContentBlock; +use codex_protocol::{ + config_types::ReasoningEffort, + plan_tool::{StepStatus, UpdatePlanArgs}, + protocol::{ + AgentMessageDeltaEvent, AgentReasoningDeltaEvent, AgentReasoningSectionBreakEvent, + BackgroundEventEvent, ErrorEvent, EventMsg, ExecCommandBeginEvent, ExecCommandEndEvent, + ExecCommandOutputDeltaEvent, ExecOutputStream, FileChange as CodexProtoFileChange, + McpInvocation, McpToolCallBeginEvent, McpToolCallEndEvent, PatchApplyBeginEvent, + PatchApplyEndEvent, StreamErrorEvent, TokenUsageInfo, ViewImageToolCallEvent, + WebSearchBeginEvent, WebSearchEndEvent, + }, +}; +use futures::StreamExt; +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use workspace_utils::{ + diff::{concatenate_diff_hunks, extract_unified_diff_hunks}, + msg_store::MsgStore, + path::make_path_relative, +}; + +use crate::{ + executors::codex::session::SessionHandler, + logs::{ + ActionType, CommandExitStatus, CommandRunResult, FileChange, NormalizedEntry, + NormalizedEntryType, TodoItem, ToolResult, ToolResultValueType, ToolStatus, + stderr_processor::normalize_stderr_logs, + utils::{ConversationPatch, EntryIndexProvider}, + }, +}; + +trait ToNormalizedEntry { + fn to_normalized_entry(&self) -> NormalizedEntry; +} + +#[derive(Debug, Deserialize)] +struct CodexNotificationParams { + #[serde(rename = "msg")] + msg: EventMsg, +} + +#[derive(Default)] +struct StreamingText { + index: usize, + content: String, +} + +#[derive(Default)] +struct CommandState { + index: Option, + command: String, + stdout: String, + stderr: String, + formatted_output: Option, + status: ToolStatus, + exit_code: Option, +} + +impl ToNormalizedEntry for CommandState { + fn to_normalized_entry(&self) -> NormalizedEntry { + NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::ToolUse { + tool_name: "bash".to_string(), + action_type: ActionType::CommandRun { + command: self.command.clone(), + result: Some(CommandRunResult { + exit_status: self + .exit_code + .map(|code| CommandExitStatus::ExitCode { code }), + output: if self.formatted_output.is_some() { + self.formatted_output.clone() + } else { + build_command_output(Some(&self.stdout), Some(&self.stderr)) + }, + }), + }, + status: self.status.clone(), + }, + content: format!("`{}`", self.command), + metadata: None, + } + } +} + +struct McpToolState { + index: Option, + invocation: McpInvocation, + result: Option, + status: ToolStatus, +} + +impl ToNormalizedEntry for McpToolState { + fn to_normalized_entry(&self) -> NormalizedEntry { + let tool_name = format!("mcp:{}:{}", self.invocation.server, self.invocation.tool); + NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::ToolUse { + tool_name: tool_name.clone(), + action_type: ActionType::Tool { + tool_name, + arguments: self.invocation.arguments.clone(), + result: self.result.clone(), + }, + status: self.status.clone(), + }, + content: self.invocation.tool.clone(), + metadata: None, + } + } +} + +#[derive(Default)] +struct WebSearchState { + index: Option, + query: Option, + status: ToolStatus, +} + +impl WebSearchState { + fn new() -> Self { + Default::default() + } +} + +impl ToNormalizedEntry for WebSearchState { + fn to_normalized_entry(&self) -> NormalizedEntry { + NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::ToolUse { + tool_name: "web_search".to_string(), + action_type: ActionType::WebFetch { + url: self.query.clone().unwrap_or_else(|| "...".to_string()), + }, + status: self.status.clone(), + }, + content: self + .query + .clone() + .unwrap_or_else(|| "Web search".to_string()), + metadata: None, + } + } +} + +#[derive(Default)] +struct PatchState { + entries: Vec, +} + +struct PatchEntry { + index: Option, + path: String, + changes: Vec, + status: ToolStatus, +} + +impl ToNormalizedEntry for PatchEntry { + fn to_normalized_entry(&self) -> NormalizedEntry { + NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::ToolUse { + tool_name: "edit".to_string(), + action_type: ActionType::FileEdit { + path: self.path.clone(), + changes: self.changes.clone(), + }, + status: self.status.clone(), + }, + content: self.path.clone(), + metadata: None, + } + } +} + +struct LogState { + entry_index: EntryIndexProvider, + assistant: Option, + thinking: Option, + commands: HashMap, + mcp_tools: HashMap, + patches: HashMap, + web_searches: HashMap, + token_usage_info: Option, +} + +enum StreamingTextKind { + Assistant, + Thinking, +} + +impl LogState { + fn new(entry_index: EntryIndexProvider) -> Self { + Self { + entry_index, + assistant: None, + thinking: None, + commands: HashMap::new(), + mcp_tools: HashMap::new(), + patches: HashMap::new(), + web_searches: HashMap::new(), + token_usage_info: None, + } + } + + fn streaming_text_update( + &mut self, + content: String, + type_: StreamingTextKind, + ) -> (NormalizedEntry, usize, bool) { + let index_provider = &self.entry_index; + let entry = match type_ { + StreamingTextKind::Assistant => &mut self.assistant, + StreamingTextKind::Thinking => &mut self.thinking, + }; + let is_new = entry.is_none(); + let (content, index) = if entry.is_none() { + let index = index_provider.next(); + *entry = Some(StreamingText { index, content }); + (&entry.as_ref().unwrap().content, index) + } else { + let streaming_state = entry.as_mut().unwrap(); + streaming_state.content.push_str(&content); + (&streaming_state.content, streaming_state.index) + }; + let normalized_entry = NormalizedEntry { + timestamp: None, + entry_type: match type_ { + StreamingTextKind::Assistant => NormalizedEntryType::AssistantMessage, + StreamingTextKind::Thinking => NormalizedEntryType::Thinking, + }, + content: content.clone(), + metadata: None, + }; + (normalized_entry, index, is_new) + } + + fn assistant_message_update(&mut self, content: String) -> (NormalizedEntry, usize, bool) { + self.streaming_text_update(content, StreamingTextKind::Assistant) + } + + fn thinking_update(&mut self, content: String) -> (NormalizedEntry, usize, bool) { + self.streaming_text_update(content, StreamingTextKind::Thinking) + } +} + +fn upsert_normalized_entry( + msg_store: &Arc, + index: usize, + normalized_entry: NormalizedEntry, + is_new: bool, +) { + if is_new { + msg_store.push_patch(ConversationPatch::add_normalized_entry( + index, + normalized_entry, + )); + } else { + msg_store.push_patch(ConversationPatch::replace(index, normalized_entry)); + } +} + +fn add_normalized_entry( + msg_store: &Arc, + index_provider: &EntryIndexProvider, + normalized_entry: NormalizedEntry, +) -> usize { + let index = index_provider.next(); + upsert_normalized_entry(msg_store, index, normalized_entry, true); + index +} + +fn replace_normalized_entry( + msg_store: &Arc, + index: usize, + normalized_entry: NormalizedEntry, +) { + upsert_normalized_entry(msg_store, index, normalized_entry, false); +} + +fn normalize_file_changes( + worktree_path: &str, + changes: &HashMap, +) -> Vec<(String, Vec)> { + changes + .iter() + .map(|(path, change)| { + let path_str = path.to_string_lossy(); + let relative = make_path_relative(path_str.as_ref(), worktree_path); + let file_changes = match change { + CodexProtoFileChange::Add { content } => vec![FileChange::Write { + content: content.clone(), + }], + CodexProtoFileChange::Delete { .. } => vec![FileChange::Delete], + CodexProtoFileChange::Update { + unified_diff, + move_path, + } => { + let mut edits = Vec::new(); + if let Some(dest) = move_path { + let dest_rel = + make_path_relative(dest.to_string_lossy().as_ref(), worktree_path); + edits.push(FileChange::Rename { new_path: dest_rel }); + } + let hunks = extract_unified_diff_hunks(unified_diff); + let diff = concatenate_diff_hunks(&relative, &hunks); + edits.push(FileChange::Edit { + unified_diff: diff, + has_line_numbers: true, + }); + edits + } + }; + (relative, file_changes) + }) + .collect() +} + +fn format_todo_status(status: &StepStatus) -> String { + match status { + StepStatus::Pending => "pending", + StepStatus::InProgress => "in_progress", + StepStatus::Completed => "completed", + } + .to_string() +} + +pub fn normalize_logs(msg_store: Arc, worktree_path: &Path) { + let entry_index = EntryIndexProvider::start_from(&msg_store); + normalize_stderr_logs(msg_store.clone(), entry_index.clone()); + + let worktree_path_str = worktree_path.to_string_lossy().to_string(); + tokio::spawn(async move { + let mut state = LogState::new(entry_index.clone()); + let mut stdout_lines = msg_store.stdout_lines_stream(); + + while let Some(Ok(line)) = stdout_lines.next().await { + if let Ok(error) = serde_json::from_str::(&line) { + add_normalized_entry(&msg_store, &entry_index, error.to_normalized_entry()); + continue; + } + + if let Ok(response) = serde_json::from_str::(&line) { + handle_jsonrpc_response(response, &msg_store, &entry_index); + continue; + } + + if let Ok(server_notification) = serde_json::from_str::(&line) { + if let ServerNotification::SessionConfigured(session_configured) = + server_notification + { + msg_store.push_session_id(session_configured.session_id.to_string()); + handle_model_params( + session_configured.model, + session_configured.reasoning_effort, + &msg_store, + &entry_index, + ); + }; + continue; + } else if let Some(session_id) = line + .strip_prefix(r#"{"method":"sessionConfigured","params":{"sessionId":""#) + .and_then(|suffix| SESSION_ID.captures(suffix).and_then(|caps| caps.get(1))) + { + // Best-effort extraction of session ID from logs in case the JSON parsing fails. + // This could happen if the line is truncated due to size limits because it includes the full session history. + msg_store.push_session_id(session_id.as_str().to_string()); + continue; + } + + let notification: JSONRPCNotification = match serde_json::from_str(&line) { + Ok(value) => value, + Err(_) => continue, + }; + + if !notification.method.starts_with("codex/event") { + continue; + } + + let Some(params) = notification + .params + .and_then(|p| serde_json::from_value::(p).ok()) + else { + continue; + }; + + let event = params.msg; + match event { + EventMsg::SessionConfigured(payload) => { + msg_store.push_session_id(payload.session_id.to_string()); + handle_model_params(payload.model, payload.reasoning_effort, &msg_store, &entry_index); + } + EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => { + state.thinking = None; + let (entry, index, is_new) = state.assistant_message_update(delta); + upsert_normalized_entry(&msg_store, index, entry, is_new); + } + EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => { + state.assistant = None; + let (entry, index, is_new) = state.thinking_update(delta); + upsert_normalized_entry(&msg_store, index, entry, is_new); + } + EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}) => { + state.assistant = None; + state.thinking = None; + } + EventMsg::ExecCommandBegin(ExecCommandBeginEvent { + call_id, command, .. + }) => { + state.assistant = None; + state.thinking = None; + let command_text = command.join(" "); + if command_text.is_empty() { + continue; + } + state.commands.insert( + call_id.clone(), + CommandState { + index: None, + command: command_text, + stdout: String::new(), + stderr: String::new(), + formatted_output: None, + status: ToolStatus::Created, + exit_code: None, + }, + ); + let command_state = state.commands.get_mut(&call_id).unwrap(); + let index = add_normalized_entry(&msg_store, &entry_index, command_state.to_normalized_entry()); + command_state.index = Some(index) + } + EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent { + call_id, + stream, + chunk, + }) => { + if let Some(command_state) = state.commands.get_mut(&call_id) { + let chunk = String::from_utf8_lossy(&chunk); + if chunk.is_empty() { + continue; + } + match stream { + ExecOutputStream::Stdout => command_state.stdout.push_str(&chunk), + ExecOutputStream::Stderr => command_state.stderr.push_str(&chunk), + } + let Some(index) = command_state.index else { + tracing::error!("missing entry index for existing command state"); + continue; + }; + replace_normalized_entry(&msg_store, index, command_state.to_normalized_entry()); + } + } + EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id, + stdout: _, + stderr: _, + aggregated_output: _, + exit_code, + duration: _, + formatted_output, + }) => { + if let Some(mut command_state) = state.commands.remove(&call_id) { + command_state.formatted_output = Some(formatted_output); + command_state.exit_code = Some(exit_code); + command_state.status = if exit_code == 0 { + ToolStatus::Success + } else { + ToolStatus::Failed + }; + let Some(index) = command_state.index else { + tracing::error!("missing entry index for existing command state"); + continue; + }; + replace_normalized_entry(&msg_store, index, command_state.to_normalized_entry()); + } + } + EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => { + add_normalized_entry(&msg_store, &entry_index, NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::SystemMessage, + content: format!("Background event: {message}"), + metadata: None, + }); + } + EventMsg::StreamError(StreamErrorEvent { message }) => { + add_normalized_entry(&msg_store, &entry_index, NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::ErrorMessage, + content: format!("Stream error: {message}"), + metadata: None, + }); + } + EventMsg::McpToolCallBegin(McpToolCallBeginEvent { + call_id, + invocation, + }) => { + state.assistant = None; + state.thinking = None; + state.mcp_tools.insert( + call_id.clone(), + McpToolState { + index: None, + invocation, + result: None, + status: ToolStatus::Created, + }, + ); + let mcp_tool_state = state.mcp_tools.get_mut(&call_id).unwrap(); + let index = add_normalized_entry(&msg_store, &entry_index, mcp_tool_state.to_normalized_entry()); + mcp_tool_state.index = Some(index); + } + EventMsg::McpToolCallEnd(McpToolCallEndEvent { + call_id, result, .. + }) => { + if let Some(mut mcp_tool_state) = state.mcp_tools.remove(&call_id) { + match result { + Ok(value) => { + mcp_tool_state.status = + if value.is_error.unwrap_or(false) { + ToolStatus::Failed + } else { + ToolStatus::Success + }; + if value.content.iter().all(|block| matches!(block, ContentBlock::TextContent(_))) { + mcp_tool_state.result = Some(ToolResult { + r#type: ToolResultValueType::Markdown, + value: Value::String(value.content.iter().map(|block| { + if let ContentBlock::TextContent(content) = block { + content.text.clone() + } else { + unreachable!() + } + }).collect::>().join("\n")) + }); + } else { + mcp_tool_state.result = Some(ToolResult { r#type: ToolResultValueType::Json, value: value.structured_content.unwrap_or_else(|| serde_json::to_value(value.content).unwrap_or_default()) }); + } + } + Err(err) => { + mcp_tool_state.status = ToolStatus::Failed; + mcp_tool_state.result = Some(ToolResult { + r#type: ToolResultValueType::Markdown, + value: Value::String(err), + }); + } + }; + let Some(index) = mcp_tool_state.index else { + tracing::error!("missing entry index for existing mcp tool state"); + continue; + }; + replace_normalized_entry(&msg_store, index, mcp_tool_state.to_normalized_entry()); + } + } + EventMsg::PatchApplyBegin(PatchApplyBeginEvent { + call_id, changes, .. + }) => { + state.assistant = None; + state.thinking = None; + let normalized = normalize_file_changes(&worktree_path_str, &changes); + let mut patch_state = PatchState::default(); + for (path, file_changes) in normalized { + patch_state.entries.push(PatchEntry { + index: None, + path, + changes: file_changes, + status: ToolStatus::Created, + }); + let patch_entry = patch_state.entries.last_mut().unwrap(); + let index = add_normalized_entry(&msg_store, &entry_index, patch_entry.to_normalized_entry()); + patch_entry.index = Some(index); + } + state.patches.insert(call_id, patch_state); + } + EventMsg::PatchApplyEnd(PatchApplyEndEvent { + call_id, + stdout: _, + stderr: _, + success, + .. + }) => { + if let Some(patch_state) = state.patches.remove(&call_id) { + let status = if success { + ToolStatus::Success + } else { + ToolStatus::Failed + }; + for mut entry in patch_state.entries { + entry.status = status.clone(); + let Some(index) = entry.index else { + tracing::error!("missing entry index for existing patch entry"); + continue; + }; + replace_normalized_entry(&msg_store, index, entry.to_normalized_entry()); + } + } + } + EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }) => { + state.assistant = None; + state.thinking = None; + state + .web_searches + .insert(call_id.clone(), WebSearchState::new()); + let web_search_state = state.web_searches.get_mut(&call_id).unwrap(); + let normalized_entry = web_search_state.to_normalized_entry(); + let index = add_normalized_entry(&msg_store, &entry_index, normalized_entry); + web_search_state.index = Some(index); + } + EventMsg::WebSearchEnd(WebSearchEndEvent { call_id, query }) => { + state.assistant = None; + state.thinking = None; + if let Some(mut entry) = state.web_searches.remove(&call_id) { + entry.status = ToolStatus::Success; + entry.query = Some(query.clone()); + let normalized_entry = entry.to_normalized_entry(); + let Some(index) = entry.index else { + tracing::error!("missing entry index for existing websearch entry"); + continue; + }; + replace_normalized_entry(&msg_store, index, normalized_entry); + } + } + EventMsg::ViewImageToolCall(ViewImageToolCallEvent { call_id: _, path }) => { + state.assistant = None; + state.thinking = None; + let path_str = path.to_string_lossy().to_string(); + let relative_path = make_path_relative(&path_str, &worktree_path_str); + add_normalized_entry( + &msg_store, + &entry_index, + NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::ToolUse { + tool_name: "view_image".to_string(), + action_type: ActionType::FileRead { path: relative_path.clone() }, + status: ToolStatus::Success, + }, + content: format!("`{relative_path}`"), + metadata: None, + }, + ); + } + EventMsg::PlanUpdate(UpdatePlanArgs { plan, explanation }) => { + let todos: Vec = plan + .iter() + .map(|item| TodoItem { + content: item.step.clone(), + status: format_todo_status(&item.status), + priority: None, + }) + .collect(); + let explanation = explanation + .as_ref() + .map(|text| text.trim()) + .filter(|text| !text.is_empty()) + .map(|text| text.to_string()); + let content = explanation.clone().unwrap_or_else(|| { + if todos.is_empty() { + "Plan updated".to_string() + } else { + format!("Plan updated ({} steps)", todos.len()) + } + }); + + add_normalized_entry( + &msg_store, + &entry_index, + NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::ToolUse { + tool_name: "plan".to_string(), + action_type: ActionType::TodoManagement { + todos, + operation: "update".to_string(), + }, + status: ToolStatus::Success, + }, + content, + metadata: None, + }, + ); + } + EventMsg::Error(ErrorEvent { message }) => { + add_normalized_entry(&msg_store, &entry_index, NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::ErrorMessage, + content: message, + metadata: None, + }); + } + EventMsg::TokenCount(payload) => { + if let Some(info) = payload.info { + state.token_usage_info = Some(info); + } + } + EventMsg::AgentReasoning(..) // content duplicated with delta events + | EventMsg::AgentMessage(..) // ditto + | EventMsg::AgentReasoningRawContent(..) + | EventMsg::AgentReasoningRawContentDelta(..) + | EventMsg::TaskStarted(..) + | EventMsg::UserMessage(..) + | EventMsg::TurnDiff(..) + | EventMsg::GetHistoryEntryResponse(..) + | EventMsg::McpListToolsResponse(..) + | EventMsg::ListCustomPromptsResponse(..) + | EventMsg::TurnAborted(..) + | EventMsg::ShutdownComplete + | EventMsg::ConversationPath(..) + | EventMsg::EnteredReviewMode(..) + | EventMsg::ExitedReviewMode(..) + | EventMsg::TaskComplete(..) + |EventMsg::ExecApprovalRequest(..) + |EventMsg::ApplyPatchApprovalRequest(..) + => {} + } + } + }); +} + +fn handle_jsonrpc_response( + response: JSONRPCResponse, + msg_store: &Arc, + entry_index: &EntryIndexProvider, +) { + let Ok(response) = serde_json::from_value::(response.result.clone()) + else { + return; + }; + + match SessionHandler::extract_session_id_from_rollout_path(response.rollout_path) { + Ok(session_id) => msg_store.push_session_id(session_id), + Err(err) => tracing::error!("failed to extract session id: {err}"), + } + + handle_model_params( + response.model, + response.reasoning_effort, + msg_store, + entry_index, + ); +} + +fn handle_model_params( + model: String, + reasoning_effort: Option, + msg_store: &Arc, + entry_index: &EntryIndexProvider, +) { + let mut params = vec![]; + params.push(format!("model: {model}")); + if let Some(reasoning_effort) = reasoning_effort { + params.push(format!("reasoning effort: {reasoning_effort}")); + } + + add_normalized_entry( + msg_store, + entry_index, + NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::SystemMessage, + content: params.join(" ").to_string(), + metadata: None, + }, + ); +} + +fn build_command_output(stdout: Option<&str>, stderr: Option<&str>) -> Option { + let mut sections = Vec::new(); + if let Some(out) = stdout { + let cleaned = out.trim(); + if !cleaned.is_empty() { + sections.push(format!("stdout:\n{cleaned}")); + } + } + if let Some(err) = stderr { + let cleaned = err.trim(); + if !cleaned.is_empty() { + sections.push(format!("stderr:\n{cleaned}")); + } + } + + if sections.is_empty() { + None + } else { + Some(sections.join("\n\n")) + } +} + +lazy_static! { + static ref SESSION_ID: Regex = Regex::new( + r#"^([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"# + ) + .expect("valid regex"); +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum Error { + LaunchError { error: String }, +} + +impl Error { + pub fn launch_error(error: String) -> Self { + Self::LaunchError { error } + } + + pub fn raw(&self) -> String { + serde_json::to_string(self).unwrap_or_default() + } +} + +impl ToNormalizedEntry for Error { + fn to_normalized_entry(&self) -> NormalizedEntry { + NormalizedEntry { + timestamp: None, + entry_type: NormalizedEntryType::ErrorMessage, + content: match self { + Error::LaunchError { error } => error.clone(), + }, + metadata: None, + } + } +} diff --git a/crates/executors/src/executors/codex/session.rs b/crates/executors/src/executors/codex/session.rs index 9e48a241..8b0a79f2 100644 --- a/crates/executors/src/executors/codex/session.rs +++ b/crates/executors/src/executors/codex/session.rs @@ -1,282 +1,244 @@ -use std::{path::PathBuf, sync::Arc}; +use std::{ + fs::File, + io::{BufRead, BufReader, BufWriter, Write}, + path::{Path, PathBuf}, +}; -use futures::StreamExt; +use chrono::Local; +use codex_protocol::protocol::SessionSource; use regex::Regex; -use workspace_utils::msg_store::MsgStore; +use serde_json::{Map, Value}; +use thiserror::Error; + +const FILENAME_TIMESTAMP_FORMAT: &str = "%Y-%m-%dT%H-%M-%S"; + +#[derive(Debug, Error)] +pub enum SessionError { + #[error("Session history format error: {0}")] + Format(String), + + #[error("Session I/O error: {0}")] + Io(String), + + #[error("Session not found: {0}")] + NotFound(String), +} /// Handles session management for Codex pub struct SessionHandler; impl SessionHandler { - /// Start monitoring stderr lines for session ID extraction - pub fn start_session_id_extraction(msg_store: Arc) { - tokio::spawn(async move { - let mut stderr_lines_stream = msg_store.stderr_lines_stream(); + pub fn extract_session_id_from_rollout_path( + rollout_path: PathBuf, + ) -> Result { + // Extracts the session UUID from the end of the rollout file path. + // Pattern: rollout-{timestamp}-{uuid}.jsonl + let filename = rollout_path + .file_name() + .and_then(|f| f.to_str()) + .ok_or_else(|| SessionError::Format("Invalid rollout path".to_string()))?; - while let Some(Ok(line)) = stderr_lines_stream.next().await { - if let Some(session_id) = Self::extract_session_id_from_line(&line) { - msg_store.push_session_id(session_id); - } - } - }); - } + // Match UUID before .jsonl extension + let re = Regex::new( + r"([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\.jsonl$", + ) + .map_err(|e| SessionError::Format(format!("Regex error: {e}")))?; - /// Extract session ID from codex stderr output. Supports: - /// - Old: session_id: - /// - New: session_id: ConversationId() - pub fn extract_session_id_from_line(line: &str) -> Option { - static SESSION_ID_REGEX: std::sync::OnceLock = std::sync::OnceLock::new(); - let regex = SESSION_ID_REGEX.get_or_init(|| { - Regex::new(r"session_id:\s*(?:ConversationId\()?(?P[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\)?").unwrap() - }); - - regex - .captures(line) - .and_then(|cap| cap.name("id")) - .map(|m| m.as_str().to_string()) + re.captures(filename) + .and_then(|caps| caps.get(1)) + .map(|uuid| uuid.as_str().to_string()) + .ok_or_else(|| { + SessionError::Format(format!( + "Could not extract session id from filename: {filename}" + )) + }) } /// Find codex rollout file path for given session_id. Used during follow-up execution. - pub fn find_rollout_file_path(session_id: &str) -> Result { - let home_dir = dirs::home_dir().ok_or("Could not determine home directory")?; - let sessions_dir = home_dir.join(".codex").join("sessions"); - - // Scan the sessions directory recursively for rollout files matching the session_id - // Pattern: rollout-{YYYY}-{MM}-{DD}T{HH}-{mm}-{ss}-{session_id}.jsonl + pub fn find_rollout_file_path(session_id: &str) -> Result { + let sessions_dir = Self::sessions_root()?; Self::scan_directory(&sessions_dir, session_id) } - // Recursively scan directory for rollout files matching the session_id - fn scan_directory(dir: &PathBuf, session_id: &str) -> Result { - if !dir.exists() { - return Err(format!( - "Sessions directory does not exist: {}", - dir.display() - )); + /// Fork a Codex rollout file by copying it to a temp location and assigning a new session id. + /// Returns (new_rollout_path, new_session_id). + pub fn fork_rollout_file(session_id: &str) -> Result<(PathBuf, String), SessionError> { + let original = Self::find_rollout_file_path(session_id)?; + tracing::debug!("Forking rollout file: {}", original.display()); + let file = File::open(&original).map_err(|e| { + SessionError::Io(format!( + "Failed to open rollout file {}: {e}", + original.display() + )) + })?; + let mut reader = BufReader::new(file); + + let mut first_line = String::new(); + reader.read_line(&mut first_line).map_err(|e| { + SessionError::Io(format!( + "Failed to read first line from {}: {e}", + original.display() + )) + })?; + let trimmed_header = first_line.trim(); + if trimmed_header.is_empty() { + return Err(SessionError::Format(format!( + "Rollout file {} missing header line", + original.display() + ))); } - let entries = std::fs::read_dir(dir) - .map_err(|e| format!("Failed to read directory {}: {}", dir.display(), e))?; + let mut meta: Value = serde_json::from_str(trimmed_header).map_err(|e| { + SessionError::Format(format!( + "Failed to parse first line JSON in {}: {e}", + original.display() + )) + })?; + + let new_session_id = uuid::Uuid::new_v4().to_string(); + + let destination = Self::create_new_rollout_path(&new_session_id)?; + let dest_file = File::create(&destination).map_err(|e| { + SessionError::Io(format!( + "Failed to create forked rollout {}: {e}", + destination.display() + )) + })?; + let mut writer = BufWriter::new(dest_file); + + Self::replace_session_id(&mut meta, &new_session_id)?; + let meta_line = serde_json::to_string(&meta) + .map_err(|e| SessionError::Format(format!("Failed to serialize modified meta: {e}")))?; + writeln!(writer, "{meta_line}").map_err(|e| { + SessionError::Io(format!( + "Failed to write meta to {}: {e}", + destination.display() + )) + })?; + + // write all remaining lines as-is + for line in reader.lines() { + let line = line.map_err(|e| { + SessionError::Io(format!( + "Failed to read line from {}: {e}", + original.display() + )) + })?; + writeln!(writer, "{line}").map_err(|e| { + SessionError::Io(format!( + "Failed to write line to {}: {e}", + destination.display() + )) + })?; + } + + writer.flush().map_err(|e| { + SessionError::Io(format!("Failed to flush {}: {e}", destination.display())) + })?; + + Ok((destination, new_session_id)) + } + + pub(crate) fn replace_session_id( + session_meta: &mut Value, + new_id: &str, + ) -> Result<(), SessionError> { + let Value::Object(map) = session_meta else { + return Err(SessionError::Format( + "First line of rollout file is not a JSON object".to_string(), + )); + }; + + let Some(Value::Object(payload)) = map.get_mut("payload") else { + return Err(SessionError::Format( + "Rollout meta payload missing or not an object".to_string(), + )); + }; + + payload.insert("id".to_string(), Value::String(new_id.to_string())); + + Self::ensure_required_payload_fields(payload); + Ok(()) + } + + fn ensure_required_payload_fields(payload: &mut Map) { + if !payload.contains_key("source") { + let Ok(value) = serde_json::to_value(SessionSource::default()) else { + tracing::error!("Failed to serialize default SessionSource"); + return; + }; + payload.insert("source".to_string(), value); + } + } + + fn sessions_root() -> Result { + let home_dir = dirs::home_dir() + .ok_or_else(|| SessionError::Io("Could not determine home directory".to_string()))?; + Ok(home_dir.join(".codex").join("sessions")) + } + + fn scan_directory(dir: &Path, session_id: &str) -> Result { + if !dir.exists() { + return Err(SessionError::Io(format!( + "Sessions directory does not exist: {}", + dir.display() + ))); + } + + let entries = std::fs::read_dir(dir).map_err(|e| { + SessionError::Io(format!("Failed to read directory {}: {e}", dir.display())) + })?; for entry in entries { - let entry = entry.map_err(|e| format!("Failed to read directory entry: {e}"))?; + let entry = entry + .map_err(|e| SessionError::Io(format!("Failed to read directory entry: {e}")))?; let path = entry.path(); if path.is_dir() { - // Recursively search subdirectories if let Ok(found) = Self::scan_directory(&path, session_id) { return Ok(found); } } else if path.is_file() - && let Some(filename) = path.file_name() - && let Some(filename_str) = filename.to_str() - && filename_str.contains(session_id) - && filename_str.starts_with("rollout-") - && filename_str.ends_with(".jsonl") + && path + .file_name() + .and_then(|name| name.to_str()) + .is_some_and(|filename| { + filename.contains(session_id) + && filename.starts_with("rollout-") + && filename.ends_with(".jsonl") + }) { return Ok(path); } } - Err(format!( + Err(SessionError::NotFound(format!( "Could not find rollout file for session_id: {session_id}" - )) + ))) } - /// Fork a Codex rollout file by copying it to a temp location and assigning a new session id. - /// Returns (new_rollout_path, new_session_id). - /// - /// Migration behavior: - /// - If the original header is old format, it is converted to new format on write. - /// - Subsequent lines: - /// - If already new RolloutLine, pass through unchanged. - /// - If object contains "record_type", skip it (ignored in old impl). - /// - Otherwise, wrap as RolloutLine of type "response_item" with payload = original JSON. - pub fn fork_rollout_file(session_id: &str) -> Result<(PathBuf, String), String> { - use std::io::{BufRead, BufReader, Write}; + fn create_new_rollout_path(new_session_id: &str) -> Result { + let sessions_root = Self::sessions_root()?; + let now_local = Local::now(); - let original = Self::find_rollout_file_path(session_id)?; + let dir = sessions_root + .join(now_local.format("%Y").to_string()) + .join(now_local.format("%m").to_string()) + .join(now_local.format("%d").to_string()); - let file = std::fs::File::open(&original) - .map_err(|e| format!("Failed to open rollout file {}: {e}", original.display()))?; - let mut reader = BufReader::new(file); - - let mut first_line = String::new(); - reader - .read_line(&mut first_line) - .map_err(|e| format!("Failed to read first line from {}: {e}", original.display()))?; - - let mut meta: serde_json::Value = serde_json::from_str(first_line.trim()).map_err(|e| { - format!( - "Failed to parse first line JSON in {}: {e}", - original.display() - ) + std::fs::create_dir_all(&dir).map_err(|e| { + SessionError::Io(format!( + "Failed to create sessions directory {}: {e}", + dir.display() + )) })?; - // Generate new UUID for forked session - let new_id = uuid::Uuid::new_v4().to_string(); - Self::set_session_id_in_rollout_meta(&mut meta, &new_id)?; - - // Prepare destination path in the same directory, following Codex rollout naming convention. - // Always create a fresh filename: rollout---
T---.jsonl - let parent_dir = original - .parent() - .ok_or_else(|| format!("Unexpected path with no parent: {}", original.display()))?; - let new_filename = Self::new_rollout_filename(&new_id); - let dest = parent_dir.join(new_filename); - - // Write new file with modified first line and copy the rest with migration as needed - let mut writer = std::fs::File::create(&dest) - .map_err(|e| format!("Failed to create forked rollout {}: {e}", dest.display()))?; - let meta_line = serde_json::to_string(&meta) - .map_err(|e| format!("Failed to serialize modified meta: {e}"))?; - writeln!(writer, "{meta_line}") - .map_err(|e| format!("Failed to write meta to {}: {e}", dest.display()))?; - - // Wrap subsequent lines - for line in reader.lines() { - let line = - line.map_err(|e| format!("I/O error reading {}: {e}", original.display()))?; - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - - // Try parse as JSON - let parsed: Result = serde_json::from_str(trimmed); - let value = match parsed { - Ok(v) => v, - Err(_) => { - // Skip invalid JSON lines during migration - continue; - } - }; - - // If already a RolloutLine (has timestamp + type/payload or flattened item), pass through - let is_rollout_line = value.get("timestamp").is_some() - && (value.get("type").is_some() || value.get("payload").is_some()); - if is_rollout_line { - writeln!(writer, "{value}") - .map_err(|e| format!("Failed to write to {}: {e}", dest.display()))?; - continue; - } - - // Ignore legacy bookkeeping lines like {"record_type": ...} - if value.get("record_type").is_some() { - continue; - } - - // Otherwise, wrap as a new RolloutLine containing a ResponseItem payload - let timestamp = chrono::Utc::now() - .format("%Y-%m-%dT%H:%M:%S%.3fZ") - .to_string(); - let envelope = serde_json::json!({ - "timestamp": timestamp, - "type": "response_item", - "payload": value, - }); - writeln!(writer, "{envelope}") - .map_err(|e| format!("Failed to write to {}: {e}", dest.display()))?; - } - - Ok((dest, new_id)) + let filename = Self::rollout_filename_from_time(new_session_id, &now_local); + Ok(dir.join(filename)) } - // Update session id inside the first-line JSON meta, supporting both old and new formats. - // - Old format: top-level { "id": "", ... } -> convert to new format - // - New format: { "type": "session_meta", "payload": { "id": "", ... }, ... } - // If both are somehow present, new format takes precedence. - pub(crate) fn set_session_id_in_rollout_meta( - meta: &mut serde_json::Value, - new_id: &str, - ) -> Result<(), String> { - match meta { - serde_json::Value::Object(map) => { - // If already new format, update payload.id and return - if let Some(serde_json::Value::Object(payload)) = map.get_mut("payload") { - payload.insert( - "id".to_string(), - serde_json::Value::String(new_id.to_string()), - ); - return Ok(()); - } - - // Convert old format to new format header - let top_timestamp = map.get("timestamp").cloned(); - let instructions = map.get("instructions").cloned(); - let git = map.get("git").cloned(); - - let mut new_top = serde_json::Map::new(); - if let Some(ts) = top_timestamp.clone() { - new_top.insert("timestamp".to_string(), ts); - } - new_top.insert( - "type".to_string(), - serde_json::Value::String("session_meta".to_string()), - ); - - let mut payload = serde_json::Map::new(); - payload.insert( - "id".to_string(), - serde_json::Value::String(new_id.to_string()), - ); - if let Some(ts) = top_timestamp { - payload.insert("timestamp".to_string(), ts); - } - if let Some(instr) = instructions { - payload.insert("instructions".to_string(), instr); - } - if let Some(git_val) = git { - payload.insert("git".to_string(), git_val); - } - // Required fields in new format: cwd, originator, cli_version - if !payload.contains_key("cwd") { - payload.insert( - "cwd".to_string(), - serde_json::Value::String(".".to_string()), - ); - } - if !payload.contains_key("originator") { - payload.insert( - "originator".to_string(), - serde_json::Value::String("vibe_kanban_migrated".to_string()), - ); - } - if !payload.contains_key("cli_version") { - payload.insert( - "cli_version".to_string(), - serde_json::Value::String("0.0.0-migrated".to_string()), - ); - } - - new_top.insert("payload".to_string(), serde_json::Value::Object(payload)); - - *map = new_top; // replace the old map with the new-format one - Ok(()) - } - _ => Err("First line of rollout file is not a JSON object".to_string()), - } - } - - // Build a new rollout filename, ignoring any original name. - // Always returns: rollout--.jsonl - fn new_rollout_filename(new_id: &str) -> String { - let now_ts = chrono::Local::now().format("%Y-%m-%dT%H-%M-%S").to_string(); - format!("rollout-{now_ts}-{new_id}.jsonl") - } -} - -#[cfg(test)] -mod tests { - use super::SessionHandler; - - #[test] - fn test_new_rollout_filename_pattern() { - let id = "ID-123"; - let out = SessionHandler::new_rollout_filename(id); - // rollout-YYYY-MM-DDTHH-MM-SS-ID-123.jsonl - let re = regex::Regex::new(r"^rollout-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}-ID-123\.jsonl$") - .unwrap(); - assert!(re.is_match(&out), "Unexpected filename: {out}"); + fn rollout_filename_from_time(new_id: &str, now_local: &chrono::DateTime) -> String { + let ts = now_local.format(FILENAME_TIMESTAMP_FORMAT).to_string(); + format!("rollout-{ts}-{new_id}.jsonl") } } diff --git a/crates/executors/src/logs/mod.rs b/crates/executors/src/logs/mod.rs index 3cf3776d..440b689e 100644 --- a/crates/executors/src/logs/mod.rs +++ b/crates/executors/src/logs/mod.rs @@ -98,10 +98,11 @@ impl NormalizedEntry { } } -#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[derive(Debug, Clone, Serialize, Deserialize, TS, Default)] #[ts(export)] #[serde(tag = "status", rename_all = "snake_case")] pub enum ToolStatus { + #[default] Created, Success, Failed, diff --git a/crates/executors/src/logs/stderr_processor.rs b/crates/executors/src/logs/stderr_processor.rs index 04510cdb..d18e5d3a 100644 --- a/crates/executors/src/logs/stderr_processor.rs +++ b/crates/executors/src/logs/stderr_processor.rs @@ -42,7 +42,7 @@ pub fn normalize_stderr_logs(msg_store: Arc, entry_index_provider: Ent .normalized_entry_producer(Box::new(|content: String| NormalizedEntry { timestamp: None, entry_type: NormalizedEntryType::ErrorMessage, - content, + content: strip_ansi_escapes::strip_str(&content), metadata: None, })) .time_gap(Duration::from_secs(2)) // Break messages if they are 2 seconds apart diff --git a/shared/schemas/codex.json b/shared/schemas/codex.json index 6c1b2c77..7ebb3790 100644 --- a/shared/schemas/codex.json +++ b/shared/schemas/codex.json @@ -76,6 +76,30 @@ null ] }, + "profile": { + "type": [ + "string", + "null" + ] + }, + "base_instructions": { + "type": [ + "string", + "null" + ] + }, + "include_plan_tool": { + "type": [ + "boolean", + "null" + ] + }, + "include_apply_patch_tool": { + "type": [ + "boolean", + "null" + ] + }, "base_command_override": { "title": "Base Command Override", "description": "Override the base command with a custom command", diff --git a/shared/types.ts b/shared/types.ts index 96e45099..41c5c730 100644 --- a/shared/types.ts +++ b/shared/types.ts @@ -162,7 +162,7 @@ export type GeminiModel = "default" | "flash"; export type Amp = { append_prompt: AppendPrompt, dangerously_allow_all?: boolean | null, base_command_override?: string | null, additional_params?: Array | null, }; -export type Codex = { append_prompt: AppendPrompt, sandbox?: SandboxMode | null, oss?: boolean | null, model?: string | null, model_reasoning_effort?: ReasoningEffort | null, model_reasoning_summary?: ReasoningSummary | null, model_reasoning_summary_format?: ReasoningSummaryFormat | null, base_command_override?: string | null, additional_params?: Array | null, }; +export type Codex = { append_prompt: AppendPrompt, sandbox?: SandboxMode | null, oss?: boolean | null, model?: string | null, model_reasoning_effort?: ReasoningEffort | null, model_reasoning_summary?: ReasoningSummary | null, model_reasoning_summary_format?: ReasoningSummaryFormat | null, profile?: string | null, base_instructions?: string | null, include_plan_tool?: boolean | null, include_apply_patch_tool?: boolean | null, base_command_override?: string | null, additional_params?: Array | null, }; export type SandboxMode = "auto" | "read-only" | "workspace-write" | "danger-full-access";