diff --git a/crates/local-deployment/src/container.rs b/crates/local-deployment/src/container.rs index 42fd611b..abe626eb 100644 --- a/crates/local-deployment/src/container.rs +++ b/crates/local-deployment/src/container.rs @@ -55,7 +55,6 @@ use services::services::{ use tokio::{sync::RwLock, task::JoinHandle}; use tokio_util::io::ReaderStream; use utils::{ - diff::create_unified_diff_hunk, log_msg::LogMsg, msg_store::MsgStore, text::{git_branch_id, short_uuid}, @@ -82,10 +81,15 @@ impl LocalContainerService { // Apply stream-level omit policy based on cumulative bytes. // If adding this diff's contents exceeds the cap, strip contents and set stats. fn apply_stream_omit_policy( - &self, diff: &mut utils::diff::Diff, sent_bytes: &Arc, + stats_only: bool, ) { + if stats_only { + Self::omit_diff_contents(diff); + return; + } + // Compute size of current diff payload let mut size = 0usize; if let Some(ref s) = diff.old_content { @@ -101,35 +105,29 @@ impl LocalContainerService { let current = sent_bytes.load(Ordering::Relaxed); if current.saturating_add(size) > Self::MAX_CUMULATIVE_DIFF_BYTES { - // We will omit content for this diff. If we still have both sides loaded - // (i.e., not already omitted by file-size guards), compute stats for UI. - if diff.additions.is_none() && diff.deletions.is_none() { - let old = diff.old_content.as_deref().unwrap_or(""); - let new = diff.new_content.as_deref().unwrap_or(""); - let hunk = create_unified_diff_hunk(old, new); - let mut add = 0usize; - let mut del = 0usize; - for line in hunk.lines() { - if let Some(first) = line.chars().next() { - if first == '+' { - add += 1; - } else if first == '-' { - del += 1; - } - } - } - diff.additions = Some(add); - diff.deletions = Some(del); - } - - diff.old_content = None; - diff.new_content = None; - diff.content_omitted = true; + Self::omit_diff_contents(diff); } else { // safe to include; account for it let _ = sent_bytes.fetch_add(size, Ordering::Relaxed); } } + + fn omit_diff_contents(diff: &mut utils::diff::Diff) { + if diff.additions.is_none() + && diff.deletions.is_none() + && (diff.old_content.is_some() || diff.new_content.is_some()) + { + let old = diff.old_content.as_deref().unwrap_or(""); + let new = diff.new_content.as_deref().unwrap_or(""); + let (add, del) = utils::diff::compute_line_change_counts(old, new); + diff.additions = Some(add); + diff.deletions = Some(del); + } + + diff.old_content = None; + diff.new_content = None; + diff.content_omitted = true; + } pub fn new( db: DBService, msg_stores: Arc>>>, @@ -627,6 +625,7 @@ impl LocalContainerService { &self, project_repo_path: &Path, merge_commit_id: &str, + stats_only: bool, ) -> Result>, ContainerError> { let diffs = self.git().get_diffs( @@ -641,7 +640,7 @@ impl LocalContainerService { let diffs: Vec<_> = diffs .into_iter() .map(|mut d| { - self.apply_stream_omit_policy(&mut d, &cum); + Self::apply_stream_omit_policy(&mut d, &cum, stats_only); d }) .collect(); @@ -665,6 +664,7 @@ impl LocalContainerService { &self, worktree_path: &Path, base_commit: &Commit, + stats_only: bool, ) -> Result>, ContainerError> { // Get initial snapshot @@ -682,7 +682,7 @@ impl LocalContainerService { let initial_diffs: Vec<_> = initial_diffs .into_iter() .map(|mut d| { - self.apply_stream_omit_policy(&mut d, &cumulative); + Self::apply_stream_omit_policy(&mut d, &cumulative, stats_only); d }) .collect(); @@ -738,6 +738,7 @@ impl LocalContainerService { &changed_paths, &cumulative, &full_sent, + stats_only, ).map_err(|e| { tracing::error!("Error processing file changes: {}", e); io::Error::other(e.to_string()) @@ -790,6 +791,7 @@ impl LocalContainerService { changed_paths: &[String], cumulative_bytes: &Arc, full_sent_paths: &Arc>>, + stats_only: bool, ) -> Result, ContainerError> { let path_filter: Vec<&str> = changed_paths.iter().map(|s| s.as_str()).collect(); @@ -809,47 +811,7 @@ impl LocalContainerService { let file_path = GitService::diff_path(&diff); files_with_diffs.insert(file_path.clone()); // Apply stream-level omit policy (affects contents and stats) - // Note: we can't call self methods from static fn; implement inline - { - // Compute size - let mut size = 0usize; - if let Some(ref s) = diff.old_content { - size += s.len(); - } - if let Some(ref s) = diff.new_content { - size += s.len(); - } - if size > 0 { - let current = cumulative_bytes.load(Ordering::Relaxed); - if current.saturating_add(size) - > LocalContainerService::MAX_CUMULATIVE_DIFF_BYTES - { - if diff.additions.is_none() && diff.deletions.is_none() { - let old = diff.old_content.as_deref().unwrap_or(""); - let new = diff.new_content.as_deref().unwrap_or(""); - let hunk = create_unified_diff_hunk(old, new); - let mut add = 0usize; - let mut del = 0usize; - for line in hunk.lines() { - if let Some(first) = line.chars().next() { - if first == '+' { - add += 1; - } else if first == '-' { - del += 1; - } - } - } - diff.additions = Some(add); - diff.deletions = Some(del); - } - diff.old_content = None; - diff.new_content = None; - diff.content_omitted = true; - } else { - let _ = cumulative_bytes.fetch_add(size, Ordering::Relaxed); - } - } - } + Self::apply_stream_omit_policy(&mut diff, cumulative_bytes, stats_only); if diff.content_omitted { if full_sent_paths.read().unwrap().contains(&file_path) { @@ -1149,6 +1111,7 @@ impl ContainerService for LocalContainerService { async fn stream_diff( &self, task_attempt: &TaskAttempt, + stats_only: bool, ) -> Result>, ContainerError> { let project_repo_path = self.get_project_repo_path(task_attempt).await?; @@ -1177,7 +1140,7 @@ impl ContainerService for LocalContainerService { && self.is_container_clean(task_attempt).await? && !is_ahead { - return self.create_merged_diff_stream(&project_repo_path, &commit); + return self.create_merged_diff_stream(&project_repo_path, &commit, stats_only); } let container_ref = self.ensure_container_exists(task_attempt).await?; @@ -1188,7 +1151,7 @@ impl ContainerService for LocalContainerService { &task_attempt.base_branch, )?; - self.create_live_diff_stream(&worktree_path, &base_commit) + self.create_live_diff_stream(&worktree_path, &base_commit, stats_only) .await } diff --git a/crates/server/src/routes/task_attempts.rs b/crates/server/src/routes/task_attempts.rs index a501e24c..f179bad7 100644 --- a/crates/server/src/routes/task_attempts.rs +++ b/crates/server/src/routes/task_attempts.rs @@ -99,6 +99,12 @@ pub struct TaskAttemptQuery { pub task_id: Option, } +#[derive(Debug, Deserialize)] +pub struct DiffStreamQuery { + #[serde(default)] + pub stats_only: bool, +} + pub async fn get_task_attempts( State(deployment): State, Query(query): Query, @@ -961,11 +967,15 @@ pub async fn replace_process( #[axum::debug_handler] pub async fn stream_task_attempt_diff_ws( ws: WebSocketUpgrade, + Query(params): Query, Extension(task_attempt): Extension, State(deployment): State, ) -> impl IntoResponse { + let stats_only = params.stats_only; ws.on_upgrade(move |socket| async move { - if let Err(e) = handle_task_attempt_diff_ws(socket, deployment, task_attempt).await { + if let Err(e) = + handle_task_attempt_diff_ws(socket, deployment, task_attempt, stats_only).await + { tracing::warn!("diff WS closed: {}", e); } }) @@ -975,13 +985,14 @@ async fn handle_task_attempt_diff_ws( socket: WebSocket, deployment: DeploymentImpl, task_attempt: TaskAttempt, + stats_only: bool, ) -> anyhow::Result<()> { use futures_util::{SinkExt, StreamExt, TryStreamExt}; use utils::log_msg::LogMsg; let mut stream = deployment .container() - .stream_diff(&task_attempt) + .stream_diff(&task_attempt, stats_only) .await? .map_ok(|msg: LogMsg| msg.to_ws_message_unchecked()); diff --git a/crates/services/src/services/container.rs b/crates/services/src/services/container.rs index 3aac4126..e7ac906e 100644 --- a/crates/services/src/services/container.rs +++ b/crates/services/src/services/container.rs @@ -197,6 +197,7 @@ pub trait ContainerService { async fn stream_diff( &self, task_attempt: &TaskAttempt, + stats_only: bool, ) -> Result>, ContainerError>; /// Fetch the MsgStore for a given execution ID, panicking if missing. diff --git a/crates/utils/src/diff.rs b/crates/utils/src/diff.rs index 90ef0ffd..a585cd42 100644 --- a/crates/utils/src/diff.rs +++ b/crates/utils/src/diff.rs @@ -1,3 +1,5 @@ +use std::borrow::Cow; + use serde::{Deserialize, Serialize}; use similar::{ChangeTag, TextDiff}; use ts_rs::TS; @@ -89,6 +91,37 @@ pub fn create_unified_diff(file_path: &str, old: &str, new: &str) -> String { out } +/// Compute addition/deletion counts between two text snapshots. +pub fn compute_line_change_counts(old: &str, new: &str) -> (usize, usize) { + let old = ensure_newline(old); + let new = ensure_newline(new); + + let diff = TextDiff::from_lines(&old, &new); + + let mut additions = 0usize; + let mut deletions = 0usize; + for change in diff.iter_all_changes() { + match change.tag() { + ChangeTag::Insert => additions += 1, + ChangeTag::Delete => deletions += 1, + ChangeTag::Equal => {} + } + } + + (additions, deletions) +} + +// ensure a line ends with a newline character +fn ensure_newline(line: &str) -> Cow<'_, str> { + if line.ends_with('\n') { + Cow::Borrowed(line) + } else { + let mut owned = line.to_owned(); + owned.push('\n'); + Cow::Owned(owned) + } +} + /// Extracts unified diff hunks from a string containing a full unified diff. /// Tolerates non-diff lines and missing `@@`` hunk headers. pub fn extract_unified_diff_hunks(unified_diff: &str) -> Vec { diff --git a/frontend/src/components/tasks/TaskDetails/DiffTab.tsx b/frontend/src/components/tasks/TaskDetails/DiffTab.tsx index 659b2953..a7a1269a 100644 --- a/frontend/src/components/tasks/TaskDetails/DiffTab.tsx +++ b/frontend/src/components/tasks/TaskDetails/DiffTab.tsx @@ -1,4 +1,4 @@ -import { useDiffEntries } from '@/hooks/useDiffEntries'; +import { useDiffStream } from '@/hooks/useDiffStream'; import { useMemo, useCallback, useState, useEffect } from 'react'; import { Loader } from '@/components/ui/loader'; import { Button } from '@/components/ui/button'; @@ -15,7 +15,7 @@ function DiffTab({ selectedAttempt }: DiffTabProps) { const [loading, setLoading] = useState(true); const [collapsedIds, setCollapsedIds] = useState>(new Set()); const [hasInitialized, setHasInitialized] = useState(false); - const { diffs, error } = useDiffEntries(selectedAttempt?.id ?? null, true); + const { diffs, error } = useDiffStream(selectedAttempt?.id ?? null, true); const { fileCount, added, deleted } = useDiffSummary( selectedAttempt?.id ?? null ); diff --git a/frontend/src/hooks/useDiffEntries.ts b/frontend/src/hooks/useDiffEntries.ts deleted file mode 100644 index 42db1def..00000000 --- a/frontend/src/hooks/useDiffEntries.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { useMemo } from 'react'; -import { useDiffStream } from './useDiffStream'; -import type { Diff, PatchType } from 'shared/types'; - -interface UseDiffEntriesResult { - diffs: Diff[]; - isConnected: boolean; - error: string | null; -} - -export const useDiffEntries = ( - attemptId: string | null, - enabled: boolean -): UseDiffEntriesResult => { - const { data, isConnected, error } = useDiffStream(attemptId, enabled); - - const diffs = useMemo(() => { - if (!data) return []; - return Object.values(data.entries) - .filter( - (e): e is Extract => e?.type === 'DIFF' - ) - .map((e) => e.content); - }, [data]); - - return { diffs, isConnected, error }; -}; diff --git a/frontend/src/hooks/useDiffStream.ts b/frontend/src/hooks/useDiffStream.ts index e39fa1be..bc238725 100644 --- a/frontend/src/hooks/useDiffStream.ts +++ b/frontend/src/hooks/useDiffStream.ts @@ -1,38 +1,60 @@ -import { useCallback } from 'react'; -import type { PatchType } from 'shared/types'; +import { useCallback, useMemo } from 'react'; +import type { Diff, PatchType } from 'shared/types'; import { useJsonPatchWsStream } from './useJsonPatchWsStream'; -interface DiffState { - entries: Record; +interface DiffEntries { + [filePath: string]: PatchType; +} + +type DiffStreamEvent = { + entries: DiffEntries; +}; + +export interface UseDiffStreamOptions { + statsOnly?: boolean; } interface UseDiffStreamResult { - data: DiffState | undefined; - isConnected: boolean; + diffs: Diff[]; error: string | null; } export const useDiffStream = ( attemptId: string | null, - enabled: boolean + enabled: boolean, + options?: UseDiffStreamOptions ): UseDiffStreamResult => { - const endpoint = attemptId - ? `/api/task-attempts/${attemptId}/diff/ws` - : undefined; + const endpoint = (() => { + if (!attemptId) return undefined; + const query = `/api/task-attempts/${attemptId}/diff/ws`; + if (typeof options?.statsOnly === 'boolean') { + const params = new URLSearchParams(); + params.set('stats_only', String(options.statsOnly)); + return `${query}?${params.toString()}`; + } else { + return query; + } + })(); const initialData = useCallback( - (): DiffState => ({ + (): DiffStreamEvent => ({ entries: {}, }), [] ); - const { data, isConnected, error } = useJsonPatchWsStream( + const { data, error } = useJsonPatchWsStream( endpoint, enabled && !!attemptId, initialData // No need for injectInitialEntry or deduplicatePatches for diffs ); - return { data, isConnected, error }; + const diffs = useMemo(() => { + return Object.values(data?.entries ?? {}) + .filter((entry) => entry?.type === 'DIFF') + .map((entry) => entry.content); + }, [data?.entries]); + + return { diffs, error }; }; diff --git a/frontend/src/hooks/useDiffSummary.ts b/frontend/src/hooks/useDiffSummary.ts index 41f90429..992e5002 100644 --- a/frontend/src/hooks/useDiffSummary.ts +++ b/frontend/src/hooks/useDiffSummary.ts @@ -1,10 +1,10 @@ -import { useDiffEntries } from '@/hooks/useDiffEntries'; -import { getHighLightLanguageFromPath } from '@/utils/extToLanguage'; -import { generateDiffFile } from '@git-diff-view/file'; +import { useDiffStream } from '@/hooks/useDiffStream'; import { useMemo } from 'react'; export function useDiffSummary(attemptId: string | null) { - const { diffs, error, isConnected } = useDiffEntries(attemptId, true); + const { diffs, error } = useDiffStream(attemptId, true, { + statsOnly: true, + }); const { fileCount, added, deleted } = useMemo(() => { if (!attemptId || diffs.length === 0) { @@ -13,38 +13,13 @@ export function useDiffSummary(attemptId: string | null) { return diffs.reduce( (acc, d) => { - try { - if (d.contentOmitted) { - acc.added += d.additions ?? 0; - acc.deleted += d.deletions ?? 0; - return acc; - } - const oldName = d.oldPath || d.newPath || 'old'; - const newName = d.newPath || d.oldPath || 'new'; - const oldContent = d.oldContent || ''; - const newContent = d.newContent || ''; - const oldLang = getHighLightLanguageFromPath(oldName) || 'plaintext'; - const newLang = getHighLightLanguageFromPath(newName) || 'plaintext'; - - const file = generateDiffFile( - oldName, - oldContent, - newName, - newContent, - oldLang, - newLang - ); - file.initRaw(); - acc.added += file.additionLength ?? 0; - acc.deleted += file.deletionLength ?? 0; - } catch (e) { - console.error('Failed to compute totals for diff', e); - } + acc.added += d.additions ?? 0; + acc.deleted += d.deletions ?? 0; return acc; }, { fileCount: diffs.length, added: 0, deleted: 0 } ); }, [attemptId, diffs]); - return { fileCount, added, deleted, isConnected, error }; + return { fileCount, added, deleted, error }; }