From 15dddacfe2906c87ee9685baf5e3733ea6894cc5 Mon Sep 17 00:00:00 2001 From: Louis Knight-Webb Date: Fri, 12 Sep 2025 18:09:14 +0100 Subject: [PATCH] Improve performance of conversation (#692) * Stream endpoint for execution processes (vibe-kanban c5144da6) I want an endpoint that's similar to task stream in crates/server/src/routes/tasks.rs but contains execution processes. The structure of the document should be: ```json { "execution_processes": { [EXECUTION_PROCESS_ID]: { ... execution process fields } } } ``` The endpoint should be at `/api/execution_processes/stream?task_attempt_id=...` crates/server/src/routes/execution_processes.rs * add virtualizedlist component * WIP remove execution processes * rebase syntax fix * tmp fix lint * lint * VirtuosoMessageList * cache * event based hook * historic * handle failed historic * running processes * user message * loading * cleanup * render user message * style * fmt * better indication for setup/cleanup scripts * fix ref issue * virtuoso license * fmt * update loader * loading * fmt * loading improvements * copy as markdown styles * spacing improvement * flush all historic at once * padding fix * markdown copy sticky * make user message editable * edit message * reset * cleanup * hook order * remove dead code --- .github/workflows/pre-release.yml | 2 + crates/executors/src/logs/mod.rs | 1 + .../server/src/routes/execution_processes.rs | 17 +- crates/services/src/services/events.rs | 180 ++++++- frontend/package.json | 6 +- .../DisplayConversationEntry.tsx | 92 +++- .../NormalizedConversation/ToolDetails.tsx | 2 +- .../NormalizedConversation/UserMessage.tsx | 70 +++ .../dialogs/global/OnboardingDialog.tsx | 1 - frontend/src/components/logs/ProcessGroup.tsx | 45 -- .../src/components/logs/ProcessStartCard.tsx | 252 --------- .../src/components/logs/VirtualizedList.tsx | 127 +++++ .../TaskDetails/ConversationExecutionLogs.tsx | 26 + .../components/tasks/TaskDetails/LogsTab.tsx | 492 +----------------- .../src/components/tasks/TaskDetailsPanel.tsx | 50 +- .../src/components/ui/markdown-renderer.tsx | 6 +- frontend/src/hooks/useConversationHistory.ts | 474 +++++++++++++++++ frontend/src/hooks/useExecutionProcesses.ts | 54 ++ frontend/src/hooks/useNormalizedLogs.tsx | 58 +++ frontend/src/hooks/useProcessRetry.ts | 229 ++++++++ frontend/src/hooks/useProjectTasks.ts | 8 +- frontend/src/pages/project-tasks.tsx | 2 +- .../src/utils/streamSseJsonPatchEntries.ts | 126 +++++ pnpm-lock.yaml | 44 +- shared/types.ts | 4 +- 25 files changed, 1492 insertions(+), 876 deletions(-) create mode 100644 frontend/src/components/NormalizedConversation/UserMessage.tsx delete mode 100644 frontend/src/components/logs/ProcessGroup.tsx delete mode 100644 frontend/src/components/logs/ProcessStartCard.tsx create mode 100644 frontend/src/components/logs/VirtualizedList.tsx create mode 100644 frontend/src/components/tasks/TaskDetails/ConversationExecutionLogs.tsx create mode 100644 frontend/src/hooks/useConversationHistory.ts create mode 100644 frontend/src/hooks/useExecutionProcesses.ts create mode 100644 frontend/src/hooks/useNormalizedLogs.tsx create mode 100644 frontend/src/hooks/useProcessRetry.ts create mode 100644 frontend/src/utils/streamSseJsonPatchEntries.ts diff --git a/.github/workflows/pre-release.yml b/.github/workflows/pre-release.yml index c4edf243..2af7c5f1 100644 --- a/.github/workflows/pre-release.yml +++ b/.github/workflows/pre-release.yml @@ -115,6 +115,8 @@ jobs: build-frontend: needs: bump-version runs-on: ubuntu-22.04 + env: + VITE_PUBLIC_REACT_VIRTUOSO_LICENSE_KEY: ${{ secrets.PUBLIC_REACT_VIRTUOSO_LICENSE_KEY }} steps: - uses: actions/checkout@v4 with: diff --git a/crates/executors/src/logs/mod.rs b/crates/executors/src/logs/mod.rs index c4e9ce81..e9bc7eb0 100644 --- a/crates/executors/src/logs/mod.rs +++ b/crates/executors/src/logs/mod.rs @@ -57,6 +57,7 @@ pub enum NormalizedEntryType { SystemMessage, ErrorMessage, Thinking, + Loading, } #[derive(Debug, Clone, Serialize, Deserialize, TS)] diff --git a/crates/server/src/routes/execution_processes.rs b/crates/server/src/routes/execution_processes.rs index 81da38b4..07c76543 100644 --- a/crates/server/src/routes/execution_processes.rs +++ b/crates/server/src/routes/execution_processes.rs @@ -1,6 +1,7 @@ use axum::{ BoxError, Extension, Router, extract::{Path, Query, State}, + http::StatusCode, middleware::from_fn_with_state, response::{ Json as ResponseJson, Sse, @@ -10,7 +11,7 @@ use axum::{ }; use db::models::execution_process::ExecutionProcess; use deployment::Deployment; -use futures_util::TryStreamExt; +use futures_util::{Stream, TryStreamExt}; use serde::Deserialize; use services::services::container::ContainerService; use utils::response::ApiResponse; @@ -83,6 +84,19 @@ pub async fn stop_execution_process( Ok(ResponseJson(ApiResponse::success(()))) } +pub async fn stream_execution_processes( + State(deployment): State, + Query(query): Query, +) -> Result>>, StatusCode> { + let stream = deployment + .events() + .stream_execution_processes_for_attempt(query.task_attempt_id) + .await + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + + Ok(Sse::new(stream.map_err(|e| -> BoxError { e.into() })).keep_alive(KeepAlive::default())) +} + pub fn router(deployment: &DeploymentImpl) -> Router { let task_attempt_id_router = Router::new() .route("/", get(get_execution_process_by_id)) @@ -96,6 +110,7 @@ pub fn router(deployment: &DeploymentImpl) -> Router { let task_attempts_router = Router::new() .route("/", get(get_execution_processes)) + .route("/stream", get(stream_execution_processes)) .nest("/{id}", task_attempt_id_router); Router::new().nest("/execution-processes", task_attempts_router) diff --git a/crates/services/src/services/events.rs b/crates/services/src/services/events.rs index 069d3378..748d4cbf 100644 --- a/crates/services/src/services/events.rs +++ b/crates/services/src/services/events.rs @@ -77,6 +77,57 @@ pub mod task_patch { } } +/// Helper functions for creating execution process-specific patches +pub mod execution_process_patch { + use db::models::execution_process::ExecutionProcess; + + use super::*; + + /// Escape JSON Pointer special characters + fn escape_pointer_segment(s: &str) -> String { + s.replace('~', "~0").replace('/', "~1") + } + + /// Create path for execution process operation + fn execution_process_path(process_id: Uuid) -> String { + format!( + "/execution_processes/{}", + escape_pointer_segment(&process_id.to_string()) + ) + } + + /// Create patch for adding a new execution process + pub fn add(process: &ExecutionProcess) -> Patch { + Patch(vec![PatchOperation::Add(AddOperation { + path: execution_process_path(process.id) + .try_into() + .expect("Execution process path should be valid"), + value: serde_json::to_value(process) + .expect("Execution process serialization should not fail"), + })]) + } + + /// Create patch for updating an existing execution process + pub fn replace(process: &ExecutionProcess) -> Patch { + Patch(vec![PatchOperation::Replace(ReplaceOperation { + path: execution_process_path(process.id) + .try_into() + .expect("Execution process path should be valid"), + value: serde_json::to_value(process) + .expect("Execution process serialization should not fail"), + })]) + } + + /// Create patch for removing an execution process + pub fn remove(process_id: Uuid) -> Patch { + Patch(vec![PatchOperation::Remove(RemoveOperation { + path: execution_process_path(process_id) + .try_into() + .expect("Execution process path should be valid"), + })]) + } +} + #[derive(Clone)] pub struct EventService { msg_store: Arc, @@ -116,6 +167,7 @@ pub enum RecordTypes { DeletedExecutionProcess { rowid: i64, task_attempt_id: Option, + process_id: Option, }, DeletedFollowUpDraft { rowid: i64, @@ -196,16 +248,21 @@ impl EventService { RecordTypes::DeletedTaskAttempt { rowid, task_id } } (HookTables::ExecutionProcesses, SqliteOperation::Delete) => { - // Try to get execution_process before deletion to capture task_attempt_id - let task_attempt_id = - ExecutionProcess::find_by_rowid(&db.pool, rowid) - .await - .ok() - .flatten() - .map(|process| process.task_attempt_id); - RecordTypes::DeletedExecutionProcess { - rowid, - task_attempt_id, + // Try to get execution_process before deletion to capture full process data + if let Ok(Some(process)) = + ExecutionProcess::find_by_rowid(&db.pool, rowid).await + { + RecordTypes::DeletedExecutionProcess { + rowid, + task_attempt_id: Some(process.task_attempt_id), + process_id: Some(process.id), + } + } else { + RecordTypes::DeletedExecutionProcess { + rowid, + task_attempt_id: None, + process_id: None, + } } } (HookTables::Tasks, _) => { @@ -244,6 +301,7 @@ impl EventService { Ok(None) => RecordTypes::DeletedExecutionProcess { rowid, task_attempt_id: None, + process_id: None, }, Err(e) => { tracing::error!( @@ -371,6 +429,27 @@ impl EventService { return; } } + RecordTypes::ExecutionProcess(process) => { + let patch = match hook.operation { + SqliteOperation::Insert => { + execution_process_patch::add(process) + } + SqliteOperation::Update => { + execution_process_patch::replace(process) + } + _ => execution_process_patch::replace(process), // fallback + }; + msg_store_for_hook.push_patch(patch); + return; + } + RecordTypes::DeletedExecutionProcess { + process_id: Some(process_id), + .. + } => { + let patch = execution_process_patch::remove(*process_id); + msg_store_for_hook.push_patch(patch); + return; + } _ => {} } @@ -549,10 +628,22 @@ impl EventService { // Get initial snapshot of execution processes let processes = ExecutionProcess::find_by_task_attempt_id(&self.db.pool, task_attempt_id).await?; + + // Convert processes array to object keyed by process ID + let processes_map: serde_json::Map = processes + .into_iter() + .map(|process| { + ( + process.id.to_string(), + serde_json::to_value(process).unwrap(), + ) + }) + .collect(); + let initial_patch = json!([{ "op": "replace", - "path": "/", - "value": { "execution_processes": processes } + "path": "/execution_processes", + "value": processes_map }]); let initial_msg = LogMsg::JsonPatch(serde_json::from_value(initial_patch).unwrap()); @@ -562,26 +653,61 @@ impl EventService { match msg_result { Ok(LogMsg::JsonPatch(patch)) => { // Filter events based on task_attempt_id - if let Some(event_patch_op) = patch.0.first() - && let Ok(event_patch_value) = serde_json::to_value(event_patch_op) - && let Ok(event_patch) = - serde_json::from_value::(event_patch_value) - { - match &event_patch.value.record { - RecordTypes::ExecutionProcess(process) => { - if process.task_attempt_id == task_attempt_id { + if let Some(patch_op) = patch.0.first() { + // Check if this is a modern execution process patch + if patch_op.path().starts_with("/execution_processes/") { + match patch_op { + json_patch::PatchOperation::Add(op) => { + // Parse execution process data directly from value + if let Ok(process) = + serde_json::from_value::( + op.value.clone(), + ) + && process.task_attempt_id == task_attempt_id + { + return Some(Ok(LogMsg::JsonPatch(patch))); + } + } + json_patch::PatchOperation::Replace(op) => { + // Parse execution process data directly from value + if let Ok(process) = + serde_json::from_value::( + op.value.clone(), + ) + && process.task_attempt_id == task_attempt_id + { + return Some(Ok(LogMsg::JsonPatch(patch))); + } + } + json_patch::PatchOperation::Remove(_) => { + // For remove operations, we can't verify task_attempt_id + // so we allow all removals and let the client handle filtering return Some(Ok(LogMsg::JsonPatch(patch))); } + _ => {} } - RecordTypes::DeletedExecutionProcess { - task_attempt_id: Some(deleted_attempt_id), - .. - } => { - if *deleted_attempt_id == task_attempt_id { - return Some(Ok(LogMsg::JsonPatch(patch))); + } + // Fallback to legacy EventPatch format for backward compatibility + else if let Ok(event_patch_value) = serde_json::to_value(patch_op) + && let Ok(event_patch) = + serde_json::from_value::(event_patch_value) + { + match &event_patch.value.record { + RecordTypes::ExecutionProcess(process) => { + if process.task_attempt_id == task_attempt_id { + return Some(Ok(LogMsg::JsonPatch(patch))); + } } + RecordTypes::DeletedExecutionProcess { + task_attempt_id: Some(deleted_attempt_id), + .. + } => { + if *deleted_attempt_id == task_attempt_id { + return Some(Ok(LogMsg::JsonPatch(patch))); + } + } + _ => {} } - _ => {} } } None diff --git a/frontend/package.json b/frontend/package.json index cb0b68de..6e68e7c9 100644 --- a/frontend/package.json +++ b/frontend/package.json @@ -39,12 +39,14 @@ "@tanstack/react-query-devtools": "^5.85.5", "@types/react-window": "^1.8.8", "@uiw/react-codemirror": "^4.25.1", + "@virtuoso.dev/message-list": "^1.13.3", "class-variance-authority": "^0.7.0", "click-to-react-component": "^1.1.2", "clsx": "^2.0.0", "diff": "^8.0.2", "embla-carousel-react": "^8.6.0", "fancy-ansi": "^0.1.3", + "idb": "^8.0.3", "lucide-react": "^0.539.0", "react": "^18.2.0", "react-diff-viewer-continued": "^3.4.0", @@ -52,7 +54,7 @@ "react-markdown": "^10.1.0", "react-router-dom": "^6.8.1", "react-use-measure": "^2.1.7", - "react-virtuoso": "^4.13.0", + "react-virtuoso": "^4.14.0", "react-window": "^1.8.11", "rfc6902": "^5.1.2", "tailwind-merge": "^2.2.0", @@ -82,4 +84,4 @@ "typescript": "^5.9.2", "vite": "^5.0.8" } -} +} \ No newline at end of file diff --git a/frontend/src/components/NormalizedConversation/DisplayConversationEntry.tsx b/frontend/src/components/NormalizedConversation/DisplayConversationEntry.tsx index a7e3e86d..59c10112 100644 --- a/frontend/src/components/NormalizedConversation/DisplayConversationEntry.tsx +++ b/frontend/src/components/NormalizedConversation/DisplayConversationEntry.tsx @@ -2,6 +2,7 @@ import MarkdownRenderer from '@/components/ui/markdown-renderer.tsx'; import { ActionType, NormalizedEntry, + TaskAttempt, type NormalizedEntryType, } from 'shared/types.ts'; import type { ProcessStartPayload } from '@/types/logs'; @@ -25,11 +26,14 @@ import { User, } from 'lucide-react'; import RawLogText from '../common/RawLogText'; +import UserMessage from './UserMessage'; type Props = { entry: NormalizedEntry | ProcessStartPayload; expansionKey: string; diffDeletable?: boolean; + executionProcessId?: string; + taskAttempt?: TaskAttempt; }; type FileEditAction = Extract; @@ -89,36 +93,51 @@ const getEntryIcon = (entryType: NormalizedEntryType) => { return ; }; +type ExitStatusVisualisation = 'success' | 'error' | 'pending'; + const getStatusIndicator = (entryType: NormalizedEntryType) => { - const result = + let status_visualisation: ExitStatusVisualisation | null = null; + if ( entryType.type === 'tool_use' && entryType.action_type.action === 'command_run' - ? entryType.action_type.result?.exit_status - : null; + ) { + status_visualisation = 'pending'; + if (entryType.action_type.result?.exit_status?.type === 'success') { + if (entryType.action_type.result?.exit_status?.success) { + status_visualisation = 'success'; + } else { + status_visualisation = 'error'; + } + } else if ( + entryType.action_type.result?.exit_status?.type === 'exit_code' + ) { + if (entryType.action_type.result?.exit_status?.code === 0) { + status_visualisation = 'success'; + } else { + status_visualisation = 'error'; + } + } + } - const status = - result?.type === 'success' - ? result.success - ? 'success' - : 'error' - : result?.type === 'exit_code' - ? result.code === 0 - ? 'success' - : 'error' - : 'unknown'; - - if (status === 'unknown') return null; - - const colorMap: Record = { + // If pending, should be a pulsing primary-foreground + const colorMap: Record = { success: 'bg-green-300', error: 'bg-red-300', + pending: 'bg-primary-foreground/50', }; + if (!status_visualisation) return null; + return (
+ {status_visualisation === 'pending' && ( +
+ )}
); }; @@ -463,11 +482,27 @@ const ToolCallCard: React.FC<{ ); }; +const LoadingCard = () => { + return ( +
+
+
+
+
+
+ ); +}; + /******************* * Main component * *******************/ -function DisplayConversationEntry({ entry, expansionKey }: Props) { +function DisplayConversationEntry({ + entry, + expansionKey, + executionProcessId, + taskAttempt, +}: Props) { const isNormalizedEntry = ( entry: NormalizedEntry | ProcessStartPayload ): entry is NormalizedEntry => 'entry_type' in entry; @@ -492,10 +527,23 @@ function DisplayConversationEntry({ entry, expansionKey }: Props) { const isSystem = entryType.type === 'system_message'; const isError = entryType.type === 'error_message'; const isToolUse = entryType.type === 'tool_use'; + const isUserMessage = entryType.type === 'user_message'; + const isLoading = entryType.type === 'loading'; const isFileEdit = (a: ActionType): a is FileEditAction => a.action === 'file_edit'; + + if (isUserMessage) { + return ( + + ); + } + return ( - <> +
{isSystem || isError ? ( + ) : isLoading ? ( + ) : (
{shouldRenderMarkdown(entryType) ? ( @@ -543,7 +593,7 @@ function DisplayConversationEntry({ entry, expansionKey }: Props) { )}
)} - +
); } diff --git a/frontend/src/components/NormalizedConversation/ToolDetails.tsx b/frontend/src/components/NormalizedConversation/ToolDetails.tsx index 149d50a9..a598bfb0 100644 --- a/frontend/src/components/NormalizedConversation/ToolDetails.tsx +++ b/frontend/src/components/NormalizedConversation/ToolDetails.tsx @@ -20,7 +20,7 @@ type Props = { }; export const renderJson = (v: JsonValue) => ( -
{JSON.stringify(v, null, 2)}
+
{JSON.stringify(v, null, 2)}
); export default function ToolDetails({ diff --git a/frontend/src/components/NormalizedConversation/UserMessage.tsx b/frontend/src/components/NormalizedConversation/UserMessage.tsx new file mode 100644 index 00000000..1dc1b085 --- /dev/null +++ b/frontend/src/components/NormalizedConversation/UserMessage.tsx @@ -0,0 +1,70 @@ +import MarkdownRenderer from '@/components/ui/markdown-renderer'; +import { Button } from '@/components/ui/button'; +import { Pencil, Send, X } from 'lucide-react'; +import { useState } from 'react'; +import { Textarea } from '@/components/ui/textarea'; +import { useProcessRetry } from '@/hooks/useProcessRetry'; +import { TaskAttempt } from 'shared/types'; + +const UserMessage = ({ + content, + executionProcessId, + taskAttempt, +}: { + content: string; + executionProcessId?: string; + taskAttempt?: TaskAttempt; +}) => { + const [isEditing, setIsEditing] = useState(false); + const [editContent, setEditContent] = useState(content); + const retryHook = useProcessRetry(taskAttempt); + + const handleEdit = () => { + if (!executionProcessId) return; + retryHook?.retryProcess(executionProcessId, editContent).then(() => { + setIsEditing(false); + }); + }; + + return ( +
+
+
+ {isEditing ? ( +