Reduce diff induced UI craches (#794)

This commit is contained in:
Solomon
2025-09-20 12:26:08 +01:00
committed by GitHub
parent 18c88324fc
commit 6e4e6f92ce
6 changed files with 336 additions and 65 deletions

View File

@@ -2,7 +2,10 @@ use std::{
collections::{HashMap, HashSet},
io,
path::{Path, PathBuf},
sync::Arc,
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};
@@ -50,6 +53,7 @@ use services::services::{
use tokio::{sync::RwLock, task::JoinHandle};
use tokio_util::io::ReaderStream;
use utils::{
diff::create_unified_diff_hunk,
log_msg::LogMsg,
msg_store::MsgStore,
text::{git_branch_id, short_uuid},
@@ -70,6 +74,60 @@ pub struct LocalContainerService {
}
impl LocalContainerService {
// Max cumulative content bytes allowed per diff stream
const MAX_CUMULATIVE_DIFF_BYTES: usize = 50 * 1024; // 50KB
// Apply stream-level omit policy based on cumulative bytes.
// If adding this diff's contents exceeds the cap, strip contents and set stats.
fn apply_stream_omit_policy(
&self,
diff: &mut utils::diff::Diff,
sent_bytes: &Arc<AtomicUsize>,
) {
// Compute size of current diff payload
let mut size = 0usize;
if let Some(ref s) = diff.old_content {
size += s.len();
}
if let Some(ref s) = diff.new_content {
size += s.len();
}
if size == 0 {
return; // nothing to account
}
let current = sent_bytes.load(Ordering::Relaxed);
if current.saturating_add(size) > Self::MAX_CUMULATIVE_DIFF_BYTES {
// We will omit content for this diff. If we still have both sides loaded
// (i.e., not already omitted by file-size guards), compute stats for UI.
if diff.additions.is_none() && diff.deletions.is_none() {
let old = diff.old_content.as_deref().unwrap_or("");
let new = diff.new_content.as_deref().unwrap_or("");
let hunk = create_unified_diff_hunk(old, new);
let mut add = 0usize;
let mut del = 0usize;
for line in hunk.lines() {
if let Some(first) = line.chars().next() {
if first == '+' {
add += 1;
} else if first == '-' {
del += 1;
}
}
}
diff.additions = Some(add);
diff.deletions = Some(del);
}
diff.old_content = None;
diff.new_content = None;
diff.content_omitted = true;
} else {
// safe to include; account for it
let _ = sent_bytes.fetch_add(size, Ordering::Relaxed);
}
}
pub fn new(
db: DBService,
msg_stores: Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>,
@@ -555,6 +613,15 @@ impl LocalContainerService {
None,
)?;
let cum = Arc::new(AtomicUsize::new(0));
let diffs: Vec<_> = diffs
.into_iter()
.map(|mut d| {
self.apply_stream_omit_policy(&mut d, &cum);
d
})
.collect();
let stream = futures::stream::iter(diffs.into_iter().map(|diff| {
let entry_index = GitService::diff_path(&diff);
let patch =
@@ -589,6 +656,29 @@ impl LocalContainerService {
None,
)?;
// cumulative counter for entire stream
let cumulative = Arc::new(AtomicUsize::new(0));
// track which file paths have been emitted with full content already
let full_sent = Arc::new(std::sync::RwLock::new(HashSet::<String>::new()));
let initial_diffs: Vec<_> = initial_diffs
.into_iter()
.map(|mut d| {
self.apply_stream_omit_policy(&mut d, &cumulative);
d
})
.collect();
// Record which paths were sent with full content
{
let mut guard = full_sent.write().unwrap();
for d in &initial_diffs {
if !d.content_omitted {
let p = GitService::diff_path(d);
guard.insert(p);
}
}
}
let initial_stream = futures::stream::iter(initial_diffs.into_iter().map(|diff| {
let entry_index = GitService::diff_path(&diff);
let patch =
@@ -606,6 +696,8 @@ impl LocalContainerService {
let live_stream = {
let git_service = git_service.clone();
let worktree_path_for_spawn = worktree_path.clone();
let cumulative = Arc::clone(&cumulative);
let full_sent = Arc::clone(&full_sent);
try_stream! {
// Move the expensive watcher setup to blocking thread to avoid blocking the async runtime
let watcher_result = tokio::task::spawn_blocking(move || {
@@ -629,6 +721,8 @@ impl LocalContainerService {
&task_branch,
&base_branch,
&changed_paths,
&cumulative,
&full_sent,
).map_err(|e| {
tracing::error!("Error processing file changes: {}", e);
io::Error::other(e.to_string())
@@ -650,7 +744,9 @@ impl LocalContainerService {
}
}.boxed();
let combined_stream = select(initial_stream, live_stream);
// Ensure all initial diffs are emitted before live updates, to avoid
// earlier files being abbreviated due to interleaving ordering.
let combined_stream = initial_stream.chain(live_stream);
Ok(combined_stream.boxed())
}
@@ -680,6 +776,8 @@ impl LocalContainerService {
task_branch: &str,
base_branch: &str,
changed_paths: &[String],
cumulative_bytes: &Arc<AtomicUsize>,
full_sent_paths: &Arc<std::sync::RwLock<HashSet<String>>>,
) -> Result<Vec<Event>, ContainerError> {
let path_filter: Vec<&str> = changed_paths.iter().map(|s| s.as_str()).collect();
@@ -696,9 +794,66 @@ impl LocalContainerService {
let mut files_with_diffs = HashSet::new();
// Add/update files that have diffs
for diff in current_diffs {
for mut diff in current_diffs {
let file_path = GitService::diff_path(&diff);
files_with_diffs.insert(file_path.clone());
// Apply stream-level omit policy (affects contents and stats)
// Note: we can't call self methods from static fn; implement inline
{
// Compute size
let mut size = 0usize;
if let Some(ref s) = diff.old_content {
size += s.len();
}
if let Some(ref s) = diff.new_content {
size += s.len();
}
if size > 0 {
let current = cumulative_bytes.load(Ordering::Relaxed);
if current.saturating_add(size)
> LocalContainerService::MAX_CUMULATIVE_DIFF_BYTES
{
if diff.additions.is_none() && diff.deletions.is_none() {
let old = diff.old_content.as_deref().unwrap_or("");
let new = diff.new_content.as_deref().unwrap_or("");
let hunk = create_unified_diff_hunk(old, new);
let mut add = 0usize;
let mut del = 0usize;
for line in hunk.lines() {
if let Some(first) = line.chars().next() {
if first == '+' {
add += 1;
} else if first == '-' {
del += 1;
}
}
}
diff.additions = Some(add);
diff.deletions = Some(del);
}
diff.old_content = None;
diff.new_content = None;
diff.content_omitted = true;
} else {
let _ = cumulative_bytes.fetch_add(size, Ordering::Relaxed);
}
}
}
// If this diff would be omitted and we already sent a full-content
// version of this path earlier in the stream, skip sending a
// degrading replacement.
if diff.content_omitted {
if full_sent_paths.read().unwrap().contains(&file_path) {
continue;
}
} else {
// Track that we have sent a full-content version
{
let mut guard = full_sent_paths.write().unwrap();
guard.insert(file_path.clone());
}
}
let patch = ConversationPatch::add_diff(escape_json_pointer_segment(&file_path), diff);
let event = LogMsg::JsonPatch(patch).to_sse_event();
@@ -1108,7 +1263,7 @@ impl ContainerService for LocalContainerService {
&& !parent.exists()
{
std::fs::create_dir_all(parent).map_err(|e| {
ContainerError::Other(anyhow!("Failed to create directory {:?}: {}", parent, e))
ContainerError::Other(anyhow!("Failed to create directory {parent:?}: {e}"))
})?;
}
@@ -1116,17 +1271,13 @@ impl ContainerService for LocalContainerService {
if source_file.exists() {
std::fs::copy(&source_file, &target_file).map_err(|e| {
ContainerError::Other(anyhow!(
"Failed to copy file {:?} to {:?}: {}",
source_file,
target_file,
e
"Failed to copy file {source_file:?} to {target_file:?}: {e}"
))
})?;
tracing::info!("Copied file {:?} to worktree", file_path);
} else {
return Err(ContainerError::Other(anyhow!(
"File {:?} does not exist in the project directory",
source_file
"File {source_file:?} does not exist in the project directory"
)));
}
}