Stop execution
This commit is contained in:
@@ -34,13 +34,12 @@ impl Executor for EchoExecutor {
|
||||
// For demonstration of streaming, we can use a shell command that outputs multiple lines
|
||||
let script = format!(
|
||||
r#"echo "Starting task: {}"
|
||||
for i in {{1..5}}; do
|
||||
for i in {{1..50}}; do
|
||||
echo "Progress line $i"
|
||||
sleep 1
|
||||
done
|
||||
echo "Task completed: {}""#,
|
||||
task.title,
|
||||
task.title
|
||||
task.title, task.title
|
||||
);
|
||||
|
||||
let child = Command::new("sh")
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod auth;
|
||||
pub mod execution_monitor;
|
||||
pub mod executor;
|
||||
pub mod executors;
|
||||
pub mod models;
|
||||
|
||||
@@ -281,6 +281,86 @@ pub async fn create_task_attempt_activity(
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stop_task_attempt(
|
||||
_auth: AuthUser,
|
||||
Path((project_id, task_id, attempt_id)): Path<(Uuid, Uuid, Uuid)>,
|
||||
Extension(pool): Extension<PgPool>,
|
||||
Extension(app_state): Extension<crate::execution_monitor::AppState>
|
||||
) -> Result<ResponseJson<ApiResponse<()>>, StatusCode> {
|
||||
// Verify task attempt exists and belongs to the correct task
|
||||
match TaskAttempt::exists_for_task(&pool, attempt_id, task_id, project_id).await {
|
||||
Ok(false) => return Err(StatusCode::NOT_FOUND),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to check task attempt existence: {}", e);
|
||||
return Err(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
Ok(true) => {}
|
||||
}
|
||||
|
||||
// Find and stop the running execution
|
||||
let mut stopped = false;
|
||||
{
|
||||
let mut executions = app_state.running_executions.lock().await;
|
||||
let mut execution_id_to_remove = None;
|
||||
|
||||
// Find the execution for this attempt
|
||||
for (exec_id, execution) in executions.iter_mut() {
|
||||
if execution.task_attempt_id == attempt_id {
|
||||
// Kill the process
|
||||
match execution.child.kill().await {
|
||||
Ok(_) => {
|
||||
stopped = true;
|
||||
execution_id_to_remove = Some(*exec_id);
|
||||
tracing::info!("Stopped execution for task attempt {}", attempt_id);
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to kill process for attempt {}: {}", attempt_id, e);
|
||||
return Err(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the stopped execution from the map
|
||||
if let Some(exec_id) = execution_id_to_remove {
|
||||
executions.remove(&exec_id);
|
||||
}
|
||||
}
|
||||
|
||||
if !stopped {
|
||||
return Ok(ResponseJson(ApiResponse {
|
||||
success: true,
|
||||
data: None,
|
||||
message: Some("No running execution found for this attempt".to_string()),
|
||||
}));
|
||||
}
|
||||
|
||||
// Create a new activity record to mark as stopped
|
||||
let activity_id = Uuid::new_v4();
|
||||
let create_activity = CreateTaskAttemptActivity {
|
||||
task_attempt_id: attempt_id,
|
||||
status: Some(TaskAttemptStatus::Paused),
|
||||
note: Some("Execution stopped by user".to_string()),
|
||||
};
|
||||
|
||||
if let Err(e) = TaskAttemptActivity::create(
|
||||
&pool,
|
||||
&create_activity,
|
||||
activity_id,
|
||||
TaskAttemptStatus::Paused,
|
||||
).await {
|
||||
tracing::error!("Failed to create stopped activity: {}", e);
|
||||
return Err(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
|
||||
Ok(ResponseJson(ApiResponse {
|
||||
success: true,
|
||||
data: None,
|
||||
message: Some("Task attempt stopped successfully".to_string()),
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn tasks_router() -> Router {
|
||||
use axum::routing::{post, put, delete};
|
||||
|
||||
@@ -289,6 +369,7 @@ pub fn tasks_router() -> Router {
|
||||
.route("/projects/:project_id/tasks/:task_id", get(get_task).put(update_task).delete(delete_task))
|
||||
.route("/projects/:project_id/tasks/:task_id/attempts", get(get_task_attempts).post(create_task_attempt))
|
||||
.route("/projects/:project_id/tasks/:task_id/attempts/:attempt_id/activities", get(get_task_attempt_activities).post(create_task_attempt_activity))
|
||||
.route("/projects/:project_id/tasks/:task_id/attempts/:attempt_id/stop", post(stop_task_attempt))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
Reference in New Issue
Block a user