Fetch initial diffs asynchronously in diff stream (#1376)
Move the blocking get_diffs call into a spawned task so the WebSocket stream is returned immediately. This prevents timeouts when fetching diffs for repositories with many changed files. Also remove a duplicate useEffect in DiffsPanel. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -148,51 +148,69 @@ pub async fn create(
|
|||||||
base_commit: Commit,
|
base_commit: Commit,
|
||||||
stats_only: bool,
|
stats_only: bool,
|
||||||
) -> Result<DiffStreamHandle, DiffStreamError> {
|
) -> Result<DiffStreamHandle, DiffStreamError> {
|
||||||
let initial_diffs_raw = git_service.get_diffs(
|
let (tx, rx) = mpsc::channel::<Result<LogMsg, io::Error>>(DIFF_STREAM_CHANNEL_CAPACITY);
|
||||||
DiffTarget::Worktree {
|
|
||||||
worktree_path: &worktree_path,
|
|
||||||
base_commit: &base_commit,
|
|
||||||
},
|
|
||||||
None,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let cumulative = Arc::new(AtomicUsize::new(0));
|
let cumulative = Arc::new(AtomicUsize::new(0));
|
||||||
let full_sent = Arc::new(std::sync::RwLock::new(HashSet::<String>::new()));
|
let full_sent = Arc::new(std::sync::RwLock::new(HashSet::<String>::new()));
|
||||||
let mut initial_diffs = Vec::with_capacity(initial_diffs_raw.len());
|
|
||||||
for mut diff in initial_diffs_raw {
|
|
||||||
apply_stream_omit_policy(&mut diff, &cumulative, stats_only);
|
|
||||||
initial_diffs.push(diff);
|
|
||||||
}
|
|
||||||
|
|
||||||
{
|
// Spawn a task to fetch initial diffs and set up the file watcher.
|
||||||
let mut guard = full_sent.write().unwrap();
|
// This allows the stream to be returned immediately while diff fetching
|
||||||
for diff in &initial_diffs {
|
// happens in the background, preventing WebSocket timeouts for large diffs.
|
||||||
if !diff.content_omitted {
|
let tx_clone = tx.clone();
|
||||||
guard.insert(GitService::diff_path(diff));
|
let watcher_task = tokio::spawn(async move {
|
||||||
|
// Fetch initial diffs in a blocking task to avoid blocking the async runtime
|
||||||
|
let git_for_diff = git_service.clone();
|
||||||
|
let worktree_for_diff = worktree_path.clone();
|
||||||
|
let base_for_diff = base_commit.clone();
|
||||||
|
|
||||||
|
let initial_diffs_result = tokio::task::spawn_blocking(move || {
|
||||||
|
git_for_diff.get_diffs(
|
||||||
|
DiffTarget::Worktree {
|
||||||
|
worktree_path: &worktree_for_diff,
|
||||||
|
base_commit: &base_for_diff,
|
||||||
|
},
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let initial_diffs_raw = match initial_diffs_result {
|
||||||
|
Ok(Ok(diffs)) => diffs,
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
tracing::error!("Failed to get initial diffs: {e}");
|
||||||
|
send_error(&tx_clone, e.to_string()).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Err(join_err) => {
|
||||||
|
tracing::error!("Diff fetch task join error: {join_err}");
|
||||||
|
send_error(&tx_clone, format!("Diff fetch failed: {join_err}")).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut initial_diffs = Vec::with_capacity(initial_diffs_raw.len());
|
||||||
|
for mut diff in initial_diffs_raw {
|
||||||
|
apply_stream_omit_policy(&mut diff, &cumulative, stats_only);
|
||||||
|
initial_diffs.push(diff);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut guard = full_sent.write().unwrap();
|
||||||
|
for diff in &initial_diffs {
|
||||||
|
if !diff.content_omitted {
|
||||||
|
guard.insert(GitService::diff_path(diff));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
let (tx, rx) = mpsc::channel::<Result<LogMsg, io::Error>>(DIFF_STREAM_CHANNEL_CAPACITY);
|
if !send_initial_diffs(&tx_clone, initial_diffs).await {
|
||||||
if !send_initial_diffs(&tx, initial_diffs).await {
|
return;
|
||||||
return Ok(DiffStreamHandle::new(ReceiverStream::new(rx).boxed(), None));
|
}
|
||||||
}
|
|
||||||
|
|
||||||
let tx_clone = tx.clone();
|
// Set up filesystem watcher for live updates
|
||||||
let ctx = DiffWatcherContext {
|
let worktree_for_watcher = worktree_path.clone();
|
||||||
git_service,
|
|
||||||
worktree_path: worktree_path.clone(),
|
|
||||||
base_commit,
|
|
||||||
cumulative,
|
|
||||||
full_sent,
|
|
||||||
stats_only,
|
|
||||||
tx: tx_clone,
|
|
||||||
};
|
|
||||||
|
|
||||||
let watcher_task = tokio::spawn(async move {
|
|
||||||
let worktree_path_for_spawn = worktree_path;
|
|
||||||
let watcher_result = tokio::task::spawn_blocking(move || {
|
let watcher_result = tokio::task::spawn_blocking(move || {
|
||||||
filesystem_watcher::async_watcher(worktree_path_for_spawn)
|
filesystem_watcher::async_watcher(worktree_for_watcher)
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
@@ -200,13 +218,13 @@ pub async fn create(
|
|||||||
Ok(Ok(parts)) => parts,
|
Ok(Ok(parts)) => parts,
|
||||||
Ok(Err(e)) => {
|
Ok(Err(e)) => {
|
||||||
tracing::error!("Failed to set up filesystem watcher: {e}");
|
tracing::error!("Failed to set up filesystem watcher: {e}");
|
||||||
send_error(&ctx.tx, e.to_string()).await;
|
send_error(&tx_clone, e.to_string()).await;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Err(join_err) => {
|
Err(join_err) => {
|
||||||
tracing::error!("Failed to spawn watcher setup: {join_err}");
|
tracing::error!("Failed to spawn watcher setup: {join_err}");
|
||||||
send_error(
|
send_error(
|
||||||
&ctx.tx,
|
&tx_clone,
|
||||||
format!("Failed to spawn watcher setup: {join_err}"),
|
format!("Failed to spawn watcher setup: {join_err}"),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@@ -214,6 +232,16 @@ pub async fn create(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let ctx = DiffWatcherContext {
|
||||||
|
git_service,
|
||||||
|
worktree_path,
|
||||||
|
base_commit,
|
||||||
|
cumulative,
|
||||||
|
full_sent,
|
||||||
|
stats_only,
|
||||||
|
tx: tx_clone,
|
||||||
|
};
|
||||||
|
|
||||||
let _debouncer_guard = debouncer;
|
let _debouncer_guard = debouncer;
|
||||||
|
|
||||||
while let Some(result) = watcher_rx.next().await {
|
while let Some(result) = watcher_rx.next().await {
|
||||||
|
|||||||
@@ -39,10 +39,6 @@ export function DiffsPanel({ selectedAttempt, gitOps }: DiffsPanelProps) {
|
|||||||
setHasInitialized(false);
|
setHasInitialized(false);
|
||||||
}, [selectedAttempt?.id]);
|
}, [selectedAttempt?.id]);
|
||||||
|
|
||||||
useEffect(() => {
|
|
||||||
setLoading(true);
|
|
||||||
}, [selectedAttempt?.id]);
|
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
if (diffs.length > 0 && loading) {
|
if (diffs.length > 0 && loading) {
|
||||||
setLoading(false);
|
setLoading(false);
|
||||||
|
|||||||
Reference in New Issue
Block a user