diff --git a/crates/db/.sqlx/query-56238751ac9cab8bd97ad787143d91f54c47089c8e732ef80c3d1e85dfba1430.json b/crates/db/.sqlx/query-56238751ac9cab8bd97ad787143d91f54c47089c8e732ef80c3d1e85dfba1430.json deleted file mode 100644 index 0c235f5c..00000000 --- a/crates/db/.sqlx/query-56238751ac9cab8bd97ad787143d91f54c47089c8e732ef80c3d1e85dfba1430.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "INSERT INTO execution_process_logs (execution_id, logs, byte_size, inserted_at)\n VALUES ($1, $2, $3, datetime('now', 'subsec'))\n ON CONFLICT (execution_id) DO UPDATE\n SET logs = logs || $2,\n byte_size = byte_size + $3,\n inserted_at = datetime('now', 'subsec')", - "describe": { - "columns": [], - "parameters": { - "Right": 3 - }, - "nullable": [] - }, - "hash": "56238751ac9cab8bd97ad787143d91f54c47089c8e732ef80c3d1e85dfba1430" -} diff --git a/crates/db/.sqlx/query-9747ebaebd562d65f0c333b0f5efc74fa63ab9fcb35a43f75f57da3fcb9a2588.json b/crates/db/.sqlx/query-9747ebaebd562d65f0c333b0f5efc74fa63ab9fcb35a43f75f57da3fcb9a2588.json new file mode 100644 index 00000000..716e7be1 --- /dev/null +++ b/crates/db/.sqlx/query-9747ebaebd562d65f0c333b0f5efc74fa63ab9fcb35a43f75f57da3fcb9a2588.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "INSERT INTO execution_process_logs (execution_id, logs, byte_size, inserted_at)\n VALUES ($1, $2, $3, datetime('now', 'subsec'))", + "describe": { + "columns": [], + "parameters": { + "Right": 3 + }, + "nullable": [] + }, + "hash": "9747ebaebd562d65f0c333b0f5efc74fa63ab9fcb35a43f75f57da3fcb9a2588" +} diff --git a/crates/db/.sqlx/query-97e6a03adc1c14e9ecabe7885598dcc0ea273dffea920838fc4dcc837293ba6b.json b/crates/db/.sqlx/query-97e6a03adc1c14e9ecabe7885598dcc0ea273dffea920838fc4dcc837293ba6b.json deleted file mode 100644 index 0875fb2d..00000000 --- a/crates/db/.sqlx/query-97e6a03adc1c14e9ecabe7885598dcc0ea273dffea920838fc4dcc837293ba6b.json +++ /dev/null @@ -1,38 +0,0 @@ -{ - "db_name": "SQLite", - "query": "INSERT INTO execution_process_logs (execution_id, logs, byte_size, inserted_at)\n VALUES ($1, $2, $3, $4)\n ON CONFLICT (execution_id) DO UPDATE\n SET logs = EXCLUDED.logs, \n byte_size = EXCLUDED.byte_size,\n inserted_at = EXCLUDED.inserted_at\n RETURNING \n execution_id as \"execution_id!: Uuid\",\n logs,\n byte_size,\n inserted_at as \"inserted_at!: DateTime\"", - "describe": { - "columns": [ - { - "name": "execution_id!: Uuid", - "ordinal": 0, - "type_info": "Blob" - }, - { - "name": "logs", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "byte_size", - "ordinal": 2, - "type_info": "Integer" - }, - { - "name": "inserted_at!: DateTime", - "ordinal": 3, - "type_info": "Text" - } - ], - "parameters": { - "Right": 4 - }, - "nullable": [ - true, - false, - false, - false - ] - }, - "hash": "97e6a03adc1c14e9ecabe7885598dcc0ea273dffea920838fc4dcc837293ba6b" -} diff --git a/crates/db/.sqlx/query-2ec7648202fc6f496b97d9486cf9fd3c59fdba73c168628784f0a09488b80528.json b/crates/db/.sqlx/query-a1574f21db387b0e4a2c3f5723de6df4ee42d98145d16e9d135345dd60128429.json similarity index 84% rename from crates/db/.sqlx/query-2ec7648202fc6f496b97d9486cf9fd3c59fdba73c168628784f0a09488b80528.json rename to crates/db/.sqlx/query-a1574f21db387b0e4a2c3f5723de6df4ee42d98145d16e9d135345dd60128429.json index 4f038c2e..e7b9901f 100644 --- a/crates/db/.sqlx/query-2ec7648202fc6f496b97d9486cf9fd3c59fdba73c168628784f0a09488b80528.json +++ b/crates/db/.sqlx/query-a1574f21db387b0e4a2c3f5723de6df4ee42d98145d16e9d135345dd60128429.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT \n execution_id as \"execution_id!: Uuid\",\n logs,\n byte_size,\n inserted_at as \"inserted_at!: DateTime\"\n FROM execution_process_logs \n WHERE execution_id = $1", + "query": "SELECT \n execution_id as \"execution_id!: Uuid\",\n logs,\n byte_size,\n inserted_at as \"inserted_at!: DateTime\"\n FROM execution_process_logs \n WHERE execution_id = $1\n ORDER BY inserted_at ASC", "describe": { "columns": [ { @@ -28,11 +28,11 @@ "Right": 1 }, "nullable": [ - true, + false, false, false, false ] }, - "hash": "2ec7648202fc6f496b97d9486cf9fd3c59fdba73c168628784f0a09488b80528" + "hash": "a1574f21db387b0e4a2c3f5723de6df4ee42d98145d16e9d135345dd60128429" } diff --git a/crates/db/migrations/20251101090000_drop_execution_process_logs_pk.sql b/crates/db/migrations/20251101090000_drop_execution_process_logs_pk.sql new file mode 100644 index 00000000..497b39eb --- /dev/null +++ b/crates/db/migrations/20251101090000_drop_execution_process_logs_pk.sql @@ -0,0 +1,53 @@ +-- Migration steps following the official SQLite "12-step generalized ALTER TABLE" procedure: +-- https://www.sqlite.org/lang_altertable.html#otheralter +-- +PRAGMA foreign_keys = OFF; + +-- This is a sqlx workaround to enable BEGIN TRANSACTION in this migration, until `-- no-transaction` lands in sqlx-sqlite. +-- https://github.com/launchbadge/sqlx/issues/2085#issuecomment-1499859906 +COMMIT TRANSACTION; + +BEGIN TRANSACTION; + +-- Create replacement table without the PRIMARY KEY constraint on execution_id. +CREATE TABLE execution_process_logs_new ( + execution_id BLOB NOT NULL, + logs TEXT NOT NULL, -- JSONL format (one LogMsg per line) + byte_size INTEGER NOT NULL, + inserted_at TEXT NOT NULL DEFAULT (datetime('now', 'subsec')), + FOREIGN KEY (execution_id) REFERENCES execution_processes(id) ON DELETE CASCADE +); + +-- Copy existing data into the replacement table. +INSERT INTO execution_process_logs_new ( + execution_id, + logs, + byte_size, + inserted_at +) +SELECT + execution_id, + logs, + byte_size, + inserted_at +FROM execution_process_logs; + +-- Drop the original table. +DROP TABLE execution_process_logs; + +-- Rename the new table into place. +ALTER TABLE execution_process_logs_new RENAME TO execution_process_logs; + +-- Rebuild indexes to preserve performance characteristics. +CREATE INDEX IF NOT EXISTS idx_execution_process_logs_execution_id_inserted_at + ON execution_process_logs (execution_id, inserted_at); + +-- Verify foreign key constraints before committing the transaction. +PRAGMA foreign_key_check; + +COMMIT; + +PRAGMA foreign_keys = ON; + +-- sqlx workaround due to lack of `-- no-transaction` in sqlx-sqlite. +BEGIN TRANSACTION; diff --git a/crates/db/src/models/execution_process_logs.rs b/crates/db/src/models/execution_process_logs.rs index b30c7bb6..62206bc8 100644 --- a/crates/db/src/models/execution_process_logs.rs +++ b/crates/db/src/models/execution_process_logs.rs @@ -13,19 +13,12 @@ pub struct ExecutionProcessLogs { pub inserted_at: DateTime, } -#[derive(Debug, Deserialize, TS)] -pub struct CreateExecutionProcessLogs { - pub execution_id: Uuid, - pub logs: String, - pub byte_size: i64, -} - impl ExecutionProcessLogs { /// Find logs by execution process ID pub async fn find_by_execution_id( pool: &SqlitePool, execution_id: Uuid, - ) -> Result, sqlx::Error> { + ) -> Result, sqlx::Error> { sqlx::query_as!( ExecutionProcessLogs, r#"SELECT @@ -34,46 +27,18 @@ impl ExecutionProcessLogs { byte_size, inserted_at as "inserted_at!: DateTime" FROM execution_process_logs - WHERE execution_id = $1"#, + WHERE execution_id = $1 + ORDER BY inserted_at ASC"#, execution_id ) - .fetch_optional(pool) - .await - } - - /// Create or update execution process logs - pub async fn upsert( - pool: &SqlitePool, - data: &CreateExecutionProcessLogs, - ) -> Result { - let now = Utc::now(); - - sqlx::query_as!( - ExecutionProcessLogs, - r#"INSERT INTO execution_process_logs (execution_id, logs, byte_size, inserted_at) - VALUES ($1, $2, $3, $4) - ON CONFLICT (execution_id) DO UPDATE - SET logs = EXCLUDED.logs, - byte_size = EXCLUDED.byte_size, - inserted_at = EXCLUDED.inserted_at - RETURNING - execution_id as "execution_id!: Uuid", - logs, - byte_size, - inserted_at as "inserted_at!: DateTime""#, - data.execution_id, - data.logs, - data.byte_size, - now - ) - .fetch_one(pool) + .fetch_all(pool) .await } /// Parse JSONL logs back into Vec - pub fn parse_logs(&self) -> Result, serde_json::Error> { + pub fn parse_logs(records: &[Self]) -> Result, serde_json::Error> { let mut messages = Vec::new(); - for line in self.logs.lines() { + for line in records.iter().flat_map(|record| record.logs.lines()) { if !line.trim().is_empty() { let msg: LogMsg = serde_json::from_str(line)?; messages.push(msg); @@ -102,11 +67,7 @@ impl ExecutionProcessLogs { let byte_size = jsonl_line.len() as i64; sqlx::query!( r#"INSERT INTO execution_process_logs (execution_id, logs, byte_size, inserted_at) - VALUES ($1, $2, $3, datetime('now', 'subsec')) - ON CONFLICT (execution_id) DO UPDATE - SET logs = logs || $2, - byte_size = byte_size + $3, - inserted_at = datetime('now', 'subsec')"#, + VALUES ($1, $2, $3, datetime('now', 'subsec'))"#, execution_id, jsonl_line, byte_size diff --git a/crates/services/src/services/container.rs b/crates/services/src/services/container.rs index 8393d978..2dcd9196 100644 --- a/crates/services/src/services/container.rs +++ b/crates/services/src/services/container.rs @@ -265,17 +265,17 @@ pub trait ContainerService { ); } else { // Fallback: load from DB and create direct stream - let logs_record = + let log_records = match ExecutionProcessLogs::find_by_execution_id(&self.db().pool, *id).await { - Ok(Some(record)) => record, - Ok(None) => return None, // No logs exist + Ok(records) if !records.is_empty() => records, + Ok(_) => return None, // No logs exist Err(e) => { tracing::error!("Failed to fetch logs for execution {}: {}", id, e); return None; } }; - let messages = match logs_record.parse_logs() { + let messages = match ExecutionProcessLogs::parse_logs(&log_records) { Ok(msgs) => msgs, Err(e) => { tracing::error!("Failed to parse logs for execution {}: {}", id, e); @@ -314,17 +314,17 @@ pub trait ContainerService { ) } else { // Fallback: load from DB and normalize - let logs_record = + let log_records = match ExecutionProcessLogs::find_by_execution_id(&self.db().pool, *id).await { - Ok(Some(record)) => record, - Ok(None) => return None, // No logs exist + Ok(records) if !records.is_empty() => records, + Ok(_) => return None, // No logs exist Err(e) => { tracing::error!("Failed to fetch logs for execution {}: {}", id, e); return None; } }; - let raw_messages = match logs_record.parse_logs() { + let raw_messages = match ExecutionProcessLogs::parse_logs(&log_records) { Ok(msgs) => msgs, Err(e) => { tracing::error!("Failed to parse logs for execution {}: {}", id, e);