Soft remove processes on retry instead of hard delete (#753)
This commit is contained in:
12
crates/db/.sqlx/query-0a6ebe48540eb694056f10b888beb95e933ca3dcfff9b1a96aa333f4743d697d.json
generated
Normal file
12
crates/db/.sqlx/query-0a6ebe48540eb694056f10b888beb95e933ca3dcfff9b1a96aa333f4743d697d.json
generated
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"db_name": "SQLite",
|
||||
"query": "UPDATE execution_processes\n SET dropped = 1\n WHERE task_attempt_id = $1\n AND created_at >= (SELECT created_at FROM execution_processes WHERE id = $2)\n AND dropped = 0",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Right": 2
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "0a6ebe48540eb694056f10b888beb95e933ca3dcfff9b1a96aa333f4743d697d"
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
{
|
||||
"db_name": "SQLite",
|
||||
"query": "DELETE FROM execution_processes\n WHERE task_attempt_id = $1\n AND created_at >= (SELECT created_at FROM execution_processes WHERE id = $2)",
|
||||
"describe": {
|
||||
"columns": [],
|
||||
"parameters": {
|
||||
"Right": 2
|
||||
},
|
||||
"nullable": []
|
||||
},
|
||||
"hash": "7bd5f383415cf853d89dd55ab9b5cf881cb62a2bc1cdb6bfaecd2567ece420ea"
|
||||
}
|
||||
@@ -446,16 +446,18 @@ impl ExecutionProcess {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete processes at and after the specified boundary (inclusive)
|
||||
pub async fn delete_at_and_after(
|
||||
/// Soft-drop processes at and after the specified boundary (inclusive)
|
||||
pub async fn drop_at_and_after(
|
||||
pool: &SqlitePool,
|
||||
task_attempt_id: Uuid,
|
||||
boundary_process_id: Uuid,
|
||||
) -> Result<i64, sqlx::Error> {
|
||||
let result = sqlx::query!(
|
||||
r#"DELETE FROM execution_processes
|
||||
WHERE task_attempt_id = $1
|
||||
AND created_at >= (SELECT created_at FROM execution_processes WHERE id = $2)"#,
|
||||
r#"UPDATE execution_processes
|
||||
SET dropped = 1
|
||||
WHERE task_attempt_id = $1
|
||||
AND created_at >= (SELECT created_at FROM execution_processes WHERE id = $2)
|
||||
AND dropped = 0"#,
|
||||
task_attempt_id,
|
||||
boundary_process_id
|
||||
)
|
||||
|
||||
@@ -854,9 +854,8 @@ pub async fn replace_process(
|
||||
// Stop any running processes for this attempt
|
||||
deployment.container().try_stop(&task_attempt).await;
|
||||
|
||||
// Delete the target process and all later processes
|
||||
let deleted_count =
|
||||
ExecutionProcess::delete_at_and_after(pool, task_attempt.id, proc_id).await?;
|
||||
// Soft-drop the target process and all later processes
|
||||
let deleted_count = ExecutionProcess::drop_at_and_after(pool, task_attempt.id, proc_id).await?;
|
||||
|
||||
// Build follow-up executor action using the original process profile
|
||||
let initial_executor_profile_id = match &process
|
||||
|
||||
Reference in New Issue
Block a user