diff --git a/crates/local-deployment/src/container.rs b/crates/local-deployment/src/container.rs index f33f3821..7c5bf039 100644 --- a/crates/local-deployment/src/container.rs +++ b/crates/local-deployment/src/container.rs @@ -40,7 +40,8 @@ use executors::{ }, }; use futures::{FutureExt, StreamExt, TryStreamExt, stream::select}; -use notify_debouncer_full::DebouncedEvent; +use notify::RecommendedWatcher; +use notify_debouncer_full::{DebouncedEvent, Debouncer, RecommendedCache}; use serde_json::json; use services::services::{ analytics::AnalyticsContext, @@ -63,6 +64,25 @@ use uuid::Uuid; use crate::command; +/// Stream wrapper that owns the filesystem watcher +/// When this stream is dropped, the watcher is automatically cleaned up +struct DiffStreamWithWatcher { + stream: futures::stream::BoxStream<'static, Result>, + _watcher: Option>, +} + +impl futures::Stream for DiffStreamWithWatcher { + type Item = Result; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + // Delegate to inner stream + std::pin::Pin::new(&mut self.stream).poll_next(cx) + } +} + #[derive(Clone)] pub struct LocalContainerService { db: DBService, @@ -621,8 +641,7 @@ impl LocalContainerService { project_repo_path: &Path, merge_commit_id: &str, stats_only: bool, - ) -> Result>, ContainerError> - { + ) -> Result { let diffs = self.git().get_diffs( DiffTarget::Commit { repo_path: project_repo_path, @@ -651,17 +670,20 @@ impl LocalContainerService { })) .boxed(); - Ok(stream) + Ok(DiffStreamWithWatcher { + stream, + _watcher: None, // Merged diffs are static, no watcher needed + }) } /// Create a live diff log stream for ongoing attempts for WebSocket + /// Returns a stream that owns the filesystem watcher - when dropped, watcher is cleaned up async fn create_live_diff_stream( &self, worktree_path: &Path, base_commit: &Commit, stats_only: bool, - ) -> Result>, ContainerError> - { + ) -> Result { // Get initial snapshot let git_service = self.git().clone(); let initial_diffs = git_service.get_diffs( @@ -704,22 +726,21 @@ impl LocalContainerService { // Create live update stream let worktree_path = worktree_path.to_path_buf(); let base_commit = base_commit.clone(); + let worktree_path_for_spawn = worktree_path.clone(); + let watcher_result = tokio::task::spawn_blocking(move || { + filesystem_watcher::async_watcher(worktree_path_for_spawn) + }) + .await + .map_err(|e| io::Error::other(format!("Failed to spawn watcher setup: {e}")))?; + let (debouncer, mut rx, canonical_worktree_path) = + watcher_result.map_err(|e| io::Error::other(e.to_string()))?; let live_stream = { let git_service = git_service.clone(); - let worktree_path_for_spawn = worktree_path.clone(); let cumulative = Arc::clone(&cumulative); let full_sent = Arc::clone(&full_sent); + try_stream! { - let watcher_result = tokio::task::spawn_blocking(move || { - filesystem_watcher::async_watcher(worktree_path_for_spawn) - }) - .await - .map_err(|e| io::Error::other(format!("Failed to spawn watcher setup: {e}")))?; - - let (_debouncer, mut rx, canonical_worktree_path) = watcher_result - .map_err(|e| io::Error::other(e.to_string()))?; - while let Some(result) = rx.next().await { match result { Ok(events) => { @@ -755,8 +776,12 @@ impl LocalContainerService { } }.boxed(); - let combined_stream = initial_stream.chain(live_stream); - Ok(combined_stream.boxed()) + let combined_stream = initial_stream.chain(live_stream).boxed(); + + Ok(DiffStreamWithWatcher { + stream: combined_stream, + _watcher: Some(debouncer), + }) } /// Extract changed file paths from filesystem events @@ -1120,7 +1145,9 @@ impl ContainerService for LocalContainerService { && self.is_container_clean(task_attempt).await? && !is_ahead { - return self.create_merged_diff_stream(&project_repo_path, &commit, stats_only); + let wrapper = + self.create_merged_diff_stream(&project_repo_path, &commit, stats_only)?; + return Ok(Box::pin(wrapper)); } let container_ref = self.ensure_container_exists(task_attempt).await?; @@ -1131,8 +1158,10 @@ impl ContainerService for LocalContainerService { &task_attempt.target_branch, )?; - self.create_live_diff_stream(&worktree_path, &base_commit, stats_only) - .await + let wrapper = self + .create_live_diff_stream(&worktree_path, &base_commit, stats_only) + .await?; + Ok(Box::pin(wrapper)) } async fn try_commit_changes(&self, ctx: &ExecutionContext) -> Result { diff --git a/crates/server/src/routes/task_attempts.rs b/crates/server/src/routes/task_attempts.rs index 61939f54..f5f7a30b 100644 --- a/crates/server/src/routes/task_attempts.rs +++ b/crates/server/src/routes/task_attempts.rs @@ -502,25 +502,37 @@ async fn handle_task_attempt_diff_ws( use futures_util::{SinkExt, StreamExt, TryStreamExt}; use utils::log_msg::LogMsg; - let mut stream = deployment + let stream = deployment .container() .stream_diff(&task_attempt, stats_only) - .await? - .map_ok(|msg: LogMsg| msg.to_ws_message_unchecked()); + .await?; + + let mut stream = stream.map_ok(|msg: LogMsg| msg.to_ws_message_unchecked()); let (mut sender, mut receiver) = socket.split(); - tokio::spawn(async move { while let Some(Ok(_)) = receiver.next().await {} }); - while let Some(item) = stream.next().await { - match item { - Ok(msg) => { - if sender.send(msg).await.is_err() { - break; + loop { + tokio::select! { + // Wait for next stream item + item = stream.next() => { + match item { + Some(Ok(msg)) => { + if sender.send(msg).await.is_err() { + break; + } + } + Some(Err(e)) => { + tracing::error!("stream error: {}", e); + break; + } + None => break, } } - Err(e) => { - tracing::error!("stream error: {}", e); - break; + // Detect client disconnection + msg = receiver.next() => { + if msg.is_none() { + break; + } } } } diff --git a/frontend/src/hooks/useJsonPatchWsStream.ts b/frontend/src/hooks/useJsonPatchWsStream.ts index 5f3588fa..dbc28082 100644 --- a/frontend/src/hooks/useJsonPatchWsStream.ts +++ b/frontend/src/hooks/useJsonPatchWsStream.ts @@ -154,7 +154,16 @@ export const useJsonPatchWsStream = ( return () => { if (wsRef.current) { - wsRef.current.close(); + const ws = wsRef.current; + + // Clear all event handlers first to prevent callbacks after cleanup + ws.onopen = null; + ws.onmessage = null; + ws.onerror = null; + ws.onclose = null; + + // Close regardless of state + ws.close(); wsRef.current = null; } if (retryTimerRef.current) {