Further execution process feedback and stability tweaks (#741)
* execution processes normalized logs error properly * update raw logs error handling * key the virtualized list
This commit is contained in:
committed by
GitHub
parent
5399bc4b5a
commit
47338fd6b1
@@ -3,11 +3,26 @@ use executors::actions::ExecutorAction;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use sqlx::{FromRow, SqlitePool, Type};
|
||||
use thiserror::Error;
|
||||
use ts_rs::TS;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{task::Task, task_attempt::TaskAttempt};
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum ExecutionProcessError {
|
||||
#[error(transparent)]
|
||||
Database(#[from] sqlx::Error),
|
||||
#[error("Execution process not found")]
|
||||
ExecutionProcessNotFound,
|
||||
#[error("Failed to create execution process: {0}")]
|
||||
CreateFailed(String),
|
||||
#[error("Failed to update execution process: {0}")]
|
||||
UpdateFailed(String),
|
||||
#[error("Invalid executor action format")]
|
||||
InvalidExecutorAction,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Type, Serialize, Deserialize, PartialEq, TS)]
|
||||
#[sqlx(type_name = "execution_process_status", rename_all = "lowercase")]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
|
||||
@@ -4,7 +4,9 @@ use axum::{
|
||||
http::StatusCode,
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
use db::models::{project::ProjectError, task_attempt::TaskAttemptError};
|
||||
use db::models::{
|
||||
execution_process::ExecutionProcessError, project::ProjectError, task_attempt::TaskAttemptError,
|
||||
};
|
||||
use deployment::DeploymentError;
|
||||
use executors::executors::ExecutorError;
|
||||
use git2::Error as Git2Error;
|
||||
@@ -23,6 +25,8 @@ pub enum ApiError {
|
||||
#[error(transparent)]
|
||||
TaskAttempt(#[from] TaskAttemptError),
|
||||
#[error(transparent)]
|
||||
ExecutionProcess(#[from] ExecutionProcessError),
|
||||
#[error(transparent)]
|
||||
GitService(#[from] GitServiceError),
|
||||
#[error(transparent)]
|
||||
GitHubService(#[from] GitHubServiceError),
|
||||
@@ -61,6 +65,12 @@ impl IntoResponse for ApiError {
|
||||
let (status_code, error_type) = match &self {
|
||||
ApiError::Project(_) => (StatusCode::INTERNAL_SERVER_ERROR, "ProjectError"),
|
||||
ApiError::TaskAttempt(_) => (StatusCode::INTERNAL_SERVER_ERROR, "TaskAttemptError"),
|
||||
ApiError::ExecutionProcess(err) => match err {
|
||||
ExecutionProcessError::ExecutionProcessNotFound => {
|
||||
(StatusCode::NOT_FOUND, "ExecutionProcessError")
|
||||
}
|
||||
_ => (StatusCode::INTERNAL_SERVER_ERROR, "ExecutionProcessError"),
|
||||
},
|
||||
// Promote certain GitService errors to conflict status with concise messages
|
||||
ApiError::GitService(git_err) => match git_err {
|
||||
services::services::git::GitServiceError::MergeConflicts(_) => {
|
||||
|
||||
@@ -9,12 +9,12 @@ use axum::{
|
||||
response::{IntoResponse, Json as ResponseJson},
|
||||
routing::{get, post},
|
||||
};
|
||||
use db::models::execution_process::ExecutionProcess;
|
||||
use db::models::execution_process::{ExecutionProcess, ExecutionProcessError};
|
||||
use deployment::Deployment;
|
||||
use futures_util::{SinkExt, StreamExt, TryStreamExt};
|
||||
use serde::Deserialize;
|
||||
use services::services::container::ContainerService;
|
||||
use utils::response::ApiResponse;
|
||||
use utils::{log_msg::LogMsg, response::ApiResponse};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{DeploymentImpl, error::ApiError, middleware::load_execution_process_middleware};
|
||||
@@ -46,12 +46,21 @@ pub async fn stream_raw_logs_ws(
|
||||
ws: WebSocketUpgrade,
|
||||
State(deployment): State<DeploymentImpl>,
|
||||
Path(exec_id): Path<Uuid>,
|
||||
) -> impl IntoResponse {
|
||||
ws.on_upgrade(move |socket| async move {
|
||||
) -> Result<impl IntoResponse, ApiError> {
|
||||
// Check if the stream exists before upgrading the WebSocket
|
||||
let _stream = deployment
|
||||
.container()
|
||||
.stream_raw_logs(&exec_id)
|
||||
.await
|
||||
.ok_or_else(|| {
|
||||
ApiError::ExecutionProcess(ExecutionProcessError::ExecutionProcessNotFound)
|
||||
})?;
|
||||
|
||||
Ok(ws.on_upgrade(move |socket| async move {
|
||||
if let Err(e) = handle_raw_logs_ws(socket, deployment, exec_id).await {
|
||||
tracing::warn!("raw logs WS closed: {}", e);
|
||||
}
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
async fn handle_raw_logs_ws(
|
||||
@@ -120,39 +129,37 @@ pub async fn stream_normalized_logs_ws(
|
||||
ws: WebSocketUpgrade,
|
||||
State(deployment): State<DeploymentImpl>,
|
||||
Path(exec_id): Path<Uuid>,
|
||||
) -> impl IntoResponse {
|
||||
ws.on_upgrade(move |socket| async move {
|
||||
if let Err(e) = handle_normalized_logs_ws(socket, deployment, exec_id).await {
|
||||
) -> Result<impl IntoResponse, ApiError> {
|
||||
let stream = deployment
|
||||
.container()
|
||||
.stream_normalized_logs(&exec_id)
|
||||
.await
|
||||
.ok_or_else(|| {
|
||||
ApiError::ExecutionProcess(ExecutionProcessError::ExecutionProcessNotFound)
|
||||
})?;
|
||||
|
||||
// Convert the error type to anyhow::Error and turn TryStream -> Stream<Result<_, _>>
|
||||
let stream = stream.err_into::<anyhow::Error>().into_stream();
|
||||
|
||||
Ok(ws.on_upgrade(move |socket| async move {
|
||||
if let Err(e) = handle_normalized_logs_ws(socket, stream).await {
|
||||
tracing::warn!("normalized logs WS closed: {}", e);
|
||||
}
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
async fn handle_normalized_logs_ws(
|
||||
socket: WebSocket,
|
||||
deployment: DeploymentImpl,
|
||||
exec_id: Uuid,
|
||||
stream: impl futures_util::Stream<Item = anyhow::Result<LogMsg>> + Unpin + Send + 'static,
|
||||
) -> anyhow::Result<()> {
|
||||
// Get the raw stream and convert LogMsg to WebSocket messages
|
||||
let mut stream = deployment
|
||||
.container()
|
||||
.stream_normalized_logs(&exec_id)
|
||||
.await
|
||||
.ok_or_else(|| anyhow::anyhow!("Execution process not found"))?
|
||||
.map_ok(|msg| msg.to_ws_message_unchecked());
|
||||
|
||||
// Split socket into sender and receiver
|
||||
let mut stream = stream.map_ok(|msg| msg.to_ws_message_unchecked());
|
||||
let (mut sender, mut receiver) = socket.split();
|
||||
|
||||
// Drain (and ignore) any client->server messages so pings/pongs work
|
||||
tokio::spawn(async move { while let Some(Ok(_)) = receiver.next().await {} });
|
||||
|
||||
// Forward server messages
|
||||
while let Some(item) = stream.next().await {
|
||||
match item {
|
||||
Ok(msg) => {
|
||||
if sender.send(msg).await.is_err() {
|
||||
break; // client disconnected
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -6,7 +6,7 @@ type Props = {
|
||||
};
|
||||
|
||||
function LogsTab({ selectedAttempt }: Props) {
|
||||
return <VirtualizedList attempt={selectedAttempt} />;
|
||||
return <VirtualizedList key={selectedAttempt.id} attempt={selectedAttempt} />;
|
||||
}
|
||||
|
||||
export default LogsTab;
|
||||
|
||||
Reference in New Issue
Block a user