Update codex to the latest (#724)

This commit is contained in:
Solomon
2025-09-17 10:03:25 +01:00
committed by GitHub
parent e9edef6e89
commit 5f37bc258f
2 changed files with 347 additions and 160 deletions

View File

@@ -1,3 +1,5 @@
mod session;
use std::{
path::{Path, PathBuf},
process::Stdio,
@@ -7,7 +9,6 @@ use std::{
use async_trait::async_trait;
use command_group::{AsyncCommandGroup, AsyncGroupChild};
use futures::StreamExt;
use regex::Regex;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use strum_macros::AsRefStr;
@@ -22,7 +23,9 @@ use utils::{
use crate::{
command::{CmdOverrides, CommandBuilder, apply_overrides},
executors::{AppendPrompt, ExecutorError, StandardCodingAgentExecutor},
executors::{
AppendPrompt, ExecutorError, StandardCodingAgentExecutor, codex::session::SessionHandler,
},
logs::{
ActionType, FileChange, NormalizedEntry, NormalizedEntryType,
utils::{EntryIndexProvider, patch::ConversationPatch},
@@ -72,158 +75,6 @@ pub enum ReasoningSummary {
None,
}
/// Handles session management for Codex executor
pub struct SessionHandler;
impl SessionHandler {
/// Start monitoring stderr lines for session ID extraction
pub fn start_session_id_extraction(msg_store: Arc<MsgStore>) {
tokio::spawn(async move {
let mut stderr_lines_stream = msg_store.stderr_lines_stream();
while let Some(Ok(line)) = stderr_lines_stream.next().await {
if let Some(session_id) = Self::extract_session_id_from_line(&line) {
msg_store.push_session_id(session_id);
}
}
});
}
/// Extract session ID from codex stderr output
pub fn extract_session_id_from_line(line: &str) -> Option<String> {
// Look for session_id in the log format:
// 2025-07-23T15:47:59.877058Z INFO codex_exec: Codex initialized with event: Event { id: "0", msg: SessionConfigured(SessionConfiguredEvent { session_id: 3cdcc4df-c7c3-4cca-8902-48c3d4a0f96b, model: "codex-mini-latest", history_log_id: 9104228, history_entry_count: 1 }) }
static SESSION_ID_REGEX: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let regex = SESSION_ID_REGEX.get_or_init(|| {
Regex::new(r"session_id:\s*([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})").unwrap()
});
regex
.captures(line)
.and_then(|cap| cap.get(1))
.map(|m| m.as_str().to_string())
}
/// Find codex rollout file path for given session_id. Used during follow-up execution.
pub fn find_rollout_file_path(session_id: &str) -> Result<PathBuf, String> {
let home_dir = dirs::home_dir().ok_or("Could not determine home directory")?;
let sessions_dir = home_dir.join(".codex").join("sessions");
// Scan the sessions directory recursively for rollout files matching the session_id
// Pattern: rollout-{YYYY}-{MM}-{DD}T{HH}-{mm}-{ss}-{session_id}.jsonl
Self::scan_directory(&sessions_dir, session_id)
}
// Helper for `find_rollout_file_path`.
// Recursively scan directory for rollout files matching the session_id
fn scan_directory(dir: &PathBuf, session_id: &str) -> Result<PathBuf, String> {
if !dir.exists() {
return Err(format!(
"Sessions directory does not exist: {}",
dir.display()
));
}
let entries = std::fs::read_dir(dir)
.map_err(|e| format!("Failed to read directory {}: {}", dir.display(), e))?;
for entry in entries {
let entry = entry.map_err(|e| format!("Failed to read directory entry: {e}"))?;
let path = entry.path();
if path.is_dir() {
// Recursively search subdirectories
if let Ok(found) = Self::scan_directory(&path, session_id) {
return Ok(found);
}
} else if path.is_file()
&& let Some(filename) = path.file_name()
&& let Some(filename_str) = filename.to_str()
&& filename_str.contains(session_id)
&& filename_str.starts_with("rollout-")
&& filename_str.ends_with(".jsonl")
{
return Ok(path);
}
}
Err(format!(
"Could not find rollout file for session_id: {session_id}"
))
}
/// Fork a Codex rollout file by copying it to a temp location and assigning a new session id.
/// Returns (new_rollout_path, new_session_id).
pub fn fork_rollout_file(session_id: &str) -> Result<(PathBuf, String), String> {
use std::io::{BufRead, BufReader, Write};
let original = Self::find_rollout_file_path(session_id)?;
let file = std::fs::File::open(&original)
.map_err(|e| format!("Failed to open rollout file {}: {e}", original.display()))?;
let mut reader = BufReader::new(file);
let mut first_line = String::new();
reader
.read_line(&mut first_line)
.map_err(|e| format!("Failed to read first line from {}: {e}", original.display()))?;
let mut meta: serde_json::Value = serde_json::from_str(first_line.trim()).map_err(|e| {
format!(
"Failed to parse first line JSON in {}: {e}",
original.display()
)
})?;
// Generate new UUID for forked session
let new_id = uuid::Uuid::new_v4().to_string();
if let serde_json::Value::Object(ref mut map) = meta {
map.insert("id".to_string(), serde_json::Value::String(new_id.clone()));
} else {
return Err("First line of rollout file is not a JSON object".to_string());
}
// Prepare destination path in the same directory, following Codex rollout naming convention:
// rollout-<YYYY>-<MM>-<DD>T<HH>-<mm>-<ss>-<session_id>.jsonl
let parent_dir = original
.parent()
.ok_or_else(|| format!("Unexpected path with no parent: {}", original.display()))?;
let filename = original
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("rollout.jsonl");
let new_filename = if filename.starts_with("rollout-") && filename.ends_with(".jsonl") {
let stem = &filename[..filename.len() - ".jsonl".len()];
if let Some(idx) = stem.rfind('-') {
// Replace the trailing session id with the new id, keep timestamp intact
format!("{}-{}.jsonl", &stem[..idx], new_id)
} else {
format!("rollout-{new_id}.jsonl")
}
} else {
format!("rollout-{new_id}.jsonl")
};
let dest = parent_dir.join(new_filename);
// Write new file with modified first line and copy the rest as-is
let mut writer = std::fs::File::create(&dest)
.map_err(|e| format!("Failed to create forked rollout {}: {e}", dest.display()))?;
let meta_line = serde_json::to_string(&meta)
.map_err(|e| format!("Failed to serialize modified meta: {e}"))?;
writeln!(writer, "{meta_line}")
.map_err(|e| format!("Failed to write meta to {}: {e}", dest.display()))?;
for line in reader.lines() {
let line =
line.map_err(|e| format!("I/O error reading {}: {e}", original.display()))?;
writeln!(writer, "{line}")
.map_err(|e| format!("Failed to write to {}: {e}", dest.display()))?;
}
Ok((dest, new_id))
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, TS, JsonSchema)]
pub struct Codex {
#[serde(default)]
@@ -246,7 +97,7 @@ pub struct Codex {
impl Codex {
fn build_command_builder(&self) -> CommandBuilder {
let mut builder = CommandBuilder::new("npx -y @openai/codex@0.29.0 exec")
let mut builder = CommandBuilder::new("npx -y @openai/codex@latest exec")
.params(["--json", "--skip-git-repo-check"]);
if let Some(approval) = &self.approval {
@@ -332,14 +183,13 @@ impl StandardCodingAgentExecutor for Codex {
session_id: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
// Fork rollout: copy and assign a new session id so each execution has a unique session
let (rollout_file_path, _new_session_id) = SessionHandler::fork_rollout_file(session_id)
let (_rollout_file_path, new_session_id) = SessionHandler::fork_rollout_file(session_id)
.map_err(|e| ExecutorError::SpawnError(std::io::Error::other(e)))?;
let (shell_cmd, shell_arg) = get_shell_command();
let codex_command = self.build_command_builder().build_follow_up(&[
"-c".to_string(),
format!("experimental_resume={}", rollout_file_path.display()),
]);
let codex_command = self
.build_command_builder()
.build_follow_up(&["resume".to_string(), new_session_id]);
let combined_prompt = self.append_prompt.combine_prompt(prompt);
@@ -1053,6 +903,18 @@ mod tests {
assert_eq!(session_id, None);
}
#[test]
fn test_extract_session_id_from_line_new_format() {
// Newer Codex versions wrap the UUID in ConversationId(...)
let line = "2025-09-12T14:36:32.515901Z INFO codex_exec: Codex initialized with event: SessionConfiguredEvent { session_id: ConversationId(bd823d48-4bd8-4d9e-9d87-93a66afbf4d2), model: \"gpt-5\", history_log_id: 0, history_entry_count: 0, initial_messages: None, rollout_path: \"/home/user/.codex/sessions/2025/09/12/rollout-2025-09-12T14-36-32-bd823d48-4bd8-4d9e-9d87-93a66afbf4d2.jsonl\" }";
let session_id = SessionHandler::extract_session_id_from_line(line);
assert_eq!(
session_id,
Some("bd823d48-4bd8-4d9e-9d87-93a66afbf4d2".to_string())
);
}
#[test]
fn test_normalize_logs_basic() {
let logs = r#"{"id":"1","msg":{"type":"task_started"}}
@@ -1349,4 +1211,55 @@ invalid json line here
let entries = parsed.to_normalized_entries(&current_dir);
assert!(entries.is_none()); // Should return None
}
#[test]
fn test_set_session_id_in_rollout_meta_old_format() {
let mut meta = serde_json::json!({
"id": "8724aa3f-efb7-4bbb-96a4-63fb3cb7ee90",
"timestamp": "2025-09-09T16:46:39.250Z",
"instructions": "# ...",
"git": {
"commit_hash": "70497c4cb9d64473e1e7602083badf338e59e75a",
"branch": "vk/9986-retry-with",
"repository_url": "https://github.com/bloopai/vibe-kanban"
}
});
let new_id = "11111111-2222-3333-4444-555555555555";
SessionHandler::set_session_id_in_rollout_meta(&mut meta, new_id).unwrap();
// After migration, we should write new-format header
assert_eq!(meta["type"].as_str(), Some("session_meta"));
assert_eq!(meta["payload"]["id"].as_str(), Some(new_id));
// Preserve instructions and git inside payload when present
assert_eq!(meta["payload"]["instructions"].as_str(), Some("# ..."));
assert!(meta["payload"]["git"].is_object());
// Top-level id should be absent in new format
assert_eq!(meta.get("id").and_then(|v| v.as_str()), None);
}
#[test]
fn test_set_session_id_in_rollout_meta_new_format() {
let mut meta = serde_json::json!({
"timestamp": "2025-09-12T15:34:41.080Z",
"type": "session_meta",
"payload": {
"id": "0c2061fc-1da8-4733-b33f-70159b4c57f2",
"timestamp": "2025-09-12T15:34:41.068Z",
"cwd": "/var/tmp/vibe-kanban-dev/worktrees/vk-f625-hi",
"originator": "codex_cli_rs",
"cli_version": "0.34.0",
"instructions": "# ...",
"git": {
"commit_hash": "07fad5465fcdca9b719cea965372a0ea39f42d15",
"branch": "vk/f625-hi",
"repository_url": "https://github.com/bloopai/vibe-kanban"
}
}
});
let new_id = "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee";
SessionHandler::set_session_id_in_rollout_meta(&mut meta, new_id).unwrap();
// New format takes precedence: payload.id updated
assert_eq!(meta["payload"]["id"].as_str(), Some(new_id));
// Top-level id should remain absent (new format only uses payload.id)
assert_eq!(meta["id"].as_str(), None);
}
}

View File

@@ -0,0 +1,274 @@
use std::{path::PathBuf, sync::Arc};
use futures::StreamExt;
use regex::Regex;
use utils::msg_store::MsgStore;
/// Handles session management for Codex
pub struct SessionHandler;
impl SessionHandler {
/// Start monitoring stderr lines for session ID extraction
pub fn start_session_id_extraction(msg_store: Arc<MsgStore>) {
tokio::spawn(async move {
let mut stderr_lines_stream = msg_store.stderr_lines_stream();
while let Some(Ok(line)) = stderr_lines_stream.next().await {
if let Some(session_id) = Self::extract_session_id_from_line(&line) {
msg_store.push_session_id(session_id);
}
}
});
}
/// Extract session ID from codex stderr output. Supports:
/// - Old: session_id: <uuid>
/// - New: session_id: ConversationId(<uuid>)
pub fn extract_session_id_from_line(line: &str) -> Option<String> {
static SESSION_ID_REGEX: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let regex = SESSION_ID_REGEX.get_or_init(|| {
Regex::new(r"session_id:\s*(?:ConversationId\()?(?P<id>[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\)?").unwrap()
});
regex
.captures(line)
.and_then(|cap| cap.name("id"))
.map(|m| m.as_str().to_string())
}
/// Find codex rollout file path for given session_id. Used during follow-up execution.
pub fn find_rollout_file_path(session_id: &str) -> Result<PathBuf, String> {
let home_dir = dirs::home_dir().ok_or("Could not determine home directory")?;
let sessions_dir = home_dir.join(".codex").join("sessions");
// Scan the sessions directory recursively for rollout files matching the session_id
// Pattern: rollout-{YYYY}-{MM}-{DD}T{HH}-{mm}-{ss}-{session_id}.jsonl
Self::scan_directory(&sessions_dir, session_id)
}
// Recursively scan directory for rollout files matching the session_id
fn scan_directory(dir: &PathBuf, session_id: &str) -> Result<PathBuf, String> {
if !dir.exists() {
return Err(format!(
"Sessions directory does not exist: {}",
dir.display()
));
}
let entries = std::fs::read_dir(dir)
.map_err(|e| format!("Failed to read directory {}: {}", dir.display(), e))?;
for entry in entries {
let entry = entry.map_err(|e| format!("Failed to read directory entry: {e}"))?;
let path = entry.path();
if path.is_dir() {
// Recursively search subdirectories
if let Ok(found) = Self::scan_directory(&path, session_id) {
return Ok(found);
}
} else if path.is_file()
&& let Some(filename) = path.file_name()
&& let Some(filename_str) = filename.to_str()
&& filename_str.contains(session_id)
&& filename_str.starts_with("rollout-")
&& filename_str.ends_with(".jsonl")
{
return Ok(path);
}
}
Err(format!(
"Could not find rollout file for session_id: {session_id}"
))
}
/// Fork a Codex rollout file by copying it to a temp location and assigning a new session id.
/// Returns (new_rollout_path, new_session_id).
///
/// Migration behavior:
/// - If the original header is old format, it is converted to new format on write.
/// - Subsequent lines:
/// - If already new RolloutLine, pass through unchanged.
/// - If object contains "record_type", skip it (ignored in old impl).
/// - Otherwise, wrap as RolloutLine of type "response_item" with payload = original JSON.
pub fn fork_rollout_file(session_id: &str) -> Result<(PathBuf, String), String> {
use std::io::{BufRead, BufReader, Write};
let original = Self::find_rollout_file_path(session_id)?;
let file = std::fs::File::open(&original)
.map_err(|e| format!("Failed to open rollout file {}: {e}", original.display()))?;
let mut reader = BufReader::new(file);
let mut first_line = String::new();
reader
.read_line(&mut first_line)
.map_err(|e| format!("Failed to read first line from {}: {e}", original.display()))?;
let mut meta: serde_json::Value = serde_json::from_str(first_line.trim()).map_err(|e| {
format!(
"Failed to parse first line JSON in {}: {e}",
original.display()
)
})?;
// Generate new UUID for forked session
let new_id = uuid::Uuid::new_v4().to_string();
Self::set_session_id_in_rollout_meta(&mut meta, &new_id)?;
// Prepare destination path in the same directory, following Codex rollout naming convention:
// rollout-<YYYY>-<MM>-<DD>T<HH>-<mm>-<ss>-<session_id>.jsonl
let parent_dir = original
.parent()
.ok_or_else(|| format!("Unexpected path with no parent: {}", original.display()))?;
let filename = original
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("rollout.jsonl");
let new_filename = if filename.starts_with("rollout-") && filename.ends_with(".jsonl") {
let stem = &filename[..filename.len() - ".jsonl".len()];
if let Some(idx) = stem.rfind('-') {
// Replace the trailing session id with the new id, keep timestamp intact
format!("{}-{}.jsonl", &stem[..idx], new_id)
} else {
format!("rollout-{new_id}.jsonl")
}
} else {
format!("rollout-{new_id}.jsonl")
};
let dest = parent_dir.join(new_filename);
// Write new file with modified first line and copy the rest with migration as needed
let mut writer = std::fs::File::create(&dest)
.map_err(|e| format!("Failed to create forked rollout {}: {e}", dest.display()))?;
let meta_line = serde_json::to_string(&meta)
.map_err(|e| format!("Failed to serialize modified meta: {e}"))?;
writeln!(writer, "{meta_line}")
.map_err(|e| format!("Failed to write meta to {}: {e}", dest.display()))?;
// Wrap subsequent lines
for line in reader.lines() {
let line =
line.map_err(|e| format!("I/O error reading {}: {e}", original.display()))?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
// Try parse as JSON
let parsed: Result<serde_json::Value, _> = serde_json::from_str(trimmed);
let value = match parsed {
Ok(v) => v,
Err(_) => {
// Skip invalid JSON lines during migration
continue;
}
};
// If already a RolloutLine (has timestamp + type/payload or flattened item), pass through
let is_rollout_line = value.get("timestamp").is_some()
&& (value.get("type").is_some() || value.get("payload").is_some());
if is_rollout_line {
writeln!(writer, "{value}")
.map_err(|e| format!("Failed to write to {}: {e}", dest.display()))?;
continue;
}
// Ignore legacy bookkeeping lines like {"record_type": ...}
if value.get("record_type").is_some() {
continue;
}
// Otherwise, wrap as a new RolloutLine containing a ResponseItem payload
let timestamp = chrono::Utc::now()
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
.to_string();
let envelope = serde_json::json!({
"timestamp": timestamp,
"type": "response_item",
"payload": value,
});
writeln!(writer, "{envelope}")
.map_err(|e| format!("Failed to write to {}: {e}", dest.display()))?;
}
Ok((dest, new_id))
}
// Update session id inside the first-line JSON meta, supporting both old and new formats.
// - Old format: top-level { "id": "<uuid>", ... } -> convert to new format
// - New format: { "type": "session_meta", "payload": { "id": "<uuid>", ... }, ... }
// If both are somehow present, new format takes precedence.
pub(crate) fn set_session_id_in_rollout_meta(
meta: &mut serde_json::Value,
new_id: &str,
) -> Result<(), String> {
match meta {
serde_json::Value::Object(map) => {
// If already new format, update payload.id and return
if let Some(serde_json::Value::Object(payload)) = map.get_mut("payload") {
payload.insert(
"id".to_string(),
serde_json::Value::String(new_id.to_string()),
);
return Ok(());
}
// Convert old format to new format header
let top_timestamp = map.get("timestamp").cloned();
let instructions = map.get("instructions").cloned();
let git = map.get("git").cloned();
let mut new_top = serde_json::Map::new();
if let Some(ts) = top_timestamp.clone() {
new_top.insert("timestamp".to_string(), ts);
}
new_top.insert(
"type".to_string(),
serde_json::Value::String("session_meta".to_string()),
);
let mut payload = serde_json::Map::new();
payload.insert(
"id".to_string(),
serde_json::Value::String(new_id.to_string()),
);
if let Some(ts) = top_timestamp {
payload.insert("timestamp".to_string(), ts);
}
if let Some(instr) = instructions {
payload.insert("instructions".to_string(), instr);
}
if let Some(git_val) = git {
payload.insert("git".to_string(), git_val);
}
// Required fields in new format: cwd, originator, cli_version
if !payload.contains_key("cwd") {
payload.insert(
"cwd".to_string(),
serde_json::Value::String(".".to_string()),
);
}
if !payload.contains_key("originator") {
payload.insert(
"originator".to_string(),
serde_json::Value::String("vibe_kanban_migrated".to_string()),
);
}
if !payload.contains_key("cli_version") {
payload.insert(
"cli_version".to_string(),
serde_json::Value::String("0.0.0-migrated".to_string()),
);
}
new_top.insert("payload".to_string(), serde_json::Value::Object(payload));
*map = new_top; // replace the old map with the new-format one
Ok(())
}
_ => Err("First line of rollout file is not a JSON object".to_string()),
}
}
}