diff --git a/crates/server/src/routes/task_attempts.rs b/crates/server/src/routes/task_attempts.rs index 48d7d61d..a1969370 100644 --- a/crates/server/src/routes/task_attempts.rs +++ b/crates/server/src/routes/task_attempts.rs @@ -55,169 +55,6 @@ pub struct FollowUpResponse { pub actual_attempt_id: Uuid, pub created_new_attempt: bool, } -// #[derive(Debug, Serialize, TS)] -// // pub struct ProcessLogsResponse { -// pub id: Uuid, -// pub process_type: ExecutionProcessType, -// pub command: String, -// pub executor_type: Option, -// pub status: ExecutionProcessStatus, -// pub normalized_conversation: NormalizedConversation, -// } - -// // Helper to normalize logs for a process (extracted from get_execution_process_normalized_logs) -// async fn normalize_process_logs( -// db_pool: &SqlitePool, -// process: &ExecutionProcess, -// ) -> NormalizedConversation { -// use crate::models::{ -// execution_process::ExecutionProcessType, executor_session::ExecutorSession, -// }; -// let executor_session = ExecutorSession::find_by_execution_process_id(db_pool, process.id) -// .await -// .ok() -// .flatten(); - -// let has_stdout = process -// .stdout -// .as_ref() -// .map(|s| !s.trim().is_empty()) -// .unwrap_or(false); -// let has_stderr = process -// .stderr -// .as_ref() -// .map(|s| !s.trim().is_empty()) -// .unwrap_or(false); - -// if !has_stdout && !has_stderr { -// return NormalizedConversation { -// entries: vec![], -// session_id: None, -// executor_type: process -// .executor_type -// .clone() -// .unwrap_or("unknown".to_string()), -// prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), -// summary: executor_session.as_ref().and_then(|s| s.summary.clone()), -// }; -// } - -// // Parse stdout as JSONL using executor normalization -// let mut stdout_entries = Vec::new(); -// if let Some(stdout) = &process.stdout { -// if !stdout.trim().is_empty() { -// let executor_type = process.executor_type.as_deref().unwrap_or("unknown"); -// let executor_config = if process.process_type == ExecutionProcessType::SetupScript { -// ExecutorConfig::SetupScript { -// script: executor_session -// .as_ref() -// .and_then(|s| s.prompt.clone()) -// .unwrap_or_else(|| "setup script".to_string()), -// } -// } else { -// match executor_type.to_string().parse() { -// Ok(config) => config, -// Err(_) => { -// return NormalizedConversation { -// entries: vec![], -// session_id: None, -// executor_type: executor_type.to_string(), -// prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), -// summary: executor_session.as_ref().and_then(|s| s.summary.clone()), -// }; -// } -// } -// }; -// let executor = executor_config.create_executor(); -// let working_dir_path = match std::fs::canonicalize(&process.working_directory) { -// Ok(canonical_path) => canonical_path.to_string_lossy().to_string(), -// Err(_) => process.working_directory.clone(), -// }; -// if let Ok(normalized) = executor.normalize_logs(stdout, &working_dir_path) { -// stdout_entries = normalized.entries; -// } -// } -// } -// // Parse stderr chunks separated by boundary markers -// let mut stderr_entries = Vec::new(); -// if let Some(stderr) = &process.stderr { -// let trimmed = stderr.trim(); -// if !trimmed.is_empty() { -// let chunks: Vec<&str> = trimmed.split("---STDERR_CHUNK_BOUNDARY---").collect(); -// for chunk in chunks { -// let chunk_trimmed = chunk.trim(); -// if !chunk_trimmed.is_empty() { -// let filtered_content = chunk_trimmed.replace("---STDERR_CHUNK_BOUNDARY---", ""); -// if !filtered_content.trim().is_empty() { -// stderr_entries.push(NormalizedEntry { -// timestamp: Some(chrono::Utc::now().to_rfc3339()), -// entry_type: NormalizedEntryType::ErrorMessage, -// content: filtered_content.trim().to_string(), -// metadata: None, -// }); -// } -// } -// } -// } -// } -// let mut all_entries = Vec::new(); -// all_entries.extend(stdout_entries); -// all_entries.extend(stderr_entries); -// all_entries.sort_by(|a, b| match (&a.timestamp, &b.timestamp) { -// (Some(a_ts), Some(b_ts)) => a_ts.cmp(b_ts), -// (Some(_), None) => std::cmp::Ordering::Less, -// (None, Some(_)) => std::cmp::Ordering::Greater, -// (None, None) => std::cmp::Ordering::Equal, -// }); -// let executor_type = if process.process_type == ExecutionProcessType::SetupScript { -// "setup-script".to_string() -// } else { -// process -// .executor_type -// .clone() -// .unwrap_or("unknown".to_string()) -// }; -// NormalizedConversation { -// entries: all_entries, -// session_id: None, -// executor_type, -// prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), -// summary: executor_session.as_ref().and_then(|s| s.summary.clone()), -// } -// } - -// /// Get all normalized logs for all execution processes of a task attempt -// pub async fn get_task_attempt_all_logs( -// Extension(_project): Extension, -// Extension(_task): Extension, -// Extension(task_attempt): Extension, -// State(app_state): State, -// ) -> Result>>, StatusCode> { -// // Fetch all execution processes for this attempt -// let processes = match ExecutionProcess::find_by_task_attempt_id( -// &app_state.db_pool, -// task_attempt.id, -// ) -// .await -// { -// Ok(list) => list, -// Err(_) => return Err(StatusCode::INTERNAL_SERVER_ERROR), -// }; -// // For each process, normalize logs -// let mut result = Vec::new(); -// for process in processes { -// let normalized_conversation = normalize_process_logs(&app_state.db_pool, &process).await; -// result.push(ProcessLogsResponse { -// id: process.id, -// process_type: process.process_type.clone(), -// command: process.command.clone(), -// executor_type: process.executor_type.clone(), -// status: process.status.clone(), -// normalized_conversation, -// }); -// } -// Ok(Json(ApiResponse::success(result))) -// } #[derive(Debug, Deserialize)] pub struct TaskAttemptQuery { @@ -849,130 +686,6 @@ pub async fn start_dev_server( Ok(ResponseJson(ApiResponse::success(()))) } -// /// Find plan content with context by searching through multiple processes in the same attempt -// async fn find_plan_content_with_context( -// pool: &SqlitePool, -// attempt_id: Uuid, -// ) -> Result { -// // Get all execution processes for this attempt -// let execution_processes = -// match ExecutionProcess::find_by_task_attempt_id(pool, attempt_id).await { -// Ok(processes) => processes, -// Err(e) => { -// tracing::error!( -// "Failed to fetch execution processes for attempt {}: {}", -// attempt_id, -// e -// ); -// return Err(StatusCode::INTERNAL_SERVER_ERROR); -// } -// }; - -// // Look for claudeplan processes (most recent first) -// for claudeplan_process in execution_processes -// .iter() -// .rev() -// .filter(|p| p.executor_type.as_deref() == Some("claude-plan")) -// { -// if let Some(stdout) = &claudeplan_process.stdout { -// if !stdout.trim().is_empty() { -// // Create executor and normalize logs -// let executor_config = ExecutorConfig::ClaudePlan; -// let executor = executor_config.create_executor(); - -// // Use working directory for normalization -// let working_dir_path = -// match std::fs::canonicalize(&claudeplan_process.working_directory) { -// Ok(canonical_path) => canonical_path.to_string_lossy().to_string(), -// Err(_) => claudeplan_process.working_directory.clone(), -// }; - -// // Normalize logs and extract plan content -// match executor.normalize_logs(stdout, &working_dir_path) { -// Ok(normalized_conversation) => { -// // Search for plan content in the normalized conversation -// if let Some(plan_content) = normalized_conversation -// .entries -// .iter() -// .rev() -// .find_map(|entry| { -// if let NormalizedEntryType::ToolUse { -// action_type: ActionType::PlanPresentation { plan }, -// .. -// } = &entry.entry_type -// { -// Some(plan.clone()) -// } else { -// None -// } -// }) -// { -// return Ok(plan_content); -// } -// } -// Err(_) => { -// continue; -// } -// } -// } -// } -// } - -// tracing::error!( -// "No claudeplan content found in any process in attempt {}", -// attempt_id -// ); -// Err(StatusCode::NOT_FOUND) -// } - -// pub async fn approve_plan( -// Extension(project): Extension, -// Extension(task): Extension, -// Extension(task_attempt): Extension, -// State(app_state): State, -// ) -> Result>, StatusCode> { -// let current_task = &task; - -// // Find plan content with context across the task hierarchy -// let plan_content = find_plan_content_with_context(&app_state.db_pool, task_attempt.id).await?; - -// use crate::models::task::CreateTask; -// let new_task_id = Uuid::new_v4(); -// let create_task_data = CreateTask { -// project_id: project.id, -// title: format!("Execute Plan: {}", current_task.title), -// description: Some(plan_content), -// parent_task_attempt: Some(task_attempt.id), -// }; - -// let new_task = match Task::create(&app_state.db_pool, &create_task_data, new_task_id).await { -// Ok(task) => task, -// Err(e) => { -// tracing::error!("Failed to create new task: {}", e); -// return Err(StatusCode::INTERNAL_SERVER_ERROR); -// } -// }; - -// // Mark original task as completed since it now has children -// if let Err(e) = -// Task::update_status(&app_state.db_pool, task.id, project.id, TaskStatus::Done).await -// { -// tracing::error!("Failed to update original task status to Done: {}", e); -// return Err(StatusCode::INTERNAL_SERVER_ERROR); -// } else { -// tracing::info!( -// "Original task {} marked as Done after plan approval (has children)", -// task.id -// ); -// } - -// Ok(ResponseJson(ApiResponse::success(FollowUpResponse { -// message: format!("Plan approved and new task created: {}", new_task.title), -// actual_attempt_id: new_task_id, // Return the new task ID -// created_new_attempt: true, -// }))) -// } - pub async fn get_task_attempt_children( Extension(task_attempt): Extension, State(deployment): State, @@ -990,20 +703,6 @@ pub async fn get_task_attempt_children( } } -// pub fn task_attempts_with_id_router(_state: AppState) -> Router { -// use axum::routing::post; - -// Router::new() -// .route( -// "/projects/:project_id/tasks/:task_id/attempts/:attempt_id/approve-plan", -// post(approve_plan), -// ) -// .merge( -// Router::new() -// .route_layer(from_fn_with_state(_state.clone(), load_task_attempt_middleware)) -// ) -// } - pub async fn stop_task_attempt_execution( Extension(task_attempt): Extension, State(deployment): State, diff --git a/crates/services/src/services/events.rs b/crates/services/src/services/events.rs index 2ca7a9a5..9d714b73 100644 --- a/crates/services/src/services/events.rs +++ b/crates/services/src/services/events.rs @@ -150,7 +150,6 @@ impl EventService { } } } - _ => unreachable!(), }; let next_entry_count = {