// useConversationHistory.ts import { CommandExitStatus, ExecutionProcess, ExecutionProcessStatus, ExecutorAction, NormalizedEntry, PatchType, TaskAttempt, ToolStatus, } from 'shared/types'; import { useExecutionProcessesContext } from '@/contexts/ExecutionProcessesContext'; import { useCallback, useEffect, useMemo, useRef } from 'react'; import { streamJsonPatchEntries } from '@/utils/streamJsonPatchEntries'; export type PatchTypeWithKey = PatchType & { patchKey: string; executionProcessId: string; }; export type AddEntryType = 'initial' | 'running' | 'historic'; export type OnEntriesUpdated = ( newEntries: PatchTypeWithKey[], addType: AddEntryType, loading: boolean ) => void; type ExecutionProcessStaticInfo = { id: string; created_at: string; updated_at: string; executor_action: ExecutorAction; }; type ExecutionProcessState = { executionProcess: ExecutionProcessStaticInfo; entries: PatchTypeWithKey[]; }; type ExecutionProcessStateStore = Record; interface UseConversationHistoryParams { attempt: TaskAttempt; onEntriesUpdated: OnEntriesUpdated; } interface UseConversationHistoryResult {} const MIN_INITIAL_ENTRIES = 10; const REMAINING_BATCH_SIZE = 50; const loadingPatch: PatchTypeWithKey = { type: 'NORMALIZED_ENTRY', content: { entry_type: { type: 'loading', }, content: '', timestamp: null, }, patchKey: 'loading', executionProcessId: '', }; const nextActionPatch: ( failed: boolean, execution_processes: number, needs_setup: boolean, setup_help_text?: string ) => PatchTypeWithKey = ( failed, execution_processes, needs_setup, setup_help_text ) => ({ type: 'NORMALIZED_ENTRY', content: { entry_type: { type: 'next_action', failed: failed, execution_processes: execution_processes, needs_setup: needs_setup, setup_help_text: setup_help_text ?? null, }, content: '', timestamp: null, }, patchKey: 'next_action', executionProcessId: '', }); export const useConversationHistory = ({ attempt, onEntriesUpdated, }: UseConversationHistoryParams): UseConversationHistoryResult => { const { executionProcessesVisible: executionProcessesRaw } = useExecutionProcessesContext(); const executionProcesses = useRef(executionProcessesRaw); const displayedExecutionProcesses = useRef({}); const loadedInitialEntries = useRef(false); const lastActiveProcessId = useRef(null); const onEntriesUpdatedRef = useRef(null); const mergeIntoDisplayed = ( mutator: (state: ExecutionProcessStateStore) => void ) => { const state = displayedExecutionProcesses.current; mutator(state); }; useEffect(() => { onEntriesUpdatedRef.current = onEntriesUpdated; }, [onEntriesUpdated]); // Keep executionProcesses up to date useEffect(() => { executionProcesses.current = executionProcessesRaw.filter( (ep) => ep.run_reason === 'setupscript' || ep.run_reason === 'cleanupscript' || ep.run_reason === 'codingagent' ); }, [executionProcessesRaw]); const loadEntriesForHistoricExecutionProcess = ( executionProcess: ExecutionProcess ) => { let url = ''; if (executionProcess.executor_action.typ.type === 'ScriptRequest') { url = `/api/execution-processes/${executionProcess.id}/raw-logs/ws`; } else { url = `/api/execution-processes/${executionProcess.id}/normalized-logs/ws`; } return new Promise((resolve) => { const controller = streamJsonPatchEntries(url, { onFinished: (allEntries) => { controller.close(); resolve(allEntries); }, onError: (err) => { console.warn!( `Error loading entries for historic execution process ${executionProcess.id}`, err ); controller.close(); resolve([]); }, }); }); }; const getLiveExecutionProcess = ( executionProcessId: string ): ExecutionProcess | undefined => { return executionProcesses?.current.find( (executionProcess) => executionProcess.id === executionProcessId ); }; const patchWithKey = ( patch: PatchType, executionProcessId: string, index: number | 'user' ) => { return { ...patch, patchKey: `${executionProcessId}:${index}`, executionProcessId, }; }; const flattenEntries = ( executionProcessState: ExecutionProcessStateStore ): PatchTypeWithKey[] => { return Object.values(executionProcessState) .filter( (p) => p.executionProcess.executor_action.typ.type === 'CodingAgentFollowUpRequest' || p.executionProcess.executor_action.typ.type === 'CodingAgentInitialRequest' ) .sort( (a, b) => new Date( a.executionProcess.created_at as unknown as string ).getTime() - new Date(b.executionProcess.created_at as unknown as string).getTime() ) .flatMap((p) => p.entries); }; const getActiveAgentProcess = (): ExecutionProcess | null => { const activeProcesses = executionProcesses?.current.filter( (p) => p.status === ExecutionProcessStatus.running && p.run_reason !== 'devserver' ); if (activeProcesses.length > 1) { console.error('More than one active execution process found'); } return activeProcesses[0] || null; }; const flattenEntriesForEmit = useCallback( (executionProcessState: ExecutionProcessStateStore): PatchTypeWithKey[] => { // Flags to control Next Action bar emit let hasPendingApproval = false; let hasRunningProcess = false; let lastProcessFailedOrKilled = false; let needsSetup = false; let setupHelpText: string | undefined; // Create user messages + tool calls for setup/cleanup scripts const allEntries = Object.values(executionProcessState) .sort( (a, b) => new Date( a.executionProcess.created_at as unknown as string ).getTime() - new Date( b.executionProcess.created_at as unknown as string ).getTime() ) .flatMap((p, index) => { const entries: PatchTypeWithKey[] = []; if ( p.executionProcess.executor_action.typ.type === 'CodingAgentInitialRequest' || p.executionProcess.executor_action.typ.type === 'CodingAgentFollowUpRequest' ) { // New user message const userNormalizedEntry: NormalizedEntry = { entry_type: { type: 'user_message', }, content: p.executionProcess.executor_action.typ.prompt, timestamp: null, }; const userPatch: PatchType = { type: 'NORMALIZED_ENTRY', content: userNormalizedEntry, }; const userPatchTypeWithKey = patchWithKey( userPatch, p.executionProcess.id, 'user' ); entries.push(userPatchTypeWithKey); // Remove all coding agent added user messages, replace with our custom one const entriesExcludingUser = p.entries.filter( (e) => e.type !== 'NORMALIZED_ENTRY' || e.content.entry_type.type !== 'user_message' ); const hasPendingApprovalEntry = entriesExcludingUser.some( (entry) => { if (entry.type !== 'NORMALIZED_ENTRY') return false; const entryType = entry.content.entry_type; return ( entryType.type === 'tool_use' && entryType.status.status === 'pending_approval' ); } ); if (hasPendingApprovalEntry) { hasPendingApproval = true; } entries.push(...entriesExcludingUser); const liveProcessStatus = getLiveExecutionProcess( p.executionProcess.id )?.status; const isProcessRunning = liveProcessStatus === ExecutionProcessStatus.running; const processFailedOrKilled = liveProcessStatus === ExecutionProcessStatus.failed || liveProcessStatus === ExecutionProcessStatus.killed; if (isProcessRunning) { hasRunningProcess = true; } if ( processFailedOrKilled && index === Object.keys(executionProcessState).length - 1 ) { lastProcessFailedOrKilled = true; // Check if this failed process has a SetupRequired entry const hasSetupRequired = entriesExcludingUser.some((entry) => { if (entry.type !== 'NORMALIZED_ENTRY') return false; if ( entry.content.entry_type.type === 'error_message' && entry.content.entry_type.error_type.type === 'setup_required' ) { setupHelpText = entry.content.content; return true; } return false; }); if (hasSetupRequired) { needsSetup = true; } } if (isProcessRunning && !hasPendingApprovalEntry) { entries.push(loadingPatch); } } else if ( p.executionProcess.executor_action.typ.type === 'ScriptRequest' ) { // Add setup and cleanup script as a tool call let toolName = ''; switch (p.executionProcess.executor_action.typ.context) { case 'SetupScript': toolName = 'Setup Script'; break; case 'CleanupScript': toolName = 'Cleanup Script'; break; case 'ToolInstallScript': toolName = 'Tool Install Script'; break; default: return []; } const executionProcess = getLiveExecutionProcess( p.executionProcess.id ); if (executionProcess?.status === ExecutionProcessStatus.running) { hasRunningProcess = true; } if ( (executionProcess?.status === ExecutionProcessStatus.failed || executionProcess?.status === ExecutionProcessStatus.killed) && index === Object.keys(executionProcessState).length - 1 ) { lastProcessFailedOrKilled = true; } const exitCode = Number(executionProcess?.exit_code) || 0; const exit_status: CommandExitStatus | null = executionProcess?.status === 'running' ? null : { type: 'exit_code', code: exitCode, }; const toolStatus: ToolStatus = executionProcess?.status === ExecutionProcessStatus.running ? { status: 'created' } : exitCode === 0 ? { status: 'success' } : { status: 'failed' }; const output = p.entries.map((line) => line.content).join('\n'); const toolNormalizedEntry: NormalizedEntry = { entry_type: { type: 'tool_use', tool_name: toolName, action_type: { action: 'command_run', command: p.executionProcess.executor_action.typ.script, result: { output, exit_status, }, }, status: toolStatus, }, content: toolName, timestamp: null, }; const toolPatch: PatchType = { type: 'NORMALIZED_ENTRY', content: toolNormalizedEntry, }; const toolPatchWithKey: PatchTypeWithKey = patchWithKey( toolPatch, p.executionProcess.id, 0 ); entries.push(toolPatchWithKey); } return entries; }); // Emit the next action bar if no process running if (!hasRunningProcess && !hasPendingApproval) { allEntries.push( nextActionPatch( lastProcessFailedOrKilled, Object.keys(executionProcessState).length, needsSetup, setupHelpText ) ); } return allEntries; }, [] ); const emitEntries = useCallback( ( executionProcessState: ExecutionProcessStateStore, addEntryType: AddEntryType, loading: boolean ) => { const entries = flattenEntriesForEmit(executionProcessState); onEntriesUpdatedRef.current?.(entries, addEntryType, loading); }, [flattenEntriesForEmit] ); // This emits its own events as they are streamed const loadRunningAndEmit = useCallback( (executionProcess: ExecutionProcess): Promise => { return new Promise((resolve, reject) => { let url = ''; if (executionProcess.executor_action.typ.type === 'ScriptRequest') { url = `/api/execution-processes/${executionProcess.id}/raw-logs/ws`; } else { url = `/api/execution-processes/${executionProcess.id}/normalized-logs/ws`; } const controller = streamJsonPatchEntries(url, { onEntries(entries) { const patchesWithKey = entries.map((entry, index) => patchWithKey(entry, executionProcess.id, index) ); mergeIntoDisplayed((state) => { state[executionProcess.id] = { executionProcess, entries: patchesWithKey, }; }); emitEntries(displayedExecutionProcesses.current, 'running', false); }, onFinished: () => { emitEntries(displayedExecutionProcesses.current, 'running', false); controller.close(); resolve(); }, onError: () => { controller.close(); reject(); }, }); }); }, [emitEntries] ); // Sometimes it can take a few seconds for the stream to start, wrap the loadRunningAndEmit method const loadRunningAndEmitWithBackoff = useCallback( async (executionProcess: ExecutionProcess) => { for (let i = 0; i < 20; i++) { try { await loadRunningAndEmit(executionProcess); break; } catch (_) { await new Promise((resolve) => setTimeout(resolve, 500)); } } }, [loadRunningAndEmit] ); const loadInitialEntries = useCallback(async (): Promise => { const localDisplayedExecutionProcesses: ExecutionProcessStateStore = {}; if (!executionProcesses?.current) return localDisplayedExecutionProcesses; for (const executionProcess of [ ...executionProcesses.current, ].reverse()) { if (executionProcess.status === ExecutionProcessStatus.running) continue; const entries = await loadEntriesForHistoricExecutionProcess(executionProcess); const entriesWithKey = entries.map((e, idx) => patchWithKey(e, executionProcess.id, idx) ); localDisplayedExecutionProcesses[executionProcess.id] = { executionProcess, entries: entriesWithKey, }; if ( flattenEntries(localDisplayedExecutionProcesses).length > MIN_INITIAL_ENTRIES ) { break; } } return localDisplayedExecutionProcesses; }, [executionProcesses]); const loadRemainingEntriesInBatches = useCallback( async (batchSize: number): Promise => { if (!executionProcesses?.current) return false; let anyUpdated = false; for (const executionProcess of [ ...executionProcesses.current, ].reverse()) { const current = displayedExecutionProcesses.current; if ( current[executionProcess.id] || executionProcess.status === ExecutionProcessStatus.running ) continue; const entries = await loadEntriesForHistoricExecutionProcess(executionProcess); const entriesWithKey = entries.map((e, idx) => patchWithKey(e, executionProcess.id, idx) ); mergeIntoDisplayed((state) => { state[executionProcess.id] = { executionProcess, entries: entriesWithKey, }; }); if ( flattenEntries(displayedExecutionProcesses.current).length > batchSize ) { anyUpdated = true; break; } anyUpdated = true; } return anyUpdated; }, [executionProcesses] ); const ensureProcessVisible = useCallback((p: ExecutionProcess) => { mergeIntoDisplayed((state) => { if (!state[p.id]) { state[p.id] = { executionProcess: { id: p.id, created_at: p.created_at, updated_at: p.updated_at, executor_action: p.executor_action, }, entries: [], }; } }); }, []); const idListKey = useMemo( () => executionProcessesRaw?.map((p) => p.id).join(','), [executionProcessesRaw] ); const idStatusKey = useMemo( () => executionProcessesRaw?.map((p) => `${p.id}:${p.status}`).join(','), [executionProcessesRaw] ); // Initial load when attempt changes useEffect(() => { let cancelled = false; (async () => { // Waiting for execution processes to load if ( executionProcesses?.current.length === 0 || loadedInitialEntries.current ) return; // Initial entries const allInitialEntries = await loadInitialEntries(); if (cancelled) return; mergeIntoDisplayed((state) => { Object.assign(state, allInitialEntries); }); emitEntries(displayedExecutionProcesses.current, 'initial', false); loadedInitialEntries.current = true; // Then load the remaining in batches while ( !cancelled && (await loadRemainingEntriesInBatches(REMAINING_BATCH_SIZE)) ) { if (cancelled) return; } await new Promise((resolve) => setTimeout(resolve, 100)); emitEntries(displayedExecutionProcesses.current, 'historic', false); })(); return () => { cancelled = true; }; }, [ attempt.id, idListKey, loadInitialEntries, loadRemainingEntriesInBatches, emitEntries, ]); // include idListKey so new processes trigger reload useEffect(() => { const activeProcess = getActiveAgentProcess(); if (!activeProcess) return; if (!displayedExecutionProcesses.current[activeProcess.id]) { const runningOrInitial = Object.keys(displayedExecutionProcesses.current).length > 1 ? 'running' : 'initial'; ensureProcessVisible(activeProcess); emitEntries(displayedExecutionProcesses.current, runningOrInitial, false); } if ( activeProcess.status === ExecutionProcessStatus.running && lastActiveProcessId.current !== activeProcess.id ) { lastActiveProcessId.current = activeProcess.id; loadRunningAndEmitWithBackoff(activeProcess); } }, [ attempt.id, idStatusKey, emitEntries, ensureProcessVisible, loadRunningAndEmitWithBackoff, ]); // If an execution process is removed, remove it from the state useEffect(() => { if (!executionProcessesRaw) return; const removedProcessIds = Object.keys( displayedExecutionProcesses.current ).filter((id) => !executionProcessesRaw.some((p) => p.id === id)); if (removedProcessIds.length > 0) { mergeIntoDisplayed((state) => { removedProcessIds.forEach((id) => { delete state[id]; }); }); } }, [attempt.id, idListKey, executionProcessesRaw]); // Reset state when attempt changes useEffect(() => { displayedExecutionProcesses.current = {}; loadedInitialEntries.current = false; lastActiveProcessId.current = null; emitEntries(displayedExecutionProcesses.current, 'initial', true); }, [attempt.id, emitEntries]); return {}; };