Implement streaming for project tasks (#608)
* Stream tasks and execution processes (vibe-kanban cd4106c5) Building on the unused /events endpoint, can we please add a /stream variant to the following endpoints: /tasks?project_id=... /execution_processes?task_attempt_id=... The endpoint should return an initial document containing all the entities given the filter, and then subsequent patches to keep the document up to date. Refactor the codebase however you see fit to give us the most maintainable code going forwards. crates/server/src/routes/tasks.rs crates/server/src/routes/execution_processes.rs crates/server/src/routes/events.rs * Issues with streaming tasks (vibe-kanban e1779942) crates/services/src/services/events.rs crates/server/src/routes/tasks.rs We should modify the stream of tasks (filtered by project) to be an object where each task is a key. This will make it much easier to produce stream diffs * Issues with streaming tasks (vibe-kanban e1779942) crates/services/src/services/events.rs crates/server/src/routes/tasks.rs We should modify the stream of tasks (filtered by project) to be an object where each task is a key. This will make it much easier to produce stream diffs * Refactor project tasks (vibe-kanban 20b19eb8) Project tasks needs to be refactored: - Doesn't follow new pattern of separating network logic into hooks - Has legacy fixed time poll for refetching tasks, but there is now a tasks/stream endpoint * revert changes to execution processes
This commit is contained in:
committed by
GitHub
parent
5ca32b50de
commit
af63563e17
@@ -1,9 +1,9 @@
|
||||
use axum::{
|
||||
extract::{Query, State},
|
||||
middleware::from_fn_with_state,
|
||||
response::Json as ResponseJson,
|
||||
response::{sse::KeepAlive, Json as ResponseJson, Sse},
|
||||
routing::{get, post},
|
||||
Extension, Json, Router,
|
||||
BoxError, Extension, Json, Router,
|
||||
};
|
||||
use db::models::{
|
||||
image::TaskImage,
|
||||
@@ -12,8 +12,9 @@ use db::models::{
|
||||
task_attempt::{CreateTaskAttempt, TaskAttempt},
|
||||
};
|
||||
use deployment::Deployment;
|
||||
use futures_util::TryStreamExt;
|
||||
use serde::Deserialize;
|
||||
use services::services::container::ContainerService;
|
||||
use services::services::{container::ContainerService, events::task_patch};
|
||||
use sqlx::Error as SqlxError;
|
||||
use utils::response::ApiResponse;
|
||||
use uuid::Uuid;
|
||||
@@ -36,6 +37,22 @@ pub async fn get_tasks(
|
||||
Ok(ResponseJson(ApiResponse::success(tasks)))
|
||||
}
|
||||
|
||||
pub async fn stream_tasks(
|
||||
State(deployment): State<DeploymentImpl>,
|
||||
Query(query): Query<TaskQuery>,
|
||||
) -> Result<
|
||||
Sse<impl futures_util::Stream<Item = Result<axum::response::sse::Event, BoxError>>>,
|
||||
axum::http::StatusCode,
|
||||
> {
|
||||
let stream = deployment
|
||||
.events()
|
||||
.stream_tasks_for_project(query.project_id)
|
||||
.await
|
||||
.map_err(|_| axum::http::StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
Ok(Sse::new(stream.map_err(|e| -> BoxError { e.into() })).keep_alive(KeepAlive::default()))
|
||||
}
|
||||
|
||||
pub async fn get_task(
|
||||
Extension(task): Extension<Task>,
|
||||
State(_deployment): State<DeploymentImpl>,
|
||||
@@ -213,6 +230,9 @@ pub async fn delete_task(
|
||||
if rows_affected == 0 {
|
||||
Err(ApiError::Database(SqlxError::RowNotFound))
|
||||
} else {
|
||||
// Emit remove patch so SSE task streams update immediately
|
||||
let patch = task_patch::remove(task.id);
|
||||
deployment.events().msg_store().push_patch(patch);
|
||||
Ok(ResponseJson(ApiResponse::success(())))
|
||||
}
|
||||
}
|
||||
@@ -224,6 +244,7 @@ pub fn router(deployment: &DeploymentImpl) -> Router<DeploymentImpl> {
|
||||
|
||||
let inner = Router::new()
|
||||
.route("/", get(get_tasks).post(create_task))
|
||||
.route("/stream", get(stream_tasks))
|
||||
.route("/create-and-start", post(create_task_and_start))
|
||||
.nest("/{task_id}", task_id_router);
|
||||
|
||||
|
||||
@@ -1,18 +1,27 @@
|
||||
use std::{str::FromStr, sync::Arc};
|
||||
|
||||
use anyhow::Error as AnyhowError;
|
||||
use axum::response::sse::Event;
|
||||
use db::{
|
||||
DBService,
|
||||
models::{execution_process::ExecutionProcess, task::Task, task_attempt::TaskAttempt},
|
||||
models::{
|
||||
execution_process::ExecutionProcess,
|
||||
task::{Task, TaskWithAttemptStatus},
|
||||
task_attempt::TaskAttempt,
|
||||
},
|
||||
};
|
||||
use serde::Serialize;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use json_patch::{AddOperation, Patch, PatchOperation, RemoveOperation, ReplaceOperation};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use sqlx::{Error as SqlxError, sqlite::SqliteOperation};
|
||||
use strum_macros::{Display, EnumString};
|
||||
use thiserror::Error;
|
||||
use tokio::sync::RwLock;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
use ts_rs::TS;
|
||||
use utils::msg_store::MsgStore;
|
||||
use utils::{log_msg::LogMsg, msg_store::MsgStore};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum EventError {
|
||||
@@ -24,6 +33,50 @@ pub enum EventError {
|
||||
Other(#[from] AnyhowError), // Catches any unclassified errors
|
||||
}
|
||||
|
||||
/// Helper functions for creating task-specific patches
|
||||
pub mod task_patch {
|
||||
use super::*;
|
||||
|
||||
/// Escape JSON Pointer special characters
|
||||
fn escape_pointer_segment(s: &str) -> String {
|
||||
s.replace('~', "~0").replace('/', "~1")
|
||||
}
|
||||
|
||||
/// Create path for task operation
|
||||
fn task_path(task_id: Uuid) -> String {
|
||||
format!("/tasks/{}", escape_pointer_segment(&task_id.to_string()))
|
||||
}
|
||||
|
||||
/// Create patch for adding a new task
|
||||
pub fn add(task: &TaskWithAttemptStatus) -> Patch {
|
||||
Patch(vec![PatchOperation::Add(AddOperation {
|
||||
path: task_path(task.id)
|
||||
.try_into()
|
||||
.expect("Task path should be valid"),
|
||||
value: serde_json::to_value(task).expect("Task serialization should not fail"),
|
||||
})])
|
||||
}
|
||||
|
||||
/// Create patch for updating an existing task
|
||||
pub fn replace(task: &TaskWithAttemptStatus) -> Patch {
|
||||
Patch(vec![PatchOperation::Replace(ReplaceOperation {
|
||||
path: task_path(task.id)
|
||||
.try_into()
|
||||
.expect("Task path should be valid"),
|
||||
value: serde_json::to_value(task).expect("Task serialization should not fail"),
|
||||
})])
|
||||
}
|
||||
|
||||
/// Create patch for removing a task
|
||||
pub fn remove(task_id: Uuid) -> Patch {
|
||||
Patch(vec![PatchOperation::Remove(RemoveOperation {
|
||||
path: task_path(task_id)
|
||||
.try_into()
|
||||
.expect("Task path should be valid"),
|
||||
})])
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct EventService {
|
||||
msg_store: Arc<MsgStore>,
|
||||
@@ -41,24 +94,34 @@ enum HookTables {
|
||||
ExecutionProcesses,
|
||||
}
|
||||
|
||||
#[derive(Serialize, TS)]
|
||||
#[derive(Serialize, Deserialize, TS)]
|
||||
#[serde(tag = "type", content = "data", rename_all = "SCREAMING_SNAKE_CASE")]
|
||||
pub enum RecordTypes {
|
||||
Task(Task),
|
||||
TaskAttempt(TaskAttempt),
|
||||
ExecutionProcess(ExecutionProcess),
|
||||
DeletedTask { rowid: i64 },
|
||||
DeletedTaskAttempt { rowid: i64 },
|
||||
DeletedExecutionProcess { rowid: i64 },
|
||||
DeletedTask {
|
||||
rowid: i64,
|
||||
project_id: Option<Uuid>,
|
||||
task_id: Option<Uuid>,
|
||||
},
|
||||
DeletedTaskAttempt {
|
||||
rowid: i64,
|
||||
task_id: Option<Uuid>,
|
||||
},
|
||||
DeletedExecutionProcess {
|
||||
rowid: i64,
|
||||
task_attempt_id: Option<Uuid>,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Serialize, TS)]
|
||||
#[derive(Serialize, Deserialize, TS)]
|
||||
pub struct EventPatchInner {
|
||||
db_op: String,
|
||||
record: RecordTypes,
|
||||
}
|
||||
|
||||
#[derive(Serialize, TS)]
|
||||
#[derive(Serialize, Deserialize, TS)]
|
||||
pub struct EventPatch {
|
||||
op: String,
|
||||
path: String,
|
||||
@@ -106,18 +169,45 @@ impl EventService {
|
||||
runtime_handle.spawn(async move {
|
||||
let record_type: RecordTypes = match (table, hook.operation.clone()) {
|
||||
(HookTables::Tasks, SqliteOperation::Delete) => {
|
||||
RecordTypes::DeletedTask { rowid }
|
||||
// Try to get task before deletion to capture project_id and task_id
|
||||
let task_info =
|
||||
Task::find_by_rowid(&db.pool, rowid).await.ok().flatten();
|
||||
RecordTypes::DeletedTask {
|
||||
rowid,
|
||||
project_id: task_info.as_ref().map(|t| t.project_id),
|
||||
task_id: task_info.as_ref().map(|t| t.id),
|
||||
}
|
||||
}
|
||||
(HookTables::TaskAttempts, SqliteOperation::Delete) => {
|
||||
RecordTypes::DeletedTaskAttempt { rowid }
|
||||
// Try to get task_attempt before deletion to capture task_id
|
||||
let task_id = TaskAttempt::find_by_rowid(&db.pool, rowid)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|attempt| attempt.task_id);
|
||||
RecordTypes::DeletedTaskAttempt { rowid, task_id }
|
||||
}
|
||||
(HookTables::ExecutionProcesses, SqliteOperation::Delete) => {
|
||||
RecordTypes::DeletedExecutionProcess { rowid }
|
||||
// Try to get execution_process before deletion to capture task_attempt_id
|
||||
let task_attempt_id =
|
||||
ExecutionProcess::find_by_rowid(&db.pool, rowid)
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
.map(|process| process.task_attempt_id);
|
||||
RecordTypes::DeletedExecutionProcess {
|
||||
rowid,
|
||||
task_attempt_id,
|
||||
}
|
||||
}
|
||||
(HookTables::Tasks, _) => {
|
||||
match Task::find_by_rowid(&db.pool, rowid).await {
|
||||
Ok(Some(task)) => RecordTypes::Task(task),
|
||||
Ok(None) => RecordTypes::DeletedTask { rowid },
|
||||
Ok(None) => RecordTypes::DeletedTask {
|
||||
rowid,
|
||||
project_id: None,
|
||||
task_id: None,
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to fetch task: {:?}", e);
|
||||
return;
|
||||
@@ -127,7 +217,10 @@ impl EventService {
|
||||
(HookTables::TaskAttempts, _) => {
|
||||
match TaskAttempt::find_by_rowid(&db.pool, rowid).await {
|
||||
Ok(Some(attempt)) => RecordTypes::TaskAttempt(attempt),
|
||||
Ok(None) => RecordTypes::DeletedTaskAttempt { rowid },
|
||||
Ok(None) => RecordTypes::DeletedTaskAttempt {
|
||||
rowid,
|
||||
task_id: None,
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to fetch task_attempt: {:?}",
|
||||
@@ -140,7 +233,10 @@ impl EventService {
|
||||
(HookTables::ExecutionProcesses, _) => {
|
||||
match ExecutionProcess::find_by_rowid(&db.pool, rowid).await {
|
||||
Ok(Some(process)) => RecordTypes::ExecutionProcess(process),
|
||||
Ok(None) => RecordTypes::DeletedExecutionProcess { rowid },
|
||||
Ok(None) => RecordTypes::DeletedExecutionProcess {
|
||||
rowid,
|
||||
task_attempt_id: None,
|
||||
},
|
||||
Err(e) => {
|
||||
tracing::error!(
|
||||
"Failed to fetch execution_process: {:?}",
|
||||
@@ -152,12 +248,6 @@ impl EventService {
|
||||
}
|
||||
};
|
||||
|
||||
let next_entry_count = {
|
||||
let mut entry_count = entry_count_for_hook.write().await;
|
||||
*entry_count += 1;
|
||||
*entry_count
|
||||
};
|
||||
|
||||
let db_op: &str = match hook.operation {
|
||||
SqliteOperation::Insert => "insert",
|
||||
SqliteOperation::Delete => "delete",
|
||||
@@ -165,6 +255,89 @@ impl EventService {
|
||||
SqliteOperation::Unknown(_) => "unknown",
|
||||
};
|
||||
|
||||
// Handle task-related operations with direct patches
|
||||
match &record_type {
|
||||
RecordTypes::Task(task) => {
|
||||
// Convert Task to TaskWithAttemptStatus
|
||||
if let Ok(task_list) =
|
||||
Task::find_by_project_id_with_attempt_status(
|
||||
&db.pool,
|
||||
task.project_id,
|
||||
)
|
||||
.await
|
||||
&& let Some(task_with_status) =
|
||||
task_list.into_iter().find(|t| t.id == task.id)
|
||||
{
|
||||
let patch = match hook.operation {
|
||||
SqliteOperation::Insert => {
|
||||
task_patch::add(&task_with_status)
|
||||
}
|
||||
SqliteOperation::Update => {
|
||||
task_patch::replace(&task_with_status)
|
||||
}
|
||||
_ => task_patch::replace(&task_with_status), // fallback
|
||||
};
|
||||
msg_store_for_hook.push_patch(patch);
|
||||
return;
|
||||
}
|
||||
}
|
||||
RecordTypes::DeletedTask {
|
||||
task_id: Some(task_id),
|
||||
..
|
||||
} => {
|
||||
let patch = task_patch::remove(*task_id);
|
||||
msg_store_for_hook.push_patch(patch);
|
||||
return;
|
||||
}
|
||||
RecordTypes::TaskAttempt(attempt) => {
|
||||
// Task attempts should update the parent task with fresh data
|
||||
if let Ok(Some(task)) =
|
||||
Task::find_by_id(&db.pool, attempt.task_id).await
|
||||
&& let Ok(task_list) =
|
||||
Task::find_by_project_id_with_attempt_status(
|
||||
&db.pool,
|
||||
task.project_id,
|
||||
)
|
||||
.await
|
||||
&& let Some(task_with_status) =
|
||||
task_list.into_iter().find(|t| t.id == attempt.task_id)
|
||||
{
|
||||
let patch = task_patch::replace(&task_with_status);
|
||||
msg_store_for_hook.push_patch(patch);
|
||||
return;
|
||||
}
|
||||
}
|
||||
RecordTypes::DeletedTaskAttempt {
|
||||
task_id: Some(task_id),
|
||||
..
|
||||
} => {
|
||||
// Task attempt deletion should update the parent task with fresh data
|
||||
if let Ok(Some(task)) =
|
||||
Task::find_by_id(&db.pool, *task_id).await
|
||||
&& let Ok(task_list) =
|
||||
Task::find_by_project_id_with_attempt_status(
|
||||
&db.pool,
|
||||
task.project_id,
|
||||
)
|
||||
.await
|
||||
&& let Some(task_with_status) =
|
||||
task_list.into_iter().find(|t| t.id == *task_id)
|
||||
{
|
||||
let patch = task_patch::replace(&task_with_status);
|
||||
msg_store_for_hook.push_patch(patch);
|
||||
return;
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Fallback: use the old entries format for other record types
|
||||
let next_entry_count = {
|
||||
let mut entry_count = entry_count_for_hook.write().await;
|
||||
*entry_count += 1;
|
||||
*entry_count
|
||||
};
|
||||
|
||||
let event_patch: EventPatch = EventPatch {
|
||||
op: "add".to_string(),
|
||||
path: format!("/entries/{next_entry_count}"),
|
||||
@@ -193,4 +366,196 @@ impl EventService {
|
||||
pub fn msg_store(&self) -> &Arc<MsgStore> {
|
||||
&self.msg_store
|
||||
}
|
||||
|
||||
/// Stream tasks for a specific project with initial snapshot
|
||||
pub async fn stream_tasks_for_project(
|
||||
&self,
|
||||
project_id: Uuid,
|
||||
) -> Result<futures::stream::BoxStream<'static, Result<Event, std::io::Error>>, EventError>
|
||||
{
|
||||
// Get initial snapshot of tasks
|
||||
let tasks = Task::find_by_project_id_with_attempt_status(&self.db.pool, project_id).await?;
|
||||
|
||||
// Convert task array to object keyed by task ID
|
||||
let tasks_map: serde_json::Map<String, serde_json::Value> = tasks
|
||||
.into_iter()
|
||||
.map(|task| (task.id.to_string(), serde_json::to_value(task).unwrap()))
|
||||
.collect();
|
||||
|
||||
let initial_patch = json!([{
|
||||
"op": "replace",
|
||||
"path": "/tasks",
|
||||
"value": tasks_map
|
||||
}]);
|
||||
let initial_msg = LogMsg::JsonPatch(serde_json::from_value(initial_patch).unwrap());
|
||||
|
||||
// Clone necessary data for the async filter
|
||||
let db_pool = self.db.pool.clone();
|
||||
|
||||
// Get filtered event stream
|
||||
let filtered_stream =
|
||||
BroadcastStream::new(self.msg_store.get_receiver()).filter_map(move |msg_result| {
|
||||
let db_pool = db_pool.clone();
|
||||
async move {
|
||||
match msg_result {
|
||||
Ok(LogMsg::JsonPatch(patch)) => {
|
||||
// Filter events based on project_id
|
||||
if let Some(patch_op) = patch.0.first() {
|
||||
// Check if this is a direct task patch (new format)
|
||||
if patch_op.path().starts_with("/tasks/") {
|
||||
match patch_op {
|
||||
json_patch::PatchOperation::Add(op) => {
|
||||
// Parse task data directly from value
|
||||
if let Ok(task) =
|
||||
serde_json::from_value::<TaskWithAttemptStatus>(
|
||||
op.value.clone(),
|
||||
)
|
||||
&& task.project_id == project_id
|
||||
{
|
||||
return Some(Ok(LogMsg::JsonPatch(patch)));
|
||||
}
|
||||
}
|
||||
json_patch::PatchOperation::Replace(op) => {
|
||||
// Parse task data directly from value
|
||||
if let Ok(task) =
|
||||
serde_json::from_value::<TaskWithAttemptStatus>(
|
||||
op.value.clone(),
|
||||
)
|
||||
&& task.project_id == project_id
|
||||
{
|
||||
return Some(Ok(LogMsg::JsonPatch(patch)));
|
||||
}
|
||||
}
|
||||
json_patch::PatchOperation::Remove(_) => {
|
||||
// For remove operations, we need to check project membership differently
|
||||
// We could cache this information or let it pass through for now
|
||||
// Since we don't have the task data, we'll allow all removals
|
||||
// and let the client handle filtering
|
||||
return Some(Ok(LogMsg::JsonPatch(patch)));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
} else if let Ok(event_patch_value) = serde_json::to_value(patch_op)
|
||||
&& let Ok(event_patch) =
|
||||
serde_json::from_value::<EventPatch>(event_patch_value)
|
||||
{
|
||||
// Handle old EventPatch format for non-task records
|
||||
match &event_patch.value.record {
|
||||
RecordTypes::Task(task) => {
|
||||
if task.project_id == project_id {
|
||||
return Some(Ok(LogMsg::JsonPatch(patch)));
|
||||
}
|
||||
}
|
||||
RecordTypes::DeletedTask {
|
||||
project_id: Some(deleted_project_id),
|
||||
..
|
||||
} => {
|
||||
if *deleted_project_id == project_id {
|
||||
return Some(Ok(LogMsg::JsonPatch(patch)));
|
||||
}
|
||||
}
|
||||
RecordTypes::TaskAttempt(attempt) => {
|
||||
// Check if this task_attempt belongs to a task in our project
|
||||
if let Ok(Some(task)) =
|
||||
Task::find_by_id(&db_pool, attempt.task_id).await
|
||||
&& task.project_id == project_id
|
||||
{
|
||||
return Some(Ok(LogMsg::JsonPatch(patch)));
|
||||
}
|
||||
}
|
||||
RecordTypes::DeletedTaskAttempt {
|
||||
task_id: Some(deleted_task_id),
|
||||
..
|
||||
} => {
|
||||
// Check if deleted attempt belonged to a task in our project
|
||||
if let Ok(Some(task)) =
|
||||
Task::find_by_id(&db_pool, *deleted_task_id).await
|
||||
&& task.project_id == project_id
|
||||
{
|
||||
return Some(Ok(LogMsg::JsonPatch(patch)));
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
Ok(other) => Some(Ok(other)), // Pass through non-patch messages
|
||||
Err(_) => None, // Filter out broadcast errors
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Start with initial snapshot, then live updates
|
||||
let initial_stream = futures::stream::once(async move { Ok(initial_msg) });
|
||||
let combined_stream = initial_stream
|
||||
.chain(filtered_stream)
|
||||
.map_ok(|msg| msg.to_sse_event())
|
||||
.boxed();
|
||||
|
||||
Ok(combined_stream)
|
||||
}
|
||||
|
||||
/// Stream execution processes for a specific task attempt with initial snapshot
|
||||
pub async fn stream_execution_processes_for_attempt(
|
||||
&self,
|
||||
task_attempt_id: Uuid,
|
||||
) -> Result<futures::stream::BoxStream<'static, Result<Event, std::io::Error>>, EventError>
|
||||
{
|
||||
// Get initial snapshot of execution processes
|
||||
let processes =
|
||||
ExecutionProcess::find_by_task_attempt_id(&self.db.pool, task_attempt_id).await?;
|
||||
let initial_patch = json!([{
|
||||
"op": "replace",
|
||||
"path": "/",
|
||||
"value": { "execution_processes": processes }
|
||||
}]);
|
||||
let initial_msg = LogMsg::JsonPatch(serde_json::from_value(initial_patch).unwrap());
|
||||
|
||||
// Get filtered event stream
|
||||
let filtered_stream = BroadcastStream::new(self.msg_store.get_receiver()).filter_map(
|
||||
move |msg_result| async move {
|
||||
match msg_result {
|
||||
Ok(LogMsg::JsonPatch(patch)) => {
|
||||
// Filter events based on task_attempt_id
|
||||
if let Some(event_patch_op) = patch.0.first()
|
||||
&& let Ok(event_patch_value) = serde_json::to_value(event_patch_op)
|
||||
&& let Ok(event_patch) =
|
||||
serde_json::from_value::<EventPatch>(event_patch_value)
|
||||
{
|
||||
match &event_patch.value.record {
|
||||
RecordTypes::ExecutionProcess(process) => {
|
||||
if process.task_attempt_id == task_attempt_id {
|
||||
return Some(Ok(LogMsg::JsonPatch(patch)));
|
||||
}
|
||||
}
|
||||
RecordTypes::DeletedExecutionProcess {
|
||||
task_attempt_id: Some(deleted_attempt_id),
|
||||
..
|
||||
} => {
|
||||
if *deleted_attempt_id == task_attempt_id {
|
||||
return Some(Ok(LogMsg::JsonPatch(patch)));
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
Ok(other) => Some(Ok(other)), // Pass through non-patch messages
|
||||
Err(_) => None, // Filter out broadcast errors
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// Start with initial snapshot, then live updates
|
||||
let initial_stream = futures::stream::once(async move { Ok(initial_msg) });
|
||||
let combined_stream = initial_stream
|
||||
.chain(filtered_stream)
|
||||
.map_ok(|msg| msg.to_sse_event())
|
||||
.boxed();
|
||||
|
||||
Ok(combined_stream)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -511,8 +511,7 @@ fn rebase_refuses_to_abort_existing_rebase() {
|
||||
"old-base",
|
||||
None,
|
||||
)
|
||||
.err()
|
||||
.expect("first rebase should error and leave in-progress state");
|
||||
.expect_err("first rebase should error and leave in-progress state");
|
||||
|
||||
// Our service should refuse to proceed and not abort the user's rebase
|
||||
let service = GitService::new();
|
||||
|
||||
Reference in New Issue
Block a user