Resume support for Gemini-CLI (#119)
This commit is contained in:
@@ -10,6 +10,10 @@ use crate::executors::{
|
|||||||
AmpExecutor, ClaudeExecutor, EchoExecutor, GeminiExecutor, OpencodeExecutor,
|
AmpExecutor, ClaudeExecutor, EchoExecutor, GeminiExecutor, OpencodeExecutor,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Constants for database streaming
|
||||||
|
const STDOUT_UPDATE_THRESHOLD: usize = 1;
|
||||||
|
const BUFFER_SIZE_THRESHOLD: usize = 1024;
|
||||||
|
|
||||||
/// Normalized conversation representation for different executor formats
|
/// Normalized conversation representation for different executor formats
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
|
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
|
||||||
#[ts(export)]
|
#[ts(export)]
|
||||||
@@ -139,6 +143,8 @@ pub enum ExecutorError {
|
|||||||
},
|
},
|
||||||
TaskNotFound,
|
TaskNotFound,
|
||||||
DatabaseError(sqlx::Error),
|
DatabaseError(sqlx::Error),
|
||||||
|
ContextCollectionFailed(String),
|
||||||
|
GitError(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for ExecutorError {
|
impl std::fmt::Display for ExecutorError {
|
||||||
@@ -173,6 +179,10 @@ impl std::fmt::Display for ExecutorError {
|
|||||||
}
|
}
|
||||||
ExecutorError::TaskNotFound => write!(f, "Task not found"),
|
ExecutorError::TaskNotFound => write!(f, "Task not found"),
|
||||||
ExecutorError::DatabaseError(e) => write!(f, "Database error: {}", e),
|
ExecutorError::DatabaseError(e) => write!(f, "Database error: {}", e),
|
||||||
|
ExecutorError::ContextCollectionFailed(msg) => {
|
||||||
|
write!(f, "Context collection failed: {}", msg)
|
||||||
|
}
|
||||||
|
ExecutorError::GitError(msg) => write!(f, "Git operation error: {}", msg),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -185,6 +195,37 @@ impl From<sqlx::Error> for ExecutorError {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<crate::models::task_attempt::TaskAttemptError> for ExecutorError {
|
||||||
|
fn from(err: crate::models::task_attempt::TaskAttemptError) -> Self {
|
||||||
|
match err {
|
||||||
|
crate::models::task_attempt::TaskAttemptError::Database(e) => {
|
||||||
|
ExecutorError::DatabaseError(e)
|
||||||
|
}
|
||||||
|
crate::models::task_attempt::TaskAttemptError::Git(e) => {
|
||||||
|
ExecutorError::GitError(format!("Git operation failed: {}", e))
|
||||||
|
}
|
||||||
|
crate::models::task_attempt::TaskAttemptError::TaskNotFound => {
|
||||||
|
ExecutorError::TaskNotFound
|
||||||
|
}
|
||||||
|
crate::models::task_attempt::TaskAttemptError::ProjectNotFound => {
|
||||||
|
ExecutorError::ContextCollectionFailed("Project not found".to_string())
|
||||||
|
}
|
||||||
|
crate::models::task_attempt::TaskAttemptError::ValidationError(msg) => {
|
||||||
|
ExecutorError::ContextCollectionFailed(format!("Validation failed: {}", msg))
|
||||||
|
}
|
||||||
|
crate::models::task_attempt::TaskAttemptError::BranchNotFound(branch) => {
|
||||||
|
ExecutorError::GitError(format!("Branch '{}' not found", branch))
|
||||||
|
}
|
||||||
|
crate::models::task_attempt::TaskAttemptError::GitService(e) => {
|
||||||
|
ExecutorError::GitError(format!("Git service error: {}", e))
|
||||||
|
}
|
||||||
|
crate::models::task_attempt::TaskAttemptError::GitHubService(e) => {
|
||||||
|
ExecutorError::GitError(format!("GitHub service error: {}", e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl ExecutorError {
|
impl ExecutorError {
|
||||||
/// Create a new SpawnFailed error with context
|
/// Create a new SpawnFailed error with context
|
||||||
pub fn spawn_failed(error: std::io::Error, context: SpawnContext) -> Self {
|
pub fn spawn_failed(error: std::io::Error, context: SpawnContext) -> Self {
|
||||||
@@ -462,8 +503,10 @@ async fn stream_stdout_to_db(
|
|||||||
accumulated_output.push_str(&line);
|
accumulated_output.push_str(&line);
|
||||||
update_counter += 1;
|
update_counter += 1;
|
||||||
|
|
||||||
// Update database every 1 lines or when we have a significant amount of data
|
// Update database every threshold lines or when we have a significant amount of data
|
||||||
if update_counter >= 1 || accumulated_output.len() > 1024 {
|
if update_counter >= STDOUT_UPDATE_THRESHOLD
|
||||||
|
|| accumulated_output.len() > BUFFER_SIZE_THRESHOLD
|
||||||
|
{
|
||||||
if let Err(e) = ExecutionProcess::append_output(
|
if let Err(e) = ExecutionProcess::append_output(
|
||||||
&pool,
|
&pool,
|
||||||
execution_process_id,
|
execution_process_id,
|
||||||
@@ -516,7 +559,8 @@ async fn stream_stderr_to_db(
|
|||||||
let mut reader = BufReader::new(output);
|
let mut reader = BufReader::new(output);
|
||||||
let mut line = String::new();
|
let mut line = String::new();
|
||||||
let mut accumulated_output = String::new();
|
let mut accumulated_output = String::new();
|
||||||
const STDERR_FLUSH_TIMEOUT: Duration = Duration::from_millis(1000); // 1000ms timeout
|
const STDERR_FLUSH_TIMEOUT_MS: u64 = 1000;
|
||||||
|
const STDERR_FLUSH_TIMEOUT: Duration = Duration::from_millis(STDERR_FLUSH_TIMEOUT_MS); // 1000ms timeout
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
line.clear();
|
line.clear();
|
||||||
|
|||||||
@@ -13,12 +13,15 @@ use crate::{
|
|||||||
utils::shell::get_shell_command,
|
utils::shell::get_shell_command,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Constants for configuration
|
||||||
|
const PATTERN_BREAK_TIMEOUT_SECS: u64 = 5;
|
||||||
|
|
||||||
/// An executor that uses Gemini CLI to process tasks
|
/// An executor that uses Gemini CLI to process tasks
|
||||||
pub struct GeminiExecutor;
|
pub struct GeminiExecutor;
|
||||||
|
|
||||||
/// An executor that resumes a Gemini session
|
/// An executor that continues a Gemini task with context from previous execution
|
||||||
pub struct GeminiFollowupExecutor {
|
pub struct GeminiFollowupExecutor {
|
||||||
pub session_id: String,
|
pub attempt_id: Uuid,
|
||||||
pub prompt: String,
|
pub prompt: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -118,6 +121,27 @@ Task title: {}"#,
|
|||||||
attempt_id
|
attempt_id
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Update ExecutorSession with the session_id immediately
|
||||||
|
if let Err(e) = crate::models::executor_session::ExecutorSession::update_session_id(
|
||||||
|
pool,
|
||||||
|
execution_process_id,
|
||||||
|
&attempt_id.to_string(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!(
|
||||||
|
"Failed to update session ID for Gemini execution process {}: {}",
|
||||||
|
execution_process_id,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
tracing::info!(
|
||||||
|
"Updated session ID {} for Gemini execution process {}",
|
||||||
|
attempt_id,
|
||||||
|
execution_process_id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
let mut child = self.spawn(pool, task_id, worktree_path).await?;
|
let mut child = self.spawn(pool, task_id, worktree_path).await?;
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
@@ -175,39 +199,40 @@ Task title: {}"#,
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
match serde_json::from_str::<NormalizedEntry>(trimmed) {
|
// Try to parse as JSON first (for NormalizedEntry format)
|
||||||
Ok(entry) => {
|
if trimmed.starts_with('{') {
|
||||||
entries.push(entry);
|
match serde_json::from_str::<NormalizedEntry>(trimmed) {
|
||||||
}
|
Ok(entry) => {
|
||||||
Err(e) => {
|
entries.push(entry);
|
||||||
tracing::warn!(
|
}
|
||||||
"Failed to parse JSONL line {} in Gemini logs: {} - Line: {}",
|
Err(e) => {
|
||||||
line_num + 1,
|
tracing::warn!(
|
||||||
e,
|
"Failed to parse JSONL line {} in Gemini logs: {} - Line: {}",
|
||||||
trimmed
|
line_num + 1,
|
||||||
);
|
e,
|
||||||
parse_errors.push(format!("Line {}: {}", line_num + 1, e));
|
trimmed
|
||||||
|
);
|
||||||
|
parse_errors.push(format!("Line {}: {}", line_num + 1, e));
|
||||||
|
|
||||||
// If this is clearly a JSONL line (starts with {), create a fallback entry
|
// Create a fallback entry for unrecognized JSON
|
||||||
if trimmed.starts_with('{') {
|
|
||||||
let fallback_entry = NormalizedEntry {
|
let fallback_entry = NormalizedEntry {
|
||||||
timestamp: Some(chrono::Utc::now().to_rfc3339()),
|
timestamp: Some(chrono::Utc::now().to_rfc3339()),
|
||||||
entry_type: NormalizedEntryType::SystemMessage,
|
entry_type: NormalizedEntryType::SystemMessage,
|
||||||
content: format!("Parse error for JSONL line: {}", trimmed),
|
content: format!("Raw output: {}", trimmed),
|
||||||
metadata: None,
|
metadata: None,
|
||||||
};
|
};
|
||||||
entries.push(fallback_entry);
|
entries.push(fallback_entry);
|
||||||
} else {
|
|
||||||
// For non-JSON lines, treat as plain text content
|
|
||||||
let text_entry = NormalizedEntry {
|
|
||||||
timestamp: Some(chrono::Utc::now().to_rfc3339()),
|
|
||||||
entry_type: NormalizedEntryType::AssistantMessage,
|
|
||||||
content: trimmed.to_string(),
|
|
||||||
metadata: None,
|
|
||||||
};
|
|
||||||
entries.push(text_entry);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// For non-JSON lines, treat as plain text content
|
||||||
|
let text_entry = NormalizedEntry {
|
||||||
|
timestamp: Some(chrono::Utc::now().to_rfc3339()),
|
||||||
|
entry_type: NormalizedEntryType::AssistantMessage,
|
||||||
|
content: trimmed.to_string(),
|
||||||
|
metadata: None,
|
||||||
|
};
|
||||||
|
entries.push(text_entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -227,7 +252,7 @@ Task title: {}"#,
|
|||||||
|
|
||||||
Ok(NormalizedConversation {
|
Ok(NormalizedConversation {
|
||||||
entries,
|
entries,
|
||||||
session_id: None, // session_id is not available in the current Gemini implementation
|
session_id: None, // Session ID is managed directly via database, not extracted from logs
|
||||||
executor_type: "gemini".to_string(),
|
executor_type: "gemini".to_string(),
|
||||||
prompt: None,
|
prompt: None,
|
||||||
summary: None,
|
summary: None,
|
||||||
@@ -436,7 +461,8 @@ impl GeminiExecutor {
|
|||||||
if let Some(&next_ch) = chars.peek() {
|
if let Some(&next_ch) = chars.peek() {
|
||||||
let is_capital = next_ch.is_uppercase() && next_ch.is_alphabetic();
|
let is_capital = next_ch.is_uppercase() && next_ch.is_alphabetic();
|
||||||
let is_space = next_ch.is_whitespace();
|
let is_space = next_ch.is_whitespace();
|
||||||
let should_force_break = is_space && last_emit_time.elapsed().as_secs() > 5;
|
let should_force_break =
|
||||||
|
is_space && last_emit_time.elapsed().as_secs() > PATTERN_BREAK_TIMEOUT_SECS;
|
||||||
|
|
||||||
if is_capital || should_force_break {
|
if is_capital || should_force_break {
|
||||||
// Pattern break detected - current segment ends here
|
// Pattern break detected - current segment ends here
|
||||||
@@ -510,20 +536,97 @@ impl GeminiExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
impl GeminiFollowupExecutor {
|
||||||
impl Executor for GeminiFollowupExecutor {
|
async fn load_task(
|
||||||
async fn spawn(
|
|
||||||
&self,
|
&self,
|
||||||
_pool: &sqlx::SqlitePool,
|
pool: &sqlx::SqlitePool,
|
||||||
_task_id: Uuid,
|
task_id: Uuid,
|
||||||
worktree_path: &str,
|
) -> Result<Task, ExecutorError> {
|
||||||
) -> Result<AsyncGroupChild, ExecutorError> {
|
Task::find_by_id(pool, task_id)
|
||||||
// --resume is currently not supported by the gemini-cli. This will error!
|
.await?
|
||||||
// TODO: Check again when this issue has been addressed: https://github.com/google-gemini/gemini-cli/issues/2222
|
.ok_or(ExecutorError::TaskNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
// Use shell command for cross-platform compatibility
|
async fn collect_resume_context(
|
||||||
|
&self,
|
||||||
|
pool: &sqlx::SqlitePool,
|
||||||
|
task: &Task,
|
||||||
|
) -> Result<crate::models::task_attempt::AttemptResumeContext, ExecutorError> {
|
||||||
|
crate::models::task_attempt::TaskAttempt::get_attempt_resume_context(
|
||||||
|
pool,
|
||||||
|
self.attempt_id,
|
||||||
|
task.id,
|
||||||
|
task.project_id,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(ExecutorError::from)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_comprehensive_prompt(
|
||||||
|
&self,
|
||||||
|
task: &Task,
|
||||||
|
resume_context: &crate::models::task_attempt::AttemptResumeContext,
|
||||||
|
) -> String {
|
||||||
|
format!(
|
||||||
|
r#"RESUME CONTEXT FOR CONTINUING TASK
|
||||||
|
|
||||||
|
=== TASK INFORMATION ===
|
||||||
|
Project ID: {}
|
||||||
|
Task ID: {}
|
||||||
|
Task Title: {}
|
||||||
|
Task Description: {}
|
||||||
|
|
||||||
|
=== EXECUTION HISTORY ===
|
||||||
|
The following is the execution history from this task attempt:
|
||||||
|
|
||||||
|
{}
|
||||||
|
|
||||||
|
=== CURRENT CHANGES ===
|
||||||
|
The following git diff shows changes made from the base branch to the current state:
|
||||||
|
|
||||||
|
```diff
|
||||||
|
{}
|
||||||
|
```
|
||||||
|
|
||||||
|
=== CURRENT REQUEST ===
|
||||||
|
{}
|
||||||
|
|
||||||
|
=== INSTRUCTIONS ===
|
||||||
|
You are continuing work on the above task. The execution history shows what has been done previously, and the git diff shows the current state of all changes. Please continue from where the previous execution left off, taking into account all the context provided above.
|
||||||
|
"#,
|
||||||
|
task.project_id,
|
||||||
|
task.id,
|
||||||
|
task.title,
|
||||||
|
task.description
|
||||||
|
.as_deref()
|
||||||
|
.unwrap_or("No description provided"),
|
||||||
|
if resume_context.execution_history.trim().is_empty() {
|
||||||
|
"(No previous execution history)"
|
||||||
|
} else {
|
||||||
|
&resume_context.execution_history
|
||||||
|
},
|
||||||
|
if resume_context.cumulative_diffs.trim().is_empty() {
|
||||||
|
"(No changes detected)"
|
||||||
|
} else {
|
||||||
|
&resume_context.cumulative_diffs
|
||||||
|
},
|
||||||
|
self.prompt
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn spawn_process(
|
||||||
|
&self,
|
||||||
|
worktree_path: &str,
|
||||||
|
comprehensive_prompt: &str,
|
||||||
|
) -> Result<AsyncGroupChild, ExecutorError> {
|
||||||
let (shell_cmd, shell_arg) = get_shell_command();
|
let (shell_cmd, shell_arg) = get_shell_command();
|
||||||
let gemini_command = format!("npx @google/gemini-cli --yolo --resume={}", self.session_id);
|
let gemini_command = "npx @google/gemini-cli --yolo";
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
"Spawning Gemini followup execution for attempt {} with resume context ({} chars)",
|
||||||
|
self.attempt_id,
|
||||||
|
comprehensive_prompt.len()
|
||||||
|
);
|
||||||
|
|
||||||
let mut command = Command::new(shell_cmd);
|
let mut command = Command::new(shell_cmd);
|
||||||
command
|
command
|
||||||
@@ -533,41 +636,157 @@ impl Executor for GeminiFollowupExecutor {
|
|||||||
.stderr(Stdio::piped())
|
.stderr(Stdio::piped())
|
||||||
.current_dir(worktree_path)
|
.current_dir(worktree_path)
|
||||||
.arg(shell_arg)
|
.arg(shell_arg)
|
||||||
.arg(&gemini_command)
|
.arg(gemini_command)
|
||||||
.env("NODE_NO_WARNINGS", "1");
|
.env("NODE_NO_WARNINGS", "1");
|
||||||
|
|
||||||
let mut child = command
|
let mut child = command.group_spawn().map_err(|e| {
|
||||||
.group_spawn() // Create new process group so we can kill entire tree
|
crate::executor::SpawnContext::from_command(&command, "Gemini")
|
||||||
.map_err(|e| {
|
.with_context(format!(
|
||||||
crate::executor::SpawnContext::from_command(&command, "Gemini")
|
"Gemini CLI followup execution with context for attempt {}",
|
||||||
|
self.attempt_id
|
||||||
|
))
|
||||||
|
.spawn_error(e)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
self.send_prompt_to_stdin(&mut child, &command, comprehensive_prompt)
|
||||||
|
.await?;
|
||||||
|
Ok(child)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send_prompt_to_stdin(
|
||||||
|
&self,
|
||||||
|
child: &mut AsyncGroupChild,
|
||||||
|
command: &Command,
|
||||||
|
comprehensive_prompt: &str,
|
||||||
|
) -> Result<(), ExecutorError> {
|
||||||
|
if let Some(mut stdin) = child.inner().stdin.take() {
|
||||||
|
tracing::debug!(
|
||||||
|
"Sending resume context to Gemini for attempt {}: {} characters",
|
||||||
|
self.attempt_id,
|
||||||
|
comprehensive_prompt.len()
|
||||||
|
);
|
||||||
|
|
||||||
|
stdin
|
||||||
|
.write_all(comprehensive_prompt.as_bytes())
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
let context = crate::executor::SpawnContext::from_command(command, "Gemini")
|
||||||
|
.with_context(format!(
|
||||||
|
"Failed to write resume prompt to Gemini CLI stdin for attempt {}",
|
||||||
|
self.attempt_id
|
||||||
|
));
|
||||||
|
ExecutorError::spawn_failed(e, context)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
stdin.shutdown().await.map_err(|e| {
|
||||||
|
let context = crate::executor::SpawnContext::from_command(command, "Gemini")
|
||||||
.with_context(format!(
|
.with_context(format!(
|
||||||
"Gemini CLI followup execution for session {}",
|
"Failed to close Gemini CLI stdin for attempt {}",
|
||||||
self.session_id
|
self.attempt_id
|
||||||
))
|
));
|
||||||
.spawn_error(e)
|
ExecutorError::spawn_failed(e, context)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
// Send the prompt via stdin instead of command line arguments
|
tracing::info!(
|
||||||
// This avoids Windows command line parsing issues
|
"Successfully sent resume context to Gemini for attempt {}",
|
||||||
if let Some(mut stdin) = child.inner().stdin.take() {
|
self.attempt_id
|
||||||
stdin.write_all(self.prompt.as_bytes()).await.map_err(|e| {
|
);
|
||||||
let context = crate::executor::SpawnContext::from_command(&command, "Gemini")
|
|
||||||
.with_context(format!(
|
|
||||||
"Failed to write prompt to Gemini CLI stdin for session {}",
|
|
||||||
self.session_id
|
|
||||||
));
|
|
||||||
ExecutorError::spawn_failed(e, context)
|
|
||||||
})?;
|
|
||||||
stdin.shutdown().await.map_err(|e| {
|
|
||||||
let context = crate::executor::SpawnContext::from_command(&command, "Gemini")
|
|
||||||
.with_context(format!(
|
|
||||||
"Failed to close Gemini CLI stdin for session {}",
|
|
||||||
self.session_id
|
|
||||||
));
|
|
||||||
ExecutorError::spawn_failed(e, context)
|
|
||||||
})?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Executor for GeminiFollowupExecutor {
|
||||||
|
async fn spawn(
|
||||||
|
&self,
|
||||||
|
pool: &sqlx::SqlitePool,
|
||||||
|
task_id: Uuid,
|
||||||
|
worktree_path: &str,
|
||||||
|
) -> Result<AsyncGroupChild, ExecutorError> {
|
||||||
|
let task = self.load_task(pool, task_id).await?;
|
||||||
|
let resume_context = self.collect_resume_context(pool, &task).await?;
|
||||||
|
let comprehensive_prompt = self.build_comprehensive_prompt(&task, &resume_context);
|
||||||
|
self.spawn_process(worktree_path, &comprehensive_prompt)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute_streaming(
|
||||||
|
&self,
|
||||||
|
pool: &sqlx::SqlitePool,
|
||||||
|
task_id: Uuid,
|
||||||
|
attempt_id: Uuid,
|
||||||
|
execution_process_id: Uuid,
|
||||||
|
worktree_path: &str,
|
||||||
|
) -> Result<AsyncGroupChild, ExecutorError> {
|
||||||
|
tracing::info!(
|
||||||
|
"Starting Gemini followup execution for task {} attempt {} with resume context",
|
||||||
|
task_id,
|
||||||
|
attempt_id
|
||||||
|
);
|
||||||
|
|
||||||
|
// Update ExecutorSession with the session_id immediately
|
||||||
|
if let Err(e) = crate::models::executor_session::ExecutorSession::update_session_id(
|
||||||
|
pool,
|
||||||
|
execution_process_id,
|
||||||
|
&self.attempt_id.to_string(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!(
|
||||||
|
"Failed to update session ID for Gemini followup execution process {}: {}",
|
||||||
|
execution_process_id,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
tracing::info!(
|
||||||
|
"Updated session ID {} for Gemini followup execution process {}",
|
||||||
|
self.attempt_id,
|
||||||
|
execution_process_id
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut child = self.spawn(pool, task_id, worktree_path).await?;
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
"Gemini followup process spawned successfully for attempt {}, PID: {:?}",
|
||||||
|
attempt_id,
|
||||||
|
child.inner().id()
|
||||||
|
);
|
||||||
|
|
||||||
|
// Take stdout and stderr pipes for streaming
|
||||||
|
let stdout = child
|
||||||
|
.inner()
|
||||||
|
.stdout
|
||||||
|
.take()
|
||||||
|
.expect("Failed to take stdout from child process");
|
||||||
|
let stderr = child
|
||||||
|
.inner()
|
||||||
|
.stderr
|
||||||
|
.take()
|
||||||
|
.expect("Failed to take stderr from child process");
|
||||||
|
|
||||||
|
// Start streaming tasks with Gemini-specific line-based message updates
|
||||||
|
let pool_clone1 = pool.clone();
|
||||||
|
let pool_clone2 = pool.clone();
|
||||||
|
|
||||||
|
tokio::spawn(GeminiExecutor::stream_gemini_with_lines(
|
||||||
|
stdout,
|
||||||
|
pool_clone1,
|
||||||
|
attempt_id,
|
||||||
|
execution_process_id,
|
||||||
|
true,
|
||||||
|
));
|
||||||
|
// Use default stderr streaming (no custom parsing)
|
||||||
|
tokio::spawn(crate::executor::stream_output_to_db(
|
||||||
|
stderr,
|
||||||
|
pool_clone2,
|
||||||
|
attempt_id,
|
||||||
|
execution_process_id,
|
||||||
|
false,
|
||||||
|
));
|
||||||
|
|
||||||
Ok(child)
|
Ok(child)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -14,6 +14,10 @@ use crate::services::{
|
|||||||
GitServiceError, ProcessService,
|
GitServiceError, ProcessService,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Constants for git diff operations
|
||||||
|
const GIT_DIFF_CONTEXT_LINES: u32 = 3;
|
||||||
|
const GIT_DIFF_INTERHUNK_LINES: u32 = 0;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum TaskAttemptError {
|
pub enum TaskAttemptError {
|
||||||
Database(sqlx::Error),
|
Database(sqlx::Error),
|
||||||
@@ -23,6 +27,7 @@ pub enum TaskAttemptError {
|
|||||||
TaskNotFound,
|
TaskNotFound,
|
||||||
ProjectNotFound,
|
ProjectNotFound,
|
||||||
ValidationError(String),
|
ValidationError(String),
|
||||||
|
BranchNotFound(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for TaskAttemptError {
|
impl std::fmt::Display for TaskAttemptError {
|
||||||
@@ -35,6 +40,7 @@ impl std::fmt::Display for TaskAttemptError {
|
|||||||
TaskAttemptError::TaskNotFound => write!(f, "Task not found"),
|
TaskAttemptError::TaskNotFound => write!(f, "Task not found"),
|
||||||
TaskAttemptError::ProjectNotFound => write!(f, "Project not found"),
|
TaskAttemptError::ProjectNotFound => write!(f, "Project not found"),
|
||||||
TaskAttemptError::ValidationError(e) => write!(f, "Validation error: {}", e),
|
TaskAttemptError::ValidationError(e) => write!(f, "Validation error: {}", e),
|
||||||
|
TaskAttemptError::BranchNotFound(branch) => write!(f, "Branch '{}' not found", branch),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -191,6 +197,13 @@ pub struct TaskAttemptState {
|
|||||||
pub coding_agent_process_id: Option<String>,
|
pub coding_agent_process_id: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Context data for resume operations (simplified)
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct AttemptResumeContext {
|
||||||
|
pub execution_history: String,
|
||||||
|
pub cumulative_diffs: String,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct TaskAttemptContext {
|
pub struct TaskAttemptContext {
|
||||||
pub task_attempt: TaskAttempt,
|
pub task_attempt: TaskAttempt,
|
||||||
@@ -1043,4 +1056,112 @@ impl TaskAttempt {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get execution history from current attempt only (simplified)
|
||||||
|
pub async fn get_attempt_execution_history(
|
||||||
|
pool: &SqlitePool,
|
||||||
|
attempt_id: Uuid,
|
||||||
|
) -> Result<String, TaskAttemptError> {
|
||||||
|
// Get all coding agent processes for this attempt
|
||||||
|
let processes =
|
||||||
|
crate::models::execution_process::ExecutionProcess::find_by_task_attempt_id(
|
||||||
|
pool, attempt_id,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// Filter to coding agent processes only and aggregate stdout
|
||||||
|
let coding_processes: Vec<_> = processes
|
||||||
|
.into_iter()
|
||||||
|
.filter(|p| {
|
||||||
|
matches!(
|
||||||
|
p.process_type,
|
||||||
|
crate::models::execution_process::ExecutionProcessType::CodingAgent
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut history = String::new();
|
||||||
|
for process in coding_processes {
|
||||||
|
if let Some(stdout) = process.stdout {
|
||||||
|
if !stdout.trim().is_empty() {
|
||||||
|
history.push_str(&stdout);
|
||||||
|
history.push('\n');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(history)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get diff between base_branch and current attempt (simplified)
|
||||||
|
pub async fn get_attempt_diff(
|
||||||
|
pool: &SqlitePool,
|
||||||
|
attempt_id: Uuid,
|
||||||
|
project_id: Uuid,
|
||||||
|
) -> Result<String, TaskAttemptError> {
|
||||||
|
// Get the task attempt with base_branch
|
||||||
|
let attempt = Self::find_by_id(pool, attempt_id)
|
||||||
|
.await?
|
||||||
|
.ok_or(TaskAttemptError::TaskNotFound)?;
|
||||||
|
|
||||||
|
// Get the project
|
||||||
|
let project = Project::find_by_id(pool, project_id)
|
||||||
|
.await?
|
||||||
|
.ok_or(TaskAttemptError::ProjectNotFound)?;
|
||||||
|
|
||||||
|
// Open the main repository
|
||||||
|
let repo = Repository::open(&project.git_repo_path)?;
|
||||||
|
|
||||||
|
// Get base branch commit
|
||||||
|
let base_branch = repo
|
||||||
|
.find_branch(&attempt.base_branch, git2::BranchType::Local)
|
||||||
|
.map_err(|_| TaskAttemptError::BranchNotFound(attempt.base_branch.clone()))?;
|
||||||
|
let base_commit = base_branch.get().peel_to_commit()?;
|
||||||
|
|
||||||
|
// Get current branch commit
|
||||||
|
let current_branch = repo
|
||||||
|
.find_branch(&attempt.branch, git2::BranchType::Local)
|
||||||
|
.map_err(|_| TaskAttemptError::BranchNotFound(attempt.branch.clone()))?;
|
||||||
|
let current_commit = current_branch.get().peel_to_commit()?;
|
||||||
|
|
||||||
|
// Create diff between base and current
|
||||||
|
let base_tree = base_commit.tree()?;
|
||||||
|
let current_tree = current_commit.tree()?;
|
||||||
|
|
||||||
|
let mut diff_opts = git2::DiffOptions::new();
|
||||||
|
diff_opts.context_lines(GIT_DIFF_CONTEXT_LINES);
|
||||||
|
diff_opts.interhunk_lines(GIT_DIFF_INTERHUNK_LINES);
|
||||||
|
|
||||||
|
let diff =
|
||||||
|
repo.diff_tree_to_tree(Some(&base_tree), Some(¤t_tree), Some(&mut diff_opts))?;
|
||||||
|
|
||||||
|
// Convert to text format
|
||||||
|
let mut diff_text = String::new();
|
||||||
|
diff.print(git2::DiffFormat::Patch, |_delta, _hunk, line| {
|
||||||
|
let content = std::str::from_utf8(line.content()).unwrap_or("");
|
||||||
|
diff_text.push_str(&format!("{}{}", line.origin(), content));
|
||||||
|
true
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Ok(diff_text)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get comprehensive resume context for Gemini followup execution (simplified)
|
||||||
|
pub async fn get_attempt_resume_context(
|
||||||
|
pool: &SqlitePool,
|
||||||
|
attempt_id: Uuid,
|
||||||
|
_task_id: Uuid,
|
||||||
|
project_id: Uuid,
|
||||||
|
) -> Result<AttemptResumeContext, TaskAttemptError> {
|
||||||
|
// Get execution history from current attempt only
|
||||||
|
let execution_history = Self::get_attempt_execution_history(pool, attempt_id).await?;
|
||||||
|
|
||||||
|
// Get diff between base_branch and current attempt
|
||||||
|
let cumulative_diffs = Self::get_attempt_diff(pool, attempt_id, project_id).await?;
|
||||||
|
|
||||||
|
Ok(AttemptResumeContext {
|
||||||
|
execution_history,
|
||||||
|
cumulative_diffs,
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -827,14 +827,11 @@ impl ProcessService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
crate::executor::ExecutorConfig::Gemini => {
|
crate::executor::ExecutorConfig::Gemini => {
|
||||||
if let Some(sid) = session_id {
|
// For Gemini, we don't use real session IDs, we pass the context directly
|
||||||
Box::new(GeminiFollowupExecutor {
|
Box::new(GeminiFollowupExecutor {
|
||||||
session_id: sid.clone(),
|
attempt_id,
|
||||||
prompt: prompt.clone(),
|
prompt: prompt.clone(),
|
||||||
})
|
})
|
||||||
} else {
|
|
||||||
return Err(TaskAttemptError::TaskNotFound); // No session ID for followup
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
crate::executor::ExecutorConfig::Echo => {
|
crate::executor::ExecutorConfig::Echo => {
|
||||||
// Echo doesn't support followup, use regular echo
|
// Echo doesn't support followup, use regular echo
|
||||||
|
|||||||
@@ -32,14 +32,14 @@ export function MarkdownRenderer({
|
|||||||
</em>
|
</em>
|
||||||
),
|
),
|
||||||
p: ({ children, ...props }) => (
|
p: ({ children, ...props }) => (
|
||||||
<p {...props} className="mb-0 last:mb-0 leading-tight">
|
<p {...props} className="mb-4 last:mb-0 leading-loose">
|
||||||
{children}
|
{children}
|
||||||
</p>
|
</p>
|
||||||
),
|
),
|
||||||
h1: ({ children, ...props }) => (
|
h1: ({ children, ...props }) => (
|
||||||
<h1
|
<h1
|
||||||
{...props}
|
{...props}
|
||||||
className="text-lg font-bold mb-0 mt-1 first:mt-0 leading-tight"
|
className="text-lg font-bold mb-4 mt-6 first:mt-0 leading-relaxed"
|
||||||
>
|
>
|
||||||
{children}
|
{children}
|
||||||
</h1>
|
</h1>
|
||||||
@@ -47,7 +47,7 @@ export function MarkdownRenderer({
|
|||||||
h2: ({ children, ...props }) => (
|
h2: ({ children, ...props }) => (
|
||||||
<h2
|
<h2
|
||||||
{...props}
|
{...props}
|
||||||
className="text-base font-bold mb-0 mt-1 first:mt-0 leading-tight"
|
className="text-base font-bold mb-3 mt-5 first:mt-0 leading-relaxed"
|
||||||
>
|
>
|
||||||
{children}
|
{children}
|
||||||
</h2>
|
</h2>
|
||||||
@@ -55,23 +55,23 @@ export function MarkdownRenderer({
|
|||||||
h3: ({ children, ...props }) => (
|
h3: ({ children, ...props }) => (
|
||||||
<h3
|
<h3
|
||||||
{...props}
|
{...props}
|
||||||
className="text-sm font-bold mb-0 mt-1 first:mt-0 leading-tight"
|
className="text-sm font-bold mb-3 mt-4 first:mt-0 leading-relaxed"
|
||||||
>
|
>
|
||||||
{children}
|
{children}
|
||||||
</h3>
|
</h3>
|
||||||
),
|
),
|
||||||
ul: ({ children, ...props }) => (
|
ul: ({ children, ...props }) => (
|
||||||
<ul {...props} className="list-disc ml-4 mb-0 -space-y-1">
|
<ul {...props} className="list-disc ml-4 mb-2 space-y-1">
|
||||||
{children}
|
{children}
|
||||||
</ul>
|
</ul>
|
||||||
),
|
),
|
||||||
ol: ({ children, ...props }) => (
|
ol: ({ children, ...props }) => (
|
||||||
<ol {...props} className="list-decimal ml-4 mb-0 -space-y-1">
|
<ol {...props} className="list-decimal ml-4 mb-2 space-y-1">
|
||||||
{children}
|
{children}
|
||||||
</ol>
|
</ol>
|
||||||
),
|
),
|
||||||
li: ({ children, ...props }) => (
|
li: ({ children, ...props }) => (
|
||||||
<li {...props} className="mb-0 leading-tight -my-0.5">
|
<li {...props} className="mb-1 leading-relaxed">
|
||||||
{children}
|
{children}
|
||||||
</li>
|
</li>
|
||||||
),
|
),
|
||||||
|
|||||||
Reference in New Issue
Block a user