Make execution_process_logs append-only to avoid SQLite lockups (#1276)

solves  `(code: 5) database is locked` errors
This commit is contained in:
Solomon
2025-11-17 17:45:15 +00:00
committed by GitHub
parent 8579029a2e
commit 6067dc693b
7 changed files with 83 additions and 107 deletions

View File

@@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "INSERT INTO execution_process_logs (execution_id, logs, byte_size, inserted_at)\n VALUES ($1, $2, $3, datetime('now', 'subsec'))\n ON CONFLICT (execution_id) DO UPDATE\n SET logs = logs || $2,\n byte_size = byte_size + $3,\n inserted_at = datetime('now', 'subsec')",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "56238751ac9cab8bd97ad787143d91f54c47089c8e732ef80c3d1e85dfba1430"
}

View File

@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "INSERT INTO execution_process_logs (execution_id, logs, byte_size, inserted_at)\n VALUES ($1, $2, $3, datetime('now', 'subsec'))",
"describe": {
"columns": [],
"parameters": {
"Right": 3
},
"nullable": []
},
"hash": "9747ebaebd562d65f0c333b0f5efc74fa63ab9fcb35a43f75f57da3fcb9a2588"
}

View File

@@ -1,38 +0,0 @@
{
"db_name": "SQLite",
"query": "INSERT INTO execution_process_logs (execution_id, logs, byte_size, inserted_at)\n VALUES ($1, $2, $3, $4)\n ON CONFLICT (execution_id) DO UPDATE\n SET logs = EXCLUDED.logs, \n byte_size = EXCLUDED.byte_size,\n inserted_at = EXCLUDED.inserted_at\n RETURNING \n execution_id as \"execution_id!: Uuid\",\n logs,\n byte_size,\n inserted_at as \"inserted_at!: DateTime<Utc>\"",
"describe": {
"columns": [
{
"name": "execution_id!: Uuid",
"ordinal": 0,
"type_info": "Blob"
},
{
"name": "logs",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "byte_size",
"ordinal": 2,
"type_info": "Integer"
},
{
"name": "inserted_at!: DateTime<Utc>",
"ordinal": 3,
"type_info": "Text"
}
],
"parameters": {
"Right": 4
},
"nullable": [
true,
false,
false,
false
]
},
"hash": "97e6a03adc1c14e9ecabe7885598dcc0ea273dffea920838fc4dcc837293ba6b"
}

View File

