Streaming support for conversation history with SSE (#167)

* Streaming support with SSE

The main focus was on Gemini-CLI token streaming, which uses the standard JSON-Patch format to stream real-time updates to the frontend visa SSE.

There is also a default database-backed SSE implementation which covers the remaining executors like Claude-code.

* minor refactorings
This commit is contained in:
Solomon
2025-07-16 13:31:49 +01:00
committed by GitHub
parent f6b5aae531
commit 6a51020fd9
14 changed files with 1463 additions and 605 deletions

View File

@@ -10,6 +10,7 @@
"dependencies": {
"@dnd-kit/core": "^6.3.1",
"@dnd-kit/modifiers": "^9.0.0",
"@microsoft/fetch-event-source": "^2.0.1",
"@radix-ui/react-dropdown-menu": "^2.1.15",
"@radix-ui/react-label": "^2.1.7",
"@radix-ui/react-portal": "^1.1.9",
@@ -23,6 +24,7 @@
"class-variance-authority": "^0.7.0",
"click-to-react-component": "^1.1.2",
"clsx": "^2.0.0",
"fast-json-patch": "^3.1.1",
"lucide-react": "^0.303.0",
"react": "^18.2.0",
"react-dom": "^18.2.0",
@@ -1131,6 +1133,12 @@
"@jridgewell/sourcemap-codec": "^1.4.14"
}
},
"node_modules/@microsoft/fetch-event-source": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz",
"integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA==",
"license": "MIT"
},
"node_modules/@nodelib/fs.scandir": {
"version": "2.1.5",
"resolved": "https://registry.npmjs.org/@nodelib/fs.scandir/-/fs.scandir-2.1.5.tgz",
@@ -3961,6 +3969,12 @@
"node": ">= 6"
}
},
"node_modules/fast-json-patch": {
"version": "3.1.1",
"resolved": "https://registry.npmjs.org/fast-json-patch/-/fast-json-patch-3.1.1.tgz",
"integrity": "sha512-vf6IHUX2SBcA+5/+4883dsIjpBTqmfBjmYiWK1savxQmFk4JfBMLa7ynTYOs1Rolp/T1betJxHiGD3g1Mn8lUQ==",
"license": "MIT"
},
"node_modules/fast-json-stable-stringify": {
"version": "2.1.0",
"resolved": "https://registry.npmjs.org/fast-json-stable-stringify/-/fast-json-stable-stringify-2.1.0.tgz",

View File

@@ -15,6 +15,7 @@
"dependencies": {
"@dnd-kit/core": "^6.3.1",
"@dnd-kit/modifiers": "^9.0.0",
"@microsoft/fetch-event-source": "^2.0.1",
"@radix-ui/react-dropdown-menu": "^2.1.15",
"@radix-ui/react-label": "^2.1.7",
"@radix-ui/react-portal": "^1.1.9",
@@ -28,6 +29,7 @@
"class-variance-authority": "^0.7.0",
"click-to-react-component": "^1.1.2",
"clsx": "^2.0.0",
"fast-json-patch": "^3.1.1",
"lucide-react": "^0.303.0",
"react": "^18.2.0",
"react-dom": "^18.2.0",

View File

@@ -1,8 +1,17 @@
import { useCallback, useContext, useEffect, useMemo, useState } from 'react';
import { Bot, Hammer, ToggleLeft, ToggleRight } from 'lucide-react';
import {
useCallback,
useContext,
useEffect,
useState,
useMemo,
useRef,
} from 'react';
import { Hammer } from 'lucide-react';
import { Loader } from '@/components/ui/loader.tsx';
import { executionProcessesApi } from '@/lib/api.ts';
import MarkdownRenderer from '@/components/ui/markdown-renderer.tsx';
import { applyPatch } from 'fast-json-patch';
import { fetchEventSource } from '@microsoft/fetch-event-source';
import type {
ExecutionProcess,
NormalizedConversation,
@@ -20,125 +29,288 @@ interface NormalizedConversationViewerProps {
diffDeletable?: boolean;
}
// Configuration for Gemini message clustering
const GEMINI_CLUSTERING_CONFIG = {
enabled: true,
maxClusterSize: 5000, // Maximum characters per cluster
maxClusterCount: 50, // Maximum number of messages to cluster together
minClusterSize: 2, // Minimum number of messages to consider clustering
};
/**
* Utility function to cluster adjacent assistant messages for Gemini executor.
*
* This function merges consecutive assistant messages into larger chunks to improve
* readability while preserving the progressive nature of Gemini's output.
*
* Clustering rules:
* - Only assistant messages are clustered together
* - Non-assistant messages (errors, tool use, etc.) break clustering
* - Clusters are limited by size (characters) and count (number of messages)
* - Requires minimum of 2 messages to form a cluster
* - Original content and formatting is preserved
*
* @param entries - Original conversation entries
* @param enabled - Whether clustering is enabled
* @returns - Processed entries with clustering applied
*/
const clusterGeminiMessages = (
entries: NormalizedEntry[],
enabled: boolean
): NormalizedEntry[] => {
if (!enabled) {
return entries;
}
const clustered: NormalizedEntry[] = [];
let currentCluster: NormalizedEntry[] = [];
const flushCluster = () => {
if (currentCluster.length === 0) return;
if (currentCluster.length < GEMINI_CLUSTERING_CONFIG.minClusterSize) {
// Not enough messages to cluster, add them individually
clustered.push(...currentCluster);
} else {
// Merge multiple messages into one
// Join with newlines to preserve message boundaries and readability
const mergedContent = currentCluster
.map((entry) => entry.content)
.join('\n');
const mergedEntry: NormalizedEntry = {
timestamp: currentCluster[0].timestamp, // Use timestamp of first message
entry_type: currentCluster[0].entry_type,
content: mergedContent,
};
clustered.push(mergedEntry);
}
currentCluster = [];
};
for (const entry of entries) {
const isAssistantMessage = entry.entry_type.type === 'assistant_message';
if (isAssistantMessage) {
// Check if we can add to current cluster
const wouldExceedSize =
currentCluster.length > 0 &&
currentCluster.map((e) => e.content).join('').length +
entry.content.length >
GEMINI_CLUSTERING_CONFIG.maxClusterSize;
const wouldExceedCount =
currentCluster.length >= GEMINI_CLUSTERING_CONFIG.maxClusterCount;
if (wouldExceedSize || wouldExceedCount) {
// Flush current cluster and start new one
flushCluster();
}
currentCluster.push(entry);
} else {
// Non-assistant message, flush current cluster and add this message separately
flushCluster();
clustered.push(entry);
}
}
// Flush any remaining cluster
flushCluster();
return clustered;
};
export function NormalizedConversationViewer({
executionProcess,
diffDeletable,
onConversationUpdate,
}: NormalizedConversationViewerProps) {
const { projectId } = useContext(TaskDetailsContext);
// Development-only logging helper
const debugLog = useCallback((message: string, ...args: any[]) => {
if (import.meta.env.DEV) {
console.log(message, ...args);
}
}, []);
const [conversation, setConversation] =
useState<NormalizedConversation | null>(null);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const [clusteringEnabled, setClusteringEnabled] = useState(
GEMINI_CLUSTERING_CONFIG.enabled
// Track fetched processes to prevent redundant database calls
const fetchedProcesses = useRef(new Set<string>());
// SSE Connection Manager - production-ready with reconnection and resilience
const sseManagerRef = useRef<{
abortController: AbortController | null;
isActive: boolean;
highestBatchId: number;
reconnectAttempts: number;
reconnectTimeout: number | null;
processId: string;
processStatus: string;
patchFailureCount: number;
}>({
abortController: null,
isActive: false,
highestBatchId: 0,
reconnectAttempts: 0,
reconnectTimeout: null,
processId: executionProcess.id,
processStatus: executionProcess.status,
patchFailureCount: 0,
});
// SSE Connection Manager with Production-Ready Resilience using fetch-event-source
const createSSEConnection = useCallback(
(processId: string, projectId: string): AbortController => {
const manager = sseManagerRef.current;
// Build URL with resume cursor if we have processed batches
const baseUrl = `/api/projects/${projectId}/execution-processes/${processId}/normalized-logs/stream`;
const url =
manager.highestBatchId > 0
? `${baseUrl}?since_batch_id=${manager.highestBatchId}`
: baseUrl;
debugLog(
`🚀 SSE: Creating connection for process ${processId} (cursor: ${manager.highestBatchId})`
);
const abortController = new AbortController();
fetchEventSource(url, {
signal: abortController.signal,
onopen: async (response) => {
if (response.ok) {
debugLog(`✅ SSE: Connected to ${processId}`);
manager.isActive = true;
manager.reconnectAttempts = 0; // Reset on successful connection
manager.patchFailureCount = 0; // Reset patch failure count
if (manager.reconnectTimeout) {
clearTimeout(manager.reconnectTimeout);
manager.reconnectTimeout = null;
}
} else {
throw new Error(`SSE connection failed: ${response.status}`);
}
},
onmessage: (event) => {
if (event.event === 'patch') {
try {
const batchData = JSON.parse(event.data);
const { batch_id, patches } = batchData;
// Skip duplicates - use manager's batch tracking
if (batch_id && batch_id <= manager.highestBatchId) {
debugLog(
`⏭️ SSE: Skipping duplicate batch_id=${batch_id} (current=${manager.highestBatchId})`
);
return;
}
// Update cursor BEFORE processing
if (batch_id) {
manager.highestBatchId = batch_id;
debugLog(`📍 SSE: Processing batch_id=${batch_id}`);
}
setConversation((prev) => {
// Create empty conversation if none exists
const baseConversation = prev || {
entries: [],
session_id: null,
executor_type: 'unknown',
prompt: null,
summary: null,
};
try {
const updated = applyPatch(
JSON.parse(JSON.stringify(baseConversation)),
patches
).newDocument as NormalizedConversation;
updated.entries = updated.entries.filter(Boolean);
debugLog(
`🔧 SSE: Applied batch_id=${batch_id}, entries: ${updated.entries.length}`
);
// Reset patch failure count on successful application
manager.patchFailureCount = 0;
// Clear loading state on first successful patch
if (!prev) {
setLoading(false);
setError(null);
}
if (onConversationUpdate) {
setTimeout(onConversationUpdate, 0);
}
return updated;
} catch (patchError) {
console.warn('❌ SSE: Patch failed:', patchError);
// Reset cursor on failure for potential retry
if (batch_id && batch_id > 0) {
manager.highestBatchId = batch_id - 1;
}
// Track patch failures for monitoring
manager.patchFailureCount++;
debugLog(
`⚠️ SSE: Patch failure #${manager.patchFailureCount} for batch_id=${batch_id}`
);
return prev || baseConversation;
}
});
} catch (e) {
console.warn('❌ SSE: Parse failed:', e);
}
}
},
onerror: (err) => {
console.warn(`🔌 SSE: Connection error for ${processId}:`, err);
manager.isActive = false;
// Only attempt reconnection if process is still running
if (manager.processStatus === 'running') {
scheduleReconnect(processId, projectId);
}
},
onclose: () => {
debugLog(`🔌 SSE: Connection closed for ${processId}`);
manager.isActive = false;
},
}).catch((error) => {
if (error.name !== 'AbortError') {
console.warn(`❌ SSE: Fetch error for ${processId}:`, error);
manager.isActive = false;
// Only attempt reconnection if process is still running
if (manager.processStatus === 'running') {
scheduleReconnect(processId, projectId);
}
}
});
return abortController;
},
[onConversationUpdate, debugLog]
);
const fetchNormalizedLogs = useCallback(
async (isPolling = false) => {
try {
if (!isPolling) {
setLoading(true);
setError(null);
const scheduleReconnect = useCallback(
(processId: string, projectId: string) => {
const manager = sseManagerRef.current;
// Clear any existing reconnection timeout
if (manager.reconnectTimeout) {
clearTimeout(manager.reconnectTimeout);
}
// Exponential backoff: 1s, 2s, 4s, 8s, max 30s
const delay = Math.min(
1000 * Math.pow(2, manager.reconnectAttempts),
30000
);
manager.reconnectAttempts++;
debugLog(
`🔄 SSE: Scheduling reconnect attempt ${manager.reconnectAttempts} in ${delay}ms`
);
manager.reconnectTimeout = window.setTimeout(() => {
if (manager.processStatus === 'running') {
debugLog(`🔄 SSE: Attempting reconnect for ${processId}`);
establishSSEConnection(processId, projectId);
}
}, delay);
},
[debugLog]
);
const establishSSEConnection = useCallback(
(processId: string, projectId: string) => {
const manager = sseManagerRef.current;
// Close existing connection if any
if (manager.abortController) {
manager.abortController.abort();
manager.abortController = null;
manager.isActive = false;
}
const abortController = createSSEConnection(processId, projectId);
manager.abortController = abortController;
return abortController;
},
[createSSEConnection]
);
// Helper functions for SSE manager
const setProcessId = (id: string) => {
sseManagerRef.current.processId = id;
};
const setProcessStatus = (status: string) => {
sseManagerRef.current.processStatus = status;
};
// Consolidated cleanup function to avoid duplication
const cleanupSSEConnection = useCallback(() => {
const manager = sseManagerRef.current;
if (manager.abortController) {
manager.abortController.abort();
manager.abortController = null;
manager.isActive = false;
}
if (manager.reconnectTimeout) {
clearTimeout(manager.reconnectTimeout);
manager.reconnectTimeout = null;
}
}, []);
const fetchNormalizedLogsOnce = useCallback(
async (processId: string) => {
// Only fetch if not already fetched for this process
if (fetchedProcesses.current.has(processId)) {
debugLog(`📋 DB: Already fetched ${processId}, skipping`);
return;
}
try {
setLoading(true);
setError(null);
debugLog(`📋 DB: Fetching logs for ${processId}`);
const result = await executionProcessesApi.getNormalizedLogs(
projectId,
executionProcess.id
processId
);
// Mark as fetched
fetchedProcesses.current.add(processId);
setConversation((prev) => {
// Only update if content actually changed
if (!prev || JSON.stringify(prev) !== JSON.stringify(result)) {
// Only update if content actually changed - use lightweight comparison
if (
!prev ||
prev.entries.length !== result.entries.length ||
prev.prompt !== result.prompt
) {
// Notify parent component of conversation update
if (onConversationUpdate) {
// Use setTimeout to ensure state update happens first
@@ -149,55 +321,114 @@ export function NormalizedConversationViewer({
return prev;
});
} catch (err) {
if (!isPolling) {
setError(
`Error fetching logs: ${err instanceof Error ? err.message : 'Unknown error'}`
);
}
// Remove from fetched set on error to allow retry
fetchedProcesses.current.delete(processId);
setError(
`Error fetching logs: ${err instanceof Error ? err.message : 'Unknown error'}`
);
} finally {
if (!isPolling) {
setLoading(false);
}
setLoading(false);
}
},
[executionProcess.id, projectId, onConversationUpdate]
[projectId, onConversationUpdate, debugLog]
);
// Initial fetch
// Process-based data fetching - fetch once from appropriate source
useEffect(() => {
fetchNormalizedLogs();
}, [fetchNormalizedLogs]);
const processId = executionProcess.id;
const processStatus = executionProcess.status;
// Auto-refresh every 2 seconds when process is running
useEffect(() => {
if (executionProcess.status === 'running') {
const interval = setInterval(() => {
fetchNormalizedLogs(true);
}, 2000);
debugLog(`🎯 Data: Process ${processId} is ${processStatus}`);
return () => clearInterval(interval);
// Reset conversation state when switching processes
const manager = sseManagerRef.current;
if (manager.processId !== processId) {
setConversation(null);
setLoading(true);
setError(null);
// Clear fetch tracking for old processes (keep memory bounded)
if (fetchedProcesses.current.size > 10) {
fetchedProcesses.current.clear();
}
}
}, [executionProcess.status, fetchNormalizedLogs]);
// Apply clustering for Gemini executor conversations
const isGeminiExecutor = useMemo(
() => conversation?.executor_type === 'gemini',
[conversation?.executor_type]
);
const hasAssistantMessages = useMemo(
() =>
conversation?.entries.some(
(entry) => entry.entry_type.type === 'assistant_message'
),
[conversation?.entries]
);
const displayEntries = useMemo(
() =>
isGeminiExecutor && conversation?.entries
? clusterGeminiMessages(conversation.entries, clusteringEnabled)
: conversation?.entries || [],
[isGeminiExecutor, conversation?.entries, clusteringEnabled]
);
if (processStatus === 'running') {
// Running processes: SSE will handle data (including initial state)
debugLog(`🚀 Data: Using SSE for running process ${processId}`);
// SSE connection will be established by the SSE management effect
} else {
// Completed processes: Single database fetch
debugLog(`📋 Data: Using database for completed process ${processId}`);
fetchNormalizedLogsOnce(processId);
}
}, [
executionProcess.id,
executionProcess.status,
fetchNormalizedLogsOnce,
debugLog,
]);
// SSE connection management for running processes only
useEffect(() => {
const processId = executionProcess.id;
const processStatus = executionProcess.status;
const manager = sseManagerRef.current;
// Update manager state
setProcessId(processId);
setProcessStatus(processStatus);
// Only establish SSE for running processes
if (processStatus !== 'running') {
debugLog(
`🚫 SSE: Process ${processStatus}, cleaning up any existing connection`
);
cleanupSSEConnection();
return;
}
// Check if connection already exists for same process ID
if (manager.abortController && manager.processId === processId) {
debugLog(`⚠️ SSE: Connection already exists for ${processId}, reusing`);
return;
}
// Process changed - close existing and reset state
if (manager.abortController && manager.processId !== processId) {
debugLog(`🔄 SSE: Switching from ${manager.processId} to ${processId}`);
cleanupSSEConnection();
manager.highestBatchId = 0; // Reset cursor for new process
manager.reconnectAttempts = 0;
manager.patchFailureCount = 0; // Reset failure count for new process
}
// Update manager state
manager.processId = processId;
manager.processStatus = processStatus;
// Establish new connection
establishSSEConnection(processId, projectId);
return () => {
debugLog(`🔌 SSE: Cleanup connection for ${processId}`);
// Close connection if it belongs to this effect
if (manager.abortController && manager.processId === processId) {
cleanupSSEConnection();
}
};
}, [executionProcess.id, executionProcess.status]);
// Memoize display entries to avoid unnecessary re-renders
const displayEntries = useMemo(() => {
if (!conversation?.entries) return [];
// Filter out any null entries that may have been created by duplicate patch application
return conversation.entries.filter((entry): entry is NormalizedEntry =>
Boolean(entry && (entry as NormalizedEntry).entry_type)
);
}, [conversation?.entries]);
if (loading) {
return (
@@ -228,39 +459,6 @@ export function NormalizedConversationViewer({
return (
<div>
{/* Display clustering controls for Gemini */}
{isGeminiExecutor && hasAssistantMessages && (
<div className="mb-4 p-2 bg-blue-50 dark:bg-blue-950/20 border border-blue-200 dark:border-blue-800 rounded-md">
<div className="flex items-center justify-between">
<div className="flex items-center gap-2 text-xs text-blue-700 dark:text-blue-300">
<Bot className="h-3 w-3" />
<span>
{clusteringEnabled &&
displayEntries.length !== conversation.entries.length
? `Messages clustered for better readability (${conversation.entries.length}${displayEntries.length} messages)`
: 'Gemini message clustering'}
</span>
</div>
<button
onClick={() => setClusteringEnabled(!clusteringEnabled)}
className="flex items-center gap-1 text-xs text-blue-700 dark:text-blue-300 hover:text-blue-800 dark:hover:text-blue-200 transition-colors"
title={
clusteringEnabled
? 'Disable message clustering'
: 'Enable message clustering'
}
>
{clusteringEnabled ? (
<ToggleRight className="h-4 w-4" />
) : (
<ToggleLeft className="h-4 w-4" />
)}
<span>{clusteringEnabled ? 'ON' : 'OFF'}</span>
</button>
</div>
</div>
)}
{/* Display prompt if available */}
{conversation.prompt && (
<div className="flex items-start gap-3">