Workspaces FE (#1733)

This commit is contained in:
Louis Knight-Webb
2026-01-08 22:14:38 +00:00
committed by GitHub
parent fe2215ba85
commit 527febdc52
291 changed files with 23770 additions and 880 deletions

View File

@@ -1,6 +1,10 @@
pub mod executor_approvals;
use std::{collections::HashMap, sync::Arc, time::Duration as StdDuration};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration as StdDuration,
};
use dashmap::DashMap;
use db::models::{
@@ -256,6 +260,26 @@ impl Approvals {
let map = self.msg_stores.read().await;
map.get(execution_process_id).cloned()
}
/// Check which execution processes have pending approvals.
/// Returns a set of execution_process_ids that have at least one pending approval.
pub fn get_pending_execution_process_ids(
&self,
execution_process_ids: &[Uuid],
) -> HashSet<Uuid> {
let id_set: HashSet<_> = execution_process_ids.iter().collect();
self.pending
.iter()
.filter_map(|entry| {
let ep_id = entry.value().execution_process_id;
if id_set.contains(&ep_id) {
Some(ep_id)
} else {
None
}
})
.collect()
}
}
pub(crate) async fn ensure_task_in_review(pool: &SqlitePool, execution_process_id: Uuid) {

View File

@@ -873,7 +873,7 @@ pub trait ContainerService {
LogMsg::Finished => {
break;
}
LogMsg::JsonPatch(_) => continue,
LogMsg::JsonPatch(_) | LogMsg::Ready => continue,
}
}
}
@@ -1050,6 +1050,8 @@ pub trait ContainerService {
)
.await?;
Workspace::set_archived(&self.db().pool, workspace.id, false).await?;
if let Some(prompt) = match executor_action.typ() {
ExecutorActionType::CodingAgentInitialRequest(coding_agent_request) => {
Some(coding_agent_request.prompt.clone())

View File

@@ -155,6 +155,9 @@ impl DiffStreamManager {
async fn run(&mut self) -> Result<(), DiffStreamError> {
self.reset_stream().await?;
// Send Ready message to indicate initial data has been sent
let _ready_error = self.tx.send(Ok(LogMsg::Ready)).await;
let (fs_debouncer, mut fs_rx, canonical_worktree) =
filesystem_watcher::async_watcher(self.args.worktree_path.clone())
.map_err(|e| io::Error::other(e.to_string()))?;

View File

@@ -77,6 +77,21 @@ impl EventService {
Ok(())
}
async fn push_workspace_update_for_session(
pool: &SqlitePool,
msg_store: Arc<MsgStore>,
session_id: Uuid,
) -> Result<(), SqlxError> {
use db::models::session::Session;
if let Some(session) = Session::find_by_id(pool, session_id).await?
&& let Some(workspace_with_status) =
Workspace::find_by_id_with_status(pool, session.workspace_id).await?
{
msg_store.push_patch(workspace_patch::replace(&workspace_with_status));
}
Ok(())
}
/// Creates the hook function that should be used with DBService::new_with_after_connect
pub fn create_hook(
msg_store: Arc<MsgStore>,
@@ -317,7 +332,21 @@ impl EventService {
return;
}
RecordTypes::Workspace(workspace) => {
// Workspaces should update the parent task with fresh data
// Emit workspace patch with status
if let Ok(Some(workspace_with_status)) =
Workspace::find_by_id_with_status(&db.pool, workspace.id)
.await
{
let patch = match hook.operation {
SqliteOperation::Insert => {
workspace_patch::add(&workspace_with_status)
}
_ => workspace_patch::replace(&workspace_with_status),
};
msg_store_for_hook.push_patch(patch);
}
// Also update parent task
if let Ok(Some(task)) =
Task::find_by_id(&db.pool, workspace.task_id).await
&& let Ok(task_list) =
@@ -331,14 +360,14 @@ impl EventService {
{
let patch = task_patch::replace(&task_with_status);
msg_store_for_hook.push_patch(patch);
return;
}
return;
}
RecordTypes::DeletedWorkspace {
task_id: Some(task_id),
..
} => {
// Workspace deletion should update the parent task with fresh data
// Update parent task
if let Ok(Some(task)) =
Task::find_by_id(&db.pool, *task_id).await
&& let Ok(task_list) =
@@ -352,8 +381,8 @@ impl EventService {
{
let patch = task_patch::replace(&task_with_status);
msg_store_for_hook.push_patch(patch);
return;
}
return;
}
RecordTypes::ExecutionProcess(process) => {
let patch = match hook.operation {
@@ -380,6 +409,19 @@ impl EventService {
);
}
if let Err(err) = EventService::push_workspace_update_for_session(
&db.pool,
msg_store_for_hook.clone(),
process.session_id,
)
.await
{
tracing::error!(
"Failed to push workspace update after execution process change: {:?}",
err
);
}
return;
}
RecordTypes::DeletedExecutionProcess {
@@ -390,8 +432,8 @@ impl EventService {
let patch = execution_process_patch::remove(*process_id);
msg_store_for_hook.push_patch(patch);
if let Some(session_id) = session_id
&& let Err(err) =
if let Some(session_id) = session_id {
if let Err(err) =
EventService::push_task_update_for_session(
&db.pool,
msg_store_for_hook.clone(),
@@ -405,6 +447,21 @@ impl EventService {
);
}
if let Err(err) =
EventService::push_workspace_update_for_session(
&db.pool,
msg_store_for_hook.clone(),
*session_id,
)
.await
{
tracing::error!(
"Failed to push workspace update after execution process removal: {:?}",
err
);
}
}
return;
}
_ => {}

View File

@@ -1,6 +1,6 @@
use db::models::{
execution_process::ExecutionProcess, project::Project, scratch::Scratch,
task::TaskWithAttemptStatus, workspace::Workspace,
task::TaskWithAttemptStatus, workspace::WorkspaceWithStatus,
};
use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation};
use uuid::Uuid;
@@ -143,8 +143,7 @@ pub mod workspace_patch {
)
}
/// Create patch for adding a new workspace
pub fn add(workspace: &Workspace) -> Patch {
pub fn add(workspace: &WorkspaceWithStatus) -> Patch {
Patch(vec![PatchOperation::Add(AddOperation {
path: workspace_path(workspace.id)
.try_into()
@@ -154,8 +153,7 @@ pub mod workspace_patch {
})])
}
/// Create patch for updating an existing workspace
pub fn replace(workspace: &Workspace) -> Patch {
pub fn replace(workspace: &WorkspaceWithStatus) -> Patch {
Patch(vec![PatchOperation::Replace(ReplaceOperation {
path: workspace_path(workspace.id)
.try_into()
@@ -165,7 +163,6 @@ pub mod workspace_patch {
})])
}
/// Create patch for removing a workspace
pub fn remove(workspace_id: Uuid) -> Patch {
Patch(vec![PatchOperation::Remove(RemoveOperation {
path: workspace_path(workspace_id)

View File

@@ -2,8 +2,8 @@ use db::models::{
execution_process::ExecutionProcess,
project::Project,
scratch::Scratch,
session::Session,
task::{Task, TaskWithAttemptStatus},
workspace::Workspace,
};
use futures::StreamExt;
use serde_json::json;
@@ -140,8 +140,8 @@ impl EventService {
}
});
// Start with initial snapshot, then live updates
let initial_stream = futures::stream::once(async move { Ok(initial_msg) });
// Start with initial snapshot, Ready signal, then live updates
let initial_stream = futures::stream::iter(vec![Ok(initial_msg), Ok(LogMsg::Ready)]);
let combined_stream = initial_stream.chain(filtered_stream).boxed();
Ok(combined_stream)
@@ -219,35 +219,24 @@ impl EventService {
}
});
// Start with initial snapshot, then live updates
let initial_stream = futures::stream::once(async move { Ok(initial_msg) });
// Start with initial snapshot, Ready signal, then live updates
let initial_stream = futures::stream::iter(vec![Ok(initial_msg), Ok(LogMsg::Ready)]);
let combined_stream = initial_stream.chain(filtered_stream).boxed();
Ok(combined_stream)
}
/// Stream execution processes for a specific workspace with initial snapshot (raw LogMsg format for WebSocket)
pub async fn stream_execution_processes_for_workspace_raw(
/// Stream execution processes for a specific session with initial snapshot (raw LogMsg format for WebSocket)
pub async fn stream_execution_processes_for_session_raw(
&self,
workspace_id: Uuid,
session_id: Uuid,
show_soft_deleted: bool,
) -> Result<futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>, EventError>
{
// Get all sessions for this workspace
let sessions = Session::find_by_workspace_id(&self.db.pool, workspace_id).await?;
// Collect all execution processes across all sessions
let mut all_processes = Vec::new();
for session in &sessions {
let processes =
ExecutionProcess::find_by_session_id(&self.db.pool, session.id, show_soft_deleted)
.await?;
all_processes.extend(processes);
}
let processes = all_processes;
// Collect session IDs for filtering
let session_ids: Vec<Uuid> = sessions.iter().map(|s| s.id).collect();
// Get execution processes for this session
let processes =
ExecutionProcess::find_by_session_id(&self.db.pool, session_id, show_soft_deleted)
.await?;
// Convert processes array to object keyed by process ID
let processes_map: serde_json::Map<String, serde_json::Value> = processes
@@ -270,11 +259,10 @@ impl EventService {
// Get filtered event stream
let filtered_stream =
BroadcastStream::new(self.msg_store.get_receiver()).filter_map(move |msg_result| {
let session_ids = session_ids.clone();
async move {
match msg_result {
Ok(LogMsg::JsonPatch(patch)) => {
// Filter events based on session_id (must belong to one of the workspace's sessions)
// Filter events based on session_id
if let Some(patch_op) = patch.0.first() {
// Check if this is a modern execution process patch
if patch_op.path().starts_with("/execution_processes/") {
@@ -285,7 +273,7 @@ impl EventService {
serde_json::from_value::<ExecutionProcess>(
op.value.clone(),
)
&& session_ids.contains(&process.session_id)
&& process.session_id == session_id
{
if !show_soft_deleted && process.dropped {
let remove_patch =
@@ -303,7 +291,7 @@ impl EventService {
serde_json::from_value::<ExecutionProcess>(
op.value.clone(),
)
&& session_ids.contains(&process.session_id)
&& process.session_id == session_id
{
if !show_soft_deleted && process.dropped {
let remove_patch =
@@ -330,7 +318,7 @@ impl EventService {
{
match &event_patch.value.record {
RecordTypes::ExecutionProcess(process) => {
if session_ids.contains(&process.session_id) {
if process.session_id == session_id {
if !show_soft_deleted && process.dropped {
let remove_patch =
execution_process_patch::remove(process.id);
@@ -345,7 +333,7 @@ impl EventService {
session_id: Some(deleted_session_id),
..
} => {
if session_ids.contains(deleted_session_id) {
if *deleted_session_id == session_id {
return Some(Ok(LogMsg::JsonPatch(patch)));
}
}
@@ -361,8 +349,8 @@ impl EventService {
}
});
// Start with initial snapshot, then live updates
let initial_stream = futures::stream::once(async move { Ok(initial_msg) });
// Start with initial snapshot, Ready signal, then live updates
let initial_stream = futures::stream::iter(vec![Ok(initial_msg), Ok(LogMsg::Ready)]);
let combined_stream = initial_stream.chain(filtered_stream).boxed();
Ok(combined_stream)
@@ -441,8 +429,97 @@ impl EventService {
}
});
let initial_stream = futures::stream::once(async move { Ok(initial_msg) });
let initial_stream = futures::stream::iter(vec![Ok(initial_msg), Ok(LogMsg::Ready)]);
let combined_stream = initial_stream.chain(filtered_stream).boxed();
Ok(combined_stream)
}
pub async fn stream_workspaces_raw(
&self,
archived: Option<bool>,
limit: Option<i64>,
) -> Result<futures::stream::BoxStream<'static, Result<LogMsg, std::io::Error>>, EventError>
{
let workspaces = Workspace::find_all_with_status(&self.db.pool, archived, limit).await?;
let workspaces_map: serde_json::Map<String, serde_json::Value> = workspaces
.into_iter()
.map(|ws| (ws.id.to_string(), serde_json::to_value(ws).unwrap()))
.collect();
let initial_patch = json!([{
"op": "replace",
"path": "/workspaces",
"value": workspaces_map
}]);
let initial_msg = LogMsg::JsonPatch(serde_json::from_value(initial_patch).unwrap());
let filtered_stream = BroadcastStream::new(self.msg_store.get_receiver()).filter_map(
move |msg_result| async move {
match msg_result {
Ok(LogMsg::JsonPatch(patch)) => {
if let Some(op) = patch.0.first()
&& op.path().starts_with("/workspaces")
{
// If archived filter is set, handle state transitions
if let Some(archived_filter) = archived {
// Extract workspace data from Add/Replace operations
let value = match op {
json_patch::PatchOperation::Add(a) => Some(&a.value),
json_patch::PatchOperation::Replace(r) => Some(&r.value),
json_patch::PatchOperation::Remove(_) => {
// Allow remove operations through - client will handle
return Some(Ok(LogMsg::JsonPatch(patch)));
}
_ => None,
};
if let Some(v) = value
&& let Some(ws_archived) =
v.get("archived").and_then(|a| a.as_bool())
{
if ws_archived == archived_filter {
// Workspace matches this filter
// Convert Replace to Add since workspace may be new to this filtered stream
if let json_patch::PatchOperation::Replace(r) = op {
let add_patch = json_patch::Patch(vec![
json_patch::PatchOperation::Add(
json_patch::AddOperation {
path: r.path.clone(),
value: r.value.clone(),
},
),
]);
return Some(Ok(LogMsg::JsonPatch(add_patch)));
}
return Some(Ok(LogMsg::JsonPatch(patch)));
} else {
// Workspace no longer matches this filter - send remove
let remove_patch = json_patch::Patch(vec![
json_patch::PatchOperation::Remove(
json_patch::RemoveOperation {
path: op
.path()
.to_string()
.try_into()
.expect("Workspace path should be valid"),
},
),
]);
return Some(Ok(LogMsg::JsonPatch(remove_patch)));
}
}
}
return Some(Ok(LogMsg::JsonPatch(patch)));
}
None
}
Ok(other) => Some(Ok(other)),
Err(_) => None,
}
},
);
let initial_stream = futures::stream::iter(vec![Ok(initial_msg), Ok(LogMsg::Ready)]);
Ok(initial_stream.chain(filtered_stream).boxed())
}
}

View File

@@ -92,7 +92,7 @@ impl FileRanker {
}
/// Calculate relevance score for a search result
fn calculate_score(&self, result: &SearchResult, stats: &FileStats) -> i64 {
pub fn calculate_score(&self, result: &SearchResult, stats: &FileStats) -> i64 {
let base_score = match result.match_type {
SearchMatchType::FileName => BASE_MATCH_SCORE_FILENAME,
SearchMatchType::DirectoryName => BASE_MATCH_SCORE_DIRNAME,

View File

@@ -258,6 +258,7 @@ impl FileSearchCache {
path: indexed_file.path.clone(),
is_file: indexed_file.is_file,
match_type: indexed_file.match_type.clone(),
score: 0,
});
}
}
@@ -265,6 +266,11 @@ impl FileSearchCache {
// Apply git history-based ranking
self.file_ranker.rerank(&mut results, &cached.stats);
// Populate scores for sorted results
for result in &mut results {
result.score = self.file_ranker.calculate_score(result, &cached.stats);
}
// Limit to top 10 results
results.truncate(10);
results

View File

@@ -180,6 +180,7 @@ impl FilesystemService {
None => return Ok(vec![]),
};
let skip_dirs = Self::get_directories_to_skip();
let vibe_kanban_temp_dir = utils::path::get_vibe_kanban_temp_dir();
let mut walker_builder = WalkBuilder::new(base_dir);
walker_builder
.follow_links(false)
@@ -200,6 +201,14 @@ impl FilesystemService {
return false;
}
// Skip vibe-kanban temp directory and all subdirectories
// Normalize to handle macOS /private/var vs /var aliasing
if utils::path::normalize_macos_private_alias(path)
.starts_with(&vibe_kanban_temp_dir)
{
return false;
}
// Skip common non-git folders
if let Some(name) = path.file_name().and_then(|n| n.to_str())
&& skip_dirs.contains(name)

View File

@@ -330,6 +330,7 @@ impl ProjectService {
path: format!("{}/{}", repo_name, r.path),
is_file: r.is_file,
match_type: r.match_type.clone(),
score: r.score,
})
})
.collect();
@@ -342,7 +343,7 @@ impl ProjectService {
};
priority(&a.match_type)
.cmp(&priority(&b.match_type))
.then_with(|| a.path.cmp(&b.path))
.then_with(|| b.score.cmp(&a.score)) // Higher scores first
});
all_results.truncate(10);
@@ -438,6 +439,7 @@ impl ProjectService {
path: relative_path.to_string_lossy().to_string(),
is_file: path.is_file(),
match_type: SearchMatchType::FileName,
score: 0,
});
} else if relative_path_str.contains(&query_lower) {
let match_type = if path
@@ -456,6 +458,7 @@ impl ProjectService {
path: relative_path.to_string_lossy().to_string(),
is_file: path.is_file(),
match_type,
score: 0,
});
}
}
@@ -465,6 +468,10 @@ impl ProjectService {
match file_ranker.get_stats(repo_path).await {
Ok(stats) => {
file_ranker.rerank(&mut results, &stats);
// Populate scores for sorted results
for result in &mut results {
result.score = file_ranker.calculate_score(result, &stats);
}
}
Err(_) => {
// Fallback to basic priority sorting