@@ -1,6 +1,6 @@
{ {
"db_name": "SQLite", "db_name": "SQLite",
"query": "SELECT \n execution_id as \"execution_id!: Uuid\",\n logs,\n byte_size,\n inserted_at as \"inserted_at!: DateTime<Utc>\"\n FROM execution_process_logs \n WHERE execution_id = $1", "query": "SELECT \n execution_id as \"execution_id!: Uuid\",\n logs,\n byte_size,\n inserted_at as \"inserted_at!: DateTime<Utc>\"\n FROM execution_process_logs \n WHERE execution_id = $1\n ORDER BY inserted_at ASC",
"describe": { "describe": {
"columns": [ "columns": [
{ {
@@ -28,11 +28,11 @@
"Right": 1 "Right": 1
}, },
"nullable": [ "nullable": [
true, false,
false, false,
false, false,
false false
] ]
}, },
"hash": "2ec7648202fc6f496b97d9486cf9fd3c59fdba73c168628784f0a09488b80528" "hash": "a1574f21db387b0e4a2c3f5723de6df4ee42d98145d16e9d135345dd60128429"
} }

View File

@@ -0,0 +1,53 @@
-- Migration steps following the official SQLite "12-step generalized ALTER TABLE" procedure:
-- https://www.sqlite.org/lang_altertable.html#otheralter
--
PRAGMA foreign_keys = OFF;
-- This is a sqlx workaround to enable BEGIN TRANSACTION in this migration, until `-- no-transaction` lands in sqlx-sqlite.
-- https://github.com/launchbadge/sqlx/issues/2085#issuecomment-1499859906
COMMIT TRANSACTION;
BEGIN TRANSACTION;
-- Create replacement table without the PRIMARY KEY constraint on execution_id.
CREATE TABLE execution_process_logs_new (
execution_id BLOB NOT NULL,
logs TEXT NOT NULL, -- JSONL format (one LogMsg per line)
byte_size INTEGER NOT NULL,
inserted_at TEXT NOT NULL DEFAULT (datetime('now', 'subsec')),
FOREIGN KEY (execution_id) REFERENCES execution_processes(id) ON DELETE CASCADE
);
-- Copy existing data into the replacement table.
INSERT INTO execution_process_logs_new (
execution_id,
logs,
byte_size,
inserted_at
)
SELECT
execution_id,
logs,
byte_size,
inserted_at
FROM execution_process_logs;
-- Drop the original table.
DROP TABLE execution_process_logs;
-- Rename the new table into place.
ALTER TABLE execution_process_logs_new RENAME TO execution_process_logs;
-- Rebuild indexes to preserve performance characteristics.
CREATE INDEX IF NOT EXISTS idx_execution_process_logs_execution_id_inserted_at
ON execution_process_logs (execution_id, inserted_at);
-- Verify foreign key constraints before committing the transaction.
PRAGMA foreign_key_check;
COMMIT;
PRAGMA foreign_keys = ON;
-- sqlx workaround due to lack of `-- no-transaction` in sqlx-sqlite.
BEGIN TRANSACTION;

View File

@@ -13,19 +13,12 @@ pub struct ExecutionProcessLogs {
pub inserted_at: DateTime<Utc>, pub inserted_at: DateTime<Utc>,
} }
#[derive(Debug, Deserialize, TS)]
pub struct CreateExecutionProcessLogs {
pub execution_id: Uuid,
pub logs: String,
pub byte_size: i64,
}
impl ExecutionProcessLogs { impl ExecutionProcessLogs {
/// Find logs by execution process ID /// Find logs by execution process ID
pub async fn find_by_execution_id( pub async fn find_by_execution_id(
pool: &SqlitePool, pool: &SqlitePool,
execution_id: Uuid, execution_id: Uuid,
) -> Result<Option<Self>, sqlx::Error> { ) -> Result<Vec<Self>, sqlx::Error> {
sqlx::query_as!( sqlx::query_as!(
ExecutionProcessLogs, ExecutionProcessLogs,
r#"SELECT r#"SELECT
@@ -34,46 +27,18 @@ impl ExecutionProcessLogs {
byte_size, byte_size,
inserted_at as "inserted_at!: DateTime<Utc>" inserted_at as "inserted_at!: DateTime<Utc>"
FROM execution_process_logs FROM execution_process_logs
WHERE execution_id = $1"#, WHERE execution_id = $1
ORDER BY inserted_at ASC"#,
execution_id execution_id
) )
.fetch_optional(pool) .fetch_all(pool)
.await
}
/// Create or update execution process logs
pub async fn upsert(
pool: &SqlitePool,
data: &CreateExecutionProcessLogs,
) -> Result<Self, sqlx::Error> {
let now = Utc::now();
sqlx::query_as!(
ExecutionProcessLogs,
r#"INSERT INTO execution_process_logs (execution_id, logs, byte_size, inserted_at)
VALUES ($1, $2, $3, $4)
ON CONFLICT (execution_id) DO UPDATE
SET logs = EXCLUDED.logs,
byte_size = EXCLUDED.byte_size,
inserted_at = EXCLUDED.inserted_at
RETURNING
execution_id as "execution_id!: Uuid",
logs,
byte_size,
inserted_at as "inserted_at!: DateTime<Utc>""#,
data.execution_id,
data.logs,
data.byte_size,
now
)
.fetch_one(pool)
.await .await
} }
/// Parse JSONL logs back into Vec<LogMsg> /// Parse JSONL logs back into Vec<LogMsg>
pub fn parse_logs(&self) -> Result<Vec<LogMsg>, serde_json::Error> { pub fn parse_logs(records: &[Self]) -> Result<Vec<LogMsg>, serde_json::Error> {
let mut messages = Vec::new(); let mut messages = Vec::new();
for line in self.logs.lines() { for line in records.iter().flat_map(|record| record.logs.lines()) {
if !line.trim().is_empty() { if !line.trim().is_empty() {
let msg: LogMsg = serde_json::from_str(line)?; let msg: LogMsg = serde_json::from_str(line)?;
messages.push(msg); messages.push(msg);
@@ -102,11 +67,7 @@ impl ExecutionProcessLogs {
let byte_size = jsonl_line.len() as i64; let byte_size = jsonl_line.len() as i64;
sqlx::query!( sqlx::query!(
r#"INSERT INTO execution_process_logs (execution_id, logs, byte_size, inserted_at) r#"INSERT INTO execution_process_logs (execution_id, logs, byte_size, inserted_at)
VALUES ($1, $2, $3, datetime('now', 'subsec')) VALUES ($1, $2, $3, datetime('now', 'subsec'))"#,
ON CONFLICT (execution_id) DO UPDATE
SET logs = logs || $2,
byte_size = byte_size + $3,
inserted_at = datetime('now', 'subsec')"#,
execution_id, execution_id,
jsonl_line, jsonl_line,
byte_size byte_size

View File

@@ -265,17 +265,17 @@ pub trait ContainerService {
); );
} else { } else {
// Fallback: load from DB and create direct stream // Fallback: load from DB and create direct stream
let logs_record = let log_records =
match ExecutionProcessLogs::find_by_execution_id(&self.db().pool, *id).await { match ExecutionProcessLogs::find_by_execution_id(&self.db().pool, *id).await {
Ok(Some(record)) => record, Ok(records) if !records.is_empty() => records,
Ok(None) => return None, // No logs exist Ok(_) => return None, // No logs exist
Err(e) => { Err(e) => {
tracing::error!("Failed to fetch logs for execution {}: {}", id, e); tracing::error!("Failed to fetch logs for execution {}: {}", id, e);
return None; return None;
} }
}; };
let messages = match logs_record.parse_logs() { let messages = match ExecutionProcessLogs::parse_logs(&log_records) {
Ok(msgs) => msgs, Ok(msgs) => msgs,
Err(e) => { Err(e) => {
tracing::error!("Failed to parse logs for execution {}: {}", id, e); tracing::error!("Failed to parse logs for execution {}: {}", id, e);
@@ -314,17 +314,17 @@ pub trait ContainerService {
) )
} else { } else {
// Fallback: load from DB and normalize // Fallback: load from DB and normalize
let logs_record = let log_records =
match ExecutionProcessLogs::find_by_execution_id(&self.db().pool, *id).await { match ExecutionProcessLogs::find_by_execution_id(&self.db().pool, *id).await {
Ok(Some(record)) => record, Ok(records) if !records.is_empty() => records,
Ok(None) => return None, // No logs exist Ok(_) => return None, // No logs exist
Err(e) => { Err(e) => {
tracing::error!("Failed to fetch logs for execution {}: {}", id, e); tracing::error!("Failed to fetch logs for execution {}: {}", id, e);
return None; return None;
} }
}; };
let raw_messages = match logs_record.parse_logs() { let raw_messages = match ExecutionProcessLogs::parse_logs(&log_records) {
Ok(msgs) => msgs, Ok(msgs) => msgs,
Err(e) => { Err(e) => {
tracing::error!("Failed to parse logs for execution {}: {}", id, e); tracing::error!("Failed to parse logs for execution {}: {}", id, e);