Make diff stats much faster (#866)

This commit is contained in:
Solomon
2025-09-29 16:16:34 +01:00
committed by GitHub
parent 3a9e5533c9
commit 6f2d6d4e40
8 changed files with 125 additions and 147 deletions

View File

@@ -55,7 +55,6 @@ use services::services::{
use tokio::{sync::RwLock, task::JoinHandle};
use tokio_util::io::ReaderStream;
use utils::{
diff::create_unified_diff_hunk,
log_msg::LogMsg,
msg_store::MsgStore,
text::{git_branch_id, short_uuid},
@@ -82,10 +81,15 @@ impl LocalContainerService {
// Apply stream-level omit policy based on cumulative bytes.
// If adding this diff's contents exceeds the cap, strip contents and set stats.
fn apply_stream_omit_policy(
&self,
diff: &mut utils::diff::Diff,
sent_bytes: &Arc<AtomicUsize>,
stats_only: bool,
) {
if stats_only {
Self::omit_diff_contents(diff);
return;
}
// Compute size of current diff payload
let mut size = 0usize;
if let Some(ref s) = diff.old_content {
@@ -101,35 +105,29 @@ impl LocalContainerService {
let current = sent_bytes.load(Ordering::Relaxed);
if current.saturating_add(size) > Self::MAX_CUMULATIVE_DIFF_BYTES {
// We will omit content for this diff. If we still have both sides loaded
// (i.e., not already omitted by file-size guards), compute stats for UI.
if diff.additions.is_none() && diff.deletions.is_none() {
let old = diff.old_content.as_deref().unwrap_or("");
let new = diff.new_content.as_deref().unwrap_or("");
let hunk = create_unified_diff_hunk(old, new);
let mut add = 0usize;
let mut del = 0usize;
for line in hunk.lines() {
if let Some(first) = line.chars().next() {
if first == '+' {
add += 1;
} else if first == '-' {
del += 1;
}
}
}
diff.additions = Some(add);
diff.deletions = Some(del);
}
diff.old_content = None;
diff.new_content = None;
diff.content_omitted = true;
Self::omit_diff_contents(diff);
} else {
// safe to include; account for it
let _ = sent_bytes.fetch_add(size, Ordering::Relaxed);
}
}
fn omit_diff_contents(diff: &mut utils::diff::Diff) {
if diff.additions.is_none()
&& diff.deletions.is_none()
&& (diff.old_content.is_some() || diff.new_content.is_some())
{
let old = diff.old_content.as_deref().unwrap_or("");
let new = diff.new_content.as_deref().unwrap_or("");
let (add, del) = utils::diff::compute_line_change_counts(old, new);
diff.additions = Some(add);
diff.deletions = Some(del);
}
diff.old_content = None;
diff.new_content = None;
diff.content_omitted = true;
}
pub fn new(
db: DBService,
msg_stores: Arc<RwLock<HashMap<Uuid, Arc<MsgStore>>>>,
@@ -627,6 +625,7 @@ impl LocalContainerService {
&self,
project_repo_path: &Path,
merge_commit_id: &str,
stats_only: bool,
) -> Result<futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>, ContainerError>
{
let diffs = self.git().get_diffs(
@@ -641,7 +640,7 @@ impl LocalContainerService {
let diffs: Vec<_> = diffs
.into_iter()
.map(|mut d| {
self.apply_stream_omit_policy(&mut d, &cum);
Self::apply_stream_omit_policy(&mut d, &cum, stats_only);
d
})
.collect();
@@ -665,6 +664,7 @@ impl LocalContainerService {
&self,
worktree_path: &Path,
base_commit: &Commit,
stats_only: bool,
) -> Result<futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>, ContainerError>
{
// Get initial snapshot
@@ -682,7 +682,7 @@ impl LocalContainerService {
let initial_diffs: Vec<_> = initial_diffs
.into_iter()
.map(|mut d| {
self.apply_stream_omit_policy(&mut d, &cumulative);
Self::apply_stream_omit_policy(&mut d, &cumulative, stats_only);
d
})
.collect();
@@ -738,6 +738,7 @@ impl LocalContainerService {
&changed_paths,
&cumulative,
&full_sent,
stats_only,
).map_err(|e| {
tracing::error!("Error processing file changes: {}", e);
io::Error::other(e.to_string())
@@ -790,6 +791,7 @@ impl LocalContainerService {
changed_paths: &[String],
cumulative_bytes: &Arc<AtomicUsize>,
full_sent_paths: &Arc<std::sync::RwLock<HashSet<String>>>,
stats_only: bool,
) -> Result<Vec<LogMsg>, ContainerError> {
let path_filter: Vec<&str> = changed_paths.iter().map(|s| s.as_str()).collect();
@@ -809,47 +811,7 @@ impl LocalContainerService {
let file_path = GitService::diff_path(&diff);
files_with_diffs.insert(file_path.clone());
// Apply stream-level omit policy (affects contents and stats)
// Note: we can't call self methods from static fn; implement inline
{
// Compute size
let mut size = 0usize;
if let Some(ref s) = diff.old_content {
size += s.len();
}
if let Some(ref s) = diff.new_content {
size += s.len();
}
if size > 0 {
let current = cumulative_bytes.load(Ordering::Relaxed);
if current.saturating_add(size)
> LocalContainerService::MAX_CUMULATIVE_DIFF_BYTES
{
if diff.additions.is_none() && diff.deletions.is_none() {
let old = diff.old_content.as_deref().unwrap_or("");
let new = diff.new_content.as_deref().unwrap_or("");
let hunk = create_unified_diff_hunk(old, new);
let mut add = 0usize;
let mut del = 0usize;
for line in hunk.lines() {
if let Some(first) = line.chars().next() {
if first == '+' {
add += 1;
} else if first == '-' {
del += 1;
}
}
}
diff.additions = Some(add);
diff.deletions = Some(del);
}
diff.old_content = None;
diff.new_content = None;
diff.content_omitted = true;
} else {
let _ = cumulative_bytes.fetch_add(size, Ordering::Relaxed);
}
}
}
Self::apply_stream_omit_policy(&mut diff, cumulative_bytes, stats_only);
if diff.content_omitted {
if full_sent_paths.read().unwrap().contains(&file_path) {
@@ -1149,6 +1111,7 @@ impl ContainerService for LocalContainerService {
async fn stream_diff(
&self,
task_attempt: &TaskAttempt,
stats_only: bool,
) -> Result<futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>, ContainerError>
{
let project_repo_path = self.get_project_repo_path(task_attempt).await?;
@@ -1177,7 +1140,7 @@ impl ContainerService for LocalContainerService {
&& self.is_container_clean(task_attempt).await?
&& !is_ahead
{
return self.create_merged_diff_stream(&project_repo_path, &commit);
return self.create_merged_diff_stream(&project_repo_path, &commit, stats_only);
}
let container_ref = self.ensure_container_exists(task_attempt).await?;
@@ -1188,7 +1151,7 @@ impl ContainerService for LocalContainerService {
&task_attempt.base_branch,
)?;
self.create_live_diff_stream(&worktree_path, &base_commit)
self.create_live_diff_stream(&worktree_path, &base_commit, stats_only)
.await
}

View File

@@ -99,6 +99,12 @@ pub struct TaskAttemptQuery {
pub task_id: Option<Uuid>,
}
#[derive(Debug, Deserialize)]
pub struct DiffStreamQuery {
#[serde(default)]
pub stats_only: bool,
}
pub async fn get_task_attempts(
State(deployment): State<DeploymentImpl>,
Query(query): Query<TaskAttemptQuery>,
@@ -961,11 +967,15 @@ pub async fn replace_process(
#[axum::debug_handler]
pub async fn stream_task_attempt_diff_ws(
ws: WebSocketUpgrade,
Query(params): Query<DiffStreamQuery>,
Extension(task_attempt): Extension<TaskAttempt>,
State(deployment): State<DeploymentImpl>,
) -> impl IntoResponse {
let stats_only = params.stats_only;
ws.on_upgrade(move |socket| async move {
if let Err(e) = handle_task_attempt_diff_ws(socket, deployment, task_attempt).await {
if let Err(e) =
handle_task_attempt_diff_ws(socket, deployment, task_attempt, stats_only).await
{
tracing::warn!("diff WS closed: {}", e);
}
})
@@ -975,13 +985,14 @@ async fn handle_task_attempt_diff_ws(
socket: WebSocket,
deployment: DeploymentImpl,
task_attempt: TaskAttempt,
stats_only: bool,
) -> anyhow::Result<()> {
use futures_util::{SinkExt, StreamExt, TryStreamExt};
use utils::log_msg::LogMsg;
let mut stream = deployment
.container()
.stream_diff(&task_attempt)
.stream_diff(&task_attempt, stats_only)
.await?
.map_ok(|msg: LogMsg| msg.to_ws_message_unchecked());

View File

@@ -197,6 +197,7 @@ pub trait ContainerService {
async fn stream_diff(
&self,
task_attempt: &TaskAttempt,
stats_only: bool,
) -> Result<futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>, ContainerError>;
/// Fetch the MsgStore for a given execution ID, panicking if missing.

View File

@@ -1,3 +1,5 @@
use std::borrow::Cow;
use serde::{Deserialize, Serialize};
use similar::{ChangeTag, TextDiff};
use ts_rs::TS;
@@ -89,6 +91,37 @@ pub fn create_unified_diff(file_path: &str, old: &str, new: &str) -> String {
out
}
/// Compute addition/deletion counts between two text snapshots.
pub fn compute_line_change_counts(old: &str, new: &str) -> (usize, usize) {
let old = ensure_newline(old);
let new = ensure_newline(new);
let diff = TextDiff::from_lines(&old, &new);
let mut additions = 0usize;
let mut deletions = 0usize;
for change in diff.iter_all_changes() {
match change.tag() {
ChangeTag::Insert => additions += 1,
ChangeTag::Delete => deletions += 1,
ChangeTag::Equal => {}
}
}
(additions, deletions)
}
// ensure a line ends with a newline character
fn ensure_newline(line: &str) -> Cow<'_, str> {
if line.ends_with('\n') {
Cow::Borrowed(line)
} else {
let mut owned = line.to_owned();
owned.push('\n');
Cow::Owned(owned)
}
}
/// Extracts unified diff hunks from a string containing a full unified diff.
/// Tolerates non-diff lines and missing `@@`` hunk headers.
pub fn extract_unified_diff_hunks(unified_diff: &str) -> Vec<String> {