Fix WebSocket connection for process logs viewer (#734)

* fix: update useLogStream to use WebSocket instead of EventSource

The backend was migrated from SSE to WebSocket in a recent commit,
but the frontend hook was still trying to connect via EventSource.
This caused 'Connection failed' errors when viewing process logs.

Changes:
- Switch from EventSource to WebSocket connection
- Update endpoint to /api/execution-processes/{id}/raw-logs/ws
- Parse messages using LogMsg format (JsonPatch, Finished)
- Maintain all existing retry and error handling logic

* fix: address review feedback for WebSocket connection

- Fixed 'finished' message format: changed from {'Finished': ''} to {finished: true}
- Added isIntentionallyClosed flag to prevent reconnection loops
- Only retry connection on actual errors, not intentional closures
- Check WebSocket close code (1000 = normal closure) before retrying
This commit is contained in:
Alcibíades Cabral Díaz
2025-09-16 09:58:59 +01:00
committed by GitHub
parent d7c51ffdfa
commit 723617d3e3

View File

@@ -11,9 +11,10 @@ interface UseLogStreamResult {
export const useLogStream = (processId: string): UseLogStreamResult => { export const useLogStream = (processId: string): UseLogStreamResult => {
const [logs, setLogs] = useState<LogEntry[]>([]); const [logs, setLogs] = useState<LogEntry[]>([]);
const [error, setError] = useState<string | null>(null); const [error, setError] = useState<string | null>(null);
const eventSourceRef = useRef<EventSource | null>(null); const wsRef = useRef<WebSocket | null>(null);
const retryCountRef = useRef<number>(0); const retryCountRef = useRef<number>(0);
const retryTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null); const retryTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const isIntentionallyClosed = useRef<boolean>(false);
useEffect(() => { useEffect(() => {
if (!processId) { if (!processId) {
@@ -25,12 +26,15 @@ export const useLogStream = (processId: string): UseLogStreamResult => {
setError(null); setError(null);
const open = () => { const open = () => {
const eventSource = new EventSource( const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
`/api/execution-processes/${processId}/raw-logs` const host = window.location.host;
const ws = new WebSocket(
`${protocol}//${host}/api/execution-processes/${processId}/raw-logs/ws`
); );
eventSourceRef.current = eventSource; wsRef.current = ws;
isIntentionallyClosed.current = false;
eventSource.onopen = () => { ws.onopen = () => {
setError(null); setError(null);
// Reset logs on new connection since server replays history // Reset logs on new connection since server replays history
setLogs([]); setLogs([]);
@@ -41,42 +45,50 @@ export const useLogStream = (processId: string): UseLogStreamResult => {
setLogs((prev) => [...prev, entry]); setLogs((prev) => [...prev, entry]);
}; };
// Handle json_patch events (new format from server) // Handle WebSocket messages
eventSource.addEventListener('json_patch', (event) => { ws.onmessage = (event) => {
try { try {
const patches = JSON.parse(event.data); const data = JSON.parse(event.data);
patches.forEach((patch: any) => {
const value = patch?.value;
if (!value || !value.type) return;
switch (value.type) { // Handle different message types based on LogMsg enum
case 'STDOUT': if ('JsonPatch' in data) {
case 'STDERR': const patches = data.JsonPatch;
addLogEntry({ type: value.type, content: value.content }); patches.forEach((patch: any) => {
break; const value = patch?.value;
// Ignore other patch types (NORMALIZED_ENTRY, DIFF, etc.) if (!value || !value.type) return;
default:
break; switch (value.type) {
} case 'STDOUT':
}); case 'STDERR':
addLogEntry({ type: value.type, content: value.content });
break;
// Ignore other patch types (NORMALIZED_ENTRY, DIFF, etc.)
default:
break;
}
});
} else if (data.finished === true) {
isIntentionallyClosed.current = true;
ws.close();
}
} catch (e) { } catch (e) {
console.error('Failed to parse json_patch:', e); console.error('Failed to parse message:', e);
} }
}); };
eventSource.addEventListener('finished', () => { ws.onerror = () => {
eventSource.close();
});
eventSource.onerror = () => {
setError('Connection failed'); setError('Connection failed');
eventSource.close(); };
// Retry a few times with backoff in case of race before logs are ready
const next = retryCountRef.current + 1; ws.onclose = (event) => {
retryCountRef.current = next; // Only retry if the close was not intentional and not a normal closure
if (next <= 6) { if (!isIntentionallyClosed.current && event.code !== 1000) {
const delay = Math.min(1500, 250 * 2 ** (next - 1)); const next = retryCountRef.current + 1;
retryTimerRef.current = setTimeout(() => open(), delay); retryCountRef.current = next;
if (next <= 6) {
const delay = Math.min(1500, 250 * 2 ** (next - 1));
retryTimerRef.current = setTimeout(() => open(), delay);
}
} }
}; };
}; };
@@ -84,9 +96,10 @@ export const useLogStream = (processId: string): UseLogStreamResult => {
open(); open();
return () => { return () => {
if (eventSourceRef.current) { if (wsRef.current) {
eventSourceRef.current.close(); isIntentionallyClosed.current = true;
eventSourceRef.current = null; wsRef.current.close();
wsRef.current = null;
} }
if (retryTimerRef.current) { if (retryTimerRef.current) {
clearTimeout(retryTimerRef.current); clearTimeout(retryTimerRef.current);