Fix mem leak (#916)

* Fix mem leak

* Fix cross-platform, fix no error logging
This commit is contained in:
Alex Netsch
2025-10-03 09:46:44 +01:00
committed by GitHub
parent 0f835e9730
commit 8dc297c521
3 changed files with 84 additions and 34 deletions

View File

@@ -40,7 +40,8 @@ use executors::{
},
};
use futures::{FutureExt, StreamExt, TryStreamExt, stream::select};
use notify_debouncer_full::DebouncedEvent;
use notify::RecommendedWatcher;
use notify_debouncer_full::{DebouncedEvent, Debouncer, RecommendedCache};
use serde_json::json;
use services::services::{
analytics::AnalyticsContext,
@@ -63,6 +64,25 @@ use uuid::Uuid;
use crate::command;
/// Stream wrapper that owns the filesystem watcher
/// When this stream is dropped, the watcher is automatically cleaned up
struct DiffStreamWithWatcher {
stream: futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>,
_watcher: Option<Debouncer<RecommendedWatcher, RecommendedCache>>,
}
impl futures::Stream for DiffStreamWithWatcher {
type Item = Result<LogMsg, std::io::Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
// Delegate to inner stream
std::pin::Pin::new(&mut self.stream).poll_next(cx)
}
}
#[derive(Clone)]
pub struct LocalContainerService {
db: DBService,
@@ -621,8 +641,7 @@ impl LocalContainerService {
project_repo_path: &Path,
merge_commit_id: &str,
stats_only: bool,
) -> Result<futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>, ContainerError>
{
) -> Result<DiffStreamWithWatcher, ContainerError> {
let diffs = self.git().get_diffs(
DiffTarget::Commit {
repo_path: project_repo_path,
@@ -651,17 +670,20 @@ impl LocalContainerService {
}))
.boxed();
Ok(stream)
Ok(DiffStreamWithWatcher {
stream,
_watcher: None, // Merged diffs are static, no watcher needed
})
}
/// Create a live diff log stream for ongoing attempts for WebSocket
/// Returns a stream that owns the filesystem watcher - when dropped, watcher is cleaned up
async fn create_live_diff_stream(
&self,
worktree_path: &Path,
base_commit: &Commit,
stats_only: bool,
) -> Result<futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>, ContainerError>
{
) -> Result<DiffStreamWithWatcher, ContainerError> {
// Get initial snapshot
let git_service = self.git().clone();
let initial_diffs = git_service.get_diffs(
@@ -704,22 +726,21 @@ impl LocalContainerService {
// Create live update stream
let worktree_path = worktree_path.to_path_buf();
let base_commit = base_commit.clone();
let worktree_path_for_spawn = worktree_path.clone();
let watcher_result = tokio::task::spawn_blocking(move || {
filesystem_watcher::async_watcher(worktree_path_for_spawn)
})
.await
.map_err(|e| io::Error::other(format!("Failed to spawn watcher setup: {e}")))?;
let (debouncer, mut rx, canonical_worktree_path) =
watcher_result.map_err(|e| io::Error::other(e.to_string()))?;
let live_stream = {
let git_service = git_service.clone();
let worktree_path_for_spawn = worktree_path.clone();
let cumulative = Arc::clone(&cumulative);
let full_sent = Arc::clone(&full_sent);
try_stream! {
let watcher_result = tokio::task::spawn_blocking(move || {
filesystem_watcher::async_watcher(worktree_path_for_spawn)
})
.await
.map_err(|e| io::Error::other(format!("Failed to spawn watcher setup: {e}")))?;
let (_debouncer, mut rx, canonical_worktree_path) = watcher_result
.map_err(|e| io::Error::other(e.to_string()))?;
while let Some(result) = rx.next().await {
match result {
Ok(events) => {
@@ -755,8 +776,12 @@ impl LocalContainerService {
}
}.boxed();
let combined_stream = initial_stream.chain(live_stream);
Ok(combined_stream.boxed())
let combined_stream = initial_stream.chain(live_stream).boxed();
Ok(DiffStreamWithWatcher {
stream: combined_stream,
_watcher: Some(debouncer),
})
}
/// Extract changed file paths from filesystem events
@@ -1120,7 +1145,9 @@ impl ContainerService for LocalContainerService {
&& self.is_container_clean(task_attempt).await?
&& !is_ahead
{
return self.create_merged_diff_stream(&project_repo_path, &commit, stats_only);
let wrapper =
self.create_merged_diff_stream(&project_repo_path, &commit, stats_only)?;
return Ok(Box::pin(wrapper));
}
let container_ref = self.ensure_container_exists(task_attempt).await?;
@@ -1131,8 +1158,10 @@ impl ContainerService for LocalContainerService {
&task_attempt.target_branch,
)?;
self.create_live_diff_stream(&worktree_path, &base_commit, stats_only)
.await
let wrapper = self
.create_live_diff_stream(&worktree_path, &base_commit, stats_only)
.await?;
Ok(Box::pin(wrapper))
}
async fn try_commit_changes(&self, ctx: &ExecutionContext) -> Result<bool, ContainerError> {

View File

@@ -502,25 +502,37 @@ async fn handle_task_attempt_diff_ws(
use futures_util::{SinkExt, StreamExt, TryStreamExt};
use utils::log_msg::LogMsg;
let mut stream = deployment
let stream = deployment
.container()
.stream_diff(&task_attempt, stats_only)
.await?
.map_ok(|msg: LogMsg| msg.to_ws_message_unchecked());
.await?;
let mut stream = stream.map_ok(|msg: LogMsg| msg.to_ws_message_unchecked());
let (mut sender, mut receiver) = socket.split();
tokio::spawn(async move { while let Some(Ok(_)) = receiver.next().await {} });
while let Some(item) = stream.next().await {
match item {
Ok(msg) => {
if sender.send(msg).await.is_err() {
break;
loop {
tokio::select! {
// Wait for next stream item
item = stream.next() => {
match item {
Some(Ok(msg)) => {
if sender.send(msg).await.is_err() {
break;
}
}
Some(Err(e)) => {
tracing::error!("stream error: {}", e);
break;
}
None => break,
}
}
Err(e) => {
tracing::error!("stream error: {}", e);
break;
// Detect client disconnection
msg = receiver.next() => {
if msg.is_none() {
break;
}
}
}
}

View File

@@ -154,7 +154,16 @@ export const useJsonPatchWsStream = <T>(
return () => {
if (wsRef.current) {
wsRef.current.close();
const ws = wsRef.current;
// Clear all event handlers first to prevent callbacks after cleanup
ws.onopen = null;
ws.onmessage = null;
ws.onerror = null;
ws.onclose = null;
// Close regardless of state
ws.close();
wsRef.current = null;
}
if (retryTimerRef.current) {