Checkpoint restore feature (#607)

This commit is contained in:
Solomon
2025-09-04 15:11:41 +01:00
committed by GitHub
parent 6c9d098216
commit 18a9ff770e
34 changed files with 1879 additions and 195 deletions

View File

@@ -79,10 +79,43 @@ impl StandardCodingAgentExecutor for Amp {
) -> Result<AsyncGroupChild, ExecutorError> {
// Use shell command for cross-platform compatibility
let (shell_cmd, shell_arg) = get_shell_command();
let amp_command = self.build_command_builder().build_follow_up(&[
// 1) Fork the thread synchronously to obtain new thread id
let fork_cmd = self.build_command_builder().build_follow_up(&[
"threads".to_string(),
"fork".to_string(),
session_id.to_string(),
]);
let fork_output = Command::new(shell_cmd)
.kill_on_drop(true)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&fork_cmd)
.output()
.await?;
let stdout_str = String::from_utf8_lossy(&fork_output.stdout);
let new_thread_id = stdout_str
.lines()
.rev()
.find(|l| !l.trim().is_empty())
.unwrap_or("")
.trim()
.to_string();
if new_thread_id.is_empty() {
return Err(ExecutorError::Io(std::io::Error::other(
"AMP threads fork did not return a thread id",
)));
}
tracing::debug!("AMP threads fork -> new thread id: {}", new_thread_id);
// 2) Continue using the new thread id
let continue_cmd = self.build_command_builder().build_follow_up(&[
"threads".to_string(),
"continue".to_string(),
session_id.to_string(),
new_thread_id.clone(),
]);
let combined_prompt = utils::text::combine_prompt(&self.append_prompt, prompt);
@@ -95,7 +128,7 @@ impl StandardCodingAgentExecutor for Amp {
.stderr(Stdio::piped())
.current_dir(current_dir)
.arg(shell_arg)
.arg(&amp_command);
.arg(&continue_cmd);
let mut child = command.group_spawn()?;

View File

@@ -117,6 +117,77 @@ impl SessionHandler {
"Could not find rollout file for session_id: {session_id}"
))
}
/// Fork a Codex rollout file by copying it to a temp location and assigning a new session id.
/// Returns (new_rollout_path, new_session_id).
pub fn fork_rollout_file(session_id: &str) -> Result<(PathBuf, String), String> {
use std::io::{BufRead, BufReader, Write};
let original = Self::find_rollout_file_path(session_id)?;
let file = std::fs::File::open(&original)
.map_err(|e| format!("Failed to open rollout file {}: {e}", original.display()))?;
let mut reader = BufReader::new(file);
let mut first_line = String::new();
reader
.read_line(&mut first_line)
.map_err(|e| format!("Failed to read first line from {}: {e}", original.display()))?;
let mut meta: serde_json::Value = serde_json::from_str(first_line.trim()).map_err(|e| {
format!(
"Failed to parse first line JSON in {}: {e}",
original.display()
)
})?;
// Generate new UUID for forked session
let new_id = uuid::Uuid::new_v4().to_string();
if let serde_json::Value::Object(ref mut map) = meta {
map.insert("id".to_string(), serde_json::Value::String(new_id.clone()));
} else {
return Err("First line of rollout file is not a JSON object".to_string());
}
// Prepare destination path in the same directory, following Codex rollout naming convention:
// rollout-<YYYY>-<MM>-<DD>T<HH>-<mm>-<ss>-<session_id>.jsonl
let parent_dir = original
.parent()
.ok_or_else(|| format!("Unexpected path with no parent: {}", original.display()))?;
let filename = original
.file_name()
.and_then(|s| s.to_str())
.unwrap_or("rollout.jsonl");
let new_filename = if filename.starts_with("rollout-") && filename.ends_with(".jsonl") {
let stem = &filename[..filename.len() - ".jsonl".len()];
if let Some(idx) = stem.rfind('-') {
// Replace the trailing session id with the new id, keep timestamp intact
format!("{}-{}.jsonl", &stem[..idx], new_id)
} else {
format!("rollout-{new_id}.jsonl")
}
} else {
format!("rollout-{new_id}.jsonl")
};
let dest = parent_dir.join(new_filename);
// Write new file with modified first line and copy the rest as-is
let mut writer = std::fs::File::create(&dest)
.map_err(|e| format!("Failed to create forked rollout {}: {e}", dest.display()))?;
let meta_line = serde_json::to_string(&meta)
.map_err(|e| format!("Failed to serialize modified meta: {e}"))?;
writeln!(writer, "{meta_line}")
.map_err(|e| format!("Failed to write meta to {}: {e}", dest.display()))?;
for line in reader.lines() {
let line =
line.map_err(|e| format!("I/O error reading {}: {e}", original.display()))?;
writeln!(writer, "{line}")
.map_err(|e| format!("Failed to write to {}: {e}", dest.display()))?;
}
Ok((dest, new_id))
}
}
/// An executor that uses Codex CLI to process tasks
@@ -196,11 +267,9 @@ impl StandardCodingAgentExecutor for Codex {
prompt: &str,
session_id: &str,
) -> Result<AsyncGroupChild, ExecutorError> {
// Find the rollout file for the given session_id using SessionHandler
let rollout_file_path =
SessionHandler::find_rollout_file_path(session_id).map_err(|e| {
ExecutorError::SpawnError(std::io::Error::new(std::io::ErrorKind::NotFound, e))
})?;
// Fork rollout: copy and assign a new session id so each execution has a unique session
let (rollout_file_path, _new_session_id) = SessionHandler::fork_rollout_file(session_id)
.map_err(|e| ExecutorError::SpawnError(std::io::Error::other(e)))?;
let (shell_cmd, shell_arg) = get_shell_command();
let codex_command = self.build_command_builder().build_follow_up(&[

View File

@@ -27,6 +27,12 @@ pub mod gemini;
pub mod opencode;
pub mod qwen;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, TS)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum BaseAgentCapability {
RestoreCheckpoint,
}
#[derive(Debug, Error)]
pub enum ExecutorError {
#[error("Follow-up is not supported: {0}")]
@@ -125,6 +131,15 @@ impl CodingAgent {
pub fn supports_mcp(&self) -> bool {
self.default_mcp_config_path().is_some()
}
pub fn capabilities(&self) -> Vec<BaseAgentCapability> {
match self {
Self::ClaudeCode(_) => vec![BaseAgentCapability::RestoreCheckpoint],
Self::Amp(_) => vec![BaseAgentCapability::RestoreCheckpoint],
Self::Codex(_) => vec![BaseAgentCapability::RestoreCheckpoint],
Self::Gemini(_) | Self::Opencode(_) | Self::Cursor(_) | Self::QwenCode(_) => vec![],
}
}
}
#[async_trait]