Upgrade Codex to the latest version (#947)

* Upgrade Codex to the latest version

Use the new `app-server` protocol to interact with codex cli.

* Fix spawn errors

* simplify session forking

* Append spawn errors to the logs
This commit is contained in:
Solomon
2025-10-07 14:57:41 +01:00
committed by GitHub
parent 41eaa061fe
commit 7c10c00d93
10 changed files with 1815 additions and 1410 deletions

View File

@@ -43,3 +43,6 @@ sqlx = "0.8.6"
axum = { workspace = true }
shlex = "1.3.0"
agent-client-protocol = "0.4"
codex-protocol = { git = "https://github.com/openai/codex.git", package = "codex-protocol", rev = "488ec061bf4d36916b8f477c700ea4fde4162a7a" }
codex-app-server-protocol = { git = "https://github.com/openai/codex.git", package = "codex-app-server-protocol", rev = "488ec061bf4d36916b8f477c700ea4fde4162a7a" }
codex-mcp-types = { git = "https://github.com/openai/codex.git", package = "mcp-types", rev = "488ec061bf4d36916b8f477c700ea4fde4162a7a" }

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,271 @@
use std::{
io,
sync::{Arc, OnceLock},
};
use async_trait::async_trait;
use codex_app_server_protocol::{
AddConversationListenerParams, AddConversationSubscriptionResponse, ApplyPatchApprovalResponse,
ClientInfo, ClientNotification, ClientRequest, ExecCommandApprovalResponse, InitializeParams,
InitializeResponse, InputItem, JSONRPCError, JSONRPCNotification, JSONRPCRequest,
JSONRPCResponse, NewConversationParams, NewConversationResponse, RequestId,
ResumeConversationParams, ResumeConversationResponse, SendUserMessageParams,
SendUserMessageResponse, ServerRequest,
};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::Value;
use tokio::{
io::{AsyncWrite, AsyncWriteExt, BufWriter},
sync::Mutex,
};
use super::jsonrpc::{JsonRpcCallbacks, JsonRpcPeer};
use crate::executors::ExecutorError;
pub struct AppServerClient {
rpc: OnceLock<JsonRpcPeer>,
log_writer: LogWriter,
}
impl AppServerClient {
pub fn new(log_writer: LogWriter) -> Arc<Self> {
Arc::new(Self {
rpc: OnceLock::new(),
log_writer,
})
}
pub fn connect(&self, peer: JsonRpcPeer) {
let _ = self.rpc.set(peer);
}
fn rpc(&self) -> &JsonRpcPeer {
self.rpc.get().expect("Codex RPC peer not attached")
}
pub async fn initialize(&self) -> Result<(), ExecutorError> {
let request = ClientRequest::Initialize {
request_id: self.next_request_id(),
params: InitializeParams {
client_info: ClientInfo {
name: "vibe-codex-executor".to_string(),
title: None,
version: env!("CARGO_PKG_VERSION").to_string(),
},
},
};
self.send_request::<InitializeResponse>(request, "initialize")
.await?;
self.send_message(&ClientNotification::Initialized).await
}
pub async fn new_conversation(
&self,
params: NewConversationParams,
) -> Result<NewConversationResponse, ExecutorError> {
let request = ClientRequest::NewConversation {
request_id: self.next_request_id(),
params,
};
self.send_request(request, "newConversation").await
}
pub async fn resume_conversation(
&self,
rollout_path: std::path::PathBuf,
overrides: NewConversationParams,
) -> Result<ResumeConversationResponse, ExecutorError> {
let request = ClientRequest::ResumeConversation {
request_id: self.next_request_id(),
params: ResumeConversationParams {
path: rollout_path,
overrides: Some(overrides),
},
};
self.send_request(request, "resumeConversation").await
}
pub async fn add_conversation_listener(
&self,
conversation_id: codex_protocol::ConversationId,
) -> Result<AddConversationSubscriptionResponse, ExecutorError> {
let request = ClientRequest::AddConversationListener {
request_id: self.next_request_id(),
params: AddConversationListenerParams { conversation_id },
};
self.send_request(request, "addConversationListener").await
}
pub async fn send_user_message(
&self,
conversation_id: codex_protocol::ConversationId,
message: String,
) -> Result<SendUserMessageResponse, ExecutorError> {
let request = ClientRequest::SendUserMessage {
request_id: self.next_request_id(),
params: SendUserMessageParams {
conversation_id,
items: vec![InputItem::Text { text: message }],
},
};
self.send_request(request, "sendUserMessage").await
}
async fn send_message<M>(&self, message: &M) -> Result<(), ExecutorError>
where
M: Serialize + Sync,
{
self.rpc().send(message).await
}
async fn send_request<R>(&self, request: ClientRequest, label: &str) -> Result<R, ExecutorError>
where
R: DeserializeOwned + std::fmt::Debug,
{
let request_id = request_id(&request);
self.rpc().request(request_id, &request, label).await
}
fn next_request_id(&self) -> RequestId {
self.rpc().next_request_id()
}
}
#[async_trait]
impl JsonRpcCallbacks for AppServerClient {
async fn on_request(
&self,
peer: &JsonRpcPeer,
raw: &str,
request: JSONRPCRequest,
) -> Result<(), ExecutorError> {
self.log_writer.log_raw(raw).await?;
match ServerRequest::try_from(request.clone()) {
Ok(server_request) => handle_server_request(peer, server_request).await,
Err(err) => {
tracing::debug!("Unhandled server request `{}`: {err}", request.method);
let response = JSONRPCResponse {
id: request.id,
result: Value::Null,
};
peer.send(&response).await
}
}
}
async fn on_response(
&self,
_peer: &JsonRpcPeer,
raw: &str,
_response: &JSONRPCResponse,
) -> Result<(), ExecutorError> {
self.log_writer.log_raw(raw).await
}
async fn on_error(
&self,
_peer: &JsonRpcPeer,
raw: &str,
_error: &JSONRPCError,
) -> Result<(), ExecutorError> {
self.log_writer.log_raw(raw).await
}
async fn on_notification(
&self,
_peer: &JsonRpcPeer,
raw: &str,
notification: JSONRPCNotification,
) -> Result<bool, ExecutorError> {
self.log_writer.log_raw(raw).await?;
let method = notification.method.as_str();
if !method.starts_with("codex/event") {
return Ok(false);
}
let has_finished = method
.strip_prefix("codex/event/")
.is_some_and(|suffix| suffix == "task_complete");
Ok(has_finished)
}
async fn on_non_json(&self, raw: &str) -> Result<(), ExecutorError> {
self.log_writer.log_raw(raw).await?;
Ok(())
}
}
// Aprovals
async fn handle_server_request(
peer: &JsonRpcPeer,
request: ServerRequest,
) -> Result<(), ExecutorError> {
match request {
ServerRequest::ApplyPatchApproval { request_id, .. } => {
let response = ApplyPatchApprovalResponse {
decision: codex_protocol::protocol::ReviewDecision::ApprovedForSession,
};
send_server_response(peer, request_id, response).await
}
ServerRequest::ExecCommandApproval { request_id, .. } => {
let response = ExecCommandApprovalResponse {
decision: codex_protocol::protocol::ReviewDecision::ApprovedForSession,
};
send_server_response(peer, request_id, response).await
}
}
}
async fn send_server_response<T>(
peer: &JsonRpcPeer,
request_id: RequestId,
response: T,
) -> Result<(), ExecutorError>
where
T: Serialize,
{
let payload = JSONRPCResponse {
id: request_id,
result: serde_json::to_value(response)
.map_err(|err| ExecutorError::Io(io::Error::other(err.to_string())))?,
};
peer.send(&payload).await
}
fn request_id(request: &ClientRequest) -> RequestId {
match request {
ClientRequest::Initialize { request_id, .. }
| ClientRequest::NewConversation { request_id, .. }
| ClientRequest::ResumeConversation { request_id, .. }
| ClientRequest::AddConversationListener { request_id, .. }
| ClientRequest::SendUserMessage { request_id, .. } => request_id.clone(),
_ => unreachable!("request_id called for unsupported request variant"),
}
}
#[derive(Clone)]
pub struct LogWriter {
writer: Arc<Mutex<BufWriter<Box<dyn AsyncWrite + Send + Unpin>>>>,
}
impl LogWriter {
pub fn new(writer: impl AsyncWrite + Send + Unpin + 'static) -> Self {
Self {
writer: Arc::new(Mutex::new(BufWriter::new(Box::new(writer)))),
}
}
pub async fn log_raw(&self, raw: &str) -> Result<(), ExecutorError> {
let mut guard = self.writer.lock().await;
guard
.write_all(raw.as_bytes())
.await
.map_err(ExecutorError::Io)?;
guard.write_all(b"\n").await.map_err(ExecutorError::Io)?;
guard.flush().await.map_err(ExecutorError::Io)?;
Ok(())
}
}

View File

@@ -0,0 +1,281 @@
//! Minimal JSON-RPC helper tailored for the Codex executor.
//!
//! We keep this bespoke layer because the codex-app-server client must handle server-initiated
//! requests as well as client-initiated requests. When a bidirectional client that
//! supports this pattern is available, this module should be straightforward to
//! replace.
use std::{
collections::HashMap,
fmt::Debug,
io,
sync::{
Arc,
atomic::{AtomicI64, Ordering},
},
};
use async_trait::async_trait;
use codex_app_server_protocol::{
JSONRPCError, JSONRPCMessage, JSONRPCNotification, JSONRPCRequest, JSONRPCResponse, RequestId,
};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::Value;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
process::{ChildStdin, ChildStdout},
sync::{Mutex, oneshot},
};
use crate::executors::ExecutorError;
#[derive(Debug)]
pub enum PendingResponse {
Result(Value),
Error(JSONRPCError),
Shutdown,
}
#[derive(Clone)]
pub struct ExitSignalSender {
inner: Arc<Mutex<Option<oneshot::Sender<()>>>>,
}
impl ExitSignalSender {
pub fn new(sender: oneshot::Sender<()>) -> Self {
Self {
inner: Arc::new(Mutex::new(Some(sender))),
}
}
pub async fn send_exit_signal(&self) {
if let Some(sender) = self.inner.lock().await.take() {
let _ = sender.send(());
}
}
}
#[derive(Clone)]
pub struct JsonRpcPeer {
stdin: Arc<Mutex<ChildStdin>>,
pending: Arc<Mutex<HashMap<RequestId, oneshot::Sender<PendingResponse>>>>,
id_counter: Arc<AtomicI64>,
}
impl JsonRpcPeer {
pub fn spawn(
stdin: ChildStdin,
stdout: ChildStdout,
callbacks: Arc<dyn JsonRpcCallbacks>,
exit_tx: ExitSignalSender,
) -> Self {
let peer = Self {
stdin: Arc::new(Mutex::new(stdin)),
pending: Arc::new(Mutex::new(HashMap::new())),
id_counter: Arc::new(AtomicI64::new(1)),
};
let reader_peer = peer.clone();
let callbacks = callbacks.clone();
tokio::spawn(async move {
let mut reader = BufReader::new(stdout);
let mut buffer = String::new();
loop {
buffer.clear();
match reader.read_line(&mut buffer).await {
Ok(0) => break,
Ok(_) => {
let line = buffer.trim_end_matches(['\n', '\r']);
if line.is_empty() {
continue;
}
match serde_json::from_str::<JSONRPCMessage>(line) {
Ok(JSONRPCMessage::Response(response)) => {
let request_id = response.id.clone();
let result = response.result.clone();
if callbacks
.on_response(&reader_peer, line, &response)
.await
.is_err()
{
break;
}
reader_peer
.resolve(request_id, PendingResponse::Result(result))
.await;
}
Ok(JSONRPCMessage::Error(error)) => {
let request_id = error.id.clone();
if callbacks
.on_error(&reader_peer, line, &error)
.await
.is_err()
{
break;
}
reader_peer
.resolve(request_id, PendingResponse::Error(error))
.await;
}
Ok(JSONRPCMessage::Request(request)) => {
if callbacks
.on_request(&reader_peer, line, request)
.await
.is_err()
{
break;
}
}
Ok(JSONRPCMessage::Notification(notification)) => {
match callbacks
.on_notification(&reader_peer, line, notification)
.await
{
// finished
Ok(true) => break,
Ok(false) => {}
Err(_) => {
break;
}
}
}
Err(_) => {
if callbacks.on_non_json(line).await.is_err() {
break;
}
}
}
}
Err(err) => {
tracing::warn!("Error reading Codex output: {err}");
break;
}
}
}
exit_tx.send_exit_signal().await;
let _ = reader_peer.shutdown().await;
});
peer
}
pub fn next_request_id(&self) -> RequestId {
RequestId::Integer(self.id_counter.fetch_add(1, Ordering::Relaxed))
}
pub async fn register(&self, request_id: RequestId) -> PendingReceiver {
let (sender, receiver) = oneshot::channel();
self.pending.lock().await.insert(request_id, sender);
receiver
}
pub async fn resolve(&self, request_id: RequestId, response: PendingResponse) {
if let Some(sender) = self.pending.lock().await.remove(&request_id) {
let _ = sender.send(response);
}
}
pub async fn shutdown(&self) -> Result<(), ExecutorError> {
let mut pending = self.pending.lock().await;
for (_, sender) in pending.drain() {
let _ = sender.send(PendingResponse::Shutdown);
}
Ok(())
}
pub async fn send<T>(&self, message: &T) -> Result<(), ExecutorError>
where
T: Serialize + Sync,
{
let raw = serde_json::to_string(message)
.map_err(|err| ExecutorError::Io(io::Error::other(err.to_string())))?;
self.send_raw(&raw).await
}
pub async fn request<R, T>(
&self,
request_id: RequestId,
message: &T,
label: &str,
) -> Result<R, ExecutorError>
where
R: DeserializeOwned + Debug,
T: Serialize + Sync,
{
let receiver = self.register(request_id).await;
self.send(message).await?;
await_response(receiver, label).await
}
async fn send_raw(&self, payload: &str) -> Result<(), ExecutorError> {
let mut guard = self.stdin.lock().await;
guard
.write_all(payload.as_bytes())
.await
.map_err(ExecutorError::Io)?;
guard.write_all(b"\n").await.map_err(ExecutorError::Io)?;
guard.flush().await.map_err(ExecutorError::Io)?;
Ok(())
}
}
pub type PendingReceiver = oneshot::Receiver<PendingResponse>;
pub async fn await_response<R>(receiver: PendingReceiver, label: &str) -> Result<R, ExecutorError>
where
R: DeserializeOwned + Debug,
{
match receiver.await {
Ok(PendingResponse::Result(value)) => serde_json::from_value(value).map_err(|err| {
ExecutorError::Io(io::Error::other(format!(
"failed to decode {label} response: {err}",
)))
}),
Ok(PendingResponse::Error(error)) => Err(ExecutorError::Io(io::Error::other(format!(
"{label} request failed: {}",
error.error.message
)))),
Ok(PendingResponse::Shutdown) => Err(ExecutorError::Io(io::Error::other(format!(
"server was shutdown while waiting for {label} response",
)))),
Err(_) => Err(ExecutorError::Io(io::Error::other(format!(
"{label} request was dropped",
)))),
}
}
#[async_trait]
pub trait JsonRpcCallbacks: Send + Sync {
async fn on_request(
&self,
peer: &JsonRpcPeer,
raw: &str,
request: JSONRPCRequest,
) -> Result<(), ExecutorError>;
async fn on_response(
&self,
peer: &JsonRpcPeer,
raw: &str,
response: &JSONRPCResponse,
) -> Result<(), ExecutorError>;
async fn on_error(
&self,
peer: &JsonRpcPeer,
raw: &str,
error: &JSONRPCError,
) -> Result<(), ExecutorError>;
async fn on_notification(
&self,
peer: &JsonRpcPeer,
raw: &str,
notification: JSONRPCNotification,
) -> Result<bool, ExecutorError>;
async fn on_non_json(&self, _raw: &str) -> Result<(), ExecutorError>;
}

