Fix stop logic

This commit is contained in:
Louis Knight-Webb
2025-06-21 23:40:13 +01:00
parent 030c1966c3
commit 4a47c50868
8 changed files with 250 additions and 40 deletions

View File

@@ -28,6 +28,7 @@ dirs = "5.0"
git2 = "0.18"
async-trait = "0.1"
dissimilar = "1.0"
libc = "0.2"
rust-embed = "8.2"
mime_guess = "2.0"
directories = "6.0.0"

View File

@@ -96,10 +96,36 @@ impl AppState {
let mut executions = self.running_executions.lock().await;
if let Some(mut execution) = executions.remove(&execution_id) {
// Kill the process
// Kill the process group to ensure all child processes are terminated
let process_id = execution.child.id();
#[cfg(unix)]
{
if let Some(pid) = process_id {
// Kill the entire process group
unsafe {
let result = libc::killpg(pid as i32, libc::SIGTERM);
if result != 0 {
tracing::warn!(
"Failed to send SIGTERM to process group {}, trying SIGKILL",
pid
);
let kill_result = libc::killpg(pid as i32, libc::SIGKILL);
if kill_result != 0 {
tracing::error!("Failed to send SIGKILL to process group {}", pid);
}
}
}
}
}
// Also kill the direct child process as fallback
match execution.child.kill().await {
Ok(_) => {
tracing::info!("Stopped execution process {}", execution_id);
tracing::info!(
"Stopped execution process {} and its process group",
execution_id
);
Ok(true)
}
Err(e) => {

View File

@@ -218,41 +218,37 @@ pub async fn execution_monitor(app_state: AppState) {
};
for process_id in executor_running_process_ids {
// Double-check that this process is not currently running and hasn't just completed
if !app_state.has_running_execution(process_id).await {
// Get the execution process to find the task attempt ID
let task_attempt_id =
match ExecutionProcess::find_by_id(&app_state.db_pool, process_id).await {
Ok(Some(process)) => {
// Additional check: if the process was recently updated, skip it
// This prevents race conditions with recent completions
let now = chrono::Utc::now();
let time_since_update = now - process.updated_at;
if time_since_update.num_seconds() < 10 {
// Process was updated within last 10 seconds, likely just completed
tracing::debug!(
"Skipping recently updated process {} (updated {} seconds ago)",
process_id,
time_since_update.num_seconds()
);
continue;
}
process.task_attempt_id
}
Ok(None) => {
tracing::error!("Execution process {} not found", process_id);
continue;
}
Err(e) => {
tracing::error!(
"Failed to fetch execution process {}: {}",
// Get the execution process to find the task attempt ID
let task_attempt_id =
match ExecutionProcess::find_by_id(&app_state.db_pool, process_id).await {
Ok(Some(process)) => {
// Additional check: if the process was recently updated, skip it
// This prevents race conditions with recent completions
let now = chrono::Utc::now();
let time_since_update = now - process.updated_at;
if time_since_update.num_seconds() < 10 {
// Process was updated within last 10 seconds, likely just completed
tracing::debug!(
"Skipping recently updated process {} (updated {} seconds ago)",
process_id,
e
time_since_update.num_seconds()
);
continue;
}
};
process.task_attempt_id
}
Ok(None) => {
tracing::error!("Execution process {} not found", process_id);
continue;
}
Err(e) => {
tracing::error!("Failed to fetch execution process {}: {}", process_id, e);
continue;
}
};
// Double-check that this task attempt is not currently running and hasn't just completed
if !app_state.has_running_execution(task_attempt_id).await {
// This is truly an orphaned task attempt - mark it as failed
let activity_id = Uuid::new_v4();
let create_activity = CreateTaskAttemptActivity {

View File

@@ -40,6 +40,7 @@ impl Executor for AmpExecutor {
.current_dir(worktree_path)
.arg("@sourcegraph/amp")
.arg("--format=jsonl")
.process_group(0) // Create new process group so we can kill entire tree
.spawn()
.map_err(ExecutorError::SpawnFailed)?;

View File

@@ -42,6 +42,7 @@ impl Executor for ClaudeExecutor {
.arg("--dangerously-skip-permissions")
.arg("--verbose")
.arg("--output-format=stream-json")
.process_group(0) // Create new process group so we can kill entire tree
.spawn()
.map_err(ExecutorError::SpawnFailed)?;

View File

@@ -44,6 +44,7 @@ echo "Task completed: {}""#,
.stderr(std::process::Stdio::piped())
.arg("-c")
.arg(&script)
.process_group(0) // Create new process group so we can kill entire tree
.spawn()
.map_err(ExecutorError::SpawnFailed)?;

View File

@@ -471,6 +471,123 @@ pub async fn get_execution_process(
}
}
#[axum::debug_handler]
pub async fn stop_all_execution_processes(
Path((project_id, task_id, attempt_id)): Path<(Uuid, Uuid, Uuid)>,
Extension(pool): Extension<SqlitePool>,
Extension(app_state): Extension<crate::app_state::AppState>,
) -> Result<ResponseJson<ApiResponse<()>>, StatusCode> {
// Verify task attempt exists and belongs to the correct task
match TaskAttempt::exists_for_task(&pool, attempt_id, task_id, project_id).await {
Ok(false) => return Err(StatusCode::NOT_FOUND),
Err(e) => {
tracing::error!("Failed to check task attempt existence: {}", e);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
Ok(true) => {}
}
// Get all execution processes for the task attempt
let processes = match ExecutionProcess::find_by_task_attempt_id(&pool, attempt_id).await {
Ok(processes) => processes,
Err(e) => {
tracing::error!(
"Failed to fetch execution processes for attempt {}: {}",
attempt_id,
e
);
return Err(StatusCode::INTERNAL_SERVER_ERROR);
}
};
let mut stopped_count = 0;
let mut errors = Vec::new();
// Stop all running processes
for process in processes {
match app_state.stop_running_execution_by_id(process.id).await {
Ok(true) => {
stopped_count += 1;
// Update the execution process status in the database
if let Err(e) = ExecutionProcess::update_completion(
&pool,
process.id,
crate::models::execution_process::ExecutionProcessStatus::Killed,
None,
)
.await
{
tracing::error!("Failed to update execution process status: {}", e);
errors.push(format!("Failed to update process {} status", process.id));
} else {
// Create a new activity record to mark as stopped
let activity_id = Uuid::new_v4();
let create_activity = CreateTaskAttemptActivity {
execution_process_id: process.id,
status: Some(TaskAttemptStatus::ExecutorFailed),
note: Some(format!(
"Execution process {:?} ({}) stopped by user",
process.process_type, process.id
)),
};
if let Err(e) = TaskAttemptActivity::create(
&pool,
&create_activity,
activity_id,
TaskAttemptStatus::ExecutorFailed,
)
.await
{
tracing::error!("Failed to create stopped activity: {}", e);
errors.push(format!(
"Failed to create activity for process {}",
process.id
));
}
}
}
Ok(false) => {
// Process was not running, which is fine
}
Err(e) => {
tracing::error!("Failed to stop execution process {}: {}", process.id, e);
errors.push(format!("Failed to stop process {}: {}", process.id, e));
}
}
}
if !errors.is_empty() {
return Ok(ResponseJson(ApiResponse {
success: false,
data: None,
message: Some(format!(
"Stopped {} processes, but encountered errors: {}",
stopped_count,
errors.join(", ")
)),
}));
}
if stopped_count == 0 {
return Ok(ResponseJson(ApiResponse {
success: true,
data: None,
message: Some("No running processes found to stop".to_string()),
}));
}
Ok(ResponseJson(ApiResponse {
success: true,
data: None,
message: Some(format!(
"Successfully stopped {} execution processes",
stopped_count
)),
}))
}
#[axum::debug_handler]
pub async fn stop_execution_process(
Path((project_id, task_id, attempt_id, process_id)): Path<(Uuid, Uuid, Uuid, Uuid)>,
@@ -645,6 +762,10 @@ pub fn task_attempts_router() -> Router {
"/projects/:project_id/tasks/:task_id/attempts/:attempt_id/execution-processes",
get(get_task_attempt_execution_processes),
)
.route(
"/projects/:project_id/tasks/:task_id/attempts/:attempt_id/stop",
post(stop_all_execution_processes),
)
.route(
"/projects/:project_id/tasks/:task_id/attempts/:attempt_id/execution-processes/:process_id/stop",
post(stop_execution_process),

View File

@@ -1,4 +1,4 @@
import { useState, useEffect } from "react";
import { useState, useEffect, useMemo } from "react";
import {
X,
History,
@@ -10,6 +10,7 @@ import {
Settings2,
Edit,
Trash2,
StopCircle,
} from "lucide-react";
import { Button } from "@/components/ui/button";
@@ -137,6 +138,7 @@ export function TaskDetailsPanel({
const [loading, setLoading] = useState(false);
const [isDescriptionExpanded, setIsDescriptionExpanded] = useState(false);
const [selectedExecutor, setSelectedExecutor] = useState<string>("claude");
const [isStopping, setIsStopping] = useState(false);
const { config } = useConfig();
// Available executors
@@ -146,13 +148,30 @@ export function TaskDetailsPanel({
{ id: "amp", name: "Amp" },
];
// Check if the selected attempt is active (not in a final state)
const isAttemptRunning =
selectedAttempt &&
attemptActivities.length > 0 &&
(attemptActivities[0].status === "setuprunning" ||
attemptActivities[0].status === "setupcomplete" ||
attemptActivities[0].status === "executorrunning");
// Check if any execution process is currently running
// We need to check the latest activity for each execution process
const isAttemptRunning = useMemo(() => {
if (!selectedAttempt || attemptActivities.length === 0 || isStopping) {
return false;
}
// Group activities by execution_process_id and get the latest one for each
const latestActivitiesByProcess = new Map<string, TaskAttemptActivity>();
attemptActivities.forEach((activity) => {
const existing = latestActivitiesByProcess.get(activity.execution_process_id);
if (!existing || new Date(activity.created_at) > new Date(existing.created_at)) {
latestActivitiesByProcess.set(activity.execution_process_id, activity);
}
});
// Check if any execution process has a running status as its latest activity
return Array.from(latestActivitiesByProcess.values()).some(
(activity) =>
activity.status === "setuprunning" ||
activity.status === "executorrunning"
);
}, [selectedAttempt, attemptActivities, isStopping]);
// Polling for updates when attempt is running
useEffect(() => {
@@ -326,6 +345,38 @@ export function TaskDetailsPanel({
}
};
const stopAllExecutions = async () => {
if (!task || !selectedAttempt) return;
try {
setIsStopping(true);
const response = await makeRequest(
`/api/projects/${projectId}/tasks/${task.id}/attempts/${selectedAttempt.id}/stop`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
}
);
if (response.ok) {
// Clear cached execution processes since they should be stopped
setExecutionProcesses({});
// Refresh activities to show updated status
await fetchAttemptActivities(selectedAttempt.id);
// Wait a bit for the backend to finish updating
setTimeout(() => {
fetchAttemptActivities(selectedAttempt.id);
}, 1000);
}
} catch (err) {
console.error("Failed to stop executions:", err);
} finally {
setIsStopping(false);
}
};
if (!task) return null;
return (
@@ -524,6 +575,18 @@ export function TaskDetailsPanel({
{selectedAttempt && (
<div className="flex gap-1">
{(isAttemptRunning || isStopping) && (
<Button
variant="outline"
size="sm"
onClick={stopAllExecutions}
disabled={isStopping}
className="text-red-600 hover:text-red-700 hover:bg-red-50 disabled:opacity-50"
>
<StopCircle className="h-4 w-4 mr-1" />
{isStopping ? "Stopping..." : "Stop"}
</Button>
)}
<Button
variant="outline"
size="sm"