diff --git a/crates/db/.sqlx/query-1d406258fa90610bddb8973e25fd9dc4f59b0769d943d2cc74d9008e68670f3e.json b/crates/db/.sqlx/query-1d406258fa90610bddb8973e25fd9dc4f59b0769d943d2cc74d9008e68670f3e.json new file mode 100644 index 00000000..3e036d59 --- /dev/null +++ b/crates/db/.sqlx/query-1d406258fa90610bddb8973e25fd9dc4f59b0769d943d2cc74d9008e68670f3e.json @@ -0,0 +1,74 @@ +{ + "db_name": "SQLite", + "query": "SELECT \n id as \"id!: Uuid\",\n task_attempt_id as \"task_attempt_id!: Uuid\",\n prompt as \"prompt!: String\",\n queued as \"queued!: bool\",\n sending as \"sending!: bool\",\n variant,\n image_ids as \"image_ids?: String\",\n created_at as \"created_at!: DateTime\",\n updated_at as \"updated_at!: DateTime\",\n version as \"version!: i64\"\n FROM follow_up_drafts\n WHERE rowid = $1", + "describe": { + "columns": [ + { + "name": "id!: Uuid", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "task_attempt_id!: Uuid", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "prompt!: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "queued!: bool", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "sending!: bool", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "variant", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "image_ids?: String", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "created_at!: DateTime", + "ordinal": 7, + "type_info": "Datetime" + }, + { + "name": "updated_at!: DateTime", + "ordinal": 8, + "type_info": "Datetime" + }, + { + "name": "version!: i64", + "ordinal": 9, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + true, + false, + false, + false, + false, + true, + true, + false, + false, + false + ] + }, + "hash": "1d406258fa90610bddb8973e25fd9dc4f59b0769d943d2cc74d9008e68670f3e" +} diff --git a/crates/db/.sqlx/query-417b6e6333eb2164b4cb1d9869cf786f34fa0219b30461234c47a869945c2a79.json b/crates/db/.sqlx/query-417b6e6333eb2164b4cb1d9869cf786f34fa0219b30461234c47a869945c2a79.json new file mode 100644 index 00000000..d6c84afb --- /dev/null +++ b/crates/db/.sqlx/query-417b6e6333eb2164b4cb1d9869cf786f34fa0219b30461234c47a869945c2a79.json @@ -0,0 +1,74 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO follow_up_drafts (id, task_attempt_id, prompt, queued, variant, image_ids)\n VALUES ($1, $2, $3, $4, $5, $6)\n ON CONFLICT(task_attempt_id) DO UPDATE SET\n prompt = excluded.prompt,\n queued = excluded.queued,\n variant = excluded.variant,\n image_ids = excluded.image_ids\n RETURNING \n id as \"id!: Uuid\",\n task_attempt_id as \"task_attempt_id!: Uuid\",\n prompt as \"prompt!: String\",\n queued as \"queued!: bool\",\n sending as \"sending!: bool\",\n variant,\n image_ids as \"image_ids?: String\",\n created_at as \"created_at!: DateTime\",\n updated_at as \"updated_at!: DateTime\",\n version as \"version!: i64\"", + "describe": { + "columns": [ + { + "name": "id!: Uuid", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "task_attempt_id!: Uuid", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "prompt!: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "queued!: bool", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "sending!: bool", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "variant", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "image_ids?: String", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "created_at!: DateTime", + "ordinal": 7, + "type_info": "Datetime" + }, + { + "name": "updated_at!: DateTime", + "ordinal": 8, + "type_info": "Datetime" + }, + { + "name": "version!: i64", + "ordinal": 9, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 6 + }, + "nullable": [ + true, + false, + false, + false, + false, + true, + true, + false, + false, + false + ] + }, + "hash": "417b6e6333eb2164b4cb1d9869cf786f34fa0219b30461234c47a869945c2a79" +} diff --git a/crates/db/.sqlx/query-9778726648c310caa65a00d31e7f9ecc38ca88b7536300143a889eda327ed1a4.json b/crates/db/.sqlx/query-9778726648c310caa65a00d31e7f9ecc38ca88b7536300143a889eda327ed1a4.json new file mode 100644 index 00000000..1c152df1 --- /dev/null +++ b/crates/db/.sqlx/query-9778726648c310caa65a00d31e7f9ecc38ca88b7536300143a889eda327ed1a4.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "UPDATE follow_up_drafts\n SET sending = 1, updated_at = CURRENT_TIMESTAMP, version = version + 1\n WHERE task_attempt_id = $1\n AND queued = 1\n AND sending = 0\n AND TRIM(prompt) != ''", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "9778726648c310caa65a00d31e7f9ecc38ca88b7536300143a889eda327ed1a4" +} diff --git a/crates/db/.sqlx/query-9966caaf5d4427190b812b20bd76e5370bc0b0ba877192ac487ff7ba487b0fa1.json b/crates/db/.sqlx/query-9966caaf5d4427190b812b20bd76e5370bc0b0ba877192ac487ff7ba487b0fa1.json new file mode 100644 index 00000000..e29c44d5 --- /dev/null +++ b/crates/db/.sqlx/query-9966caaf5d4427190b812b20bd76e5370bc0b0ba877192ac487ff7ba487b0fa1.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO task_images (id, task_id, image_id)\n SELECT $1, $2, $3\n WHERE NOT EXISTS (\n SELECT 1 FROM task_images WHERE task_id = $2 AND image_id = $3\n )", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "9966caaf5d4427190b812b20bd76e5370bc0b0ba877192ac487ff7ba487b0fa1" +} diff --git a/crates/db/.sqlx/query-c98097bb6edac80896cf320ca9f670f18db291bf4d626923b63dde3445fb4a3d.json b/crates/db/.sqlx/query-c98097bb6edac80896cf320ca9f670f18db291bf4d626923b63dde3445fb4a3d.json new file mode 100644 index 00000000..b7463496 --- /dev/null +++ b/crates/db/.sqlx/query-c98097bb6edac80896cf320ca9f670f18db291bf4d626923b63dde3445fb4a3d.json @@ -0,0 +1,74 @@ +{ + "db_name": "SQLite", + "query": "SELECT \n id as \"id!: Uuid\",\n task_attempt_id as \"task_attempt_id!: Uuid\",\n prompt as \"prompt!: String\",\n queued as \"queued!: bool\",\n sending as \"sending!: bool\",\n variant,\n image_ids as \"image_ids?: String\",\n created_at as \"created_at!: DateTime\",\n updated_at as \"updated_at!: DateTime\",\n version as \"version!: i64\"\n FROM follow_up_drafts\n WHERE task_attempt_id = $1", + "describe": { + "columns": [ + { + "name": "id!: Uuid", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "task_attempt_id!: Uuid", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "prompt!: String", + "ordinal": 2, + "type_info": "Text" + }, + { + "name": "queued!: bool", + "ordinal": 3, + "type_info": "Integer" + }, + { + "name": "sending!: bool", + "ordinal": 4, + "type_info": "Integer" + }, + { + "name": "variant", + "ordinal": 5, + "type_info": "Text" + }, + { + "name": "image_ids?: String", + "ordinal": 6, + "type_info": "Text" + }, + { + "name": "created_at!: DateTime", + "ordinal": 7, + "type_info": "Datetime" + }, + { + "name": "updated_at!: DateTime", + "ordinal": 8, + "type_info": "Datetime" + }, + { + "name": "version!: i64", + "ordinal": 9, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + true, + false, + false, + false, + false, + true, + true, + false, + false, + false + ] + }, + "hash": "c98097bb6edac80896cf320ca9f670f18db291bf4d626923b63dde3445fb4a3d" +} diff --git a/crates/db/.sqlx/query-d3bdec518c805d8eeb37c2c7d782ce05f7dd1d4df18dab306e91d83f874efe90.json b/crates/db/.sqlx/query-d3bdec518c805d8eeb37c2c7d782ce05f7dd1d4df18dab306e91d83f874efe90.json new file mode 100644 index 00000000..156b6b0e --- /dev/null +++ b/crates/db/.sqlx/query-d3bdec518c805d8eeb37c2c7d782ce05f7dd1d4df18dab306e91d83f874efe90.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "UPDATE follow_up_drafts \n SET prompt = '', queued = 0, sending = 0, image_ids = NULL, updated_at = CURRENT_TIMESTAMP, version = version + 1\n WHERE task_attempt_id = $1", + "describe": { + "columns": [], + "parameters": { + "Right": 1 + }, + "nullable": [] + }, + "hash": "d3bdec518c805d8eeb37c2c7d782ce05f7dd1d4df18dab306e91d83f874efe90" +} diff --git a/crates/db/migrations/20250906120000_add_follow_up_drafts.sql b/crates/db/migrations/20250906120000_add_follow_up_drafts.sql new file mode 100644 index 00000000..dd94fc17 --- /dev/null +++ b/crates/db/migrations/20250906120000_add_follow_up_drafts.sql @@ -0,0 +1,27 @@ +-- Follow-up drafts per task attempt +-- Stores a single draft prompt that can be queued for the next available run + +CREATE TABLE IF NOT EXISTS follow_up_drafts ( + id TEXT PRIMARY KEY, + task_attempt_id TEXT NOT NULL UNIQUE, + prompt TEXT NOT NULL DEFAULT '', + queued INTEGER NOT NULL DEFAULT 0, + sending INTEGER NOT NULL DEFAULT 0, + version INTEGER NOT NULL DEFAULT 0, + variant TEXT NULL, + image_ids TEXT NULL, -- JSON array of UUID strings + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY(task_attempt_id) REFERENCES task_attempts(id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_follow_up_drafts_task_attempt_id + ON follow_up_drafts(task_attempt_id); + +-- Trigger to keep updated_at current +CREATE TRIGGER IF NOT EXISTS trg_follow_up_drafts_updated_at +AFTER UPDATE ON follow_up_drafts +FOR EACH ROW +BEGIN + UPDATE follow_up_drafts SET updated_at = CURRENT_TIMESTAMP WHERE id = OLD.id; +END; diff --git a/crates/db/src/models/follow_up_draft.rs b/crates/db/src/models/follow_up_draft.rs new file mode 100644 index 00000000..e87f30af --- /dev/null +++ b/crates/db/src/models/follow_up_draft.rs @@ -0,0 +1,195 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::{FromRow, SqlitePool}; +use ts_rs::TS; +use uuid::Uuid; + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +pub struct FollowUpDraft { + pub id: Uuid, + pub task_attempt_id: Uuid, + pub prompt: String, + pub queued: bool, + pub sending: bool, + pub variant: Option, + // Stored as JSON in the DB; serde handles Uuid <-> string in JSON + #[serde(skip_serializing_if = "Option::is_none")] + pub image_ids: Option>, + pub created_at: DateTime, + pub updated_at: DateTime, + pub version: i64, +} + +#[derive(Debug, Clone, FromRow)] +struct FollowUpDraftRow { + pub id: Uuid, + pub task_attempt_id: Uuid, + pub prompt: String, + pub queued: bool, + pub sending: bool, + pub variant: Option, + pub image_ids: Option, + pub created_at: DateTime, + pub updated_at: DateTime, + pub version: i64, +} + +impl From for FollowUpDraft { + fn from(r: FollowUpDraftRow) -> Self { + let image_ids = r + .image_ids + .as_deref() + .and_then(|s| serde_json::from_str::>(s).ok()); + FollowUpDraft { + id: r.id, + task_attempt_id: r.task_attempt_id, + prompt: r.prompt, + queued: r.queued, + sending: r.sending, + variant: r.variant, + image_ids, + created_at: r.created_at, + updated_at: r.updated_at, + version: r.version, + } + } +} + +#[derive(Debug, Deserialize, TS)] +pub struct UpsertFollowUpDraft { + pub task_attempt_id: Uuid, + pub prompt: String, + pub queued: bool, + pub variant: Option, + pub image_ids: Option>, +} + +impl FollowUpDraft { + pub async fn find_by_rowid(pool: &SqlitePool, rowid: i64) -> Result, sqlx::Error> { + sqlx::query_as!( + FollowUpDraftRow, + r#"SELECT + id as "id!: Uuid", + task_attempt_id as "task_attempt_id!: Uuid", + prompt as "prompt!: String", + queued as "queued!: bool", + sending as "sending!: bool", + variant, + image_ids as "image_ids?: String", + created_at as "created_at!: DateTime", + updated_at as "updated_at!: DateTime", + version as "version!: i64" + FROM follow_up_drafts + WHERE rowid = $1"#, + rowid + ) + .fetch_optional(pool) + .await + .map(|opt| opt.map(FollowUpDraft::from)) + } + pub async fn find_by_task_attempt_id( + pool: &SqlitePool, + task_attempt_id: Uuid, + ) -> Result, sqlx::Error> { + sqlx::query_as!( + FollowUpDraftRow, + r#"SELECT + id as "id!: Uuid", + task_attempt_id as "task_attempt_id!: Uuid", + prompt as "prompt!: String", + queued as "queued!: bool", + sending as "sending!: bool", + variant, + image_ids as "image_ids?: String", + created_at as "created_at!: DateTime", + updated_at as "updated_at!: DateTime", + version as "version!: i64" + FROM follow_up_drafts + WHERE task_attempt_id = $1"#, + task_attempt_id + ) + .fetch_optional(pool) + .await + .map(|opt| opt.map(FollowUpDraft::from)) + } + + pub async fn upsert( + pool: &SqlitePool, + data: &UpsertFollowUpDraft, + ) -> Result { + let id = Uuid::new_v4(); + { + let image_ids_json = data + .image_ids + .as_ref() + .map(|ids| serde_json::to_string(ids).unwrap_or_else(|_| "[]".to_string())); + + sqlx::query_as!( + FollowUpDraftRow, + r#"INSERT INTO follow_up_drafts (id, task_attempt_id, prompt, queued, variant, image_ids) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT(task_attempt_id) DO UPDATE SET + prompt = excluded.prompt, + queued = excluded.queued, + variant = excluded.variant, + image_ids = excluded.image_ids + RETURNING + id as "id!: Uuid", + task_attempt_id as "task_attempt_id!: Uuid", + prompt as "prompt!: String", + queued as "queued!: bool", + sending as "sending!: bool", + variant, + image_ids as "image_ids?: String", + created_at as "created_at!: DateTime", + updated_at as "updated_at!: DateTime", + version as "version!: i64""#, + id, + data.task_attempt_id, + data.prompt, + data.queued, + data.variant, + image_ids_json + ) + .fetch_one(pool) + .await + .map(FollowUpDraft::from) + } + } + + pub async fn clear_after_send( + pool: &SqlitePool, + task_attempt_id: Uuid, + ) -> Result<(), sqlx::Error> { + sqlx::query!( + r#"UPDATE follow_up_drafts + SET prompt = '', queued = 0, sending = 0, image_ids = NULL, updated_at = CURRENT_TIMESTAMP, version = version + 1 + WHERE task_attempt_id = $1"#, + task_attempt_id + ) + .execute(pool) + .await?; + Ok(()) + } + + /// Attempt to atomically mark this draft as "sending" if it's currently queued and non-empty. + /// Returns true if the row was updated (we acquired the send lock), false otherwise. + pub async fn try_mark_sending( + pool: &SqlitePool, + task_attempt_id: Uuid, + ) -> Result { + let result = sqlx::query!( + r#"UPDATE follow_up_drafts + SET sending = 1, updated_at = CURRENT_TIMESTAMP, version = version + 1 + WHERE task_attempt_id = $1 + AND queued = 1 + AND sending = 0 + AND TRIM(prompt) != ''"#, + task_attempt_id + ) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) + } +} diff --git a/crates/db/src/models/image.rs b/crates/db/src/models/image.rs index 687bef1d..a4c31cb7 100644 --- a/crates/db/src/models/image.rs +++ b/crates/db/src/models/image.rs @@ -185,6 +185,30 @@ impl TaskImage { Ok(()) } + /// Associate multiple images with a task, skipping duplicates. + pub async fn associate_many_dedup( + pool: &SqlitePool, + task_id: Uuid, + image_ids: &[Uuid], + ) -> Result<(), sqlx::Error> { + for &image_id in image_ids { + let id = Uuid::new_v4(); + sqlx::query!( + r#"INSERT INTO task_images (id, task_id, image_id) + SELECT $1, $2, $3 + WHERE NOT EXISTS ( + SELECT 1 FROM task_images WHERE task_id = $2 AND image_id = $3 + )"#, + id, + task_id, + image_id + ) + .execute(pool) + .await?; + } + Ok(()) + } + pub async fn delete_by_task_id(pool: &SqlitePool, task_id: Uuid) -> Result<(), sqlx::Error> { sqlx::query!(r#"DELETE FROM task_images WHERE task_id = $1"#, task_id) .execute(pool) diff --git a/crates/db/src/models/mod.rs b/crates/db/src/models/mod.rs index 58d3441a..7f5605aa 100644 --- a/crates/db/src/models/mod.rs +++ b/crates/db/src/models/mod.rs @@ -1,6 +1,7 @@ pub mod execution_process; pub mod execution_process_logs; pub mod executor_session; +pub mod follow_up_draft; pub mod image; pub mod merge; pub mod project; diff --git a/crates/local-deployment/src/container.rs b/crates/local-deployment/src/container.rs index 10002d17..d26b3b01 100644 --- a/crates/local-deployment/src/container.rs +++ b/crates/local-deployment/src/container.rs @@ -18,6 +18,8 @@ use db::{ ExecutionContext, ExecutionProcess, ExecutionProcessRunReason, ExecutionProcessStatus, }, executor_session::ExecutorSession, + follow_up_draft::FollowUpDraft, + image::TaskImage, merge::Merge, project::Project, task::{Task, TaskStatus}, @@ -397,6 +399,14 @@ impl LocalContainerService { if Self::should_finalize(&ctx) { Self::finalize_task(&db, &config, &ctx).await; + // After finalization, check if a queued follow-up exists and start it + if let Err(e) = container.try_consume_queued_followup(&ctx).await { + tracing::error!( + "Failed to start queued follow-up for attempt {}: {}", + ctx.task_attempt.id, + e + ); + } } // Fire event when CodingAgent execution has finished @@ -1193,4 +1203,162 @@ impl LocalContainerService { Ok(()) } + + /// If a queued follow-up draft exists for this attempt and nothing is running, + /// start it immediately and clear the draft. + async fn try_consume_queued_followup( + &self, + ctx: &ExecutionContext, + ) -> Result<(), ContainerError> { + // Only consider CodingAgent/cleanup chains; skip DevServer completions + if matches!( + ctx.execution_process.run_reason, + ExecutionProcessRunReason::DevServer + ) { + return Ok(()); + } + + // If anything is running for this attempt, bail + let procs = + ExecutionProcess::find_by_task_attempt_id(&self.db.pool, ctx.task_attempt.id).await?; + if procs + .iter() + .any(|p| matches!(p.status, ExecutionProcessStatus::Running)) + { + return Ok(()); + } + + // Load draft and ensure it's eligible + let Some(draft) = + FollowUpDraft::find_by_task_attempt_id(&self.db.pool, ctx.task_attempt.id).await? + else { + return Ok(()); + }; + + if !draft.queued || draft.prompt.trim().is_empty() { + return Ok(()); + } + + // Atomically acquire sending lock; if not acquired, someone else is sending. + if !FollowUpDraft::try_mark_sending(&self.db.pool, ctx.task_attempt.id) + .await + .unwrap_or(false) + { + return Ok(()); + } + + // Ensure worktree exists + let container_ref = self.ensure_container_exists(&ctx.task_attempt).await?; + + // Get session id + let Some(session_id) = ExecutionProcess::find_latest_session_id_by_task_attempt( + &self.db.pool, + ctx.task_attempt.id, + ) + .await? + else { + tracing::warn!( + "No session id found for attempt {}. Cannot start queued follow-up.", + ctx.task_attempt.id + ); + return Ok(()); + }; + + // Get last coding agent process to inherit executor profile + let Some(latest) = ExecutionProcess::find_latest_by_task_attempt_and_run_reason( + &self.db.pool, + ctx.task_attempt.id, + &ExecutionProcessRunReason::CodingAgent, + ) + .await? + else { + tracing::warn!( + "No prior CodingAgent process for attempt {}. Cannot start queued follow-up.", + ctx.task_attempt.id + ); + return Ok(()); + }; + + use executors::actions::ExecutorActionType; + let initial_executor_profile_id = match &latest.executor_action()?.typ { + ExecutorActionType::CodingAgentInitialRequest(req) => req.executor_profile_id.clone(), + ExecutorActionType::CodingAgentFollowUpRequest(req) => req.executor_profile_id.clone(), + _ => { + tracing::warn!( + "Latest process for attempt {} is not a coding agent; skipping queued follow-up", + ctx.task_attempt.id + ); + return Ok(()); + } + }; + + let executor_profile_id = executors::profile::ExecutorProfileId { + executor: initial_executor_profile_id.executor, + variant: draft.variant.clone(), + }; + + // Prepare cleanup action + let cleanup_action = ctx + .task + .parent_project(&self.db.pool) + .await? + .and_then(|p| p.cleanup_script) + .map(|script| { + Box::new(executors::actions::ExecutorAction::new( + executors::actions::ExecutorActionType::ScriptRequest( + executors::actions::script::ScriptRequest { + script, + language: executors::actions::script::ScriptRequestLanguage::Bash, + context: executors::actions::script::ScriptContext::CleanupScript, + }, + ), + None, + )) + }); + + // Handle images: associate, copy to worktree, canonicalize prompt + let mut prompt = draft.prompt.clone(); + if let Some(image_ids) = &draft.image_ids { + // Associate to task + let _ = TaskImage::associate_many_dedup(&self.db.pool, ctx.task.id, image_ids).await; + + // Copy to worktree and canonicalize + let worktree_path = std::path::PathBuf::from(&container_ref); + if let Err(e) = self + .image_service + .copy_images_by_ids_to_worktree(&worktree_path, image_ids) + .await + { + tracing::warn!("Failed to copy images to worktree: {}", e); + } else { + prompt = ImageService::canonicalise_image_paths(&prompt, &worktree_path); + } + } + + let follow_up_request = + executors::actions::coding_agent_follow_up::CodingAgentFollowUpRequest { + prompt, + session_id, + executor_profile_id, + }; + + let follow_up_action = executors::actions::ExecutorAction::new( + executors::actions::ExecutorActionType::CodingAgentFollowUpRequest(follow_up_request), + cleanup_action, + ); + + // Start the execution + let _ = self + .start_execution( + &ctx.task_attempt, + &follow_up_action, + &ExecutionProcessRunReason::CodingAgent, + ) + .await?; + + // Clear the draft to reflect that it has been consumed + let _ = FollowUpDraft::clear_after_send(&self.db.pool, ctx.task_attempt.id).await; + + Ok(()) + } } diff --git a/crates/server/src/bin/generate_types.rs b/crates/server/src/bin/generate_types.rs index 001a2706..11bdb1af 100644 --- a/crates/server/src/bin/generate_types.rs +++ b/crates/server/src/bin/generate_types.rs @@ -44,6 +44,8 @@ fn generate_types_content() -> String { server::routes::config::UpdateMcpServersBody::decl(), server::routes::config::GetMcpServerResponse::decl(), server::routes::task_attempts::CreateFollowUpAttempt::decl(), + server::routes::task_attempts::FollowUpDraftResponse::decl(), + server::routes::task_attempts::UpdateFollowUpDraftRequest::decl(), server::routes::task_attempts::CreateGitHubPrRequest::decl(), server::routes::images::ImageResponse::decl(), services::services::github_service::GitHubServiceError::decl(), @@ -100,6 +102,7 @@ fn generate_types_content() -> String { services::services::events::EventPatch::decl(), services::services::events::EventPatchInner::decl(), services::services::events::RecordTypes::decl(), + db::models::follow_up_draft::FollowUpDraft::decl(), executors::logs::CommandExitStatus::decl(), executors::logs::CommandRunResult::decl(), executors::logs::NormalizedConversation::decl(), diff --git a/crates/server/src/routes/task_attempts.rs b/crates/server/src/routes/task_attempts.rs index bf941ede..538e5f12 100644 --- a/crates/server/src/routes/task_attempts.rs +++ b/crates/server/src/routes/task_attempts.rs @@ -13,6 +13,7 @@ use axum::{ }; use db::models::{ execution_process::{ExecutionProcess, ExecutionProcessRunReason}, + follow_up_draft::FollowUpDraft, image::TaskImage, merge::{Merge, MergeStatus, PrMerge, PullRequestInfo}, project::{Project, ProjectError}, @@ -230,7 +231,7 @@ pub async fn follow_up( let mut prompt = payload.prompt; if let Some(image_ids) = &payload.image_ids { - TaskImage::associate_many(&deployment.db().pool, task.id, image_ids).await?; + TaskImage::associate_many_dedup(&deployment.db().pool, task.id, image_ids).await?; // Copy new images from the image cache to the worktree if let Some(container_ref) = &task_attempt.container_ref { @@ -276,9 +277,479 @@ pub async fn follow_up( ) .await?; + // Clear any persisted follow-up draft for this attempt to avoid stale UI after manual send + let _ = FollowUpDraft::clear_after_send(&deployment.db().pool, task_attempt.id).await; + Ok(ResponseJson(ApiResponse::success(execution_process))) } +#[derive(Debug, Serialize, TS)] +pub struct FollowUpDraftResponse { + pub task_attempt_id: Uuid, + pub prompt: String, + pub queued: bool, + pub variant: Option, + pub image_ids: Option>, // attachments + pub version: i64, +} + +#[derive(Debug, Deserialize, TS)] +pub struct UpdateFollowUpDraftRequest { + pub prompt: Option, + // Present with null explicitly clears variant; absent leaves unchanged + pub variant: Option>, + pub image_ids: Option>, // send empty array to clear; omit to leave unchanged + pub version: Option, // optimistic concurrency +} + +#[derive(Debug, Deserialize, TS)] +pub struct SetQueueRequest { + pub queued: bool, + pub expected_queued: Option, + pub expected_version: Option, +} + +async fn has_running_processes_for_attempt( + pool: &sqlx::SqlitePool, + attempt_id: Uuid, +) -> Result { + let processes = ExecutionProcess::find_by_task_attempt_id(pool, attempt_id).await?; + Ok(processes.into_iter().any(|p| { + matches!( + p.status, + db::models::execution_process::ExecutionProcessStatus::Running + ) + })) +} + +#[axum::debug_handler] +pub async fn get_follow_up_draft( + Extension(task_attempt): Extension, + State(deployment): State, +) -> Result>, ApiError> { + let pool = &deployment.db().pool; + let draft = FollowUpDraft::find_by_task_attempt_id(pool, task_attempt.id) + .await? + .map(|d| FollowUpDraftResponse { + task_attempt_id: d.task_attempt_id, + prompt: d.prompt, + queued: d.queued, + variant: d.variant, + image_ids: d.image_ids, + version: d.version, + }) + .unwrap_or(FollowUpDraftResponse { + task_attempt_id: task_attempt.id, + prompt: "".to_string(), + queued: false, + variant: None, + image_ids: None, + version: 0, + }); + Ok(ResponseJson(ApiResponse::success(draft))) +} + +#[axum::debug_handler] +pub async fn save_follow_up_draft( + Extension(task_attempt): Extension, + State(deployment): State, + Json(payload): Json, +) -> Result>, ApiError> { + let pool = &deployment.db().pool; + + // Enforce: cannot edit while queued + let d = match FollowUpDraft::find_by_task_attempt_id(pool, task_attempt.id).await? { + Some(d) => d, + None => { + // Create empty draft implicitly + let id = uuid::Uuid::new_v4(); + sqlx::query( + r#"INSERT INTO follow_up_drafts (id, task_attempt_id, prompt, queued, sending) + VALUES (?, ?, '', 0, 0)"#, + ) + .bind(id) + .bind(task_attempt.id) + .execute(pool) + .await?; + FollowUpDraft::find_by_task_attempt_id(pool, task_attempt.id) + .await? + .ok_or(SqlxError::RowNotFound)? + } + }; + if d.queued { + return Err(ApiError::Conflict( + "Draft is queued; click Edit to unqueue before editing".to_string(), + )); + } + + // Optimistic concurrency check + if let Some(expected_version) = payload.version + && d.version != expected_version + { + return Err(ApiError::Conflict( + "Draft changed, please retry with latest".to_string(), + )); + } + + if payload.prompt.is_none() && payload.variant.is_none() && payload.image_ids.is_none() { + // nothing to change; return current + } else { + // Build a conservative UPDATE using positional binds to avoid SQL builder quirks + let mut set_clauses: Vec<&str> = Vec::new(); + let mut has_variant_null = false; + if payload.prompt.is_some() { + set_clauses.push("prompt = ?"); + } + if let Some(variant_opt) = &payload.variant { + match variant_opt { + Some(_) => set_clauses.push("variant = ?"), + None => { + has_variant_null = true; + set_clauses.push("variant = NULL"); + } + } + } + if payload.image_ids.is_some() { + set_clauses.push("image_ids = ?"); + } + // Always bump metadata when something changes + set_clauses.push("updated_at = CURRENT_TIMESTAMP"); + set_clauses.push("version = version + 1"); + + let mut sql = String::from("UPDATE follow_up_drafts SET "); + sql.push_str(&set_clauses.join(", ")); + sql.push_str(" WHERE task_attempt_id = ?"); + + let mut q = sqlx::query(&sql); + if let Some(prompt) = &payload.prompt { + q = q.bind(prompt); + } + if let Some(variant_opt) = &payload.variant + && let Some(v) = variant_opt + { + q = q.bind(v); + } + if let Some(image_ids) = &payload.image_ids { + let image_ids_json = + serde_json::to_string(image_ids).unwrap_or_else(|_| "[]".to_string()); + q = q.bind(image_ids_json); + } + // WHERE bind + q = q.bind(task_attempt.id); + q.execute(pool).await?; + let _ = has_variant_null; // silence unused (document intent) + } + + // Ensure images are associated with the task for preview/loading + if let Some(image_ids) = &payload.image_ids + && !image_ids.is_empty() + { + // get parent task + let task = task_attempt + .parent_task(&deployment.db().pool) + .await? + .ok_or(SqlxError::RowNotFound)?; + TaskImage::associate_many_dedup(pool, task.id, image_ids).await?; + } + + // If queued and no process running for this attempt, attempt to start immediately. + // Use an atomic sending lock to prevent duplicate starts when concurrent requests occur. + let current = FollowUpDraft::find_by_task_attempt_id(pool, task_attempt.id).await?; + let should_consider_start = current.as_ref().map(|c| c.queued).unwrap_or(false) + && !has_running_processes_for_attempt(pool, task_attempt.id).await?; + if should_consider_start { + if FollowUpDraft::try_mark_sending(pool, task_attempt.id) + .await + .unwrap_or(false) + { + // Start follow up with saved draft + let _ = + start_follow_up_from_draft(&deployment, &task_attempt, current.as_ref().unwrap()) + .await; + } else { + tracing::debug!( + "Follow-up draft for attempt {} already being sent or not eligible", + task_attempt.id + ); + } + } + + // Return current draft state (may have been cleared if started immediately) + let current = FollowUpDraft::find_by_task_attempt_id(pool, task_attempt.id) + .await? + .map(|d| FollowUpDraftResponse { + task_attempt_id: d.task_attempt_id, + prompt: d.prompt, + queued: d.queued, + variant: d.variant, + image_ids: d.image_ids, + version: d.version, + }) + .unwrap_or(FollowUpDraftResponse { + task_attempt_id: task_attempt.id, + prompt: "".to_string(), + queued: false, + variant: None, + image_ids: None, + version: 0, + }); + + Ok(ResponseJson(ApiResponse::success(current))) +} + +#[axum::debug_handler] +pub async fn stream_follow_up_draft( + Extension(task_attempt): Extension, + State(deployment): State, +) -> Result< + Sse>>>, + ApiError, +> { + let stream = deployment + .events() + .stream_follow_up_draft_for_attempt(task_attempt.id) + .await + .map_err(|e| ApiError::from(deployment::DeploymentError::from(e)))?; + Ok( + Sse::new(stream.map_err(|e| -> Box { Box::new(e) })) + .keep_alive(KeepAlive::default()), + ) +} + +#[axum::debug_handler] +pub async fn set_follow_up_queue( + Extension(task_attempt): Extension, + State(deployment): State, + Json(payload): Json, +) -> Result>, ApiError> { + let pool = &deployment.db().pool; + let Some(d) = FollowUpDraft::find_by_task_attempt_id(pool, task_attempt.id).await? else { + return Err(ApiError::Conflict("No draft to queue".to_string())); + }; + + // Optimistic concurrency: ensure caller's view matches current state (if provided) + if let Some(expected) = payload.expected_queued + && d.queued != expected + { + return Err(ApiError::Conflict( + "Draft state changed, please refresh and try again".to_string(), + )); + } + if let Some(expected_v) = payload.expected_version + && d.version != expected_v + { + return Err(ApiError::Conflict( + "Draft changed, please refresh and try again".to_string(), + )); + } + + if payload.queued { + let should_queue = !d.prompt.trim().is_empty(); + sqlx::query( + r#"UPDATE follow_up_drafts + SET queued = ?, updated_at = CURRENT_TIMESTAMP, version = version + 1 + WHERE task_attempt_id = ?"#, + ) + .bind(should_queue as i64) + .bind(task_attempt.id) + .execute(pool) + .await?; + } else { + // Unqueue + sqlx::query( + r#"UPDATE follow_up_drafts + SET queued = 0, updated_at = CURRENT_TIMESTAMP, version = version + 1 + WHERE task_attempt_id = ?"#, + ) + .bind(task_attempt.id) + .execute(pool) + .await?; + } + + // If queued and no process running for this attempt, attempt to start immediately. + let current = FollowUpDraft::find_by_task_attempt_id(pool, task_attempt.id).await?; + let should_consider_start = current.as_ref().map(|c| c.queued).unwrap_or(false) + && !has_running_processes_for_attempt(pool, task_attempt.id).await?; + if should_consider_start { + if FollowUpDraft::try_mark_sending(pool, task_attempt.id) + .await + .unwrap_or(false) + { + let _ = + start_follow_up_from_draft(&deployment, &task_attempt, current.as_ref().unwrap()) + .await; + } else { + // Schedule a short delayed recheck to handle timing edges + let deployment_clone = deployment.clone(); + let task_attempt_clone = task_attempt.clone(); + tokio::spawn(async move { + use std::time::Duration; + tokio::time::sleep(Duration::from_millis(1200)).await; + let pool = &deployment_clone.db().pool; + // Still no running process? + let running = + match ExecutionProcess::find_by_task_attempt_id(pool, task_attempt_clone.id) + .await + { + Ok(procs) => procs.into_iter().any(|p| { + matches!( + p.status, + db::models::execution_process::ExecutionProcessStatus::Running + ) + }), + Err(_) => true, // assume running on error to avoid duplicate starts + }; + if running { + return; + } + // Still queued and eligible? + let draft = + match FollowUpDraft::find_by_task_attempt_id(pool, task_attempt_clone.id).await + { + Ok(Some(d)) if d.queued && !d.sending && !d.prompt.trim().is_empty() => d, + _ => return, + }; + if FollowUpDraft::try_mark_sending(pool, task_attempt_clone.id) + .await + .unwrap_or(false) + { + let _ = + start_follow_up_from_draft(&deployment_clone, &task_attempt_clone, &draft) + .await; + } + }); + } + } + + let d = FollowUpDraft::find_by_task_attempt_id(pool, task_attempt.id) + .await? + .ok_or(SqlxError::RowNotFound)?; + let resp = FollowUpDraftResponse { + task_attempt_id: d.task_attempt_id, + prompt: d.prompt, + queued: d.queued, + variant: d.variant, + image_ids: d.image_ids, + version: d.version, + }; + Ok(ResponseJson(ApiResponse::success(resp))) +} + +async fn start_follow_up_from_draft( + deployment: &DeploymentImpl, + task_attempt: &TaskAttempt, + draft: &FollowUpDraft, +) -> Result { + // Ensure worktree exists + deployment + .container() + .ensure_container_exists(task_attempt) + .await?; + + // Get latest session id (ignoring dropped) + let session_id = ExecutionProcess::find_latest_session_id_by_task_attempt( + &deployment.db().pool, + task_attempt.id, + ) + .await? + .ok_or(ApiError::TaskAttempt(TaskAttemptError::ValidationError( + "Couldn't find a prior session_id, please create a new task attempt".to_string(), + )))?; + + // Get latest coding agent process to inherit executor profile + let latest_execution_process = ExecutionProcess::find_latest_by_task_attempt_and_run_reason( + &deployment.db().pool, + task_attempt.id, + &ExecutionProcessRunReason::CodingAgent, + ) + .await? + .ok_or(ApiError::TaskAttempt(TaskAttemptError::ValidationError( + "Couldn't find initial coding agent process, has it run yet?".to_string(), + )))?; + let initial_executor_profile_id = match &latest_execution_process + .executor_action() + .map_err(|e| ApiError::TaskAttempt(TaskAttemptError::ValidationError(e.to_string())))? + .typ + { + ExecutorActionType::CodingAgentInitialRequest(request) => { + Ok(request.executor_profile_id.clone()) + } + ExecutorActionType::CodingAgentFollowUpRequest(request) => { + Ok(request.executor_profile_id.clone()) + } + _ => Err(ApiError::TaskAttempt(TaskAttemptError::ValidationError( + "Couldn't find profile from initial request".to_string(), + ))), + }?; + + // Inherit executor profile; override variant if provided in draft + let executor_profile_id = ExecutorProfileId { + executor: initial_executor_profile_id.executor, + variant: draft.variant.clone(), + }; + + // Get parent task -> project and cleanup action + let task = task_attempt + .parent_task(&deployment.db().pool) + .await? + .ok_or(SqlxError::RowNotFound)?; + let project = task + .parent_project(&deployment.db().pool) + .await? + .ok_or(SqlxError::RowNotFound)?; + + let cleanup_action = project.cleanup_script.map(|script| { + Box::new(ExecutorAction::new( + ExecutorActionType::ScriptRequest(ScriptRequest { + script, + language: ScriptRequestLanguage::Bash, + context: ScriptContext::CleanupScript, + }), + None, + )) + }); + + // Handle images: associate to task, copy to worktree, and canonicalize paths in prompt + let mut prompt = draft.prompt.clone(); + if let Some(image_ids) = &draft.image_ids { + TaskImage::associate_many_dedup(&deployment.db().pool, task_attempt.task_id, image_ids) + .await?; + if let Some(container_ref) = &task_attempt.container_ref { + let worktree_path = std::path::PathBuf::from(container_ref); + deployment + .image() + .copy_images_by_ids_to_worktree(&worktree_path, image_ids) + .await?; + prompt = ImageService::canonicalise_image_paths(&prompt, &worktree_path); + } + } + + let follow_up_request = CodingAgentFollowUpRequest { + prompt, + session_id, + executor_profile_id, + }; + + let follow_up_action = ExecutorAction::new( + ExecutorActionType::CodingAgentFollowUpRequest(follow_up_request), + cleanup_action, + ); + + let execution_process = deployment + .container() + .start_execution( + task_attempt, + &follow_up_action, + &ExecutionProcessRunReason::CodingAgent, + ) + .await?; + + // Best-effort: clear the draft after scheduling the execution + let _ = FollowUpDraft::clear_after_send(&deployment.db().pool, task_attempt.id).await; + + Ok(execution_process) +} + #[axum::debug_handler] pub async fn restore_task_attempt( Extension(task_attempt): Extension, @@ -1071,6 +1542,12 @@ pub fn router(deployment: &DeploymentImpl) -> Router { let task_attempt_id_router = Router::new() .route("/", get(get_task_attempt)) .route("/follow-up", post(follow_up)) + .route( + "/follow-up-draft", + get(get_follow_up_draft).put(save_follow_up_draft), + ) + .route("/follow-up-draft/stream", get(stream_follow_up_draft)) + .route("/follow-up-draft/queue", post(set_follow_up_queue)) .route("/restore", post(restore_task_attempt)) .route("/commit-info", get(get_commit_info)) .route("/commit-compare", get(compare_commit_to_head)) diff --git a/crates/server/src/routes/tasks.rs b/crates/server/src/routes/tasks.rs index ae41957d..2ab6dc21 100644 --- a/crates/server/src/routes/tasks.rs +++ b/crates/server/src/routes/tasks.rs @@ -80,7 +80,7 @@ pub async fn create_task( let task = Task::create(&deployment.db().pool, &payload, id).await?; if let Some(image_ids) = &payload.image_ids { - TaskImage::associate_many(&deployment.db().pool, task.id, image_ids).await?; + TaskImage::associate_many_dedup(&deployment.db().pool, task.id, image_ids).await?; } deployment @@ -106,7 +106,7 @@ pub async fn create_task_and_start( let task = Task::create(&deployment.db().pool, &payload, task_id).await?; if let Some(image_ids) = &payload.image_ids { - TaskImage::associate_many(&deployment.db().pool, task.id, image_ids).await?; + TaskImage::associate_many_dedup(&deployment.db().pool, task.id, image_ids).await?; } deployment @@ -202,7 +202,7 @@ pub async fn update_task( if let Some(image_ids) = &payload.image_ids { TaskImage::delete_by_task_id(&deployment.db().pool, task.id).await?; - TaskImage::associate_many(&deployment.db().pool, task.id, image_ids).await?; + TaskImage::associate_many_dedup(&deployment.db().pool, task.id, image_ids).await?; } Ok(ResponseJson(ApiResponse::success(task))) diff --git a/crates/services/src/services/events.rs b/crates/services/src/services/events.rs index 34c5aae2..069d3378 100644 --- a/crates/services/src/services/events.rs +++ b/crates/services/src/services/events.rs @@ -93,6 +93,8 @@ enum HookTables { TaskAttempts, #[strum(to_string = "execution_processes")] ExecutionProcesses, + #[strum(to_string = "follow_up_drafts")] + FollowUpDrafts, } #[derive(Serialize, Deserialize, TS)] @@ -101,6 +103,7 @@ pub enum RecordTypes { Task(Task), TaskAttempt(TaskAttempt), ExecutionProcess(ExecutionProcess), + FollowUpDraft(db::models::follow_up_draft::FollowUpDraft), DeletedTask { rowid: i64, project_id: Option, @@ -114,6 +117,10 @@ pub enum RecordTypes { rowid: i64, task_attempt_id: Option, }, + DeletedFollowUpDraft { + rowid: i64, + task_attempt_id: Option, + }, } #[derive(Serialize, Deserialize, TS)] @@ -247,6 +254,41 @@ impl EventService { } } } + (HookTables::FollowUpDrafts, SqliteOperation::Delete) => { + // Try to get draft before deletion to capture attempt id + let attempt_id = + db::models::follow_up_draft::FollowUpDraft::find_by_rowid( + &db.pool, rowid, + ) + .await + .ok() + .flatten() + .map(|d| d.task_attempt_id); + RecordTypes::DeletedFollowUpDraft { + rowid, + task_attempt_id: attempt_id, + } + } + (HookTables::FollowUpDrafts, _) => { + match db::models::follow_up_draft::FollowUpDraft::find_by_rowid( + &db.pool, rowid, + ) + .await + { + Ok(Some(draft)) => RecordTypes::FollowUpDraft(draft), + Ok(None) => RecordTypes::DeletedFollowUpDraft { + rowid, + task_attempt_id: None, + }, + Err(e) => { + tracing::error!( + "Failed to fetch follow_up_draft: {:?}", + e + ); + return; + } + } + } }; let db_op: &str = match hook.operation { @@ -559,4 +601,106 @@ impl EventService { Ok(combined_stream) } + + /// Stream follow-up draft for a specific task attempt with initial snapshot + pub async fn stream_follow_up_draft_for_attempt( + &self, + task_attempt_id: Uuid, + ) -> Result>, EventError> + { + // Get initial snapshot of follow-up draft + let draft = db::models::follow_up_draft::FollowUpDraft::find_by_task_attempt_id( + &self.db.pool, + task_attempt_id, + ) + .await? + .unwrap_or(db::models::follow_up_draft::FollowUpDraft { + id: uuid::Uuid::new_v4(), + task_attempt_id, + prompt: String::new(), + queued: false, + sending: false, + variant: None, + image_ids: None, + created_at: chrono::Utc::now(), + updated_at: chrono::Utc::now(), + version: 0, + }); + + let initial_patch = json!([{ + "op": "replace", + "path": "/", + "value": { "follow_up_draft": draft } + }]); + let initial_msg = LogMsg::JsonPatch(serde_json::from_value(initial_patch).unwrap()); + + // Filtered live stream, mapped into direct JSON patches that update /follow_up_draft + let filtered_stream = BroadcastStream::new(self.msg_store.get_receiver()).filter_map( + move |msg_result| async move { + match msg_result { + Ok(LogMsg::JsonPatch(patch)) => { + if let Some(event_patch_op) = patch.0.first() + && let Ok(event_patch_value) = serde_json::to_value(event_patch_op) + && let Ok(event_patch) = + serde_json::from_value::(event_patch_value) + { + match &event_patch.value.record { + RecordTypes::FollowUpDraft(draft) => { + if draft.task_attempt_id == task_attempt_id { + // Build a direct patch to replace /follow_up_draft + let direct = json!([{ + "op": "replace", + "path": "/follow_up_draft", + "value": draft + }]); + let direct_patch = serde_json::from_value(direct).unwrap(); + return Some(Ok(LogMsg::JsonPatch(direct_patch))); + } + } + RecordTypes::DeletedFollowUpDraft { + task_attempt_id: Some(id), + .. + } => { + if *id == task_attempt_id { + // Replace with empty draft state + let empty = json!({ + "id": uuid::Uuid::new_v4(), + "task_attempt_id": id, + "prompt": "", + "queued": false, + "sending": false, + "variant": null, + "image_ids": null, + "created_at": chrono::Utc::now(), + "updated_at": chrono::Utc::now(), + "version": 0 + }); + let direct = json!([{ + "op": "replace", + "path": "/follow_up_draft", + "value": empty + }]); + let direct_patch = serde_json::from_value(direct).unwrap(); + return Some(Ok(LogMsg::JsonPatch(direct_patch))); + } + } + _ => {} + } + } + None + } + Ok(other) => Some(Ok(other)), + Err(_) => None, + } + }, + ); + + let initial_stream = futures::stream::once(async move { Ok(initial_msg) }); + let combined_stream = initial_stream + .chain(filtered_stream) + .map_ok(|msg| msg.to_sse_event()) + .boxed(); + + Ok(combined_stream) + } } diff --git a/frontend/src/components/NormalizedConversation/DisplayConversationEntry.tsx b/frontend/src/components/NormalizedConversation/DisplayConversationEntry.tsx index afd6d14e..fa49f0a5 100644 --- a/frontend/src/components/NormalizedConversation/DisplayConversationEntry.tsx +++ b/frontend/src/components/NormalizedConversation/DisplayConversationEntry.tsx @@ -372,7 +372,7 @@ const ToolCallCard: React.FC<{ const HeaderWrapper: React.ElementType = hasExpandableDetails ? 'button' : 'div'; - const headerProps: any = hasExpandableDetails + const headerProps = hasExpandableDetails ? { onClick: (e: React.MouseEvent) => { e.preventDefault(); diff --git a/frontend/src/components/tasks/TaskFollowUpSection.tsx b/frontend/src/components/tasks/TaskFollowUpSection.tsx index 105c0e01..a02f0135 100644 --- a/frontend/src/components/tasks/TaskFollowUpSection.tsx +++ b/frontend/src/components/tasks/TaskFollowUpSection.tsx @@ -1,5 +1,8 @@ import { AlertCircle, + CheckCircle2, + WifiOff, + Clock, Send, ChevronDown, ImageIcon, @@ -10,11 +13,19 @@ import { ImageUploadSection } from '@/components/ui/ImageUploadSection'; import { Alert, AlertDescription } from '@/components/ui/alert'; import { FileSearchTextarea } from '@/components/ui/file-search-textarea'; import { useEffect, useMemo, useState, useRef, useCallback } from 'react'; -import { attemptsApi, imagesApi } from '@/lib/api.ts'; -import type { ImageResponse, TaskWithAttemptStatus } from 'shared/types'; +import { + attemptsApi, + imagesApi, + type UpdateFollowUpDraftRequest, +} from '@/lib/api.ts'; +import type { + ImageResponse, + TaskWithAttemptStatus, + FollowUpDraft, +} from 'shared/types'; import { useBranchStatus } from '@/hooks'; import { useAttemptExecution } from '@/hooks/useAttemptExecution'; -import { Loader } from '@/components/ui/loader'; +import { Loader2 } from 'lucide-react'; import { useUserSystem } from '@/components/config-provider'; import { DropdownMenu, @@ -25,6 +36,8 @@ import { import { cn } from '@/lib/utils'; import { useVariantCyclingShortcut } from '@/lib/keyboard-shortcuts'; import { useReview } from '@/contexts/ReviewProvider'; +import { useJsonPatchStream } from '@/hooks/useJsonPatchStream'; +import { inIframe } from '@/vscode/bridge'; interface TaskFollowUpSectionProps { task: TaskWithAttemptStatus; @@ -104,6 +117,70 @@ export function TaskFollowUpSection({ const [newlyUploadedImageIds, setNewlyUploadedImageIds] = useState( [] ); + const wrapperRef = useRef(null); + const [lockedMinHeight, setLockedMinHeight] = useState(null); + // Fade-out overlay for clearing text when sending begins + const [fadeOverlayText, setFadeOverlayText] = useState(''); + const [showFadeOverlay, setShowFadeOverlay] = useState(false); + const [overlayFadeClass, setOverlayFadeClass] = useState(''); + const overlayFadeTimerRef = useRef(undefined); + const overlayHideTimerRef = useRef(undefined); + const [isQueued, setIsQueued] = useState(false); + const [isDraftSending, setIsDraftSending] = useState(false); + const [isQueuing, setIsQueuing] = useState(false); + const [isUnqueuing, setIsUnqueuing] = useState(false); + const [isDraftReady, setIsDraftReady] = useState(false); + const saveTimeoutRef = useRef(undefined); + const [isSaving, setIsSaving] = useState(false); + const [saveStatus, setSaveStatus] = useState< + 'idle' | 'saving' | 'saved' | 'offline' | 'sent' + >('idle'); + const [isStatusFading, setIsStatusFading] = useState(false); + const statusFadeTimerRef = useRef(undefined); + const statusClearTimerRef = useRef(undefined); + const lastSentRef = useRef(''); + const suppressNextSaveRef = useRef(false); + const localDirtyRef = useRef(false); + // We auto-resolve conflicts silently by adopting server state. + const lastServerVersionRef = useRef(-1); + const prevSendingRef = useRef(false); + + // Helper to show a pleasant fade for transient "Saved" status + const scheduleSavedStatus = useCallback(() => { + // Clear pending timers + if (statusFadeTimerRef.current) + window.clearTimeout(statusFadeTimerRef.current); + if (statusClearTimerRef.current) + window.clearTimeout(statusClearTimerRef.current); + setIsStatusFading(false); + setSaveStatus('saved'); + // Fade out close to the end of visibility + statusFadeTimerRef.current = window.setTimeout( + () => setIsStatusFading(true), + 1800 + ); + statusClearTimerRef.current = window.setTimeout(() => { + setSaveStatus('idle'); + setIsStatusFading(false); + }, 2000); + }, []); + + const scheduleSentStatus = useCallback(() => { + if (statusFadeTimerRef.current) + window.clearTimeout(statusFadeTimerRef.current); + if (statusClearTimerRef.current) + window.clearTimeout(statusClearTimerRef.current); + setIsStatusFading(false); + setSaveStatus('sent'); + statusFadeTimerRef.current = window.setTimeout( + () => setIsStatusFading(true), + 1800 + ); + statusClearTimerRef.current = window.setTimeout(() => { + setSaveStatus('idle'); + setIsStatusFading(false); + }, 2000); + }, []); // Get the profile from the attempt data const selectedProfile = selectedAttemptProfile; @@ -154,6 +231,347 @@ export function TaskFollowUpSection({ setSelectedVariant(defaultFollowUpVariant); }, [defaultFollowUpVariant]); + // Subscribe to follow-up draft SSE stream for this attempt + type DraftStreamState = { follow_up_draft: FollowUpDraft }; + const draftStreamEndpoint = selectedAttemptId + ? `/api/task-attempts/${selectedAttemptId}/follow-up-draft/stream` + : undefined; + const makeInitialDraftData = useCallback( + () => ({ + follow_up_draft: { + id: '', + task_attempt_id: selectedAttemptId || '', + prompt: '', + queued: false, + sending: false, + variant: null, + image_ids: [], + // version used only for local comparison; server will patch real value + version: 0 as unknown, + created_at: new Date().toISOString() as unknown, + updated_at: new Date().toISOString() as unknown, + } as any, + }), + [selectedAttemptId] + ); + + const { + data: draftStream, + isConnected: draftStreamConnected, + error: draftStreamError, + } = useJsonPatchStream( + draftStreamEndpoint, + !!draftStreamEndpoint, + makeInitialDraftData + ); + + // One-shot hydration via REST to avoid waiting on SSE, and to handle environments + // where SSE connects but initial event is delayed or blocked. + useEffect(() => { + let cancelled = false; + const hydrateOnce = async () => { + if (!selectedAttemptId) return; + try { + const draft = await attemptsApi.getFollowUpDraft(selectedAttemptId); + if (cancelled) return; + suppressNextSaveRef.current = true; + const incomingVersion = draft?.version + ? Number(draft.version as unknown) + : 0; + lastServerVersionRef.current = incomingVersion; + setFollowUpMessage(draft.prompt || ''); + setIsQueued(!!draft.queued); + if (draft.variant !== undefined && draft.variant !== null) + setSelectedVariant(draft.variant); + // Load images if present + if (draft.image_ids && draft.image_ids.length > 0) { + const all = await imagesApi.getTaskImages(task.id); + if (cancelled) return; + const wantIds = new Set(draft.image_ids); + setImages(all.filter((img) => wantIds.has(img.id))); + } else { + setImages([]); + } + if (!isDraftReady) setIsDraftReady(true); + } catch { + // ignore, rely on SSE/poll fallback + } + }; + hydrateOnce(); + return () => { + cancelled = true; + }; + }, [selectedAttemptId]); + + // Cleanup timers on unmount + useEffect(() => { + return () => { + if (statusFadeTimerRef.current) + window.clearTimeout(statusFadeTimerRef.current); + if (statusClearTimerRef.current) + window.clearTimeout(statusClearTimerRef.current); + }; + }, []); + + useEffect(() => { + if (!draftStream) return; + const d = draftStream.follow_up_draft as FollowUpDraft; + // Ignore synthetic initial placeholder until real SSE snapshot arrives + if ((d as any).id === '') { + return; + } + const incomingVersion = d?.version ? Number(d.version as unknown) : 0; + + // Always reflect queued/sending flags immediately + setIsQueued(!!d.queued); + const sendingNow = !!(d as any).sending; + setIsDraftSending(sendingNow); + + // If server indicates we're sending, ensure the editor is cleared for clarity. + if (sendingNow) { + // Edge trigger: show Sent pill once + if (!prevSendingRef.current) { + scheduleSentStatus(); + } + // Show a quick fade-out of the prior content while clearing the actual textarea value + if (followUpMessage !== '') { + if (overlayFadeTimerRef.current) + window.clearTimeout(overlayFadeTimerRef.current); + if (overlayHideTimerRef.current) + window.clearTimeout(overlayHideTimerRef.current); + // Lock container height to avoid jump while autosize recomputes + if (wrapperRef.current) { + const h = wrapperRef.current.getBoundingClientRect().height; + setLockedMinHeight(h); + } + setFadeOverlayText(followUpMessage); + setShowFadeOverlay(true); + // Start fully visible + setOverlayFadeClass('opacity-100'); + // Clear textarea immediately under the overlay + setFollowUpMessage(''); + // Trigger fast fade on next tick (no motion), then remove overlay shortly after + overlayFadeTimerRef.current = window.setTimeout( + () => setOverlayFadeClass('opacity-0'), + 20 + ); + overlayHideTimerRef.current = window.setTimeout(() => { + setShowFadeOverlay(false); + setFadeOverlayText(''); + setOverlayFadeClass(''); + // Release height lock shortly after fade completes + setLockedMinHeight(null); + }, 180); + } + if (images.length > 0) setImages([]); + if (newlyUploadedImageIds.length > 0) setNewlyUploadedImageIds([]); + if (showImageUpload) setShowImageUpload(false); + } + prevSendingRef.current = sendingNow; + + // Skip if this is a duplicate of what we already processed + if (incomingVersion === lastServerVersionRef.current) { + if (!isDraftReady) setIsDraftReady(true); + return; + } + + // Mark that next local change shouldn't auto-save (we're syncing from server) + suppressNextSaveRef.current = true; + + // Initial hydration: avoid clobbering locally typed text with empty server prompt + if (lastServerVersionRef.current === -1) { + if (!localDirtyRef.current && !sendingNow) { + setFollowUpMessage(d.prompt || ''); + } + if (d.variant !== undefined) setSelectedVariant(d.variant); + lastServerVersionRef.current = incomingVersion; + } + + // Real server-side change: adopt new prompt/variant + if (incomingVersion > lastServerVersionRef.current) { + // If sending, keep the editor clear regardless of server prompt value + setFollowUpMessage(sendingNow ? '' : d.prompt || ''); + if (d.variant !== undefined) setSelectedVariant(d.variant); + localDirtyRef.current = false; + lastServerVersionRef.current = incomingVersion; + } + if (!d.image_ids || d.image_ids.length === 0) { + setImages([]); + setNewlyUploadedImageIds([]); + setShowImageUpload(false); + } else { + // Load attached images for this draft by IDs + const wantIds = new Set(d.image_ids); + const haveIds = new Set(images.map((img) => img.id)); + let mismatch = false; + if (images.length !== wantIds.size) mismatch = true; + else + for (const id of wantIds) + if (!haveIds.has(id)) { + mismatch = true; + break; + } + if (mismatch) { + imagesApi + .getTaskImages(task.id) + .then((all) => { + setImages(all.filter((img) => wantIds.has(img.id))); + setNewlyUploadedImageIds([]); + }) + .catch(() => void 0); + } + } + if (!isDraftReady) setIsDraftReady(true); + }, [draftStream]); + + // Cleanup overlay timers + useEffect(() => { + return () => { + if (overlayFadeTimerRef.current) + window.clearTimeout(overlayFadeTimerRef.current); + if (overlayHideTimerRef.current) + window.clearTimeout(overlayHideTimerRef.current); + }; + }, []); + + // Fallback: if running inside VSCode iframe and SSE isn't connected, poll the draft endpoint to keep UI in sync + const pollTimerRef = useRef(undefined); + useEffect(() => { + if (!selectedAttemptId) return; + const shouldPoll = + inIframe() && (!draftStreamConnected || !!draftStreamError); + if (!shouldPoll) { + if (pollTimerRef.current) window.clearInterval(pollTimerRef.current); + pollTimerRef.current = undefined; + return; + } + const pollOnce = async () => { + try { + const draft = await attemptsApi.getFollowUpDraft(selectedAttemptId); + // Update immediate state, similar to SSE handler + setIsQueued(!!draft.queued); + // Polling response does not include 'sending'; preserve previous sending state + const incomingVersion = draft?.version + ? Number(draft.version as unknown) + : 0; + if (incomingVersion !== lastServerVersionRef.current) { + suppressNextSaveRef.current = true; + setFollowUpMessage(draft.prompt || ''); + if (draft.variant !== undefined && draft.variant !== null) + setSelectedVariant(draft.variant); + lastServerVersionRef.current = incomingVersion; + // images not included in response type for polling; leave as-is + } + if (!isDraftReady) setIsDraftReady(true); + } catch { + // ignore + } + }; + // Prime once, then interval + pollOnce(); + pollTimerRef.current = window.setInterval(pollOnce, 1000); + return () => { + if (pollTimerRef.current) window.clearInterval(pollTimerRef.current); + pollTimerRef.current = undefined; + }; + }, [selectedAttemptId, draftStreamConnected, draftStreamError]); + + // Debounced persist draft on message or variant change (only while not queued) + useEffect(() => { + if (!selectedAttemptId) return; + // skip saving if currently sending follow-up; it will be cleared on success + if (isSendingFollowUp) return; + // also skip while server is sending a queued draft + if (isDraftSending) return; + // skip saving while queue/unqueue transitions are in-flight + if (isQueuing || isUnqueuing) return; + if (suppressNextSaveRef.current) { + suppressNextSaveRef.current = false; + return; + } + // Only save when not queued (edit mode) + if (isQueued) return; + + const saveDraft = async () => { + const d = draftStream?.follow_up_draft; + const payload: any = {} as UpdateFollowUpDraftRequest; + // prompt change + if (d && followUpMessage !== (d.prompt || '')) { + payload.prompt = followUpMessage; + } + // variant change (string | null) + if ((d?.variant ?? null) !== (selectedVariant ?? null)) { + payload.variant = selectedVariant as any; // may be null + } + // images change (compare ids) + const currentIds = images.map((img) => img.id); + const serverIds = (d?.image_ids as string[] | undefined) ?? []; + const idsEqual = + currentIds.length === serverIds.length && + currentIds.every((id, i) => id === serverIds[i]); + if (!idsEqual) { + payload.image_ids = currentIds; + } + + // If no field changed, skip network + const keys = Object.keys(payload).filter((k) => k !== 'version'); + if (keys.length === 0) return; + const payloadKey = JSON.stringify(payload); + if (payloadKey === lastSentRef.current) return; + lastSentRef.current = payloadKey; + try { + setIsSaving(true); + setSaveStatus(navigator.onLine ? 'saving' : 'offline'); + await attemptsApi.saveFollowUpDraft(selectedAttemptId, payload); + // pleasant linger + fade-out + scheduleSavedStatus(); + } catch (e: unknown) { + // On conflict or error, silently adopt server state + try { + const draft = await attemptsApi.getFollowUpDraft(selectedAttemptId); + suppressNextSaveRef.current = true; + setFollowUpMessage(draft.prompt || ''); + setIsQueued(!!draft.queued); + if (draft.variant !== undefined && draft.variant !== null) { + setSelectedVariant(draft.variant); + } + if (draft.version !== undefined && draft.version !== null) { + lastServerVersionRef.current = Number(draft.version as unknown); + } + } catch { + /* empty */ + } + setSaveStatus(navigator.onLine ? 'idle' : 'offline'); + } finally { + setIsSaving(false); + } + }; + + // debounce 400ms + if (saveTimeoutRef.current) window.clearTimeout(saveTimeoutRef.current); + saveTimeoutRef.current = ( + window.setTimeout as unknown as ( + handler: () => void, + timeout?: number + ) => number + )(saveDraft, 400); + return () => { + if (saveTimeoutRef.current) window.clearTimeout(saveTimeoutRef.current); + }; + }, [ + followUpMessage, + selectedVariant, + isQueued, + selectedAttemptId, + isSendingFollowUp, + isQueuing, + isUnqueuing, + ]); + + // Remove BroadcastChannel — SSE is authoritative + + // (removed duplicate SSE subscription block) + const handleImageUploaded = useCallback((image: ImageResponse) => { const markdownText = `![${image.original_name}](${image.file_path})`; setFollowUpMessage((prev) => { @@ -204,7 +622,9 @@ export function TaskFollowUpSection({ image_ids: imageIds, }); setFollowUpMessage(''); - clearComments(); // Clear review comments after successful submission + // Clear review comments and reset queue state after successful submission + clearComments(); + setIsQueued(false); // Clear images and newly uploaded IDs after successful submission setImages([]); setNewlyUploadedImageIds([]); @@ -219,6 +639,100 @@ export function TaskFollowUpSection({ } }; + // Derived UI lock: disallow edits/actions while queued or transitioning + const isDraftLocked = isQueued || isQueuing || isUnqueuing || isDraftSending; + const isInputDisabled = isDraftLocked || !isDraftReady; + + // Queue handler: ensure draft is persisted immediately, then toggle queued + const onQueue = async () => { + if (!selectedAttemptId) return; + if (isQueuing || isQueued) return; + const hasContent = followUpMessage.trim().length > 0; + if (!hasContent) return; + try { + // Prevent any pending debounced save from racing + if (saveTimeoutRef.current) window.clearTimeout(saveTimeoutRef.current); + suppressNextSaveRef.current = true; + setIsQueuing(true); + // Optimistically reflect queued state to block edits/buttons immediately + setIsQueued(true); + setIsSaving(true); + setSaveStatus(navigator.onLine ? 'saving' : 'offline'); + // 1) Force-save current draft so the row exists and is up to date (no version to avoid conflicts) + const immediatePayload: any = { + // Do NOT send version here to avoid spurious 409; we'll use the returned version for queueing + prompt: followUpMessage, + } as UpdateFollowUpDraftRequest; + if ( + (draftStream?.follow_up_draft?.variant ?? null) !== + (selectedVariant ?? null) + ) { + immediatePayload.variant = selectedVariant as any; + } + const currentIds = images.map((img) => img.id); + const serverIds = + (draftStream?.follow_up_draft?.image_ids as string[] | undefined) ?? []; + const idsEqual = + currentIds.length === serverIds.length && + currentIds.every((id, i) => id === serverIds[i]); + if (!idsEqual) { + immediatePayload.image_ids = currentIds; + } + await attemptsApi.saveFollowUpDraft(selectedAttemptId, immediatePayload); + + // 2) Queue with optimistic concurrency using latest version from save + try { + const resp = await attemptsApi.setFollowUpQueue( + selectedAttemptId, + true + ); + // Immediate local sync to avoid waiting for SSE + if (resp?.version !== undefined) { + lastServerVersionRef.current = Number(resp.version as unknown); + } + setIsQueued(!!resp.queued); + if (resp.variant !== undefined && resp.variant !== null) { + setSelectedVariant(resp.variant); + } + } catch (err: unknown) { + // On any error, silently adopt server state + const latest = await attemptsApi.getFollowUpDraft(selectedAttemptId); + suppressNextSaveRef.current = true; + if (latest.version !== undefined && latest.version !== null) { + lastServerVersionRef.current = Number(latest.version as unknown); + } + setIsQueued(!!latest.queued); + if (latest.variant !== undefined && latest.variant !== null) { + setSelectedVariant(latest.variant); + } + } + // Do not show "Saved" for queue; right side shows Queued; a "Sent" pill will appear when sending starts + setSaveStatus('idle'); + } catch (e: unknown) { + // On any error, hard refresh to server truth + try { + const draft = await attemptsApi.getFollowUpDraft(selectedAttemptId); + suppressNextSaveRef.current = true; + setFollowUpMessage(draft.prompt || ''); + setIsQueued(!!draft.queued); + if (draft.variant !== undefined && draft.variant !== null) { + setSelectedVariant(draft.variant); + } + if (draft.version !== undefined && draft.version !== null) { + lastServerVersionRef.current = Number(draft.version as unknown); + } + } catch { + /* empty */ + } + setSaveStatus(navigator.onLine ? 'idle' : 'offline'); + } finally { + setIsSaving(false); + setIsQueuing(false); + } + }; + + // (Removed) auto-unqueue logic — editing is explicit and guarded by a lock now + return ( selectedAttemptId && (
@@ -238,7 +752,7 @@ export function TaskFollowUpSection({ onUpload={imagesApi.upload} onDelete={imagesApi.delete} onImageUploaded={handleImageUploaded} - disabled={!canSendFollowUp} + disabled={!canSendFollowUp || isDraftLocked || !isDraftReady} collapsible={false} defaultExpanded={true} /> @@ -253,16 +767,27 @@ export function TaskFollowUpSection({ )}
-
+
{ setFollowUpMessage(value); + localDirtyRef.current = true; if (followUpError) setFollowUpError(null); }} onKeyDown={(e) => { @@ -271,23 +796,110 @@ export function TaskFollowUpSection({ if (canSendFollowUp && !isSendingFollowUp) { onSendFollowUp(); } + } else if (e.key === 'Escape') { + // Clear input and auto-cancel queue + e.preventDefault(); + setFollowUpMessage(''); } }} - className="flex-1 min-h-[40px] resize-none" - disabled={!canTypeFollowUp} + className={cn( + 'flex-1 min-h-[40px] resize-none', + showFadeOverlay && 'placeholder-transparent' + )} + // Edits are disallowed while queued or in transition + disabled={isInputDisabled} projectId={projectId} rows={1} maxRows={6} /> + {showFadeOverlay && fadeOverlayText && ( +
+ {fadeOverlayText} +
+ )} + {(isUnqueuing || !isDraftReady) && ( +
+ +
+ )}
-
+ {/* Status row: reserved space above action buttons to avoid layout shift */} +
+ {/* Left side: save state or conflicts */} +
+ {saveStatus === 'saving' ? ( + + Saving… + + ) : saveStatus === 'offline' ? ( + + Offline — changes pending + + ) : saveStatus === 'saved' ? ( + + Saved + + ) : saveStatus === 'sent' ? ( + + Sent + + ) : null} +
+ {/* Right side: queued/sending status */} +
+ {isUnqueuing ? ( + + Unlocking… + + ) : !isDraftReady ? ( + + Loading + draft… + + ) : isDraftSending ? ( + + Sending + follow-up… + + ) : isQueued ? ( + + Queued for next turn. Edits + are locked. + + ) : null} +
+
+
{/* Image button */}
-
- {comments.length > 0 && ( - - )} - {isAttemptRunning ? ( - - ) : ( + {/* (removed) old inline notices now replaced by the status row above */} + + {isAttemptRunning ? ( + + ) : ( +
+ {comments.length > 0 && ( + + )} - )} -
+ {isQueued && ( + + )} +
+ )} + {isAttemptRunning && ( +
+ +
+ )}
diff --git a/frontend/src/hooks/useJsonPatchStream.ts b/frontend/src/hooks/useJsonPatchStream.ts index c0703064..a2e00c57 100644 --- a/frontend/src/hooks/useJsonPatchStream.ts +++ b/frontend/src/hooks/useJsonPatchStream.ts @@ -33,6 +33,20 @@ export const useJsonPatchStream = ( const [error, setError] = useState(null); const eventSourceRef = useRef(null); const dataRef = useRef(undefined); + const retryTimerRef = useRef(null); + const retryAttemptsRef = useRef(0); + const [retryNonce, setRetryNonce] = useState(0); + + function scheduleReconnect() { + if (retryTimerRef.current) return; // already scheduled + // Exponential backoff with cap: 1s, 2s, 4s, 8s (max), then stay at 8s + const attempt = retryAttemptsRef.current; + const delay = Math.min(8000, 1000 * Math.pow(2, attempt)); + retryTimerRef.current = window.setTimeout(() => { + retryTimerRef.current = null; + setRetryNonce((n) => n + 1); + }, delay); + } useEffect(() => { if (!enabled || !endpoint) { @@ -41,6 +55,11 @@ export const useJsonPatchStream = ( eventSourceRef.current.close(); eventSourceRef.current = null; } + if (retryTimerRef.current) { + window.clearTimeout(retryTimerRef.current); + retryTimerRef.current = null; + } + retryAttemptsRef.current = 0; setData(undefined); setIsConnected(false); setError(null); @@ -67,6 +86,12 @@ export const useJsonPatchStream = ( eventSource.onopen = () => { setError(null); setIsConnected(true); + // Reset backoff on successful connection + retryAttemptsRef.current = 0; + if (retryTimerRef.current) { + window.clearTimeout(retryTimerRef.current); + retryTimerRef.current = null; + } }; eventSource.addEventListener('json_patch', (event) => { @@ -96,12 +121,23 @@ export const useJsonPatchStream = ( eventSource.close(); eventSourceRef.current = null; setIsConnected(false); + // Treat finished as terminal and schedule reconnect; servers may rotate + retryAttemptsRef.current += 1; + scheduleReconnect(); }); eventSource.onerror = () => { setError('Connection failed'); + // Close and schedule reconnect + try { + eventSource.close(); + } catch { + /* empty */ + } eventSourceRef.current = null; setIsConnected(false); + retryAttemptsRef.current += 1; + scheduleReconnect(); }; eventSourceRef.current = eventSource; @@ -112,6 +148,10 @@ export const useJsonPatchStream = ( eventSourceRef.current.close(); eventSourceRef.current = null; } + if (retryTimerRef.current) { + window.clearTimeout(retryTimerRef.current); + retryTimerRef.current = null; + } dataRef.current = undefined; setData(undefined); }; @@ -121,6 +161,7 @@ export const useJsonPatchStream = ( initialData, options.injectInitialEntry, options.deduplicatePatches, + retryNonce, ]); return { data, isConnected, error }; diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 1e1ec410..c95169ae 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -38,10 +38,16 @@ import { ImageResponse, RestoreAttemptRequest, RestoreAttemptResult, + FollowUpDraftResponse, + UpdateFollowUpDraftRequest, } from 'shared/types'; // Re-export types for convenience export type { RepositoryInfo } from 'shared/types'; +export type { + FollowUpDraftResponse, + UpdateFollowUpDraftRequest, +} from 'shared/types'; export class ApiError extends Error { public status?: number; @@ -358,6 +364,50 @@ export const attemptsApi = { return handleApiResponse(response); }, + getFollowUpDraft: async ( + attemptId: string + ): Promise => { + const response = await makeRequest( + `/api/task-attempts/${attemptId}/follow-up-draft` + ); + return handleApiResponse(response); + }, + + saveFollowUpDraft: async ( + attemptId: string, + data: UpdateFollowUpDraftRequest + ): Promise => { + const response = await makeRequest( + `/api/task-attempts/${attemptId}/follow-up-draft`, + { + // Server expects PUT for saving/updating the draft + method: 'PUT', + body: JSON.stringify(data), + } + ); + return handleApiResponse(response); + }, + + setFollowUpQueue: async ( + attemptId: string, + queued: boolean, + expectedQueued?: boolean, + expectedVersion?: number + ): Promise => { + const response = await makeRequest( + `/api/task-attempts/${attemptId}/follow-up-draft/queue`, + { + method: 'POST', + body: JSON.stringify({ + queued, + expected_queued: expectedQueued, + expected_version: expectedVersion, + }), + } + ); + return handleApiResponse(response); + }, + deleteFile: async ( attemptId: string, fileToDelete: string diff --git a/frontend/src/lib/keyboard-shortcuts.ts b/frontend/src/lib/keyboard-shortcuts.ts index ae37204d..a2ed772d 100644 --- a/frontend/src/lib/keyboard-shortcuts.ts +++ b/frontend/src/lib/keyboard-shortcuts.ts @@ -173,7 +173,7 @@ export function useKanbanKeyboardNavigation({ focusedStatus: string | null; setFocusedStatus: (status: string | null) => void; groupedTasks: Record; - filteredTasks: any[]; + filteredTasks: unknown[]; allTaskStatuses: string[]; onViewTaskDetails?: (task: any) => void; preserveIndexOnColumnSwitch?: boolean; diff --git a/frontend/src/pages/settings/AgentSettings.tsx b/frontend/src/pages/settings/AgentSettings.tsx index e0477bf2..a971889e 100644 --- a/frontend/src/pages/settings/AgentSettings.tsx +++ b/frontend/src/pages/settings/AgentSettings.tsx @@ -67,12 +67,12 @@ export function AgentSettings() { }, [serverProfilesContent, serverParsedProfiles, isDirty]); // Sync raw profiles with parsed profiles - const syncRawProfiles = (profiles: any) => { + const syncRawProfiles = (profiles: unknown) => { setLocalProfilesContent(JSON.stringify(profiles, null, 2)); }; // Mark profiles as dirty - const markDirty = (nextProfiles: any) => { + const markDirty = (nextProfiles: unknown) => { setLocalParsedProfiles(nextProfiles); syncRawProfiles(nextProfiles); setIsDirty(true); @@ -216,7 +216,7 @@ export function AgentSettings() { // Show success setProfilesSuccess(true); setTimeout(() => setProfilesSuccess(false), 3000); - } catch (saveError: any) { + } catch (saveError: unknown) { console.error('Failed to save deletion to backend:', saveError); } } catch (error) { @@ -256,7 +256,7 @@ export function AgentSettings() { if (useFormEditor && localParsedProfiles) { setLocalProfilesContent(contentToSave); } - } catch (err: any) { + } catch (err: unknown) { console.error('Failed to save profiles:', err); } }; @@ -264,7 +264,7 @@ export function AgentSettings() { const handleExecutorConfigChange = ( executorType: string, configuration: string, - formData: any + formData: unknown ) => { if (!localParsedProfiles || !localParsedProfiles.executors) return; @@ -285,7 +285,7 @@ export function AgentSettings() { markDirty(updatedProfiles); }; - const handleExecutorConfigSave = async (formData: any) => { + const handleExecutorConfigSave = async (formData: unknown) => { if (!localParsedProfiles || !localParsedProfiles.executors) return; // Update the parsed profiles with the saved config @@ -316,7 +316,7 @@ export function AgentSettings() { // Update the local content as well setLocalProfilesContent(contentToSave); - } catch (err: any) { + } catch (err: unknown) { console.error('Failed to save profiles:', err); } }; diff --git a/shared/types.ts b/shared/types.ts index 5a4d1fdb..438a350e 100644 --- a/shared/types.ts +++ b/shared/types.ts @@ -76,6 +76,10 @@ export type GetMcpServerResponse = { mcp_config: McpConfig, config_path: string, export type CreateFollowUpAttempt = { prompt: string, variant: string | null, image_ids: Array | null, }; +export type FollowUpDraftResponse = { task_attempt_id: string, prompt: string, queued: boolean, variant: string | null, image_ids: Array | null, version: bigint, }; + +export type UpdateFollowUpDraftRequest = { prompt: string | null, variant: string | null | null, image_ids: Array | null, version: bigint | null, }; + export type CreateGitHubPrRequest = { title: string, body: string | null, base_branch: string | null, }; export type ImageResponse = { id: string, file_path: string, original_name: string, mime_type: string | null, size_bytes: bigint, hash: string, created_at: string, updated_at: string, }; @@ -236,7 +240,9 @@ export type EventPatch = { op: string, path: string, value: EventPatchInner, }; export type EventPatchInner = { db_op: string, record: RecordTypes, }; -export type RecordTypes = { "type": "TASK", "data": Task } | { "type": "TASK_ATTEMPT", "data": TaskAttempt } | { "type": "EXECUTION_PROCESS", "data": ExecutionProcess } | { "type": "DELETED_TASK", "data": { rowid: bigint, project_id: string | null, task_id: string | null, } } | { "type": "DELETED_TASK_ATTEMPT", "data": { rowid: bigint, task_id: string | null, } } | { "type": "DELETED_EXECUTION_PROCESS", "data": { rowid: bigint, task_attempt_id: string | null, } }; +export type RecordTypes = { "type": "TASK", "data": Task } | { "type": "TASK_ATTEMPT", "data": TaskAttempt } | { "type": "EXECUTION_PROCESS", "data": ExecutionProcess } | { "type": "FOLLOW_UP_DRAFT", "data": FollowUpDraft } | { "type": "DELETED_TASK", "data": { rowid: bigint, project_id: string | null, task_id: string | null, } } | { "type": "DELETED_TASK_ATTEMPT", "data": { rowid: bigint, task_id: string | null, } } | { "type": "DELETED_EXECUTION_PROCESS", "data": { rowid: bigint, task_attempt_id: string | null, } } | { "type": "DELETED_FOLLOW_UP_DRAFT", "data": { rowid: bigint, task_attempt_id: string | null, } }; + +export type FollowUpDraft = { id: string, task_attempt_id: string, prompt: string, queued: boolean, sending: boolean, variant: string | null, image_ids: Array | null, created_at: string, updated_at: string, version: bigint, }; export type CommandExitStatus = { "type": "exit_code", code: number, } | { "type": "success", success: boolean, };