View File

@@ -0,0 +1,833 @@
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::Arc,
};
use codex_app_server_protocol::{
JSONRPCNotification, JSONRPCResponse, NewConversationResponse, ServerNotification,
};
use codex_mcp_types::ContentBlock;
use codex_protocol::{
config_types::ReasoningEffort,
plan_tool::{StepStatus, UpdatePlanArgs},
protocol::{
AgentMessageDeltaEvent, AgentReasoningDeltaEvent, AgentReasoningSectionBreakEvent,
BackgroundEventEvent, ErrorEvent, EventMsg, ExecCommandBeginEvent, ExecCommandEndEvent,
ExecCommandOutputDeltaEvent, ExecOutputStream, FileChange as CodexProtoFileChange,
McpInvocation, McpToolCallBeginEvent, McpToolCallEndEvent, PatchApplyBeginEvent,
PatchApplyEndEvent, StreamErrorEvent, TokenUsageInfo, ViewImageToolCallEvent,
WebSearchBeginEvent, WebSearchEndEvent,
},
};
use futures::StreamExt;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use workspace_utils::{
diff::{concatenate_diff_hunks, extract_unified_diff_hunks},
msg_store::MsgStore,
path::make_path_relative,
};
use crate::{
executors::codex::session::SessionHandler,
logs::{
ActionType, CommandExitStatus, CommandRunResult, FileChange, NormalizedEntry,
NormalizedEntryType, TodoItem, ToolResult, ToolResultValueType, ToolStatus,
stderr_processor::normalize_stderr_logs,
utils::{ConversationPatch, EntryIndexProvider},
},
};
trait ToNormalizedEntry {
fn to_normalized_entry(&self) -> NormalizedEntry;
}
#[derive(Debug, Deserialize)]
struct CodexNotificationParams {
#[serde(rename = "msg")]
msg: EventMsg,
}
#[derive(Default)]
struct StreamingText {
index: usize,
content: String,
}
#[derive(Default)]
struct CommandState {
index: Option<usize>,
command: String,
stdout: String,
stderr: String,
formatted_output: Option<String>,
status: ToolStatus,
exit_code: Option<i32>,
}
impl ToNormalizedEntry for CommandState {
fn to_normalized_entry(&self) -> NormalizedEntry {
NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::ToolUse {
tool_name: "bash".to_string(),
action_type: ActionType::CommandRun {
command: self.command.clone(),
result: Some(CommandRunResult {
exit_status: self
.exit_code
.map(|code| CommandExitStatus::ExitCode { code }),
output: if self.formatted_output.is_some() {
self.formatted_output.clone()
} else {
build_command_output(Some(&self.stdout), Some(&self.stderr))
},
}),
},
status: self.status.clone(),
},
content: format!("`{}`", self.command),
metadata: None,
}
}
}
struct McpToolState {
index: Option<usize>,
invocation: McpInvocation,
result: Option<ToolResult>,
status: ToolStatus,
}
impl ToNormalizedEntry for McpToolState {
fn to_normalized_entry(&self) -> NormalizedEntry {
let tool_name = format!("mcp:{}:{}", self.invocation.server, self.invocation.tool);
NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::ToolUse {
tool_name: tool_name.clone(),
action_type: ActionType::Tool {
tool_name,
arguments: self.invocation.arguments.clone(),
result: self.result.clone(),
},
status: self.status.clone(),
},
content: self.invocation.tool.clone(),
metadata: None,
}
}
}
#[derive(Default)]
struct WebSearchState {
index: Option<usize>,
query: Option<String>,
status: ToolStatus,
}
impl WebSearchState {
fn new() -> Self {
Default::default()
}
}
impl ToNormalizedEntry for WebSearchState {
fn to_normalized_entry(&self) -> NormalizedEntry {
NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::ToolUse {
tool_name: "web_search".to_string(),
action_type: ActionType::WebFetch {
url: self.query.clone().unwrap_or_else(|| "...".to_string()),
},
status: self.status.clone(),
},
content: self
.query
.clone()
.unwrap_or_else(|| "Web search".to_string()),
metadata: None,
}
}
}
#[derive(Default)]
struct PatchState {
entries: Vec<PatchEntry>,
}
struct PatchEntry {
index: Option<usize>,
path: String,
changes: Vec<FileChange>,
status: ToolStatus,
}
impl ToNormalizedEntry for PatchEntry {
fn to_normalized_entry(&self) -> NormalizedEntry {
NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::ToolUse {
tool_name: "edit".to_string(),
action_type: ActionType::FileEdit {
path: self.path.clone(),
changes: self.changes.clone(),
},
status: self.status.clone(),
},
content: self.path.clone(),
metadata: None,
}
}
}
struct LogState {
entry_index: EntryIndexProvider,
assistant: Option<StreamingText>,
thinking: Option<StreamingText>,
commands: HashMap<String, CommandState>,
mcp_tools: HashMap<String, McpToolState>,
patches: HashMap<String, PatchState>,
web_searches: HashMap<String, WebSearchState>,
token_usage_info: Option<TokenUsageInfo>,
}
enum StreamingTextKind {
Assistant,
Thinking,
}
impl LogState {
fn new(entry_index: EntryIndexProvider) -> Self {
Self {
entry_index,
assistant: None,
thinking: None,
commands: HashMap::new(),
mcp_tools: HashMap::new(),
patches: HashMap::new(),
web_searches: HashMap::new(),
token_usage_info: None,
}
}
fn streaming_text_update(
&mut self,
content: String,
type_: StreamingTextKind,
) -> (NormalizedEntry, usize, bool) {
let index_provider = &self.entry_index;
let entry = match type_ {
StreamingTextKind::Assistant => &mut self.assistant,
StreamingTextKind::Thinking => &mut self.thinking,
};
let is_new = entry.is_none();
let (content, index) = if entry.is_none() {
let index = index_provider.next();
*entry = Some(StreamingText { index, content });
(&entry.as_ref().unwrap().content, index)
} else {
let streaming_state = entry.as_mut().unwrap();
streaming_state.content.push_str(&content);
(&streaming_state.content, streaming_state.index)
};
let normalized_entry = NormalizedEntry {
timestamp: None,
entry_type: match type_ {
StreamingTextKind::Assistant => NormalizedEntryType::AssistantMessage,
StreamingTextKind::Thinking => NormalizedEntryType::Thinking,
},
content: content.clone(),
metadata: None,
};
(normalized_entry, index, is_new)
}
fn assistant_message_update(&mut self, content: String) -> (NormalizedEntry, usize, bool) {
self.streaming_text_update(content, StreamingTextKind::Assistant)
}
fn thinking_update(&mut self, content: String) -> (NormalizedEntry, usize, bool) {
self.streaming_text_update(content, StreamingTextKind::Thinking)
}
}
fn upsert_normalized_entry(
msg_store: &Arc<MsgStore>,
index: usize,
normalized_entry: NormalizedEntry,
is_new: bool,
) {
if is_new {
msg_store.push_patch(ConversationPatch::add_normalized_entry(
index,
normalized_entry,
));
} else {
msg_store.push_patch(ConversationPatch::replace(index, normalized_entry));
}
}
fn add_normalized_entry(
msg_store: &Arc<MsgStore>,
index_provider: &EntryIndexProvider,
normalized_entry: NormalizedEntry,
) -> usize {
let index = index_provider.next();
upsert_normalized_entry(msg_store, index, normalized_entry, true);
index
}
fn replace_normalized_entry(
msg_store: &Arc<MsgStore>,
index: usize,
normalized_entry: NormalizedEntry,
) {
upsert_normalized_entry(msg_store, index, normalized_entry, false);
}
fn normalize_file_changes(
worktree_path: &str,
changes: &HashMap<PathBuf, CodexProtoFileChange>,
) -> Vec<(String, Vec<FileChange>)> {
changes
.iter()
.map(|(path, change)| {
let path_str = path.to_string_lossy();
let relative = make_path_relative(path_str.as_ref(), worktree_path);
let file_changes = match change {
CodexProtoFileChange::Add { content } => vec![FileChange::Write {
content: content.clone(),
}],
CodexProtoFileChange::Delete { .. } => vec![FileChange::Delete],
CodexProtoFileChange::Update {
unified_diff,
move_path,
} => {
let mut edits = Vec::new();
if let Some(dest) = move_path {
let dest_rel =
make_path_relative(dest.to_string_lossy().as_ref(), worktree_path);
edits.push(FileChange::Rename { new_path: dest_rel });
}
let hunks = extract_unified_diff_hunks(unified_diff);
let diff = concatenate_diff_hunks(&relative, &hunks);
edits.push(FileChange::Edit {
unified_diff: diff,
has_line_numbers: true,
});
edits
}
};
(relative, file_changes)
})
.collect()
}
fn format_todo_status(status: &StepStatus) -> String {
match status {
StepStatus::Pending => "pending",
StepStatus::InProgress => "in_progress",
StepStatus::Completed => "completed",
}
.to_string()
}
pub fn normalize_logs(msg_store: Arc<MsgStore>, worktree_path: &Path) {
let entry_index = EntryIndexProvider::start_from(&msg_store);
normalize_stderr_logs(msg_store.clone(), entry_index.clone());
let worktree_path_str = worktree_path.to_string_lossy().to_string();
tokio::spawn(async move {
let mut state = LogState::new(entry_index.clone());
let mut stdout_lines = msg_store.stdout_lines_stream();
while let Some(Ok(line)) = stdout_lines.next().await {
if let Ok(error) = serde_json::from_str::<Error>(&line) {
add_normalized_entry(&msg_store, &entry_index, error.to_normalized_entry());
continue;
}
if let Ok(response) = serde_json::from_str::<JSONRPCResponse>(&line) {
handle_jsonrpc_response(response, &msg_store, &entry_index);
continue;
}
if let Ok(server_notification) = serde_json::from_str::<ServerNotification>(&line) {
if let ServerNotification::SessionConfigured(session_configured) =
server_notification
{
msg_store.push_session_id(session_configured.session_id.to_string());
handle_model_params(
session_configured.model,
session_configured.reasoning_effort,
&msg_store,
&entry_index,
);
};
continue;
} else if let Some(session_id) = line
.strip_prefix(r#"{"method":"sessionConfigured","params":{"sessionId":""#)
.and_then(|suffix| SESSION_ID.captures(suffix).and_then(|caps| caps.get(1)))
{
// Best-effort extraction of session ID from logs in case the JSON parsing fails.
// This could happen if the line is truncated due to size limits because it includes the full session history.
msg_store.push_session_id(session_id.as_str().to_string());
continue;
}
let notification: JSONRPCNotification = match serde_json::from_str(&line) {
Ok(value) => value,
Err(_) => continue,
};
if !notification.method.starts_with("codex/event") {
continue;
}
let Some(params) = notification
.params
.and_then(|p| serde_json::from_value::<CodexNotificationParams>(p).ok())
else {
continue;
};
let event = params.msg;
match event {
EventMsg::SessionConfigured(payload) => {
msg_store.push_session_id(payload.session_id.to_string());
handle_model_params(payload.model, payload.reasoning_effort, &msg_store, &entry_index);
}
EventMsg::AgentMessageDelta(AgentMessageDeltaEvent { delta }) => {
state.thinking = None;
let (entry, index, is_new) = state.assistant_message_update(delta);
upsert_normalized_entry(&msg_store, index, entry, is_new);
}
EventMsg::AgentReasoningDelta(AgentReasoningDeltaEvent { delta }) => {
state.assistant = None;
let (entry, index, is_new) = state.thinking_update(delta);
upsert_normalized_entry(&msg_store, index, entry, is_new);
}
EventMsg::AgentReasoningSectionBreak(AgentReasoningSectionBreakEvent {}) => {
state.assistant = None;
state.thinking = None;
}
EventMsg::ExecCommandBegin(ExecCommandBeginEvent {
call_id, command, ..
}) => {
state.assistant = None;
state.thinking = None;
let command_text = command.join(" ");
if command_text.is_empty() {
continue;
}
state.commands.insert(
call_id.clone(),
CommandState {
index: None,
command: command_text,
stdout: String::new(),
stderr: String::new(),
formatted_output: None,
status: ToolStatus::Created,
exit_code: None,
},
);
let command_state = state.commands.get_mut(&call_id).unwrap();
let index = add_normalized_entry(&msg_store, &entry_index, command_state.to_normalized_entry());
command_state.index = Some(index)
}
EventMsg::ExecCommandOutputDelta(ExecCommandOutputDeltaEvent {
call_id,
stream,
chunk,
}) => {
if let Some(command_state) = state.commands.get_mut(&call_id) {
let chunk = String::from_utf8_lossy(&chunk);
if chunk.is_empty() {
continue;
}
match stream {
ExecOutputStream::Stdout => command_state.stdout.push_str(&chunk),
ExecOutputStream::Stderr => command_state.stderr.push_str(&chunk),
}
let Some(index) = command_state.index else {
tracing::error!("missing entry index for existing command state");
continue;
};
replace_normalized_entry(&msg_store, index, command_state.to_normalized_entry());
}
}
EventMsg::ExecCommandEnd(ExecCommandEndEvent {
call_id,
stdout: _,
stderr: _,
aggregated_output: _,
exit_code,
duration: _,
formatted_output,
}) => {
if let Some(mut command_state) = state.commands.remove(&call_id) {
command_state.formatted_output = Some(formatted_output);
command_state.exit_code = Some(exit_code);
command_state.status = if exit_code == 0 {
ToolStatus::Success
} else {
ToolStatus::Failed
};
let Some(index) = command_state.index else {
tracing::error!("missing entry index for existing command state");
continue;
};
replace_normalized_entry(&msg_store, index, command_state.to_normalized_entry());
}
}
EventMsg::BackgroundEvent(BackgroundEventEvent { message }) => {
add_normalized_entry(&msg_store, &entry_index, NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::SystemMessage,
content: format!("Background event: {message}"),
metadata: None,
});
}
EventMsg::StreamError(StreamErrorEvent { message }) => {
add_normalized_entry(&msg_store, &entry_index, NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::ErrorMessage,
content: format!("Stream error: {message}"),
metadata: None,
});
}
EventMsg::McpToolCallBegin(McpToolCallBeginEvent {
call_id,
invocation,
}) => {
state.assistant = None;
state.thinking = None;
state.mcp_tools.insert(
call_id.clone(),
McpToolState {
index: None,
invocation,
result: None,
status: ToolStatus::Created,
},
);
let mcp_tool_state = state.mcp_tools.get_mut(&call_id).unwrap();
let index = add_normalized_entry(&msg_store, &entry_index, mcp_tool_state.to_normalized_entry());
mcp_tool_state.index = Some(index);
}
EventMsg::McpToolCallEnd(McpToolCallEndEvent {
call_id, result, ..
}) => {
if let Some(mut mcp_tool_state) = state.mcp_tools.remove(&call_id) {
match result {
Ok(value) => {
mcp_tool_state.status =
if value.is_error.unwrap_or(false) {
ToolStatus::Failed
} else {
ToolStatus::Success
};
if value.content.iter().all(|block| matches!(block, ContentBlock::TextContent(_))) {
mcp_tool_state.result = Some(ToolResult {
r#type: ToolResultValueType::Markdown,
value: Value::String(value.content.iter().map(|block| {
if let ContentBlock::TextContent(content) = block {
content.text.clone()
} else {
unreachable!()
}
}).collect::<Vec<String>>().join("\n"))
});
} else {
mcp_tool_state.result = Some(ToolResult { r#type: ToolResultValueType::Json, value: value.structured_content.unwrap_or_else(|| serde_json::to_value(value.content).unwrap_or_default()) });
}
}
Err(err) => {
mcp_tool_state.status = ToolStatus::Failed;
mcp_tool_state.result = Some(ToolResult {
r#type: ToolResultValueType::Markdown,
value: Value::String(err),
});
}
};
let Some(index) = mcp_tool_state.index else {
tracing::error!("missing entry index for existing mcp tool state");
continue;
};
replace_normalized_entry(&msg_store, index, mcp_tool_state.to_normalized_entry());
}
}
EventMsg::PatchApplyBegin(PatchApplyBeginEvent {
call_id, changes, ..
}) => {
state.assistant = None;
state.thinking = None;
let normalized = normalize_file_changes(&worktree_path_str, &changes);
let mut patch_state = PatchState::default();
for (path, file_changes) in normalized {
patch_state.entries.push(PatchEntry {
index: None,
path,
changes: file_changes,
status: ToolStatus::Created,
});
let patch_entry = patch_state.entries.last_mut().unwrap();
let index = add_normalized_entry(&msg_store, &entry_index, patch_entry.to_normalized_entry());
patch_entry.index = Some(index);
}
state.patches.insert(call_id, patch_state);
}
EventMsg::PatchApplyEnd(PatchApplyEndEvent {
call_id,
stdout: _,
stderr: _,
success,
..
}) => {
if let Some(patch_state) = state.patches.remove(&call_id) {
let status = if success {
ToolStatus::Success
} else {
ToolStatus::Failed
};
for mut entry in patch_state.entries {
entry.status = status.clone();
let Some(index) = entry.index else {
tracing::error!("missing entry index for existing patch entry");
continue;
};
replace_normalized_entry(&msg_store, index, entry.to_normalized_entry());
}
}
}
EventMsg::WebSearchBegin(WebSearchBeginEvent { call_id }) => {
state.assistant = None;
state.thinking = None;
state
.web_searches
.insert(call_id.clone(), WebSearchState::new());
let web_search_state = state.web_searches.get_mut(&call_id).unwrap();
let normalized_entry = web_search_state.to_normalized_entry();
let index = add_normalized_entry(&msg_store, &entry_index, normalized_entry);
web_search_state.index = Some(index);
}
EventMsg::WebSearchEnd(WebSearchEndEvent { call_id, query }) => {
state.assistant = None;
state.thinking = None;
if let Some(mut entry) = state.web_searches.remove(&call_id) {
entry.status = ToolStatus::Success;
entry.query = Some(query.clone());
let normalized_entry = entry.to_normalized_entry();
let Some(index) = entry.index else {
tracing::error!("missing entry index for existing websearch entry");
continue;
};
replace_normalized_entry(&msg_store, index, normalized_entry);
}
}
EventMsg::ViewImageToolCall(ViewImageToolCallEvent { call_id: _, path }) => {
state.assistant = None;
state.thinking = None;
let path_str = path.to_string_lossy().to_string();
let relative_path = make_path_relative(&path_str, &worktree_path_str);
add_normalized_entry(
&msg_store,
&entry_index,
NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::ToolUse {
tool_name: "view_image".to_string(),
action_type: ActionType::FileRead { path: relative_path.clone() },
status: ToolStatus::Success,
},
content: format!("`{relative_path}`"),
metadata: None,
},
);
}
EventMsg::PlanUpdate(UpdatePlanArgs { plan, explanation }) => {
let todos: Vec<TodoItem> = plan
.iter()
.map(|item| TodoItem {
content: item.step.clone(),
status: format_todo_status(&item.status),
priority: None,
})
.collect();
let explanation = explanation
.as_ref()
.map(|text| text.trim())
.filter(|text| !text.is_empty())
.map(|text| text.to_string());
let content = explanation.clone().unwrap_or_else(|| {
if todos.is_empty() {
"Plan updated".to_string()
} else {
format!("Plan updated ({} steps)", todos.len())
}
});
add_normalized_entry(
&msg_store,
&entry_index,
NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::ToolUse {
tool_name: "plan".to_string(),
action_type: ActionType::TodoManagement {
todos,
operation: "update".to_string(),
},
status: ToolStatus::Success,
},
content,
metadata: None,
},
);
}
EventMsg::Error(ErrorEvent { message }) => {
add_normalized_entry(&msg_store, &entry_index, NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::ErrorMessage,
content: message,
metadata: None,
});
}
EventMsg::TokenCount(payload) => {
if let Some(info) = payload.info {
state.token_usage_info = Some(info);
}
}
EventMsg::AgentReasoning(..) // content duplicated with delta events
| EventMsg::AgentMessage(..) // ditto
| EventMsg::AgentReasoningRawContent(..)
| EventMsg::AgentReasoningRawContentDelta(..)
| EventMsg::TaskStarted(..)
| EventMsg::UserMessage(..)
| EventMsg::TurnDiff(..)
| EventMsg::GetHistoryEntryResponse(..)
| EventMsg::McpListToolsResponse(..)
| EventMsg::ListCustomPromptsResponse(..)
| EventMsg::TurnAborted(..)
| EventMsg::ShutdownComplete
| EventMsg::ConversationPath(..)
| EventMsg::EnteredReviewMode(..)
| EventMsg::ExitedReviewMode(..)
| EventMsg::TaskComplete(..)
|EventMsg::ExecApprovalRequest(..)
|EventMsg::ApplyPatchApprovalRequest(..)
=> {}
}
}
});
}
fn handle_jsonrpc_response(
response: JSONRPCResponse,
msg_store: &Arc<MsgStore>,
entry_index: &EntryIndexProvider,
) {
let Ok(response) = serde_json::from_value::<NewConversationResponse>(response.result.clone())
else {
return;
};
match SessionHandler::extract_session_id_from_rollout_path(response.rollout_path) {
Ok(session_id) => msg_store.push_session_id(session_id),
Err(err) => tracing::error!("failed to extract session id: {err}"),
}
handle_model_params(
response.model,
response.reasoning_effort,
msg_store,
entry_index,
);
}
fn handle_model_params(
model: String,
reasoning_effort: Option<ReasoningEffort>,
msg_store: &Arc<MsgStore>,
entry_index: &EntryIndexProvider,
) {
let mut params = vec![];
params.push(format!("model: {model}"));
if let Some(reasoning_effort) = reasoning_effort {
params.push(format!("reasoning effort: {reasoning_effort}"));
}
add_normalized_entry(
msg_store,
entry_index,
NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::SystemMessage,
content: params.join(" ").to_string(),
metadata: None,
},
);
}
fn build_command_output(stdout: Option<&str>, stderr: Option<&str>) -> Option<String> {
let mut sections = Vec::new();
if let Some(out) = stdout {
let cleaned = out.trim();
if !cleaned.is_empty() {
sections.push(format!("stdout:\n{cleaned}"));
}
}
if let Some(err) = stderr {
let cleaned = err.trim();
if !cleaned.is_empty() {
sections.push(format!("stderr:\n{cleaned}"));
}
}
if sections.is_empty() {
None
} else {
Some(sections.join("\n\n"))
}
}
lazy_static! {
static ref SESSION_ID: Regex = Regex::new(
r#"^([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"#
)
.expect("valid regex");
}
#[derive(Serialize, Deserialize, Debug)]
pub enum Error {
LaunchError { error: String },
}
impl Error {
pub fn launch_error(error: String) -> Self {
Self::LaunchError { error }
}
pub fn raw(&self) -> String {
serde_json::to_string(self).unwrap_or_default()
}
}
impl ToNormalizedEntry for Error {
fn to_normalized_entry(&self) -> NormalizedEntry {
NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::ErrorMessage,
content: match self {
Error::LaunchError { error } => error.clone(),
},
metadata: None,
}
}
}

