Improve conversation logs rendering (#238)

* pull all logs in one request

* render only the last 100 entries from the conversation

* cleanup

* fix diffs jumping on update

* fix SSE to not loose new logs on reconnect

* fmt

* small refactoring

* remove obsolete /normalized-logs endpoint
This commit is contained in:
Anastasiia Solop
2025-07-17 17:12:51 +02:00
committed by GitHub
parent 74482375a9
commit ddc692fa77
14 changed files with 878 additions and 732 deletions

View File

@@ -32,7 +32,7 @@ import {
TaskRelatedTasksContext,
TaskSelectedAttemptContext,
} from './taskDetailsContext.ts';
import { AttemptData } from '@/lib/types.ts';
import type { AttemptData } from '@/lib/types.ts';
const TaskDetailsProvider: FC<{
task: TaskWithAttemptStatus;
@@ -81,6 +81,7 @@ const TaskDetailsProvider: FC<{
const [attemptData, setAttemptData] = useState<AttemptData>({
processes: [],
runningProcessDetails: {},
allLogs: [], // new field for all logs
});
const diffLoadingRef = useRef(false);
@@ -233,13 +234,12 @@ const TaskDetailsProvider: FC<{
if (!task) return;
try {
const processesResult = await attemptsApi.getExecutionProcesses(
projectId,
taskId,
attemptId
);
const [processesResult, allLogsResult] = await Promise.all([
attemptsApi.getExecutionProcesses(projectId, taskId, attemptId),
attemptsApi.getAllLogs(projectId, taskId, attemptId),
]);
if (processesResult !== undefined) {
if (processesResult !== undefined && allLogsResult !== undefined) {
const runningProcesses = processesResult.filter(
(process) => process.status === 'running'
);
@@ -277,6 +277,7 @@ const TaskDetailsProvider: FC<{
const newData = {
processes: processesResult,
runningProcessDetails,
allLogs: allLogsResult,
};
if (JSON.stringify(prev) === JSON.stringify(newData)) return prev;
return newData;

View File

@@ -58,7 +58,7 @@ export function DiffCard({
</div>
{isBackgroundRefreshing && (
<div className="flex items-center gap-1">
<Loader size={12} className="mr-1" message="Updating..." />
<Loader size={12} />
</div>
)}
</div>

View File

@@ -9,11 +9,16 @@ import {
} from 'react';
import { TaskAttemptDataContext } from '@/components/context/taskDetailsContext.ts';
import { Loader } from '@/components/ui/loader.tsx';
import { Button } from '@/components/ui/button';
import Prompt from './Prompt';
import ConversationEntry from './ConversationEntry';
import { ConversationEntryDisplayType } from '@/lib/types';
function Conversation() {
const { attemptData } = useContext(TaskAttemptDataContext);
const [shouldAutoScrollLogs, setShouldAutoScrollLogs] = useState(true);
const [conversationUpdateTrigger, setConversationUpdateTrigger] = useState(0);
const [visibleCount, setVisibleCount] = useState(100);
const scrollContainerRef = useRef<HTMLDivElement>(null);
@@ -27,7 +32,7 @@ function Conversation() {
scrollContainerRef.current.scrollTop =
scrollContainerRef.current.scrollHeight;
}
}, [attemptData.processes, conversationUpdateTrigger, shouldAutoScrollLogs]);
}, [attemptData.allLogs, conversationUpdateTrigger, shouldAutoScrollLogs]);
const handleLogsScroll = useCallback(() => {
if (scrollContainerRef.current) {
@@ -43,57 +48,128 @@ function Conversation() {
}
}, [shouldAutoScrollLogs]);
const mainCodingAgentProcess = useMemo(() => {
let mainCAProcess = Object.values(attemptData.runningProcessDetails).find(
(process) =>
process.process_type === 'codingagent' && process.command === 'executor'
);
// Find main and follow-up processes from allLogs
const mainCodingAgentLog = useMemo(
() =>
attemptData.allLogs.find(
(log) =>
log.process_type.toLowerCase() === 'codingagent' &&
log.command === 'executor'
),
[attemptData.allLogs]
);
const followUpLogs = useMemo(
() =>
attemptData.allLogs.filter(
(log) =>
log.process_type.toLowerCase() === 'codingagent' &&
log.command === 'followup_executor'
),
[attemptData.allLogs]
);
if (!mainCAProcess) {
const mainCodingAgentSummary = attemptData.processes.find(
(process) =>
process.process_type === 'codingagent' &&
process.command === 'executor'
);
// Combine all logs in order (main first, then follow-ups)
const allProcessLogs = useMemo(
() =>
[mainCodingAgentLog, ...followUpLogs].filter(Boolean) as Array<
NonNullable<typeof mainCodingAgentLog>
>,
[mainCodingAgentLog, followUpLogs]
);
if (mainCodingAgentSummary) {
mainCAProcess = Object.values(attemptData.runningProcessDetails).find(
(process) => process.id === mainCodingAgentSummary.id
);
if (!mainCAProcess) {
mainCAProcess = {
...mainCodingAgentSummary,
stdout: null,
stderr: null,
} as any;
}
}
}
return mainCAProcess;
}, [attemptData.processes, attemptData.runningProcessDetails]);
const followUpProcesses = useMemo(() => {
return attemptData.processes
.filter(
(process) =>
process.process_type === 'codingagent' &&
process.command === 'followup_executor'
)
.map((summary) => {
const detailedProcess = Object.values(
attemptData.runningProcessDetails
).find((process) => process.id === summary.id);
return (
detailedProcess ||
({
...summary,
stdout: null,
stderr: null,
} as any)
);
// Flatten all entries, keeping process info for each entry
const allEntries = useMemo(() => {
const entries: Array<ConversationEntryDisplayType> = [];
allProcessLogs.forEach((log, processIndex) => {
if (!log) return;
if (log.status === 'running') return; // Skip static entries for running processes
const processId = String(log.id); // Ensure string
const processPrompt = log.normalized_conversation.prompt || undefined; // Ensure undefined, not null
const entriesArr = log.normalized_conversation.entries || [];
entriesArr.forEach((entry, entryIndex) => {
entries.push({
entry,
processId,
processPrompt,
processStatus: log.status,
processIsRunning: false, // Only completed processes here
process: log,
isFirstInProcess: entryIndex === 0,
processIndex,
entryIndex,
});
});
}, [attemptData.processes, attemptData.runningProcessDetails]);
});
// Sort by timestamp (entries without timestamp go last)
entries.sort((a, b) => {
if (a.entry.timestamp && b.entry.timestamp) {
return a.entry.timestamp.localeCompare(b.entry.timestamp);
}
if (a.entry.timestamp) return -1;
if (b.entry.timestamp) return 1;
return 0;
});
return entries;
}, [allProcessLogs]);
// Identify running processes (main + follow-ups)
const runningProcessLogs = useMemo(
() => allProcessLogs.filter((log) => log.status === 'running'),
[allProcessLogs]
);
// Paginate: show only the last visibleCount entries
const visibleEntries = useMemo(
() => allEntries.slice(-visibleCount),
[allEntries, visibleCount]
);
const renderedVisibleEntries = useMemo(
() =>
visibleEntries.map((entry, index) => (
<ConversationEntry
key={entry.entry.timestamp || index}
idx={index}
item={entry}
handleConversationUpdate={handleConversationUpdate}
visibleEntriesLength={visibleEntries.length}
runningProcessDetails={attemptData.runningProcessDetails}
/>
)),
[
visibleEntries,
handleConversationUpdate,
attemptData.runningProcessDetails,
]
);
const renderedRunningProcessLogs = useMemo(() => {
return runningProcessLogs.map((log, i) => {
const runningProcess = attemptData.runningProcessDetails[String(log.id)];
if (!runningProcess) return null;
// Show prompt only if this is the first entry in the process (i.e., no completed entries for this process)
const showPrompt =
log.normalized_conversation.prompt &&
!allEntries.some((e) => e.processId === String(log.id));
return (
<div key={String(log.id)} className={i > 0 ? 'mt-8' : ''}>
{showPrompt && (
<Prompt prompt={log.normalized_conversation.prompt || ''} />
)}
<NormalizedConversationViewer
executionProcess={runningProcess}
onConversationUpdate={handleConversationUpdate}
diffDeletable
/>
</div>
);
});
}, [
runningProcessLogs,
attemptData.runningProcessDetails,
handleConversationUpdate,
allEntries,
]);
return (
<div
@@ -101,29 +177,26 @@ function Conversation() {
onScroll={handleLogsScroll}
className="h-full overflow-y-auto"
>
{mainCodingAgentProcess || followUpProcesses.length > 0 ? (
<div className="space-y-8">
{mainCodingAgentProcess && (
<div className="space-y-6">
<NormalizedConversationViewer
executionProcess={mainCodingAgentProcess}
onConversationUpdate={handleConversationUpdate}
diffDeletable
/>
</div>
)}
{followUpProcesses.map((followUpProcess) => (
<div key={followUpProcess.id}>
<div className="border-t border-border mb-8"></div>
<NormalizedConversationViewer
executionProcess={followUpProcess}
onConversationUpdate={handleConversationUpdate}
diffDeletable
/>
</div>
))}
{visibleCount < allEntries.length && (
<div className="flex justify-center mb-4">
<Button
variant="outline"
className="w-full"
onClick={() =>
setVisibleCount((c) => Math.min(c + 100, allEntries.length))
}
>
Load previous logs
</Button>
</div>
) : (
)}
{visibleEntries.length > 0 && (
<div className="space-y-2">{renderedVisibleEntries}</div>
)}
{/* Render live viewers for running processes (after paginated list) */}
{renderedRunningProcessLogs}
{/* If nothing to show at all, show loader */}
{visibleEntries.length === 0 && runningProcessLogs.length === 0 && (
<Loader
message={
<>

View File

@@ -0,0 +1,56 @@
import { ConversationEntryDisplayType } from '@/lib/types';
import DisplayConversationEntry from '../DisplayConversationEntry';
import { NormalizedConversationViewer } from './NormalizedConversationViewer';
import Prompt from './Prompt';
import { Loader } from '@/components/ui/loader.tsx';
import { ExecutionProcess } from 'shared/types';
type Props = {
item: ConversationEntryDisplayType;
idx: number;
handleConversationUpdate: () => void;
visibleEntriesLength: number;
runningProcessDetails: Record<string, ExecutionProcess>;
};
const ConversationEntry = ({
item,
idx,
handleConversationUpdate,
visibleEntriesLength,
runningProcessDetails,
}: Props) => {
const showPrompt = item.isFirstInProcess && item.processPrompt;
// For running processes, render the live viewer below the static entries
if (item.processIsRunning && idx === visibleEntriesLength - 1) {
// Only render the live viewer for the last entry of a running process
const runningProcess = runningProcessDetails[item.processId];
if (runningProcess) {
return (
<div key={item.entry.timestamp || idx}>
{showPrompt && <Prompt prompt={item.processPrompt || ''} />}
<NormalizedConversationViewer
executionProcess={runningProcess}
onConversationUpdate={handleConversationUpdate}
diffDeletable
/>
</div>
);
}
// Fallback: show loading if not found
return <Loader message="Loading live logs..." size={24} className="py-4" />;
} else {
return (
<div key={item.entry.timestamp || idx}>
{showPrompt && <Prompt prompt={item.processPrompt || ''} />}
<DisplayConversationEntry
entry={item.entry}
index={idx}
diffDeletable
/>
</div>
);
}
};
export default ConversationEntry;

View File

@@ -1,25 +1,9 @@
import {
useCallback,
useContext,
useEffect,
useState,
useMemo,
useRef,
} from 'react';
import { Hammer } from 'lucide-react';
import { Loader } from '@/components/ui/loader.tsx';
import { executionProcessesApi } from '@/lib/api.ts';
import MarkdownRenderer from '@/components/ui/markdown-renderer.tsx';
import { applyPatch } from 'fast-json-patch';
import { fetchEventSource } from '@microsoft/fetch-event-source';
import type {
ExecutionProcess,
NormalizedConversation,
NormalizedEntry,
WorktreeDiff,
} from 'shared/types.ts';
import { TaskDetailsContext } from '@/components/context/taskDetailsContext.ts';
import type { ExecutionProcess, WorktreeDiff } from 'shared/types.ts';
import DisplayConversationEntry from '@/components/tasks/TaskDetails/DisplayConversationEntry.tsx';
import useNormalizedConversation from '@/hooks/useNormalizedConversation';
interface NormalizedConversationViewerProps {
executionProcess: ExecutionProcess;
@@ -34,401 +18,11 @@ export function NormalizedConversationViewer({
diffDeletable,
onConversationUpdate,
}: NormalizedConversationViewerProps) {
const { projectId } = useContext(TaskDetailsContext);
// Development-only logging helper
const debugLog = useCallback((message: string, ...args: any[]) => {
if (import.meta.env.DEV) {
console.log(message, ...args);
}
}, []);
const [conversation, setConversation] =
useState<NormalizedConversation | null>(null);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
// Track fetched processes to prevent redundant database calls
const fetchedProcesses = useRef(new Set<string>());
// SSE Connection Manager - production-ready with reconnection and resilience
const sseManagerRef = useRef<{
abortController: AbortController | null;
isActive: boolean;
highestBatchId: number;
reconnectAttempts: number;
reconnectTimeout: number | null;
processId: string;
processStatus: string;
patchFailureCount: number;
}>({
abortController: null,
isActive: false,
highestBatchId: 0,
reconnectAttempts: 0,
reconnectTimeout: null,
processId: executionProcess.id,
processStatus: executionProcess.status,
patchFailureCount: 0,
});
// SSE Connection Manager with Production-Ready Resilience using fetch-event-source
const createSSEConnection = useCallback(
(processId: string, projectId: string): AbortController => {
const manager = sseManagerRef.current;
// Build URL with resume cursor if we have processed batches
const baseUrl = `/api/projects/${projectId}/execution-processes/${processId}/normalized-logs/stream`;
const url =
manager.highestBatchId > 0
? `${baseUrl}?since_batch_id=${manager.highestBatchId}`
: baseUrl;
debugLog(
`🚀 SSE: Creating connection for process ${processId} (cursor: ${manager.highestBatchId})`
);
const abortController = new AbortController();
fetchEventSource(url, {
signal: abortController.signal,
onopen: async (response) => {
if (response.ok) {
debugLog(`✅ SSE: Connected to ${processId}`);
manager.isActive = true;
manager.reconnectAttempts = 0; // Reset on successful connection
manager.patchFailureCount = 0; // Reset patch failure count
if (manager.reconnectTimeout) {
clearTimeout(manager.reconnectTimeout);
manager.reconnectTimeout = null;
}
} else {
throw new Error(`SSE connection failed: ${response.status}`);
}
},
onmessage: (event) => {
if (event.event === 'patch') {
try {
const batchData = JSON.parse(event.data);
const { batch_id, patches } = batchData;
// Skip duplicates - use manager's batch tracking
if (batch_id && batch_id <= manager.highestBatchId) {
debugLog(
`⏭️ SSE: Skipping duplicate batch_id=${batch_id} (current=${manager.highestBatchId})`
);
return;
}
// Update cursor BEFORE processing
if (batch_id) {
manager.highestBatchId = batch_id;
debugLog(`📍 SSE: Processing batch_id=${batch_id}`);
}
setConversation((prev) => {
// Create empty conversation if none exists
const baseConversation = prev || {
entries: [],
session_id: null,
executor_type: 'unknown',
prompt: null,
summary: null,
};
try {
const updated = applyPatch(
JSON.parse(JSON.stringify(baseConversation)),
patches
).newDocument as NormalizedConversation;
updated.entries = updated.entries.filter(Boolean);
debugLog(
`🔧 SSE: Applied batch_id=${batch_id}, entries: ${updated.entries.length}`
);
// Reset patch failure count on successful application
manager.patchFailureCount = 0;
// Clear loading state on first successful patch
if (!prev) {
setLoading(false);
setError(null);
}
if (onConversationUpdate) {
setTimeout(onConversationUpdate, 0);
}
return updated;
} catch (patchError) {
console.warn('❌ SSE: Patch failed:', patchError);
// Reset cursor on failure for potential retry
if (batch_id && batch_id > 0) {
manager.highestBatchId = batch_id - 1;
}
// Track patch failures for monitoring
manager.patchFailureCount++;
debugLog(
`⚠️ SSE: Patch failure #${manager.patchFailureCount} for batch_id=${batch_id}`
);
return prev || baseConversation;
}
});
} catch (e) {
console.warn('❌ SSE: Parse failed:', e);
}
}
},
onerror: (err) => {
console.warn(`🔌 SSE: Connection error for ${processId}:`, err);
manager.isActive = false;
// Only attempt reconnection if process is still running
if (manager.processStatus === 'running') {
scheduleReconnect(processId, projectId);
}
},
onclose: () => {
debugLog(`🔌 SSE: Connection closed for ${processId}`);
manager.isActive = false;
},
}).catch((error) => {
if (error.name !== 'AbortError') {
console.warn(`❌ SSE: Fetch error for ${processId}:`, error);
manager.isActive = false;
// Only attempt reconnection if process is still running
if (manager.processStatus === 'running') {
scheduleReconnect(processId, projectId);
}
}
});
return abortController;
},
[onConversationUpdate, debugLog]
);
const scheduleReconnect = useCallback(
(processId: string, projectId: string) => {
const manager = sseManagerRef.current;
// Clear any existing reconnection timeout
if (manager.reconnectTimeout) {
clearTimeout(manager.reconnectTimeout);
}
// Exponential backoff: 1s, 2s, 4s, 8s, max 30s
const delay = Math.min(
1000 * Math.pow(2, manager.reconnectAttempts),
30000
);
manager.reconnectAttempts++;
debugLog(
`🔄 SSE: Scheduling reconnect attempt ${manager.reconnectAttempts} in ${delay}ms`
);
manager.reconnectTimeout = window.setTimeout(() => {
if (manager.processStatus === 'running') {
debugLog(`🔄 SSE: Attempting reconnect for ${processId}`);
establishSSEConnection(processId, projectId);
}
}, delay);
},
[debugLog]
);
const establishSSEConnection = useCallback(
(processId: string, projectId: string) => {
const manager = sseManagerRef.current;
// Close existing connection if any
if (manager.abortController) {
manager.abortController.abort();
manager.abortController = null;
manager.isActive = false;
}
const abortController = createSSEConnection(processId, projectId);
manager.abortController = abortController;
return abortController;
},
[createSSEConnection]
);
// Helper functions for SSE manager
const setProcessId = (id: string) => {
sseManagerRef.current.processId = id;
};
const setProcessStatus = (status: string) => {
sseManagerRef.current.processStatus = status;
};
// Consolidated cleanup function to avoid duplication
const cleanupSSEConnection = useCallback(() => {
const manager = sseManagerRef.current;
if (manager.abortController) {
manager.abortController.abort();
manager.abortController = null;
manager.isActive = false;
}
if (manager.reconnectTimeout) {
clearTimeout(manager.reconnectTimeout);
manager.reconnectTimeout = null;
}
}, []);
const fetchNormalizedLogsOnce = useCallback(
async (processId: string) => {
// Only fetch if not already fetched for this process
if (fetchedProcesses.current.has(processId)) {
debugLog(`📋 DB: Already fetched ${processId}, skipping`);
return;
}
try {
setLoading(true);
setError(null);
debugLog(`📋 DB: Fetching logs for ${processId}`);
const result = await executionProcessesApi.getNormalizedLogs(
projectId,
processId
);
// Mark as fetched
fetchedProcesses.current.add(processId);
setConversation((prev) => {
// Only update if content actually changed - use lightweight comparison
if (
!prev ||
prev.entries.length !== result.entries.length ||
prev.prompt !== result.prompt
) {
// Notify parent component of conversation update
if (onConversationUpdate) {
// Use setTimeout to ensure state update happens first
setTimeout(onConversationUpdate, 0);
}
return result;
}
return prev;
});
} catch (err) {
// Remove from fetched set on error to allow retry
fetchedProcesses.current.delete(processId);
setError(
`Error fetching logs: ${err instanceof Error ? err.message : 'Unknown error'}`
);
} finally {
setLoading(false);
}
},
[projectId, onConversationUpdate, debugLog]
);
// Process-based data fetching - fetch once from appropriate source
useEffect(() => {
const processId = executionProcess.id;
const processStatus = executionProcess.status;
debugLog(`🎯 Data: Process ${processId} is ${processStatus}`);
// Reset conversation state when switching processes
const manager = sseManagerRef.current;
if (manager.processId !== processId) {
setConversation(null);
setLoading(true);
setError(null);
// Clear fetch tracking for old processes (keep memory bounded)
if (fetchedProcesses.current.size > 10) {
fetchedProcesses.current.clear();
}
}
if (processStatus === 'running') {
// Running processes: SSE will handle data (including initial state)
debugLog(`🚀 Data: Using SSE for running process ${processId}`);
// SSE connection will be established by the SSE management effect
} else {
// Completed processes: Single database fetch
debugLog(`📋 Data: Using database for completed process ${processId}`);
fetchNormalizedLogsOnce(processId);
}
}, [
executionProcess.id,
executionProcess.status,
fetchNormalizedLogsOnce,
debugLog,
]);
// SSE connection management for running processes only
useEffect(() => {
const processId = executionProcess.id;
const processStatus = executionProcess.status;
const manager = sseManagerRef.current;
// Update manager state
setProcessId(processId);
setProcessStatus(processStatus);
// Only establish SSE for running processes
if (processStatus !== 'running') {
debugLog(
`🚫 SSE: Process ${processStatus}, cleaning up any existing connection`
);
cleanupSSEConnection();
return;
}
// Check if connection already exists for same process ID
if (manager.abortController && manager.processId === processId) {
debugLog(`⚠️ SSE: Connection already exists for ${processId}, reusing`);
return;
}
// Process changed - close existing and reset state
if (manager.abortController && manager.processId !== processId) {
debugLog(`🔄 SSE: Switching from ${manager.processId} to ${processId}`);
cleanupSSEConnection();
manager.highestBatchId = 0; // Reset cursor for new process
manager.reconnectAttempts = 0;
manager.patchFailureCount = 0; // Reset failure count for new process
}
// Update manager state
manager.processId = processId;
manager.processStatus = processStatus;
// Establish new connection
establishSSEConnection(processId, projectId);
return () => {
debugLog(`🔌 SSE: Cleanup connection for ${processId}`);
// Close connection if it belongs to this effect
if (manager.abortController && manager.processId === processId) {
cleanupSSEConnection();
}
};
}, [executionProcess.id, executionProcess.status]);
// Memoize display entries to avoid unnecessary re-renders
const displayEntries = useMemo(() => {
if (!conversation?.entries) return [];
// Filter out any null entries that may have been created by duplicate patch application
return conversation.entries.filter((entry): entry is NormalizedEntry =>
Boolean(entry && (entry as NormalizedEntry).entry_type)
);
}, [conversation?.entries]);
const { loading, error, conversation, displayEntries } =
useNormalizedConversation({
executionProcess,
onConversationUpdate,
});
if (loading) {
return (

View File

@@ -0,0 +1,22 @@
import MarkdownRenderer from '@/components/ui/markdown-renderer';
import { Hammer } from 'lucide-react';
const Prompt = ({ prompt }: { prompt: string }) => {
return (
<div className="flex items-start gap-3">
<div className="flex-shrink-0 mt-1">
<Hammer className="h-4 w-4 text-blue-600" />
</div>
<div className="flex-1 min-w-0">
<div className="text-sm whitespace-pre-wrap text-foreground">
<MarkdownRenderer
content={prompt}
className="whitespace-pre-wrap break-words"
/>
</div>
</div>
</div>
);
};
export default Prompt;

View File

@@ -173,6 +173,7 @@ function TaskDetailsToolbar() {
setAttemptData({
processes: [],
runningProcessDetails: {},
allLogs: [],
});
}
} catch (error) {

View File

@@ -12,9 +12,11 @@ export const Loader: React.FC<LoaderProps> = ({
size = 32,
className = '',
}) => (
<div className={`flex flex-col items-center justify-center ${className}`}>
<div
className={`flex flex-col items-center justify-center gap-2 ${className}`}
>
<Loader2
className="animate-spin text-muted-foreground mb-2"
className="animate-spin text-muted-foreground"
style={{ width: size, height: size }}
/>
{!!message && (

View File

@@ -0,0 +1,429 @@
import {
TaskAttemptDataContext,
TaskDetailsContext,
} from '@/components/context/taskDetailsContext';
import { fetchEventSource } from '@microsoft/fetch-event-source';
import { applyPatch } from 'fast-json-patch';
import {
useCallback,
useContext,
useEffect,
useMemo,
useRef,
useState,
} from 'react';
import {
ExecutionProcess,
NormalizedConversation,
NormalizedEntry,
} from 'shared/types';
const useNormalizedConversation = ({
executionProcess,
onConversationUpdate,
}: {
executionProcess?: ExecutionProcess;
onConversationUpdate?: () => void;
}) => {
const { projectId } = useContext(TaskDetailsContext);
const { attemptData } = useContext(TaskAttemptDataContext);
// Development-only logging helper
const debugLog = useCallback((message: string, ...args: any[]) => {
if (import.meta.env.DEV) {
console.log(message, ...args);
}
}, []);
const [conversation, setConversation] =
useState<NormalizedConversation | null>(null);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
// Track fetched processes to prevent redundant database calls
const fetchedProcesses = useRef(new Set<string>());
// SSE Connection Manager - production-ready with reconnection and resilience
const sseManagerRef = useRef<{
abortController: AbortController | null;
isActive: boolean;
highestBatchId: number;
reconnectAttempts: number;
reconnectTimeout: number | null;
processId: string;
processStatus: string;
patchFailureCount: number;
onopenCalled: boolean;
}>({
abortController: null,
isActive: false,
highestBatchId: 0,
reconnectAttempts: 0,
reconnectTimeout: null,
processId: executionProcess?.id || '',
processStatus: executionProcess?.status || '',
patchFailureCount: 0,
onopenCalled: false,
});
// SSE Connection Manager with Production-Ready Resilience using fetch-event-source
const createSSEConnection = useCallback(
(processId: string, projectId: string): AbortController => {
const manager = sseManagerRef.current;
// Build URL with resume cursor if we have processed batches
const baseUrl = `/api/projects/${projectId}/execution-processes/${processId}/normalized-logs/stream`;
const url =
manager.highestBatchId > 0
? `${baseUrl}?since_batch_id=${manager.highestBatchId}`
: baseUrl;
debugLog(
`🚀 SSE: Creating connection for process ${processId} (cursor: ${manager.highestBatchId})`
);
const abortController = new AbortController();
fetchEventSource(url, {
signal: abortController.signal,
onopen: async (response) => {
const manager = sseManagerRef.current;
if (manager.onopenCalled) {
// This is a "phantom" reconnect, so abort and re-create
debugLog(
'⚠️ SSE: onopen called again for same connection, forcing reconnect'
);
abortController.abort();
manager.abortController = null;
manager.isActive = false;
manager.onopenCalled = false;
// Re-establish with latest cursor
scheduleReconnect(processId, projectId);
return;
}
manager.onopenCalled = true;
if (response.ok) {
debugLog(`✅ SSE: Connected to ${processId}`);
manager.isActive = true;
manager.reconnectAttempts = 0; // Reset on successful connection
manager.patchFailureCount = 0; // Reset patch failure count
if (manager.reconnectTimeout) {
clearTimeout(manager.reconnectTimeout);
manager.reconnectTimeout = null;
}
} else {
throw new Error(`SSE connection failed: ${response.status}`);
}
},
onmessage: (event) => {
if (event.event === 'patch') {
try {
const batchData = JSON.parse(event.data);
const { batch_id, patches } = batchData;
// Skip duplicates - use manager's batch tracking
if (batch_id && batch_id <= manager.highestBatchId) {
debugLog(
`⏭️ SSE: Skipping duplicate batch_id=${batch_id} (current=${manager.highestBatchId})`
);
return;
}
// Update cursor BEFORE processing
if (batch_id) {
manager.highestBatchId = batch_id;
debugLog(`📍 SSE: Processing batch_id=${batch_id}`);
}
setConversation((prev) => {
// Create empty conversation if none exists
const baseConversation = prev || {
entries: [],
session_id: null,
executor_type: 'unknown',
prompt: null,
summary: null,
};
try {
const updated = applyPatch(
JSON.parse(JSON.stringify(baseConversation)),
patches
).newDocument as NormalizedConversation;
updated.entries = updated.entries.filter(Boolean);
debugLog(
`🔧 SSE: Applied batch_id=${batch_id}, entries: ${updated.entries.length}`
);
// Reset patch failure count on successful application
manager.patchFailureCount = 0;
// Clear loading state on first successful patch
if (!prev) {
setLoading(false);
setError(null);
}
if (onConversationUpdate) {
setTimeout(onConversationUpdate, 0);
}
return updated;
} catch (patchError) {
console.warn('❌ SSE: Patch failed:', patchError);
// Reset cursor on failure for potential retry
if (batch_id && batch_id > 0) {
manager.highestBatchId = batch_id - 1;
}
// Track patch failures for monitoring
manager.patchFailureCount++;
debugLog(
`⚠️ SSE: Patch failure #${manager.patchFailureCount} for batch_id=${batch_id}`
);
return prev || baseConversation;
}
});
} catch (e) {
console.warn('❌ SSE: Parse failed:', e);
}
}
},
onerror: (err) => {
console.warn(`🔌 SSE: Connection error for ${processId}:`, err);
manager.isActive = false;
// Only attempt reconnection if process is still running
if (manager.processStatus === 'running') {
scheduleReconnect(processId, projectId);
}
},
onclose: () => {
debugLog(`🔌 SSE: Connection closed for ${processId}`);
manager.isActive = false;
},
}).catch((error) => {
if (error.name !== 'AbortError') {
console.warn(`❌ SSE: Fetch error for ${processId}:`, error);
manager.isActive = false;
// Only attempt reconnection if process is still running
if (manager.processStatus === 'running') {
scheduleReconnect(processId, projectId);
}
}
});
return abortController;
},
[onConversationUpdate, debugLog]
);
const scheduleReconnect = useCallback(
(processId: string, projectId: string) => {
const manager = sseManagerRef.current;
// Clear any existing reconnection timeout
if (manager.reconnectTimeout) {
clearTimeout(manager.reconnectTimeout);
}
// Exponential backoff: 1s, 2s, 4s, 8s, max 30s
const delay = Math.min(
1000 * Math.pow(2, manager.reconnectAttempts),
30000
);
manager.reconnectAttempts++;
debugLog(
`🔄 SSE: Scheduling reconnect attempt ${manager.reconnectAttempts} in ${delay}ms`
);
manager.reconnectTimeout = window.setTimeout(() => {
if (manager.processStatus === 'running') {
debugLog(`🔄 SSE: Attempting reconnect for ${processId}`);
establishSSEConnection(processId, projectId);
}
}, delay);
},
[debugLog]
);
const establishSSEConnection = useCallback(
(processId: string, projectId: string) => {
const manager = sseManagerRef.current;
// Close existing connection if any
if (manager.abortController) {
manager.abortController.abort();
manager.abortController = null;
manager.isActive = false;
}
const abortController = createSSEConnection(processId, projectId);
manager.abortController = abortController;
return abortController;
},
[createSSEConnection]
);
// Helper functions for SSE manager
const setProcessId = (id: string) => {
sseManagerRef.current.processId = id;
};
const setProcessStatus = (status: string) => {
sseManagerRef.current.processStatus = status;
};
// Consolidated cleanup function to avoid duplication
const cleanupSSEConnection = useCallback(() => {
const manager = sseManagerRef.current;
if (manager.abortController) {
manager.abortController.abort();
manager.abortController = null;
manager.isActive = false;
}
if (manager.reconnectTimeout) {
clearTimeout(manager.reconnectTimeout);
manager.reconnectTimeout = null;
}
manager.onopenCalled = false;
}, []);
// Process-based data fetching - fetch once from appropriate source
useEffect(() => {
if (!executionProcess?.id || !executionProcess?.status) {
return;
}
const processId = executionProcess.id;
const processStatus = executionProcess.status;
debugLog(`🎯 Data: Process ${processId} is ${processStatus}`);
// Reset conversation state when switching processes
const manager = sseManagerRef.current;
if (manager.processId !== processId) {
setConversation(null);
setLoading(true);
setError(null);
// Clear fetch tracking for old processes (keep memory bounded)
if (fetchedProcesses.current.size > 10) {
fetchedProcesses.current.clear();
}
}
if (processStatus === 'running') {
// Running processes: SSE will handle data (including initial state)
debugLog(`🚀 Data: Using SSE for running process ${processId}`);
// SSE connection will be established by the SSE management effect
} else {
// Completed processes: Single database fetch
debugLog(`📋 Data: Using database for completed process ${processId}`);
const logs = attemptData.allLogs.find(
(entry) => entry.id === executionProcess.id
)?.normalized_conversation;
if (logs) {
setConversation((prev) => {
// Only update if content actually changed - use lightweight comparison
if (
!prev ||
prev.entries.length !== logs.entries.length ||
prev.prompt !== logs.prompt
) {
// Notify parent component of conversation update
if (onConversationUpdate) {
// Use setTimeout to ensure state update happens first
setTimeout(onConversationUpdate, 0);
}
return logs;
}
return prev;
});
}
setLoading(false);
}
}, [
executionProcess?.id,
executionProcess?.status,
attemptData.allLogs,
debugLog,
onConversationUpdate,
]);
// SSE connection management for running processes only
useEffect(() => {
if (!executionProcess?.id || !executionProcess?.status) {
return;
}
const processId = executionProcess.id;
const processStatus = executionProcess.status;
const manager = sseManagerRef.current;
// Update manager state
setProcessId(processId);
setProcessStatus(processStatus);
// Only establish SSE for running processes
if (processStatus !== 'running') {
debugLog(
`🚫 SSE: Process ${processStatus}, cleaning up any existing connection`
);
cleanupSSEConnection();
return;
}
// Check if connection already exists for same process ID
if (manager.abortController && manager.processId === processId) {
debugLog(`⚠️ SSE: Connection already exists for ${processId}, reusing`);
return;
}
// Process changed - close existing and reset state
if (manager.abortController && manager.processId !== processId) {
debugLog(`🔄 SSE: Switching from ${manager.processId} to ${processId}`);
cleanupSSEConnection();
manager.highestBatchId = 0; // Reset cursor for new process
manager.reconnectAttempts = 0;
manager.patchFailureCount = 0; // Reset failure count for new process
}
// Update manager state
manager.processId = processId;
manager.processStatus = processStatus;
// Establish new connection
establishSSEConnection(processId, projectId);
return () => {
debugLog(`🔌 SSE: Cleanup connection for ${processId}`);
// Close connection if it belongs to this effect
if (manager.abortController && manager.processId === processId) {
cleanupSSEConnection();
}
};
}, [executionProcess?.id, executionProcess?.status]);
// Memoize display entries to avoid unnecessary re-renders
const displayEntries = useMemo(() => {
if (!conversation?.entries) return [];
// Filter out any null entries that may have been created by duplicate patch application
return conversation.entries.filter((entry): entry is NormalizedEntry =>
Boolean(entry && (entry as NormalizedEntry).entry_type)
);
}, [conversation?.entries]);
return {
displayEntries,
conversation,
loading,
error,
};
};
export default useNormalizedConversation;

View File

@@ -7,14 +7,14 @@ import {
CreateTask,
CreateTaskAndStart,
CreateTaskAttempt,
DeviceStartResponse,
CreateTaskTemplate,
DeviceStartResponse,
DirectoryEntry,
type EditorType,
ExecutionProcess,
ExecutionProcessSummary,
GitBranch,
NormalizedConversation,
ProcessLogsResponse,
Project,
ProjectWithBranch,
Task,
@@ -471,6 +471,17 @@ export const attemptsApi = {
const response = await makeRequest(`/api/attempts/${attemptId}/details`);
return handleApiResponse<TaskAttempt>(response);
},
getAllLogs: async (
projectId: string,
taskId: string,
attemptId: string
): Promise<ProcessLogsResponse[]> => {
const response = await makeRequest(
`/api/projects/${projectId}/tasks/${taskId}/attempts/${attemptId}/logs`
);
return handleApiResponse(response);
},
};
// Execution Process APIs
@@ -484,16 +495,6 @@ export const executionProcessesApi = {
);
return handleApiResponse<ExecutionProcess>(response);
},
getNormalizedLogs: async (
projectId: string,
processId: string
): Promise<NormalizedConversation> => {
const response = await makeRequest(
`/api/projects/${projectId}/execution-processes/${processId}/normalized-logs`
);
return handleApiResponse<NormalizedConversation>(response);
},
};
// File System APIs

View File

@@ -2,11 +2,13 @@ import {
DiffChunkType,
ExecutionProcess,
ExecutionProcessSummary,
ProcessLogsResponse,
} from 'shared/types.ts';
export type AttemptData = {
processes: ExecutionProcessSummary[];
runningProcessDetails: Record<string, ExecutionProcess>;
allLogs: ProcessLogsResponse[];
};
export interface ProcessedLine {
@@ -23,3 +25,15 @@ export interface ProcessedSection {
expandedAbove?: boolean;
expandedBelow?: boolean;
}
export interface ConversationEntryDisplayType {
entry: any;
processId: string;
processPrompt?: string;
processStatus: string;
processIsRunning: boolean;
process: any;
isFirstInProcess: boolean;
processIndex: number;
entryIndex: number;
}