diff --git a/backend/src/bin/generate_types.rs b/backend/src/bin/generate_types.rs index dd423a36..0b69fea8 100644 --- a/backend/src/bin/generate_types.rs +++ b/backend/src/bin/generate_types.rs @@ -111,6 +111,7 @@ fn generate_types_content() -> String { vibe_kanban::routes::filesystem::DirectoryEntry::decl(), vibe_kanban::routes::filesystem::DirectoryListResponse::decl(), vibe_kanban::routes::auth::DeviceStartResponse::decl(), + vibe_kanban::routes::task_attempts::ProcessLogsResponse::decl(), vibe_kanban::models::task_attempt::DiffChunkType::decl(), vibe_kanban::models::task_attempt::DiffChunk::decl(), vibe_kanban::models::task_attempt::FileDiff::decl(), diff --git a/backend/src/routes/task_attempts.rs b/backend/src/routes/task_attempts.rs index 0c95a91d..a4d58351 100644 --- a/backend/src/routes/task_attempts.rs +++ b/backend/src/routes/task_attempts.rs @@ -7,6 +7,7 @@ use axum::{ }; use serde::{Deserialize, Serialize}; use sqlx::SqlitePool; +use ts_rs::TS; use uuid::Uuid; use crate::{ @@ -16,8 +17,9 @@ use crate::{ }, models::{ config::Config, - execution_process::{ExecutionProcess, ExecutionProcessSummary, ExecutionProcessType}, - executor_session::ExecutorSession, + execution_process::{ + ExecutionProcess, ExecutionProcessStatus, ExecutionProcessSummary, ExecutionProcessType, + }, task::{Task, TaskStatus}, task_attempt::{ BranchStatus, CreateFollowUpAttempt, CreatePrParams, CreateTaskAttempt, TaskAttempt, @@ -46,6 +48,176 @@ pub struct FollowUpResponse { pub created_new_attempt: bool, } +#[derive(Debug, Serialize, TS)] +#[ts(export)] +pub struct ProcessLogsResponse { + pub id: Uuid, + pub process_type: ExecutionProcessType, + pub command: String, + pub executor_type: Option, + pub status: ExecutionProcessStatus, + pub normalized_conversation: NormalizedConversation, +} + +// Helper to normalize logs for a process (extracted from get_execution_process_normalized_logs) +async fn normalize_process_logs( + db_pool: &SqlitePool, + process: &ExecutionProcess, +) -> NormalizedConversation { + use crate::models::{ + execution_process::ExecutionProcessType, executor_session::ExecutorSession, + }; + let executor_session = ExecutorSession::find_by_execution_process_id(db_pool, process.id) + .await + .ok() + .flatten(); + + let has_stdout = process + .stdout + .as_ref() + .map(|s| !s.trim().is_empty()) + .unwrap_or(false); + let has_stderr = process + .stderr + .as_ref() + .map(|s| !s.trim().is_empty()) + .unwrap_or(false); + + if !has_stdout && !has_stderr { + return NormalizedConversation { + entries: vec![], + session_id: None, + executor_type: process + .executor_type + .clone() + .unwrap_or("unknown".to_string()), + prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), + summary: executor_session.as_ref().and_then(|s| s.summary.clone()), + }; + } + + // Parse stdout as JSONL using executor normalization + let mut stdout_entries = Vec::new(); + if let Some(stdout) = &process.stdout { + if !stdout.trim().is_empty() { + let executor_type = process.executor_type.as_deref().unwrap_or("unknown"); + let executor_config = if process.process_type == ExecutionProcessType::SetupScript { + ExecutorConfig::SetupScript { + script: executor_session + .as_ref() + .and_then(|s| s.prompt.clone()) + .unwrap_or_else(|| "setup script".to_string()), + } + } else { + match executor_type.to_string().parse() { + Ok(config) => config, + Err(_) => { + return NormalizedConversation { + entries: vec![], + session_id: None, + executor_type: executor_type.to_string(), + prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), + summary: executor_session.as_ref().and_then(|s| s.summary.clone()), + }; + } + } + }; + let executor = executor_config.create_executor(); + let working_dir_path = match std::fs::canonicalize(&process.working_directory) { + Ok(canonical_path) => canonical_path.to_string_lossy().to_string(), + Err(_) => process.working_directory.clone(), + }; + if let Ok(normalized) = executor.normalize_logs(stdout, &working_dir_path) { + stdout_entries = normalized.entries; + } + } + } + // Parse stderr chunks separated by boundary markers + let mut stderr_entries = Vec::new(); + if let Some(stderr) = &process.stderr { + let trimmed = stderr.trim(); + if !trimmed.is_empty() { + let chunks: Vec<&str> = trimmed.split("---STDERR_CHUNK_BOUNDARY---").collect(); + for chunk in chunks { + let chunk_trimmed = chunk.trim(); + if !chunk_trimmed.is_empty() { + let filtered_content = chunk_trimmed.replace("---STDERR_CHUNK_BOUNDARY---", ""); + if !filtered_content.trim().is_empty() { + stderr_entries.push(NormalizedEntry { + timestamp: Some(chrono::Utc::now().to_rfc3339()), + entry_type: NormalizedEntryType::ErrorMessage, + content: filtered_content.trim().to_string(), + metadata: None, + }); + } + } + } + } + } + let mut all_entries = Vec::new(); + all_entries.extend(stdout_entries); + all_entries.extend(stderr_entries); + all_entries.sort_by(|a, b| match (&a.timestamp, &b.timestamp) { + (Some(a_ts), Some(b_ts)) => a_ts.cmp(b_ts), + (Some(_), None) => std::cmp::Ordering::Less, + (None, Some(_)) => std::cmp::Ordering::Greater, + (None, None) => std::cmp::Ordering::Equal, + }); + let executor_type = if process.process_type == ExecutionProcessType::SetupScript { + "setup-script".to_string() + } else { + process + .executor_type + .clone() + .unwrap_or("unknown".to_string()) + }; + NormalizedConversation { + entries: all_entries, + session_id: None, + executor_type, + prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), + summary: executor_session.as_ref().and_then(|s| s.summary.clone()), + } +} + +/// Get all normalized logs for all execution processes of a task attempt +pub async fn get_task_attempt_all_logs( + Path((project_id, task_id, attempt_id)): Path<(Uuid, Uuid, Uuid)>, + State(app_state): State, +) -> Result>>, StatusCode> { + // Validate attempt belongs to task and project + let _ctx = match TaskAttempt::load_context(&app_state.db_pool, attempt_id, task_id, project_id) + .await + { + Ok(ctx) => ctx, + Err(_) => return Err(StatusCode::NOT_FOUND), + }; + // Fetch all execution processes for this attempt + let processes = + match ExecutionProcess::find_by_task_attempt_id(&app_state.db_pool, attempt_id).await { + Ok(list) => list, + Err(_) => return Err(StatusCode::INTERNAL_SERVER_ERROR), + }; + // For each process, normalize logs + let mut result = Vec::new(); + for process in processes { + let normalized_conversation = normalize_process_logs(&app_state.db_pool, &process).await; + result.push(ProcessLogsResponse { + id: process.id, + process_type: process.process_type.clone(), + command: process.command.clone(), + executor_type: process.executor_type.clone(), + status: process.status.clone(), + normalized_conversation, + }); + } + Ok(Json(ApiResponse { + success: true, + data: Some(result), + message: None, + })) +} + pub async fn get_task_attempts( Path((project_id, task_id)): Path<(Uuid, Uuid)>, State(app_state): State, @@ -988,228 +1160,6 @@ pub async fn get_task_attempt_execution_state( } } -pub async fn get_execution_process_normalized_logs( - Path((project_id, process_id)): Path<(Uuid, Uuid)>, - State(app_state): State, -) -> Result>, StatusCode> { - // Get the execution process and verify it belongs to the correct project - let process = match ExecutionProcess::find_by_id(&app_state.db_pool, process_id).await { - Ok(Some(process)) => process, - Ok(None) => return Err(StatusCode::NOT_FOUND), - Err(e) => { - tracing::error!("Failed to fetch execution process {}: {}", process_id, e); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - }; - - // Verify the process belongs to a task attempt in the correct project - let attempt = match TaskAttempt::find_by_id(&app_state.db_pool, process.task_attempt_id).await { - Ok(Some(attempt)) => attempt, - Ok(None) => return Err(StatusCode::NOT_FOUND), - Err(e) => { - tracing::error!("Failed to fetch task attempt: {}", e); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - }; - - let _task = match Task::find_by_id(&app_state.db_pool, attempt.task_id).await { - Ok(Some(task)) if task.project_id == project_id => task, - Ok(Some(_)) => return Err(StatusCode::NOT_FOUND), // Wrong project - Ok(None) => return Err(StatusCode::NOT_FOUND), - Err(e) => { - tracing::error!("Failed to fetch task: {}", e); - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } - }; - - // Get executor session data for this execution process - let executor_session = - match ExecutorSession::find_by_execution_process_id(&app_state.db_pool, process_id).await { - Ok(session) => session, - Err(e) => { - tracing::error!( - "Failed to fetch executor session for process {}: {}", - process_id, - e - ); - None - } - }; - - // Check if logs are available - let has_stdout = - process.stdout.is_some() && !process.stdout.as_ref().unwrap().trim().is_empty(); - let has_stderr = - process.stderr.is_some() && !process.stderr.as_ref().unwrap().trim().is_empty(); - - // If no logs available, return empty conversation - if !has_stdout && !has_stderr { - let empty_conversation = NormalizedConversation { - entries: vec![], - session_id: None, - executor_type: process - .executor_type - .clone() - .unwrap_or("unknown".to_string()), - prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), - summary: executor_session.as_ref().and_then(|s| s.summary.clone()), - }; - - return Ok(ResponseJson(ApiResponse { - success: true, - data: Some(empty_conversation), - message: None, - })); - } - - // Parse stdout as JSONL using executor normalization - let mut stdout_entries = Vec::new(); - if let Some(stdout) = &process.stdout { - if !stdout.trim().is_empty() { - // Determine executor type and create appropriate executor for normalization - let executor_type = process.executor_type.as_deref().unwrap_or("unknown"); - let executor_config = if process.process_type == ExecutionProcessType::SetupScript { - // For setup scripts, use the setup script executor - ExecutorConfig::SetupScript { - script: executor_session - .as_ref() - .and_then(|s| s.prompt.clone()) - .unwrap_or_else(|| "setup script".to_string()), - } - } else { - match executor_type.to_string().parse() { - Ok(config) => config, - Err(_) => { - tracing::warn!( - "Unsupported executor type: {}, cannot normalize logs properly", - executor_type - ); - return Ok(ResponseJson(ApiResponse { - success: false, - data: None, - message: Some(format!("Unsupported executor type: {}", executor_type)), - })); - } - } - }; - - let executor = executor_config.create_executor(); - - // Use the working directory path for normalization - // Try to canonicalize if the directory exists, otherwise use the stored path as-is - let working_dir_path = match std::fs::canonicalize(&process.working_directory) { - Ok(canonical_path) => { - tracing::debug!( - "Using canonical path for normalization: {}", - canonical_path.display() - ); - canonical_path.to_string_lossy().to_string() - } - Err(_) => { - tracing::debug!( - "Working directory {} no longer exists, using stored path for normalization", - process.working_directory - ); - process.working_directory.clone() - } - }; - - // Normalize stdout logs with error handling - match executor.normalize_logs(stdout, &working_dir_path) { - Ok(normalized) => { - stdout_entries = normalized.entries; - tracing::debug!( - "Successfully normalized {} stdout entries for process {}", - stdout_entries.len(), - process_id - ); - } - Err(e) => { - tracing::error!( - "Failed to normalize stdout for process {}: {}", - process_id, - e - ); - return Ok(ResponseJson(ApiResponse { - success: false, - data: None, - message: Some(format!("Failed to normalize logs: {}", e)), - })); - } - } - } - } - - // Parse stderr chunks separated by boundary markers - let mut stderr_entries = Vec::new(); - if let Some(stderr) = &process.stderr { - let trimmed = stderr.trim(); - if !trimmed.is_empty() { - // Split stderr by chunk boundaries and create separate error messages - let chunks: Vec<&str> = trimmed.split("---STDERR_CHUNK_BOUNDARY---").collect(); - - for chunk in chunks { - let chunk_trimmed = chunk.trim(); - if !chunk_trimmed.is_empty() { - // Filter out any remaining boundary markers from the chunk content - let filtered_content = chunk_trimmed.replace("---STDERR_CHUNK_BOUNDARY---", ""); - if !filtered_content.trim().is_empty() { - stderr_entries.push(NormalizedEntry { - timestamp: Some(chrono::Utc::now().to_rfc3339()), - entry_type: NormalizedEntryType::ErrorMessage, - content: filtered_content.trim().to_string(), - metadata: None, - }); - } - } - } - - tracing::debug!( - "Processed stderr content into {} error messages for process {}", - stderr_entries.len(), - process_id - ); - } - } - - // Merge stdout and stderr entries chronologically - let mut all_entries = Vec::new(); - all_entries.extend(stdout_entries); - all_entries.extend(stderr_entries); - - // Sort by timestamp (entries without timestamps go to the end) - all_entries.sort_by(|a, b| match (&a.timestamp, &b.timestamp) { - (Some(a_ts), Some(b_ts)) => a_ts.cmp(b_ts), - (Some(_), None) => std::cmp::Ordering::Less, - (None, Some(_)) => std::cmp::Ordering::Greater, - (None, None) => std::cmp::Ordering::Equal, - }); - - // Create final normalized conversation - let executor_type = if process.process_type == ExecutionProcessType::SetupScript { - "setup-script".to_string() - } else { - process - .executor_type - .clone() - .unwrap_or("unknown".to_string()) - }; - - let normalized_conversation = NormalizedConversation { - entries: all_entries, - session_id: None, - executor_type, - prompt: executor_session.as_ref().and_then(|s| s.prompt.clone()), - summary: executor_session.as_ref().and_then(|s| s.summary.clone()), - }; - - Ok(ResponseJson(ApiResponse { - success: true, - data: Some(normalized_conversation), - message: None, - })) -} - /// Find plan content with context by searching through multiple processes in the same attempt async fn find_plan_content_with_context( pool: &SqlitePool, @@ -1456,8 +1406,8 @@ pub fn task_attempts_router() -> Router { get(get_execution_process), ) .route( - "/projects/:project_id/execution-processes/:process_id/normalized-logs", - get(get_execution_process_normalized_logs), + "/projects/:project_id/tasks/:task_id/attempts/:attempt_id/logs", + get(get_task_attempt_all_logs), ) .route( "/projects/:project_id/tasks/:task_id/attempts/:attempt_id/follow-up", diff --git a/frontend/src/components/context/TaskDetailsContextProvider.tsx b/frontend/src/components/context/TaskDetailsContextProvider.tsx index 303f370a..c49befde 100644 --- a/frontend/src/components/context/TaskDetailsContextProvider.tsx +++ b/frontend/src/components/context/TaskDetailsContextProvider.tsx @@ -32,7 +32,7 @@ import { TaskRelatedTasksContext, TaskSelectedAttemptContext, } from './taskDetailsContext.ts'; -import { AttemptData } from '@/lib/types.ts'; +import type { AttemptData } from '@/lib/types.ts'; const TaskDetailsProvider: FC<{ task: TaskWithAttemptStatus; @@ -81,6 +81,7 @@ const TaskDetailsProvider: FC<{ const [attemptData, setAttemptData] = useState({ processes: [], runningProcessDetails: {}, + allLogs: [], // new field for all logs }); const diffLoadingRef = useRef(false); @@ -233,13 +234,12 @@ const TaskDetailsProvider: FC<{ if (!task) return; try { - const processesResult = await attemptsApi.getExecutionProcesses( - projectId, - taskId, - attemptId - ); + const [processesResult, allLogsResult] = await Promise.all([ + attemptsApi.getExecutionProcesses(projectId, taskId, attemptId), + attemptsApi.getAllLogs(projectId, taskId, attemptId), + ]); - if (processesResult !== undefined) { + if (processesResult !== undefined && allLogsResult !== undefined) { const runningProcesses = processesResult.filter( (process) => process.status === 'running' ); @@ -277,6 +277,7 @@ const TaskDetailsProvider: FC<{ const newData = { processes: processesResult, runningProcessDetails, + allLogs: allLogsResult, }; if (JSON.stringify(prev) === JSON.stringify(newData)) return prev; return newData; diff --git a/frontend/src/components/tasks/TaskDetails/DiffCard.tsx b/frontend/src/components/tasks/TaskDetails/DiffCard.tsx index 5c0ad268..77163d8d 100644 --- a/frontend/src/components/tasks/TaskDetails/DiffCard.tsx +++ b/frontend/src/components/tasks/TaskDetails/DiffCard.tsx @@ -58,7 +58,7 @@ export function DiffCard({ {isBackgroundRefreshing && (
- +
)} diff --git a/frontend/src/components/tasks/TaskDetails/LogsTab/Conversation.tsx b/frontend/src/components/tasks/TaskDetails/LogsTab/Conversation.tsx index 06802e04..7dd30e10 100644 --- a/frontend/src/components/tasks/TaskDetails/LogsTab/Conversation.tsx +++ b/frontend/src/components/tasks/TaskDetails/LogsTab/Conversation.tsx @@ -9,11 +9,16 @@ import { } from 'react'; import { TaskAttemptDataContext } from '@/components/context/taskDetailsContext.ts'; import { Loader } from '@/components/ui/loader.tsx'; +import { Button } from '@/components/ui/button'; +import Prompt from './Prompt'; +import ConversationEntry from './ConversationEntry'; +import { ConversationEntryDisplayType } from '@/lib/types'; function Conversation() { const { attemptData } = useContext(TaskAttemptDataContext); const [shouldAutoScrollLogs, setShouldAutoScrollLogs] = useState(true); const [conversationUpdateTrigger, setConversationUpdateTrigger] = useState(0); + const [visibleCount, setVisibleCount] = useState(100); const scrollContainerRef = useRef(null); @@ -27,7 +32,7 @@ function Conversation() { scrollContainerRef.current.scrollTop = scrollContainerRef.current.scrollHeight; } - }, [attemptData.processes, conversationUpdateTrigger, shouldAutoScrollLogs]); + }, [attemptData.allLogs, conversationUpdateTrigger, shouldAutoScrollLogs]); const handleLogsScroll = useCallback(() => { if (scrollContainerRef.current) { @@ -43,57 +48,128 @@ function Conversation() { } }, [shouldAutoScrollLogs]); - const mainCodingAgentProcess = useMemo(() => { - let mainCAProcess = Object.values(attemptData.runningProcessDetails).find( - (process) => - process.process_type === 'codingagent' && process.command === 'executor' - ); + // Find main and follow-up processes from allLogs + const mainCodingAgentLog = useMemo( + () => + attemptData.allLogs.find( + (log) => + log.process_type.toLowerCase() === 'codingagent' && + log.command === 'executor' + ), + [attemptData.allLogs] + ); + const followUpLogs = useMemo( + () => + attemptData.allLogs.filter( + (log) => + log.process_type.toLowerCase() === 'codingagent' && + log.command === 'followup_executor' + ), + [attemptData.allLogs] + ); - if (!mainCAProcess) { - const mainCodingAgentSummary = attemptData.processes.find( - (process) => - process.process_type === 'codingagent' && - process.command === 'executor' - ); + // Combine all logs in order (main first, then follow-ups) + const allProcessLogs = useMemo( + () => + [mainCodingAgentLog, ...followUpLogs].filter(Boolean) as Array< + NonNullable + >, + [mainCodingAgentLog, followUpLogs] + ); - if (mainCodingAgentSummary) { - mainCAProcess = Object.values(attemptData.runningProcessDetails).find( - (process) => process.id === mainCodingAgentSummary.id - ); - - if (!mainCAProcess) { - mainCAProcess = { - ...mainCodingAgentSummary, - stdout: null, - stderr: null, - } as any; - } - } - } - return mainCAProcess; - }, [attemptData.processes, attemptData.runningProcessDetails]); - - const followUpProcesses = useMemo(() => { - return attemptData.processes - .filter( - (process) => - process.process_type === 'codingagent' && - process.command === 'followup_executor' - ) - .map((summary) => { - const detailedProcess = Object.values( - attemptData.runningProcessDetails - ).find((process) => process.id === summary.id); - return ( - detailedProcess || - ({ - ...summary, - stdout: null, - stderr: null, - } as any) - ); + // Flatten all entries, keeping process info for each entry + const allEntries = useMemo(() => { + const entries: Array = []; + allProcessLogs.forEach((log, processIndex) => { + if (!log) return; + if (log.status === 'running') return; // Skip static entries for running processes + const processId = String(log.id); // Ensure string + const processPrompt = log.normalized_conversation.prompt || undefined; // Ensure undefined, not null + const entriesArr = log.normalized_conversation.entries || []; + entriesArr.forEach((entry, entryIndex) => { + entries.push({ + entry, + processId, + processPrompt, + processStatus: log.status, + processIsRunning: false, // Only completed processes here + process: log, + isFirstInProcess: entryIndex === 0, + processIndex, + entryIndex, + }); }); - }, [attemptData.processes, attemptData.runningProcessDetails]); + }); + // Sort by timestamp (entries without timestamp go last) + entries.sort((a, b) => { + if (a.entry.timestamp && b.entry.timestamp) { + return a.entry.timestamp.localeCompare(b.entry.timestamp); + } + if (a.entry.timestamp) return -1; + if (b.entry.timestamp) return 1; + return 0; + }); + return entries; + }, [allProcessLogs]); + + // Identify running processes (main + follow-ups) + const runningProcessLogs = useMemo( + () => allProcessLogs.filter((log) => log.status === 'running'), + [allProcessLogs] + ); + + // Paginate: show only the last visibleCount entries + const visibleEntries = useMemo( + () => allEntries.slice(-visibleCount), + [allEntries, visibleCount] + ); + + const renderedVisibleEntries = useMemo( + () => + visibleEntries.map((entry, index) => ( + + )), + [ + visibleEntries, + handleConversationUpdate, + attemptData.runningProcessDetails, + ] + ); + + const renderedRunningProcessLogs = useMemo(() => { + return runningProcessLogs.map((log, i) => { + const runningProcess = attemptData.runningProcessDetails[String(log.id)]; + if (!runningProcess) return null; + // Show prompt only if this is the first entry in the process (i.e., no completed entries for this process) + const showPrompt = + log.normalized_conversation.prompt && + !allEntries.some((e) => e.processId === String(log.id)); + return ( +
0 ? 'mt-8' : ''}> + {showPrompt && ( + + )} + +
+ ); + }); + }, [ + runningProcessLogs, + attemptData.runningProcessDetails, + handleConversationUpdate, + allEntries, + ]); return (
- {mainCodingAgentProcess || followUpProcesses.length > 0 ? ( -
- {mainCodingAgentProcess && ( -
- -
- )} - {followUpProcesses.map((followUpProcess) => ( -
-
- -
- ))} + {visibleCount < allEntries.length && ( +
+
- ) : ( + )} + {visibleEntries.length > 0 && ( +
{renderedVisibleEntries}
+ )} + {/* Render live viewers for running processes (after paginated list) */} + {renderedRunningProcessLogs} + {/* If nothing to show at all, show loader */} + {visibleEntries.length === 0 && runningProcessLogs.length === 0 && ( diff --git a/frontend/src/components/tasks/TaskDetails/LogsTab/ConversationEntry.tsx b/frontend/src/components/tasks/TaskDetails/LogsTab/ConversationEntry.tsx new file mode 100644 index 00000000..89e7746d --- /dev/null +++ b/frontend/src/components/tasks/TaskDetails/LogsTab/ConversationEntry.tsx @@ -0,0 +1,56 @@ +import { ConversationEntryDisplayType } from '@/lib/types'; +import DisplayConversationEntry from '../DisplayConversationEntry'; +import { NormalizedConversationViewer } from './NormalizedConversationViewer'; +import Prompt from './Prompt'; +import { Loader } from '@/components/ui/loader.tsx'; +import { ExecutionProcess } from 'shared/types'; + +type Props = { + item: ConversationEntryDisplayType; + idx: number; + handleConversationUpdate: () => void; + visibleEntriesLength: number; + runningProcessDetails: Record; +}; + +const ConversationEntry = ({ + item, + idx, + handleConversationUpdate, + visibleEntriesLength, + runningProcessDetails, +}: Props) => { + const showPrompt = item.isFirstInProcess && item.processPrompt; + // For running processes, render the live viewer below the static entries + if (item.processIsRunning && idx === visibleEntriesLength - 1) { + // Only render the live viewer for the last entry of a running process + const runningProcess = runningProcessDetails[item.processId]; + if (runningProcess) { + return ( +
+ {showPrompt && } + +
+ ); + } + // Fallback: show loading if not found + return ; + } else { + return ( +
+ {showPrompt && } + +
+ ); + } +}; + +export default ConversationEntry; diff --git a/frontend/src/components/tasks/TaskDetails/LogsTab/NormalizedConversationViewer.tsx b/frontend/src/components/tasks/TaskDetails/LogsTab/NormalizedConversationViewer.tsx index ba0b8100..d749829b 100644 --- a/frontend/src/components/tasks/TaskDetails/LogsTab/NormalizedConversationViewer.tsx +++ b/frontend/src/components/tasks/TaskDetails/LogsTab/NormalizedConversationViewer.tsx @@ -1,25 +1,9 @@ -import { - useCallback, - useContext, - useEffect, - useState, - useMemo, - useRef, -} from 'react'; import { Hammer } from 'lucide-react'; import { Loader } from '@/components/ui/loader.tsx'; -import { executionProcessesApi } from '@/lib/api.ts'; import MarkdownRenderer from '@/components/ui/markdown-renderer.tsx'; -import { applyPatch } from 'fast-json-patch'; -import { fetchEventSource } from '@microsoft/fetch-event-source'; -import type { - ExecutionProcess, - NormalizedConversation, - NormalizedEntry, - WorktreeDiff, -} from 'shared/types.ts'; -import { TaskDetailsContext } from '@/components/context/taskDetailsContext.ts'; +import type { ExecutionProcess, WorktreeDiff } from 'shared/types.ts'; import DisplayConversationEntry from '@/components/tasks/TaskDetails/DisplayConversationEntry.tsx'; +import useNormalizedConversation from '@/hooks/useNormalizedConversation'; interface NormalizedConversationViewerProps { executionProcess: ExecutionProcess; @@ -34,401 +18,11 @@ export function NormalizedConversationViewer({ diffDeletable, onConversationUpdate, }: NormalizedConversationViewerProps) { - const { projectId } = useContext(TaskDetailsContext); - - // Development-only logging helper - const debugLog = useCallback((message: string, ...args: any[]) => { - if (import.meta.env.DEV) { - console.log(message, ...args); - } - }, []); - - const [conversation, setConversation] = - useState(null); - const [loading, setLoading] = useState(true); - const [error, setError] = useState(null); - - // Track fetched processes to prevent redundant database calls - const fetchedProcesses = useRef(new Set()); - - // SSE Connection Manager - production-ready with reconnection and resilience - const sseManagerRef = useRef<{ - abortController: AbortController | null; - isActive: boolean; - highestBatchId: number; - reconnectAttempts: number; - reconnectTimeout: number | null; - processId: string; - processStatus: string; - patchFailureCount: number; - }>({ - abortController: null, - isActive: false, - highestBatchId: 0, - reconnectAttempts: 0, - reconnectTimeout: null, - processId: executionProcess.id, - processStatus: executionProcess.status, - patchFailureCount: 0, - }); - - // SSE Connection Manager with Production-Ready Resilience using fetch-event-source - const createSSEConnection = useCallback( - (processId: string, projectId: string): AbortController => { - const manager = sseManagerRef.current; - // Build URL with resume cursor if we have processed batches - const baseUrl = `/api/projects/${projectId}/execution-processes/${processId}/normalized-logs/stream`; - const url = - manager.highestBatchId > 0 - ? `${baseUrl}?since_batch_id=${manager.highestBatchId}` - : baseUrl; - debugLog( - `🚀 SSE: Creating connection for process ${processId} (cursor: ${manager.highestBatchId})` - ); - - const abortController = new AbortController(); - - fetchEventSource(url, { - signal: abortController.signal, - onopen: async (response) => { - if (response.ok) { - debugLog(`✅ SSE: Connected to ${processId}`); - manager.isActive = true; - manager.reconnectAttempts = 0; // Reset on successful connection - manager.patchFailureCount = 0; // Reset patch failure count - - if (manager.reconnectTimeout) { - clearTimeout(manager.reconnectTimeout); - manager.reconnectTimeout = null; - } - } else { - throw new Error(`SSE connection failed: ${response.status}`); - } - }, - onmessage: (event) => { - if (event.event === 'patch') { - try { - const batchData = JSON.parse(event.data); - const { batch_id, patches } = batchData; - - // Skip duplicates - use manager's batch tracking - if (batch_id && batch_id <= manager.highestBatchId) { - debugLog( - `⏭️ SSE: Skipping duplicate batch_id=${batch_id} (current=${manager.highestBatchId})` - ); - return; - } - - // Update cursor BEFORE processing - if (batch_id) { - manager.highestBatchId = batch_id; - debugLog(`📍 SSE: Processing batch_id=${batch_id}`); - } - - setConversation((prev) => { - // Create empty conversation if none exists - const baseConversation = prev || { - entries: [], - session_id: null, - executor_type: 'unknown', - prompt: null, - summary: null, - }; - - try { - const updated = applyPatch( - JSON.parse(JSON.stringify(baseConversation)), - patches - ).newDocument as NormalizedConversation; - - updated.entries = updated.entries.filter(Boolean); - - debugLog( - `🔧 SSE: Applied batch_id=${batch_id}, entries: ${updated.entries.length}` - ); - - // Reset patch failure count on successful application - manager.patchFailureCount = 0; - - // Clear loading state on first successful patch - if (!prev) { - setLoading(false); - setError(null); - } - - if (onConversationUpdate) { - setTimeout(onConversationUpdate, 0); - } - - return updated; - } catch (patchError) { - console.warn('❌ SSE: Patch failed:', patchError); - // Reset cursor on failure for potential retry - if (batch_id && batch_id > 0) { - manager.highestBatchId = batch_id - 1; - } - // Track patch failures for monitoring - manager.patchFailureCount++; - debugLog( - `⚠️ SSE: Patch failure #${manager.patchFailureCount} for batch_id=${batch_id}` - ); - return prev || baseConversation; - } - }); - } catch (e) { - console.warn('❌ SSE: Parse failed:', e); - } - } - }, - onerror: (err) => { - console.warn(`🔌 SSE: Connection error for ${processId}:`, err); - manager.isActive = false; - - // Only attempt reconnection if process is still running - if (manager.processStatus === 'running') { - scheduleReconnect(processId, projectId); - } - }, - onclose: () => { - debugLog(`🔌 SSE: Connection closed for ${processId}`); - manager.isActive = false; - }, - }).catch((error) => { - if (error.name !== 'AbortError') { - console.warn(`❌ SSE: Fetch error for ${processId}:`, error); - manager.isActive = false; - - // Only attempt reconnection if process is still running - if (manager.processStatus === 'running') { - scheduleReconnect(processId, projectId); - } - } - }); - - return abortController; - }, - [onConversationUpdate, debugLog] - ); - - const scheduleReconnect = useCallback( - (processId: string, projectId: string) => { - const manager = sseManagerRef.current; - - // Clear any existing reconnection timeout - if (manager.reconnectTimeout) { - clearTimeout(manager.reconnectTimeout); - } - - // Exponential backoff: 1s, 2s, 4s, 8s, max 30s - const delay = Math.min( - 1000 * Math.pow(2, manager.reconnectAttempts), - 30000 - ); - manager.reconnectAttempts++; - - debugLog( - `🔄 SSE: Scheduling reconnect attempt ${manager.reconnectAttempts} in ${delay}ms` - ); - - manager.reconnectTimeout = window.setTimeout(() => { - if (manager.processStatus === 'running') { - debugLog(`🔄 SSE: Attempting reconnect for ${processId}`); - establishSSEConnection(processId, projectId); - } - }, delay); - }, - [debugLog] - ); - - const establishSSEConnection = useCallback( - (processId: string, projectId: string) => { - const manager = sseManagerRef.current; - - // Close existing connection if any - if (manager.abortController) { - manager.abortController.abort(); - manager.abortController = null; - manager.isActive = false; - } - - const abortController = createSSEConnection(processId, projectId); - manager.abortController = abortController; - - return abortController; - }, - [createSSEConnection] - ); - - // Helper functions for SSE manager - const setProcessId = (id: string) => { - sseManagerRef.current.processId = id; - }; - const setProcessStatus = (status: string) => { - sseManagerRef.current.processStatus = status; - }; - - // Consolidated cleanup function to avoid duplication - const cleanupSSEConnection = useCallback(() => { - const manager = sseManagerRef.current; - - if (manager.abortController) { - manager.abortController.abort(); - manager.abortController = null; - manager.isActive = false; - } - - if (manager.reconnectTimeout) { - clearTimeout(manager.reconnectTimeout); - manager.reconnectTimeout = null; - } - }, []); - - const fetchNormalizedLogsOnce = useCallback( - async (processId: string) => { - // Only fetch if not already fetched for this process - if (fetchedProcesses.current.has(processId)) { - debugLog(`📋 DB: Already fetched ${processId}, skipping`); - return; - } - - try { - setLoading(true); - setError(null); - debugLog(`📋 DB: Fetching logs for ${processId}`); - - const result = await executionProcessesApi.getNormalizedLogs( - projectId, - processId - ); - - // Mark as fetched - fetchedProcesses.current.add(processId); - - setConversation((prev) => { - // Only update if content actually changed - use lightweight comparison - if ( - !prev || - prev.entries.length !== result.entries.length || - prev.prompt !== result.prompt - ) { - // Notify parent component of conversation update - if (onConversationUpdate) { - // Use setTimeout to ensure state update happens first - setTimeout(onConversationUpdate, 0); - } - return result; - } - return prev; - }); - } catch (err) { - // Remove from fetched set on error to allow retry - fetchedProcesses.current.delete(processId); - setError( - `Error fetching logs: ${err instanceof Error ? err.message : 'Unknown error'}` - ); - } finally { - setLoading(false); - } - }, - [projectId, onConversationUpdate, debugLog] - ); - - // Process-based data fetching - fetch once from appropriate source - useEffect(() => { - const processId = executionProcess.id; - const processStatus = executionProcess.status; - - debugLog(`🎯 Data: Process ${processId} is ${processStatus}`); - - // Reset conversation state when switching processes - const manager = sseManagerRef.current; - if (manager.processId !== processId) { - setConversation(null); - setLoading(true); - setError(null); - - // Clear fetch tracking for old processes (keep memory bounded) - if (fetchedProcesses.current.size > 10) { - fetchedProcesses.current.clear(); - } - } - - if (processStatus === 'running') { - // Running processes: SSE will handle data (including initial state) - debugLog(`🚀 Data: Using SSE for running process ${processId}`); - // SSE connection will be established by the SSE management effect - } else { - // Completed processes: Single database fetch - debugLog(`📋 Data: Using database for completed process ${processId}`); - fetchNormalizedLogsOnce(processId); - } - }, [ - executionProcess.id, - executionProcess.status, - fetchNormalizedLogsOnce, - debugLog, - ]); - - // SSE connection management for running processes only - useEffect(() => { - const processId = executionProcess.id; - const processStatus = executionProcess.status; - const manager = sseManagerRef.current; - - // Update manager state - setProcessId(processId); - setProcessStatus(processStatus); - - // Only establish SSE for running processes - if (processStatus !== 'running') { - debugLog( - `🚫 SSE: Process ${processStatus}, cleaning up any existing connection` - ); - cleanupSSEConnection(); - return; - } - - // Check if connection already exists for same process ID - if (manager.abortController && manager.processId === processId) { - debugLog(`⚠️ SSE: Connection already exists for ${processId}, reusing`); - return; - } - - // Process changed - close existing and reset state - if (manager.abortController && manager.processId !== processId) { - debugLog(`🔄 SSE: Switching from ${manager.processId} to ${processId}`); - cleanupSSEConnection(); - manager.highestBatchId = 0; // Reset cursor for new process - manager.reconnectAttempts = 0; - manager.patchFailureCount = 0; // Reset failure count for new process - } - - // Update manager state - manager.processId = processId; - manager.processStatus = processStatus; - - // Establish new connection - establishSSEConnection(processId, projectId); - - return () => { - debugLog(`🔌 SSE: Cleanup connection for ${processId}`); - - // Close connection if it belongs to this effect - if (manager.abortController && manager.processId === processId) { - cleanupSSEConnection(); - } - }; - }, [executionProcess.id, executionProcess.status]); - - // Memoize display entries to avoid unnecessary re-renders - const displayEntries = useMemo(() => { - if (!conversation?.entries) return []; - - // Filter out any null entries that may have been created by duplicate patch application - return conversation.entries.filter((entry): entry is NormalizedEntry => - Boolean(entry && (entry as NormalizedEntry).entry_type) - ); - }, [conversation?.entries]); + const { loading, error, conversation, displayEntries } = + useNormalizedConversation({ + executionProcess, + onConversationUpdate, + }); if (loading) { return ( diff --git a/frontend/src/components/tasks/TaskDetails/LogsTab/Prompt.tsx b/frontend/src/components/tasks/TaskDetails/LogsTab/Prompt.tsx new file mode 100644 index 00000000..af988912 --- /dev/null +++ b/frontend/src/components/tasks/TaskDetails/LogsTab/Prompt.tsx @@ -0,0 +1,22 @@ +import MarkdownRenderer from '@/components/ui/markdown-renderer'; +import { Hammer } from 'lucide-react'; + +const Prompt = ({ prompt }: { prompt: string }) => { + return ( +
+
+ +
+
+
+ +
+
+
+ ); +}; + +export default Prompt; diff --git a/frontend/src/components/tasks/TaskDetailsToolbar.tsx b/frontend/src/components/tasks/TaskDetailsToolbar.tsx index 11274bfe..2e825b52 100644 --- a/frontend/src/components/tasks/TaskDetailsToolbar.tsx +++ b/frontend/src/components/tasks/TaskDetailsToolbar.tsx @@ -173,6 +173,7 @@ function TaskDetailsToolbar() { setAttemptData({ processes: [], runningProcessDetails: {}, + allLogs: [], }); } } catch (error) { diff --git a/frontend/src/components/ui/loader.tsx b/frontend/src/components/ui/loader.tsx index 49107325..f31ff3d2 100644 --- a/frontend/src/components/ui/loader.tsx +++ b/frontend/src/components/ui/loader.tsx @@ -12,9 +12,11 @@ export const Loader: React.FC = ({ size = 32, className = '', }) => ( -
+
{!!message && ( diff --git a/frontend/src/hooks/useNormalizedConversation.ts b/frontend/src/hooks/useNormalizedConversation.ts new file mode 100644 index 00000000..4cd1a5af --- /dev/null +++ b/frontend/src/hooks/useNormalizedConversation.ts @@ -0,0 +1,429 @@ +import { + TaskAttemptDataContext, + TaskDetailsContext, +} from '@/components/context/taskDetailsContext'; +import { fetchEventSource } from '@microsoft/fetch-event-source'; +import { applyPatch } from 'fast-json-patch'; +import { + useCallback, + useContext, + useEffect, + useMemo, + useRef, + useState, +} from 'react'; +import { + ExecutionProcess, + NormalizedConversation, + NormalizedEntry, +} from 'shared/types'; + +const useNormalizedConversation = ({ + executionProcess, + onConversationUpdate, +}: { + executionProcess?: ExecutionProcess; + onConversationUpdate?: () => void; +}) => { + const { projectId } = useContext(TaskDetailsContext); + const { attemptData } = useContext(TaskAttemptDataContext); + + // Development-only logging helper + const debugLog = useCallback((message: string, ...args: any[]) => { + if (import.meta.env.DEV) { + console.log(message, ...args); + } + }, []); + + const [conversation, setConversation] = + useState(null); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + + // Track fetched processes to prevent redundant database calls + const fetchedProcesses = useRef(new Set()); + + // SSE Connection Manager - production-ready with reconnection and resilience + const sseManagerRef = useRef<{ + abortController: AbortController | null; + isActive: boolean; + highestBatchId: number; + reconnectAttempts: number; + reconnectTimeout: number | null; + processId: string; + processStatus: string; + patchFailureCount: number; + onopenCalled: boolean; + }>({ + abortController: null, + isActive: false, + highestBatchId: 0, + reconnectAttempts: 0, + reconnectTimeout: null, + processId: executionProcess?.id || '', + processStatus: executionProcess?.status || '', + patchFailureCount: 0, + onopenCalled: false, + }); + + // SSE Connection Manager with Production-Ready Resilience using fetch-event-source + const createSSEConnection = useCallback( + (processId: string, projectId: string): AbortController => { + const manager = sseManagerRef.current; + // Build URL with resume cursor if we have processed batches + const baseUrl = `/api/projects/${projectId}/execution-processes/${processId}/normalized-logs/stream`; + const url = + manager.highestBatchId > 0 + ? `${baseUrl}?since_batch_id=${manager.highestBatchId}` + : baseUrl; + debugLog( + `🚀 SSE: Creating connection for process ${processId} (cursor: ${manager.highestBatchId})` + ); + + const abortController = new AbortController(); + + fetchEventSource(url, { + signal: abortController.signal, + onopen: async (response) => { + const manager = sseManagerRef.current; + if (manager.onopenCalled) { + // This is a "phantom" reconnect, so abort and re-create + debugLog( + '⚠️ SSE: onopen called again for same connection, forcing reconnect' + ); + abortController.abort(); + manager.abortController = null; + manager.isActive = false; + manager.onopenCalled = false; + // Re-establish with latest cursor + scheduleReconnect(processId, projectId); + return; + } + manager.onopenCalled = true; + if (response.ok) { + debugLog(`✅ SSE: Connected to ${processId}`); + manager.isActive = true; + manager.reconnectAttempts = 0; // Reset on successful connection + manager.patchFailureCount = 0; // Reset patch failure count + + if (manager.reconnectTimeout) { + clearTimeout(manager.reconnectTimeout); + manager.reconnectTimeout = null; + } + } else { + throw new Error(`SSE connection failed: ${response.status}`); + } + }, + onmessage: (event) => { + if (event.event === 'patch') { + try { + const batchData = JSON.parse(event.data); + const { batch_id, patches } = batchData; + + // Skip duplicates - use manager's batch tracking + if (batch_id && batch_id <= manager.highestBatchId) { + debugLog( + `⏭️ SSE: Skipping duplicate batch_id=${batch_id} (current=${manager.highestBatchId})` + ); + return; + } + + // Update cursor BEFORE processing + if (batch_id) { + manager.highestBatchId = batch_id; + debugLog(`📍 SSE: Processing batch_id=${batch_id}`); + } + + setConversation((prev) => { + // Create empty conversation if none exists + const baseConversation = prev || { + entries: [], + session_id: null, + executor_type: 'unknown', + prompt: null, + summary: null, + }; + + try { + const updated = applyPatch( + JSON.parse(JSON.stringify(baseConversation)), + patches + ).newDocument as NormalizedConversation; + + updated.entries = updated.entries.filter(Boolean); + + debugLog( + `🔧 SSE: Applied batch_id=${batch_id}, entries: ${updated.entries.length}` + ); + + // Reset patch failure count on successful application + manager.patchFailureCount = 0; + + // Clear loading state on first successful patch + if (!prev) { + setLoading(false); + setError(null); + } + + if (onConversationUpdate) { + setTimeout(onConversationUpdate, 0); + } + + return updated; + } catch (patchError) { + console.warn('❌ SSE: Patch failed:', patchError); + // Reset cursor on failure for potential retry + if (batch_id && batch_id > 0) { + manager.highestBatchId = batch_id - 1; + } + // Track patch failures for monitoring + manager.patchFailureCount++; + debugLog( + `⚠️ SSE: Patch failure #${manager.patchFailureCount} for batch_id=${batch_id}` + ); + return prev || baseConversation; + } + }); + } catch (e) { + console.warn('❌ SSE: Parse failed:', e); + } + } + }, + onerror: (err) => { + console.warn(`🔌 SSE: Connection error for ${processId}:`, err); + manager.isActive = false; + + // Only attempt reconnection if process is still running + if (manager.processStatus === 'running') { + scheduleReconnect(processId, projectId); + } + }, + onclose: () => { + debugLog(`🔌 SSE: Connection closed for ${processId}`); + manager.isActive = false; + }, + }).catch((error) => { + if (error.name !== 'AbortError') { + console.warn(`❌ SSE: Fetch error for ${processId}:`, error); + manager.isActive = false; + + // Only attempt reconnection if process is still running + if (manager.processStatus === 'running') { + scheduleReconnect(processId, projectId); + } + } + }); + + return abortController; + }, + [onConversationUpdate, debugLog] + ); + + const scheduleReconnect = useCallback( + (processId: string, projectId: string) => { + const manager = sseManagerRef.current; + + // Clear any existing reconnection timeout + if (manager.reconnectTimeout) { + clearTimeout(manager.reconnectTimeout); + } + + // Exponential backoff: 1s, 2s, 4s, 8s, max 30s + const delay = Math.min( + 1000 * Math.pow(2, manager.reconnectAttempts), + 30000 + ); + manager.reconnectAttempts++; + + debugLog( + `🔄 SSE: Scheduling reconnect attempt ${manager.reconnectAttempts} in ${delay}ms` + ); + + manager.reconnectTimeout = window.setTimeout(() => { + if (manager.processStatus === 'running') { + debugLog(`🔄 SSE: Attempting reconnect for ${processId}`); + establishSSEConnection(processId, projectId); + } + }, delay); + }, + [debugLog] + ); + + const establishSSEConnection = useCallback( + (processId: string, projectId: string) => { + const manager = sseManagerRef.current; + + // Close existing connection if any + if (manager.abortController) { + manager.abortController.abort(); + manager.abortController = null; + manager.isActive = false; + } + + const abortController = createSSEConnection(processId, projectId); + manager.abortController = abortController; + + return abortController; + }, + [createSSEConnection] + ); + + // Helper functions for SSE manager + const setProcessId = (id: string) => { + sseManagerRef.current.processId = id; + }; + const setProcessStatus = (status: string) => { + sseManagerRef.current.processStatus = status; + }; + + // Consolidated cleanup function to avoid duplication + const cleanupSSEConnection = useCallback(() => { + const manager = sseManagerRef.current; + + if (manager.abortController) { + manager.abortController.abort(); + manager.abortController = null; + manager.isActive = false; + } + + if (manager.reconnectTimeout) { + clearTimeout(manager.reconnectTimeout); + manager.reconnectTimeout = null; + } + manager.onopenCalled = false; + }, []); + + // Process-based data fetching - fetch once from appropriate source + useEffect(() => { + if (!executionProcess?.id || !executionProcess?.status) { + return; + } + const processId = executionProcess.id; + const processStatus = executionProcess.status; + + debugLog(`🎯 Data: Process ${processId} is ${processStatus}`); + + // Reset conversation state when switching processes + const manager = sseManagerRef.current; + if (manager.processId !== processId) { + setConversation(null); + setLoading(true); + setError(null); + + // Clear fetch tracking for old processes (keep memory bounded) + if (fetchedProcesses.current.size > 10) { + fetchedProcesses.current.clear(); + } + } + + if (processStatus === 'running') { + // Running processes: SSE will handle data (including initial state) + debugLog(`🚀 Data: Using SSE for running process ${processId}`); + // SSE connection will be established by the SSE management effect + } else { + // Completed processes: Single database fetch + debugLog(`📋 Data: Using database for completed process ${processId}`); + const logs = attemptData.allLogs.find( + (entry) => entry.id === executionProcess.id + )?.normalized_conversation; + if (logs) { + setConversation((prev) => { + // Only update if content actually changed - use lightweight comparison + if ( + !prev || + prev.entries.length !== logs.entries.length || + prev.prompt !== logs.prompt + ) { + // Notify parent component of conversation update + if (onConversationUpdate) { + // Use setTimeout to ensure state update happens first + setTimeout(onConversationUpdate, 0); + } + return logs; + } + return prev; + }); + } + setLoading(false); + } + }, [ + executionProcess?.id, + executionProcess?.status, + attemptData.allLogs, + debugLog, + onConversationUpdate, + ]); + + // SSE connection management for running processes only + useEffect(() => { + if (!executionProcess?.id || !executionProcess?.status) { + return; + } + const processId = executionProcess.id; + const processStatus = executionProcess.status; + const manager = sseManagerRef.current; + + // Update manager state + setProcessId(processId); + setProcessStatus(processStatus); + + // Only establish SSE for running processes + if (processStatus !== 'running') { + debugLog( + `🚫 SSE: Process ${processStatus}, cleaning up any existing connection` + ); + cleanupSSEConnection(); + return; + } + + // Check if connection already exists for same process ID + if (manager.abortController && manager.processId === processId) { + debugLog(`⚠️ SSE: Connection already exists for ${processId}, reusing`); + return; + } + + // Process changed - close existing and reset state + if (manager.abortController && manager.processId !== processId) { + debugLog(`🔄 SSE: Switching from ${manager.processId} to ${processId}`); + cleanupSSEConnection(); + manager.highestBatchId = 0; // Reset cursor for new process + manager.reconnectAttempts = 0; + manager.patchFailureCount = 0; // Reset failure count for new process + } + + // Update manager state + manager.processId = processId; + manager.processStatus = processStatus; + + // Establish new connection + establishSSEConnection(processId, projectId); + + return () => { + debugLog(`🔌 SSE: Cleanup connection for ${processId}`); + + // Close connection if it belongs to this effect + if (manager.abortController && manager.processId === processId) { + cleanupSSEConnection(); + } + }; + }, [executionProcess?.id, executionProcess?.status]); + + // Memoize display entries to avoid unnecessary re-renders + const displayEntries = useMemo(() => { + if (!conversation?.entries) return []; + + // Filter out any null entries that may have been created by duplicate patch application + return conversation.entries.filter((entry): entry is NormalizedEntry => + Boolean(entry && (entry as NormalizedEntry).entry_type) + ); + }, [conversation?.entries]); + + return { + displayEntries, + conversation, + loading, + error, + }; +}; + +export default useNormalizedConversation; diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index 165b389e..0d3dead9 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -7,14 +7,14 @@ import { CreateTask, CreateTaskAndStart, CreateTaskAttempt, - DeviceStartResponse, CreateTaskTemplate, + DeviceStartResponse, DirectoryEntry, type EditorType, ExecutionProcess, ExecutionProcessSummary, GitBranch, - NormalizedConversation, + ProcessLogsResponse, Project, ProjectWithBranch, Task, @@ -471,6 +471,17 @@ export const attemptsApi = { const response = await makeRequest(`/api/attempts/${attemptId}/details`); return handleApiResponse(response); }, + + getAllLogs: async ( + projectId: string, + taskId: string, + attemptId: string + ): Promise => { + const response = await makeRequest( + `/api/projects/${projectId}/tasks/${taskId}/attempts/${attemptId}/logs` + ); + return handleApiResponse(response); + }, }; // Execution Process APIs @@ -484,16 +495,6 @@ export const executionProcessesApi = { ); return handleApiResponse(response); }, - - getNormalizedLogs: async ( - projectId: string, - processId: string - ): Promise => { - const response = await makeRequest( - `/api/projects/${projectId}/execution-processes/${processId}/normalized-logs` - ); - return handleApiResponse(response); - }, }; // File System APIs diff --git a/frontend/src/lib/types.ts b/frontend/src/lib/types.ts index e0a4d22a..860a2241 100644 --- a/frontend/src/lib/types.ts +++ b/frontend/src/lib/types.ts @@ -2,11 +2,13 @@ import { DiffChunkType, ExecutionProcess, ExecutionProcessSummary, + ProcessLogsResponse, } from 'shared/types.ts'; export type AttemptData = { processes: ExecutionProcessSummary[]; runningProcessDetails: Record; + allLogs: ProcessLogsResponse[]; }; export interface ProcessedLine { @@ -23,3 +25,15 @@ export interface ProcessedSection { expandedAbove?: boolean; expandedBelow?: boolean; } + +export interface ConversationEntryDisplayType { + entry: any; + processId: string; + processPrompt?: string; + processStatus: string; + processIsRunning: boolean; + process: any; + isFirstInProcess: boolean; + processIndex: number; + entryIndex: number; +} diff --git a/shared/types.ts b/shared/types.ts index c4d5dbc7..29467352 100644 --- a/shared/types.ts +++ b/shared/types.ts @@ -76,6 +76,8 @@ export type DirectoryListResponse = { entries: Array, current_pa export type DeviceStartResponse = { device_code: string, user_code: string, verification_uri: string, expires_in: number, interval: number, }; +export type ProcessLogsResponse = { id: string, process_type: ExecutionProcessType, command: string, executor_type: string | null, status: ExecutionProcessStatus, normalized_conversation: NormalizedConversation, }; + export type DiffChunkType = "Equal" | "Insert" | "Delete"; export type DiffChunk = { chunk_type: DiffChunkType, content: string, };