View File

@@ -1,282 +1,244 @@
use std::{path::PathBuf, sync::Arc};
use std::{
fs::File,
io::{BufRead, BufReader, BufWriter, Write},
path::{Path, PathBuf},
};
use futures::StreamExt;
use chrono::Local;
use codex_protocol::protocol::SessionSource;
use regex::Regex;
use workspace_utils::msg_store::MsgStore;
use serde_json::{Map, Value};
use thiserror::Error;
const FILENAME_TIMESTAMP_FORMAT: &str = "%Y-%m-%dT%H-%M-%S";
#[derive(Debug, Error)]
pub enum SessionError {
#[error("Session history format error: {0}")]
Format(String),
#[error("Session I/O error: {0}")]
Io(String),
#[error("Session not found: {0}")]
NotFound(String),
}
/// Handles session management for Codex
pub struct SessionHandler;
impl SessionHandler {
/// Start monitoring stderr lines for session ID extraction
pub fn start_session_id_extraction(msg_store: Arc<MsgStore>) {
tokio::spawn(async move {
let mut stderr_lines_stream = msg_store.stderr_lines_stream();
pub fn extract_session_id_from_rollout_path(
rollout_path: PathBuf,
) -> Result<String, SessionError> {
// Extracts the session UUID from the end of the rollout file path.
// Pattern: rollout-{timestamp}-{uuid}.jsonl
let filename = rollout_path
.file_name()
.and_then(|f| f.to_str())
.ok_or_else(|| SessionError::Format("Invalid rollout path".to_string()))?;
while let Some(Ok(line)) = stderr_lines_stream.next().await {
if let Some(session_id) = Self::extract_session_id_from_line(&line) {
msg_store.push_session_id(session_id);
}
}
});
}
// Match UUID before .jsonl extension
let re = Regex::new(
r"([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\.jsonl$",
)
.map_err(|e| SessionError::Format(format!("Regex error: {e}")))?;
/// Extract session ID from codex stderr output. Supports:
/// - Old: session_id: <uuid>
/// - New: session_id: ConversationId(<uuid>)
pub fn extract_session_id_from_line(line: &str) -> Option<String> {
static SESSION_ID_REGEX: std::sync::OnceLock<Regex> = std::sync::OnceLock::new();
let regex = SESSION_ID_REGEX.get_or_init(|| {
Regex::new(r"session_id:\s*(?:ConversationId\()?(?P<id>[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\)?").unwrap()
});
regex
.captures(line)
.and_then(|cap| cap.name("id"))
.map(|m| m.as_str().to_string())
re.captures(filename)
.and_then(|caps| caps.get(1))
.map(|uuid| uuid.as_str().to_string())
.ok_or_else(|| {
SessionError::Format(format!(
"Could not extract session id from filename: {filename}"
))
})
}
/// Find codex rollout file path for given session_id. Used during follow-up execution.
pub fn find_rollout_file_path(session_id: &str) -> Result<PathBuf, String> {
let home_dir = dirs::home_dir().ok_or("Could not determine home directory")?;
let sessions_dir = home_dir.join(".codex").join("sessions");
// Scan the sessions directory recursively for rollout files matching the session_id
// Pattern: rollout-{YYYY}-{MM}-{DD}T{HH}-{mm}-{ss}-{session_id}.jsonl
pub fn find_rollout_file_path(session_id: &str) -> Result<PathBuf, SessionError> {
let sessions_dir = Self::sessions_root()?;
Self::scan_directory(&sessions_dir, session_id)
}
// Recursively scan directory for rollout files matching the session_id
fn scan_directory(dir: &PathBuf, session_id: &str) -> Result<PathBuf, String> {
if !dir.exists() {
return Err(format!(
"Sessions directory does not exist: {}",
dir.display()
));
/// Fork a Codex rollout file by copying it to a temp location and assigning a new session id.
/// Returns (new_rollout_path, new_session_id).
pub fn fork_rollout_file(session_id: &str) -> Result<(PathBuf, String), SessionError> {
let original = Self::find_rollout_file_path(session_id)?;
tracing::debug!("Forking rollout file: {}", original.display());
let file = File::open(&original).map_err(|e| {
SessionError::Io(format!(
"Failed to open rollout file {}: {e}",
original.display()
))
})?;
let mut reader = BufReader::new(file);
let mut first_line = String::new();
reader.read_line(&mut first_line).map_err(|e| {
SessionError::Io(format!(
"Failed to read first line from {}: {e}",
original.display()
))
})?;
let trimmed_header = first_line.trim();
if trimmed_header.is_empty() {
return Err(SessionError::Format(format!(
"Rollout file {} missing header line",
original.display()
)));
}
let entries = std::fs::read_dir(dir)
.map_err(|e| format!("Failed to read directory {}: {}", dir.display(), e))?;
let mut meta: Value = serde_json::from_str(trimmed_header).map_err(|e| {
SessionError::Format(format!(
"Failed to parse first line JSON in {}: {e}",
original.display()
))
})?;
let new_session_id = uuid::Uuid::new_v4().to_string();
let destination = Self::create_new_rollout_path(&new_session_id)?;
let dest_file = File::create(&destination).map_err(|e| {
SessionError::Io(format!(
"Failed to create forked rollout {}: {e}",
destination.display()
))
})?;
let mut writer = BufWriter::new(dest_file);
Self::replace_session_id(&mut meta, &new_session_id)?;
let meta_line = serde_json::to_string(&meta)
.map_err(|e| SessionError::Format(format!("Failed to serialize modified meta: {e}")))?;
writeln!(writer, "{meta_line}").map_err(|e| {
SessionError::Io(format!(
"Failed to write meta to {}: {e}",
destination.display()
))
})?;
// write all remaining lines as-is
for line in reader.lines() {
let line = line.map_err(|e| {
SessionError::Io(format!(
"Failed to read line from {}: {e}",
original.display()
))
})?;
writeln!(writer, "{line}").map_err(|e| {
SessionError::Io(format!(
"Failed to write line to {}: {e}",
destination.display()
))
})?;
}
writer.flush().map_err(|e| {
SessionError::Io(format!("Failed to flush {}: {e}", destination.display()))
})?;
Ok((destination, new_session_id))
}
pub(crate) fn replace_session_id(
session_meta: &mut Value,
new_id: &str,
) -> Result<(), SessionError> {
let Value::Object(map) = session_meta else {
return Err(SessionError::Format(
"First line of rollout file is not a JSON object".to_string(),
));
};
let Some(Value::Object(payload)) = map.get_mut("payload") else {
return Err(SessionError::Format(
"Rollout meta payload missing or not an object".to_string(),
));
};
payload.insert("id".to_string(), Value::String(new_id.to_string()));
Self::ensure_required_payload_fields(payload);
Ok(())
}
fn ensure_required_payload_fields(payload: &mut Map<String, Value>) {
if !payload.contains_key("source") {
let Ok(value) = serde_json::to_value(SessionSource::default()) else {
tracing::error!("Failed to serialize default SessionSource");
return;
};
payload.insert("source".to_string(), value);
}
}
fn sessions_root() -> Result<PathBuf, SessionError> {
let home_dir = dirs::home_dir()
.ok_or_else(|| SessionError::Io("Could not determine home directory".to_string()))?;
Ok(home_dir.join(".codex").join("sessions"))
}
fn scan_directory(dir: &Path, session_id: &str) -> Result<PathBuf, SessionError> {
if !dir.exists() {
return Err(SessionError::Io(format!(
"Sessions directory does not exist: {}",
dir.display()
)));
}
let entries = std::fs::read_dir(dir).map_err(|e| {
SessionError::Io(format!("Failed to read directory {}: {e}", dir.display()))
})?;
for entry in entries {
let entry = entry.map_err(|e| format!("Failed to read directory entry: {e}"))?;
let entry = entry
.map_err(|e| SessionError::Io(format!("Failed to read directory entry: {e}")))?;
let path = entry.path();
if path.is_dir() {
// Recursively search subdirectories
if let Ok(found) = Self::scan_directory(&path, session_id) {
return Ok(found);
}
} else if path.is_file()
&& let Some(filename) = path.file_name()
&& let Some(filename_str) = filename.to_str()
&& filename_str.contains(session_id)
&& filename_str.starts_with("rollout-")
&& filename_str.ends_with(".jsonl")
&& path
.file_name()
.and_then(|name| name.to_str())
.is_some_and(|filename| {
filename.contains(session_id)
&& filename.starts_with("rollout-")
&& filename.ends_with(".jsonl")
})
{
return Ok(path);
}
}
Err(format!(
Err(SessionError::NotFound(format!(
"Could not find rollout file for session_id: {session_id}"
))
)))
}
/// Fork a Codex rollout file by copying it to a temp location and assigning a new session id.
/// Returns (new_rollout_path, new_session_id).
///
/// Migration behavior:
/// - If the original header is old format, it is converted to new format on write.
/// - Subsequent lines:
/// - If already new RolloutLine, pass through unchanged.
/// - If object contains "record_type", skip it (ignored in old impl).
/// - Otherwise, wrap as RolloutLine of type "response_item" with payload = original JSON.
pub fn fork_rollout_file(session_id: &str) -> Result<(PathBuf, String), String> {
use std::io::{BufRead, BufReader, Write};
fn create_new_rollout_path(new_session_id: &str) -> Result<PathBuf, SessionError> {
let sessions_root = Self::sessions_root()?;
let now_local = Local::now();
let original = Self::find_rollout_file_path(session_id)?;
let dir = sessions_root
.join(now_local.format("%Y").to_string())
.join(now_local.format("%m").to_string())
.join(now_local.format("%d").to_string());
let file = std::fs::File::open(&original)
.map_err(|e| format!("Failed to open rollout file {}: {e}", original.display()))?;
let mut reader = BufReader::new(file);
let mut first_line = String::new();
reader
.read_line(&mut first_line)
.map_err(|e| format!("Failed to read first line from {}: {e}", original.display()))?;
let mut meta: serde_json::Value = serde_json::from_str(first_line.trim()).map_err(|e| {
format!(
"Failed to parse first line JSON in {}: {e}",
original.display()
)
std::fs::create_dir_all(&dir).map_err(|e| {
SessionError::Io(format!(
"Failed to create sessions directory {}: {e}",
dir.display()
))
})?;
// Generate new UUID for forked session
let new_id = uuid::Uuid::new_v4().to_string();
Self::set_session_id_in_rollout_meta(&mut meta, &new_id)?;
// Prepare destination path in the same directory, following Codex rollout naming convention.
// Always create a fresh filename: rollout-<YYYY>-<MM>-<DD>T<HH>-<mm>-<ss>-<session_id>.jsonl
let parent_dir = original
.parent()
.ok_or_else(|| format!("Unexpected path with no parent: {}", original.display()))?;
let new_filename = Self::new_rollout_filename(&new_id);
let dest = parent_dir.join(new_filename);
// Write new file with modified first line and copy the rest with migration as needed
let mut writer = std::fs::File::create(&dest)
.map_err(|e| format!("Failed to create forked rollout {}: {e}", dest.display()))?;
let meta_line = serde_json::to_string(&meta)
.map_err(|e| format!("Failed to serialize modified meta: {e}"))?;
writeln!(writer, "{meta_line}")
.map_err(|e| format!("Failed to write meta to {}: {e}", dest.display()))?;
// Wrap subsequent lines
for line in reader.lines() {
let line =
line.map_err(|e| format!("I/O error reading {}: {e}", original.display()))?;
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
// Try parse as JSON
let parsed: Result<serde_json::Value, _> = serde_json::from_str(trimmed);
let value = match parsed {
Ok(v) => v,
Err(_) => {
// Skip invalid JSON lines during migration
continue;
}
};
// If already a RolloutLine (has timestamp + type/payload or flattened item), pass through
let is_rollout_line = value.get("timestamp").is_some()
&& (value.get("type").is_some() || value.get("payload").is_some());
if is_rollout_line {
writeln!(writer, "{value}")
.map_err(|e| format!("Failed to write to {}: {e}", dest.display()))?;
continue;
}
// Ignore legacy bookkeeping lines like {"record_type": ...}
if value.get("record_type").is_some() {
continue;
}
// Otherwise, wrap as a new RolloutLine containing a ResponseItem payload
let timestamp = chrono::Utc::now()
.format("%Y-%m-%dT%H:%M:%S%.3fZ")
.to_string();
let envelope = serde_json::json!({
"timestamp": timestamp,
"type": "response_item",
"payload": value,
});
writeln!(writer, "{envelope}")
.map_err(|e| format!("Failed to write to {}: {e}", dest.display()))?;
}
Ok((dest, new_id))
let filename = Self::rollout_filename_from_time(new_session_id, &now_local);
Ok(dir.join(filename))
}
// Update session id inside the first-line JSON meta, supporting both old and new formats.
// - Old format: top-level { "id": "<uuid>", ... } -> convert to new format
// - New format: { "type": "session_meta", "payload": { "id": "<uuid>", ... }, ... }
// If both are somehow present, new format takes precedence.
pub(crate) fn set_session_id_in_rollout_meta(
meta: &mut serde_json::Value,
new_id: &str,
) -> Result<(), String> {
match meta {
serde_json::Value::Object(map) => {
// If already new format, update payload.id and return
if let Some(serde_json::Value::Object(payload)) = map.get_mut("payload") {
payload.insert(
"id".to_string(),
serde_json::Value::String(new_id.to_string()),
);
return Ok(());
}
// Convert old format to new format header
let top_timestamp = map.get("timestamp").cloned();
let instructions = map.get("instructions").cloned();
let git = map.get("git").cloned();
let mut new_top = serde_json::Map::new();
if let Some(ts) = top_timestamp.clone() {
new_top.insert("timestamp".to_string(), ts);
}
new_top.insert(
"type".to_string(),
serde_json::Value::String("session_meta".to_string()),
);
let mut payload = serde_json::Map::new();
payload.insert(
"id".to_string(),
serde_json::Value::String(new_id.to_string()),
);
if let Some(ts) = top_timestamp {
payload.insert("timestamp".to_string(), ts);
}
if let Some(instr) = instructions {
payload.insert("instructions".to_string(), instr);
}
if let Some(git_val) = git {
payload.insert("git".to_string(), git_val);
}
// Required fields in new format: cwd, originator, cli_version
if !payload.contains_key("cwd") {
payload.insert(
"cwd".to_string(),
serde_json::Value::String(".".to_string()),
);
}
if !payload.contains_key("originator") {
payload.insert(
"originator".to_string(),
serde_json::Value::String("vibe_kanban_migrated".to_string()),
);
}
if !payload.contains_key("cli_version") {
payload.insert(
"cli_version".to_string(),
serde_json::Value::String("0.0.0-migrated".to_string()),
);
}
new_top.insert("payload".to_string(), serde_json::Value::Object(payload));
*map = new_top; // replace the old map with the new-format one
Ok(())
}
_ => Err("First line of rollout file is not a JSON object".to_string()),
}
}
// Build a new rollout filename, ignoring any original name.
// Always returns: rollout-<timestamp>-<id>.jsonl
fn new_rollout_filename(new_id: &str) -> String {
let now_ts = chrono::Local::now().format("%Y-%m-%dT%H-%M-%S").to_string();
format!("rollout-{now_ts}-{new_id}.jsonl")
}
}
#[cfg(test)]
mod tests {
use super::SessionHandler;
#[test]
fn test_new_rollout_filename_pattern() {
let id = "ID-123";
let out = SessionHandler::new_rollout_filename(id);
// rollout-YYYY-MM-DDTHH-MM-SS-ID-123.jsonl
let re = regex::Regex::new(r"^rollout-\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}-ID-123\.jsonl$")
.unwrap();
assert!(re.is_match(&out), "Unexpected filename: {out}");
fn rollout_filename_from_time(new_id: &str, now_local: &chrono::DateTime<Local>) -> String {
let ts = now_local.format(FILENAME_TIMESTAMP_FORMAT).to_string();
format!("rollout-{ts}-{new_id}.jsonl")
}
}

View File

@@ -98,10 +98,11 @@ impl NormalizedEntry {
}
}
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[derive(Debug, Clone, Serialize, Deserialize, TS, Default)]
#[ts(export)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum ToolStatus {
#[default]
Created,
Success,
Failed,

View File

@@ -42,7 +42,7 @@ pub fn normalize_stderr_logs(msg_store: Arc<MsgStore>, entry_index_provider: Ent
.normalized_entry_producer(Box::new(|content: String| NormalizedEntry {
timestamp: None,
entry_type: NormalizedEntryType::ErrorMessage,
content,
content: strip_ansi_escapes::strip_str(&content),
metadata: None,
}))
.time_gap(Duration::from_secs(2)) // Break messages if they are 2 seconds apart

View File

@@ -76,6 +76,30 @@
null
]
},
"profile": {
"type": [
"string",
"null"
]
},
"base_instructions": {
"type": [
"string",
"null"
]
},
"include_plan_tool": {
"type": [
"boolean",
"null"
]
},
"include_apply_patch_tool": {
"type": [
"boolean",
"null"
]
},
"base_command_override": {
"title": "Base Command Override",
"description": "Override the base command with a custom command",

View File

@@ -162,7 +162,7 @@ export type GeminiModel = "default" | "flash";
export type Amp = { append_prompt: AppendPrompt, dangerously_allow_all?: boolean | null, base_command_override?: string | null, additional_params?: Array<string> | null, };
export type Codex = { append_prompt: AppendPrompt, sandbox?: SandboxMode | null, oss?: boolean | null, model?: string | null, model_reasoning_effort?: ReasoningEffort | null, model_reasoning_summary?: ReasoningSummary | null, model_reasoning_summary_format?: ReasoningSummaryFormat | null, base_command_override?: string | null, additional_params?: Array<string> | null, };
export type Codex = { append_prompt: AppendPrompt, sandbox?: SandboxMode | null, oss?: boolean | null, model?: string | null, model_reasoning_effort?: ReasoningEffort | null, model_reasoning_summary?: ReasoningSummary | null, model_reasoning_summary_format?: ReasoningSummaryFormat | null, profile?: string | null, base_instructions?: string | null, include_plan_tool?: boolean | null, include_apply_patch_tool?: boolean | null, base_command_override?: string | null, additional_params?: Array<string> | null, };
export type SandboxMode = "auto" | "read-only" | "workspace-write" | "danger-full-access";