From 723617d3e3c19c8b0957ef6924600d77131bf74b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alcib=C3=ADades=20Cabral=20D=C3=ADaz?= <62911544+alcibiadesc@users.noreply.github.com> Date: Tue, 16 Sep 2025 09:58:59 +0100 Subject: [PATCH] Fix WebSocket connection for process logs viewer (#734) * fix: update useLogStream to use WebSocket instead of EventSource The backend was migrated from SSE to WebSocket in a recent commit, but the frontend hook was still trying to connect via EventSource. This caused 'Connection failed' errors when viewing process logs. Changes: - Switch from EventSource to WebSocket connection - Update endpoint to /api/execution-processes/{id}/raw-logs/ws - Parse messages using LogMsg format (JsonPatch, Finished) - Maintain all existing retry and error handling logic * fix: address review feedback for WebSocket connection - Fixed 'finished' message format: changed from {'Finished': ''} to {finished: true} - Added isIntentionallyClosed flag to prevent reconnection loops - Only retry connection on actual errors, not intentional closures - Check WebSocket close code (1000 = normal closure) before retrying --- frontend/src/hooks/useLogStream.ts | 89 +++++++++++++++++------------- 1 file changed, 51 insertions(+), 38 deletions(-) diff --git a/frontend/src/hooks/useLogStream.ts b/frontend/src/hooks/useLogStream.ts index 0a649619..fad0c798 100644 --- a/frontend/src/hooks/useLogStream.ts +++ b/frontend/src/hooks/useLogStream.ts @@ -11,9 +11,10 @@ interface UseLogStreamResult { export const useLogStream = (processId: string): UseLogStreamResult => { const [logs, setLogs] = useState([]); const [error, setError] = useState(null); - const eventSourceRef = useRef(null); + const wsRef = useRef(null); const retryCountRef = useRef(0); const retryTimerRef = useRef | null>(null); + const isIntentionallyClosed = useRef(false); useEffect(() => { if (!processId) { @@ -25,12 +26,15 @@ export const useLogStream = (processId: string): UseLogStreamResult => { setError(null); const open = () => { - const eventSource = new EventSource( - `/api/execution-processes/${processId}/raw-logs` + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const host = window.location.host; + const ws = new WebSocket( + `${protocol}//${host}/api/execution-processes/${processId}/raw-logs/ws` ); - eventSourceRef.current = eventSource; + wsRef.current = ws; + isIntentionallyClosed.current = false; - eventSource.onopen = () => { + ws.onopen = () => { setError(null); // Reset logs on new connection since server replays history setLogs([]); @@ -41,42 +45,50 @@ export const useLogStream = (processId: string): UseLogStreamResult => { setLogs((prev) => [...prev, entry]); }; - // Handle json_patch events (new format from server) - eventSource.addEventListener('json_patch', (event) => { + // Handle WebSocket messages + ws.onmessage = (event) => { try { - const patches = JSON.parse(event.data); - patches.forEach((patch: any) => { - const value = patch?.value; - if (!value || !value.type) return; + const data = JSON.parse(event.data); - switch (value.type) { - case 'STDOUT': - case 'STDERR': - addLogEntry({ type: value.type, content: value.content }); - break; - // Ignore other patch types (NORMALIZED_ENTRY, DIFF, etc.) - default: - break; - } - }); + // Handle different message types based on LogMsg enum + if ('JsonPatch' in data) { + const patches = data.JsonPatch; + patches.forEach((patch: any) => { + const value = patch?.value; + if (!value || !value.type) return; + + switch (value.type) { + case 'STDOUT': + case 'STDERR': + addLogEntry({ type: value.type, content: value.content }); + break; + // Ignore other patch types (NORMALIZED_ENTRY, DIFF, etc.) + default: + break; + } + }); + } else if (data.finished === true) { + isIntentionallyClosed.current = true; + ws.close(); + } } catch (e) { - console.error('Failed to parse json_patch:', e); + console.error('Failed to parse message:', e); } - }); + }; - eventSource.addEventListener('finished', () => { - eventSource.close(); - }); - - eventSource.onerror = () => { + ws.onerror = () => { setError('Connection failed'); - eventSource.close(); - // Retry a few times with backoff in case of race before logs are ready - const next = retryCountRef.current + 1; - retryCountRef.current = next; - if (next <= 6) { - const delay = Math.min(1500, 250 * 2 ** (next - 1)); - retryTimerRef.current = setTimeout(() => open(), delay); + }; + + ws.onclose = (event) => { + // Only retry if the close was not intentional and not a normal closure + if (!isIntentionallyClosed.current && event.code !== 1000) { + const next = retryCountRef.current + 1; + retryCountRef.current = next; + if (next <= 6) { + const delay = Math.min(1500, 250 * 2 ** (next - 1)); + retryTimerRef.current = setTimeout(() => open(), delay); + } } }; }; @@ -84,9 +96,10 @@ export const useLogStream = (processId: string): UseLogStreamResult => { open(); return () => { - if (eventSourceRef.current) { - eventSourceRef.current.close(); - eventSourceRef.current = null; + if (wsRef.current) { + isIntentionallyClosed.current = true; + wsRef.current.close(); + wsRef.current = null; } if (retryTimerRef.current) { clearTimeout(retryTimerRef.current);