Real-time sync for Projects (#1512)

* Real-time sync for Projects

* Do not create project in a transaction

Update hooks trigger before the transaction is commited, which causes insert events to be dismissed because the row is isn't found
This commit is contained in:
Solomon
2025-12-17 18:25:34 +00:00
committed by GitHub
parent 4b4fdb9a60
commit a282bbdae4
21 changed files with 402 additions and 133 deletions

View File

@@ -3,7 +3,7 @@ use std::{str::FromStr, sync::Arc};
use db::{
DBService,
models::{
execution_process::ExecutionProcess, scratch::Scratch, task::Task,
execution_process::ExecutionProcess, project::Project, scratch::Scratch, task::Task,
task_attempt::TaskAttempt,
},
};
@@ -20,7 +20,9 @@ mod streams;
#[path = "events/types.rs"]
pub mod types;
pub use patches::{execution_process_patch, scratch_patch, task_attempt_patch, task_patch};
pub use patches::{
execution_process_patch, project_patch, scratch_patch, task_attempt_patch, task_patch,
};
pub use types::{EventError, EventPatch, EventPatchInner, HookTables, RecordTypes};
#[derive(Clone)]
@@ -107,6 +109,14 @@ impl EventService {
msg_store_for_preupdate.push_patch(patch);
}
}
"projects" => {
if let Ok(value) = preupdate.get_old_column_value(0)
&& let Ok(project_id) = <Uuid as Decode<Sqlite>>::decode(value)
{
let patch = project_patch::remove(project_id);
msg_store_for_preupdate.push_patch(patch);
}
}
"task_attempts" => {
if let Ok(value) = preupdate.get_old_column_value(0)
&& let Ok(attempt_id) = <Uuid as Decode<Sqlite>>::decode(value)
@@ -151,6 +161,7 @@ impl EventService {
runtime_handle.spawn(async move {
let record_type: RecordTypes = match (table, hook.operation.clone()) {
(HookTables::Tasks, SqliteOperation::Delete)
| (HookTables::Projects, SqliteOperation::Delete)
| (HookTables::TaskAttempts, SqliteOperation::Delete)
| (HookTables::ExecutionProcesses, SqliteOperation::Delete)
| (HookTables::Scratch, SqliteOperation::Delete) => {
@@ -171,6 +182,19 @@ impl EventService {
}
}
}
(HookTables::Projects, _) => {
match Project::find_by_rowid(&db.pool, rowid).await {
Ok(Some(project)) => RecordTypes::Project(project),
Ok(None) => RecordTypes::DeletedProject {
rowid,
project_id: None,
},
Err(e) => {
tracing::error!("Failed to fetch project: {:?}", e);
return;
}
}
}
(HookTables::TaskAttempts, _) => {
match TaskAttempt::find_by_rowid(&db.pool, rowid).await {
Ok(Some(attempt)) => RecordTypes::TaskAttempt(attempt),
@@ -261,6 +285,15 @@ impl EventService {
msg_store_for_hook.push_patch(patch);
return;
}
RecordTypes::Project(project) => {
let patch = match hook.operation {
SqliteOperation::Insert => project_patch::add(project),
SqliteOperation::Update => project_patch::replace(project),
_ => project_patch::replace(project),
};
msg_store_for_hook.push_patch(patch);
return;
}
RecordTypes::Scratch(scratch) => {
let patch = match hook.operation {
SqliteOperation::Insert => scratch_patch::add(scratch),

View File

@@ -1,6 +1,6 @@
use db::models::{
execution_process::ExecutionProcess, scratch::Scratch, task::TaskWithAttemptStatus,
task_attempt::TaskAttempt,
execution_process::ExecutionProcess, project::Project, scratch::Scratch,
task::TaskWithAttemptStatus, task_attempt::TaskAttempt,
};
use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation};
use uuid::Uuid;
@@ -48,6 +48,47 @@ pub mod task_patch {
}
}
/// Helper functions for creating project-specific patches
pub mod project_patch {
use super::*;
fn project_path(project_id: Uuid) -> String {
format!(
"/projects/{}",
escape_pointer_segment(&project_id.to_string())
)
}
/// Create patch for adding a new project
pub fn add(project: &Project) -> Patch {
Patch(vec![PatchOperation::Add(AddOperation {
path: project_path(project.id)
.try_into()
.expect("Project path should be valid"),
value: serde_json::to_value(project).expect("Project serialization should not fail"),
})])
}
/// Create patch for updating an existing project
pub fn replace(project: &Project) -> Patch {
Patch(vec![PatchOperation::Replace(ReplaceOperation {
path: project_path(project.id)
.try_into()
.expect("Project path should be valid"),
value: serde_json::to_value(project).expect("Project serialization should not fail"),
})])
}
/// Create patch for removing a project
pub fn remove(project_id: Uuid) -> Patch {
Patch(vec![PatchOperation::Remove(RemoveOperation {
path: project_path(project_id)
.try_into()
.expect("Project path should be valid"),
})])
}
}
/// Helper functions for creating execution process-specific patches
pub mod execution_process_patch {
use super::*;

View File

@@ -1,11 +1,12 @@
use db::models::{
execution_process::ExecutionProcess,
project::Project,
scratch::Scratch,
task::{Task, TaskWithAttemptStatus},
};
use futures::StreamExt;
use serde_json::json;
use tokio_stream::wrappers::BroadcastStream;
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
use utils::log_msg::LogMsg;
use uuid::Uuid;
@@ -145,6 +146,85 @@ impl EventService {
Ok(combined_stream)
}
/// Stream raw project messages with initial snapshot
pub async fn stream_projects_raw(
&self,
) -> Result<futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>, EventError>
{
fn build_projects_snapshot(projects: Vec<Project>) -> LogMsg {
// Convert projects array to object keyed by project ID
let projects_map: serde_json::Map<String, serde_json::Value> = projects
.into_iter()
.map(|project| {
(
project.id.to_string(),
serde_json::to_value(project).unwrap(),
)
})
.collect();
let patch = json!([
{
"op": "replace",
"path": "/projects",
"value": projects_map
}
]);
LogMsg::JsonPatch(serde_json::from_value(patch).unwrap())
}
// Get initial snapshot of projects
let projects = Project::find_all(&self.db.pool).await?;
let initial_msg = build_projects_snapshot(projects);
let db_pool = self.db.pool.clone();
// Get filtered event stream (projects only)
let filtered_stream =
BroadcastStream::new(self.msg_store.get_receiver()).filter_map(move |msg_result| {
let db_pool = db_pool.clone();
async move {
match msg_result {
Ok(LogMsg::JsonPatch(patch)) => {
if let Some(patch_op) = patch.0.first()
&& patch_op.path().starts_with("/projects")
{
return Some(Ok(LogMsg::JsonPatch(patch)));
}
None
}
Ok(other) => Some(Ok(other)), // Pass through non-patch messages
Err(BroadcastStreamRecvError::Lagged(skipped)) => {
tracing::warn!(
skipped = skipped,
"projects stream lagged; resyncing snapshot"
);
match Project::find_all(&db_pool).await {
Ok(projects) => Some(Ok(build_projects_snapshot(projects))),
Err(err) => {
tracing::error!(
error = %err,
"failed to resync projects after lag"
);
Some(Err(std::io::Error::other(format!(
"failed to resync projects after lag: {err}"
))))
}
}
}
}
}
});
// Start with initial snapshot, then live updates
let initial_stream = futures::stream::once(async move { Ok(initial_msg) });
let combined_stream = initial_stream.chain(filtered_stream).boxed();
Ok(combined_stream)
}
/// Stream execution processes for a specific task attempt with initial snapshot (raw LogMsg format for WebSocket)
pub async fn stream_execution_processes_for_attempt_raw(
&self,

View File

@@ -1,6 +1,7 @@
use anyhow::Error as AnyhowError;
use db::models::{
execution_process::ExecutionProcess, scratch::Scratch, task::Task, task_attempt::TaskAttempt,
execution_process::ExecutionProcess, project::Project, scratch::Scratch, task::Task,
task_attempt::TaskAttempt,
};
use serde::{Deserialize, Serialize};
use sqlx::Error as SqlxError;
@@ -29,6 +30,8 @@ pub enum HookTables {
ExecutionProcesses,
#[strum(to_string = "scratch")]
Scratch,
#[strum(to_string = "projects")]
Projects,
}
#[derive(Serialize, Deserialize, TS)]
@@ -38,6 +41,7 @@ pub enum RecordTypes {
TaskAttempt(TaskAttempt),
ExecutionProcess(ExecutionProcess),
Scratch(Scratch),
Project(Project),
DeletedTask {
rowid: i64,
project_id: Option<Uuid>,
@@ -57,6 +61,10 @@ pub enum RecordTypes {
scratch_id: Option<Uuid>,
scratch_type: Option<String>,
},
DeletedProject {
rowid: i64,
project_id: Option<Uuid>,
},
}
#[derive(Serialize, Deserialize, TS)]

View File

@@ -106,22 +106,17 @@ impl ProjectService {
let id = Uuid::new_v4();
// Start transaction
let mut tx = pool.begin().await?;
let project = Project::create(&mut *tx, &payload, id)
let project = Project::create(pool, &payload, id)
.await
.map_err(|e| ProjectServiceError::Project(ProjectError::CreateFailed(e.to_string())))?;
for repo in normalized_repos {
let repo_entity =
Repo::find_or_create(&mut *tx, Path::new(&repo.git_repo_path), &repo.display_name)
Repo::find_or_create(pool, Path::new(&repo.git_repo_path), &repo.display_name)
.await?;
ProjectRepo::create(&mut *tx, project.id, repo_entity.id).await?;
ProjectRepo::create(pool, project.id, repo_entity.id).await?;
}
tx.commit().await?;
Ok(project)
}