From 47338fd6b1268ed2d38c78f4c2758dbd57177848 Mon Sep 17 00:00:00 2001 From: Louis Knight-Webb Date: Tue, 16 Sep 2025 12:27:42 +0100 Subject: [PATCH] Further execution process feedback and stability tweaks (#741) * execution processes normalized logs error properly * update raw logs error handling * key the virtualized list --- crates/db/src/models/execution_process.rs | 15 +++++ crates/server/src/error.rs | 12 +++- .../server/src/routes/execution_processes.rs | 57 +++++++++++-------- .../components/tasks/TaskDetails/LogsTab.tsx | 2 +- 4 files changed, 59 insertions(+), 27 deletions(-) diff --git a/crates/db/src/models/execution_process.rs b/crates/db/src/models/execution_process.rs index 9b46893a..617117ba 100644 --- a/crates/db/src/models/execution_process.rs +++ b/crates/db/src/models/execution_process.rs @@ -3,11 +3,26 @@ use executors::actions::ExecutorAction; use serde::{Deserialize, Serialize}; use serde_json::Value; use sqlx::{FromRow, SqlitePool, Type}; +use thiserror::Error; use ts_rs::TS; use uuid::Uuid; use super::{task::Task, task_attempt::TaskAttempt}; +#[derive(Debug, Error)] +pub enum ExecutionProcessError { + #[error(transparent)] + Database(#[from] sqlx::Error), + #[error("Execution process not found")] + ExecutionProcessNotFound, + #[error("Failed to create execution process: {0}")] + CreateFailed(String), + #[error("Failed to update execution process: {0}")] + UpdateFailed(String), + #[error("Invalid executor action format")] + InvalidExecutorAction, +} + #[derive(Debug, Clone, Type, Serialize, Deserialize, PartialEq, TS)] #[sqlx(type_name = "execution_process_status", rename_all = "lowercase")] #[serde(rename_all = "lowercase")] diff --git a/crates/server/src/error.rs b/crates/server/src/error.rs index de6c1157..53dc9e6d 100644 --- a/crates/server/src/error.rs +++ b/crates/server/src/error.rs @@ -4,7 +4,9 @@ use axum::{ http::StatusCode, response::{IntoResponse, Response}, }; -use db::models::{project::ProjectError, task_attempt::TaskAttemptError}; +use db::models::{ + execution_process::ExecutionProcessError, project::ProjectError, task_attempt::TaskAttemptError, +}; use deployment::DeploymentError; use executors::executors::ExecutorError; use git2::Error as Git2Error; @@ -23,6 +25,8 @@ pub enum ApiError { #[error(transparent)] TaskAttempt(#[from] TaskAttemptError), #[error(transparent)] + ExecutionProcess(#[from] ExecutionProcessError), + #[error(transparent)] GitService(#[from] GitServiceError), #[error(transparent)] GitHubService(#[from] GitHubServiceError), @@ -61,6 +65,12 @@ impl IntoResponse for ApiError { let (status_code, error_type) = match &self { ApiError::Project(_) => (StatusCode::INTERNAL_SERVER_ERROR, "ProjectError"), ApiError::TaskAttempt(_) => (StatusCode::INTERNAL_SERVER_ERROR, "TaskAttemptError"), + ApiError::ExecutionProcess(err) => match err { + ExecutionProcessError::ExecutionProcessNotFound => { + (StatusCode::NOT_FOUND, "ExecutionProcessError") + } + _ => (StatusCode::INTERNAL_SERVER_ERROR, "ExecutionProcessError"), + }, // Promote certain GitService errors to conflict status with concise messages ApiError::GitService(git_err) => match git_err { services::services::git::GitServiceError::MergeConflicts(_) => { diff --git a/crates/server/src/routes/execution_processes.rs b/crates/server/src/routes/execution_processes.rs index 899bdf51..300bd2a3 100644 --- a/crates/server/src/routes/execution_processes.rs +++ b/crates/server/src/routes/execution_processes.rs @@ -9,12 +9,12 @@ use axum::{ response::{IntoResponse, Json as ResponseJson}, routing::{get, post}, }; -use db::models::execution_process::ExecutionProcess; +use db::models::execution_process::{ExecutionProcess, ExecutionProcessError}; use deployment::Deployment; use futures_util::{SinkExt, StreamExt, TryStreamExt}; use serde::Deserialize; use services::services::container::ContainerService; -use utils::response::ApiResponse; +use utils::{log_msg::LogMsg, response::ApiResponse}; use uuid::Uuid; use crate::{DeploymentImpl, error::ApiError, middleware::load_execution_process_middleware}; @@ -46,12 +46,21 @@ pub async fn stream_raw_logs_ws( ws: WebSocketUpgrade, State(deployment): State, Path(exec_id): Path, -) -> impl IntoResponse { - ws.on_upgrade(move |socket| async move { +) -> Result { + // Check if the stream exists before upgrading the WebSocket + let _stream = deployment + .container() + .stream_raw_logs(&exec_id) + .await + .ok_or_else(|| { + ApiError::ExecutionProcess(ExecutionProcessError::ExecutionProcessNotFound) + })?; + + Ok(ws.on_upgrade(move |socket| async move { if let Err(e) = handle_raw_logs_ws(socket, deployment, exec_id).await { tracing::warn!("raw logs WS closed: {}", e); } - }) + })) } async fn handle_raw_logs_ws( @@ -120,39 +129,37 @@ pub async fn stream_normalized_logs_ws( ws: WebSocketUpgrade, State(deployment): State, Path(exec_id): Path, -) -> impl IntoResponse { - ws.on_upgrade(move |socket| async move { - if let Err(e) = handle_normalized_logs_ws(socket, deployment, exec_id).await { +) -> Result { + let stream = deployment + .container() + .stream_normalized_logs(&exec_id) + .await + .ok_or_else(|| { + ApiError::ExecutionProcess(ExecutionProcessError::ExecutionProcessNotFound) + })?; + + // Convert the error type to anyhow::Error and turn TryStream -> Stream> + let stream = stream.err_into::().into_stream(); + + Ok(ws.on_upgrade(move |socket| async move { + if let Err(e) = handle_normalized_logs_ws(socket, stream).await { tracing::warn!("normalized logs WS closed: {}", e); } - }) + })) } async fn handle_normalized_logs_ws( socket: WebSocket, - deployment: DeploymentImpl, - exec_id: Uuid, + stream: impl futures_util::Stream> + Unpin + Send + 'static, ) -> anyhow::Result<()> { - // Get the raw stream and convert LogMsg to WebSocket messages - let mut stream = deployment - .container() - .stream_normalized_logs(&exec_id) - .await - .ok_or_else(|| anyhow::anyhow!("Execution process not found"))? - .map_ok(|msg| msg.to_ws_message_unchecked()); - - // Split socket into sender and receiver + let mut stream = stream.map_ok(|msg| msg.to_ws_message_unchecked()); let (mut sender, mut receiver) = socket.split(); - - // Drain (and ignore) any client->server messages so pings/pongs work tokio::spawn(async move { while let Some(Ok(_)) = receiver.next().await {} }); - - // Forward server messages while let Some(item) = stream.next().await { match item { Ok(msg) => { if sender.send(msg).await.is_err() { - break; // client disconnected + break; } } Err(e) => { diff --git a/frontend/src/components/tasks/TaskDetails/LogsTab.tsx b/frontend/src/components/tasks/TaskDetails/LogsTab.tsx index 57484b38..72a0c169 100644 --- a/frontend/src/components/tasks/TaskDetails/LogsTab.tsx +++ b/frontend/src/components/tasks/TaskDetails/LogsTab.tsx @@ -6,7 +6,7 @@ type Props = { }; function LogsTab({ selectedAttempt }: Props) { - return ; + return ; } export default LogsTab;