Update setup script activity etc...

This commit is contained in:
Louis Knight-Webb
2025-06-19 11:09:32 -04:00
parent 376be69b8e
commit 68a3fa2109
10 changed files with 321 additions and 190 deletions

View File

@@ -0,0 +1,20 @@
{
"db_name": "SQLite",
"query": "SELECT DISTINCT ta.id as \"id!: Uuid\"\n FROM task_attempts ta\n INNER JOIN (\n SELECT task_attempt_id, MAX(created_at) as latest_created_at\n FROM task_attempt_activities\n GROUP BY task_attempt_id\n ) latest_activity ON ta.id = latest_activity.task_attempt_id\n INNER JOIN task_attempt_activities taa ON ta.id = taa.task_attempt_id \n AND taa.created_at = latest_activity.latest_created_at\n WHERE taa.status IN ($1, $2, $3)",
"describe": {
"columns": [
{
"name": "id!: Uuid",
"ordinal": 0,
"type_info": "Blob"
}
],
"parameters": {
"Right": 3
},
"nullable": [
true
]
},
"hash": "52c58db6e8a3b690a8980e395733a6e44bc5b0836eab8801e4c43cb47560ca41"
}

View File

@@ -4,6 +4,7 @@ CREATE TABLE projects (
id BLOB PRIMARY KEY,
name TEXT NOT NULL,
git_repo_path TEXT NOT NULL DEFAULT '' UNIQUE,
setup_script TEXT DEFAULT '',
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP
);
@@ -38,8 +39,7 @@ CREATE TABLE task_attempt_activities (
id BLOB PRIMARY KEY,
task_attempt_id BLOB NOT NULL,
status TEXT NOT NULL DEFAULT 'init'
CHECK (status IN ('init','inprogress','paused')),
note TEXT,
CHECK (status IN ('init','setuprunning','setupcomplete','setupfailed','executorrunning','executorcomplete','executorfailed','paused')), note TEXT,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (task_attempt_id) REFERENCES task_attempts(id) ON DELETE CASCADE
);

View File

@@ -1 +0,0 @@
ALTER TABLE projects ADD COLUMN setup_script TEXT;

View File

@@ -30,21 +30,21 @@ pub async fn execution_monitor(app_state: AppState) {
loop {
interval.tick().await;
// Check for orphaned task attempts with latest activity status = InProgress but no running execution
let inprogress_attempt_ids =
match TaskAttemptActivity::find_attempts_with_latest_inprogress_status(
// Check for orphaned task attempts with latest activity status = ExecutorRunning but no running execution
let executor_running_attempt_ids =
match TaskAttemptActivity::find_attempts_with_latest_executor_running_status(
&app_state.db_pool,
)
.await
{
Ok(attempts) => attempts,
Err(e) => {
tracing::error!("Failed to query inprogress attempts: {}", e);
tracing::error!("Failed to query executor running attempts: {}", e);
continue;
}
};
for attempt_id in inprogress_attempt_ids {
for attempt_id in executor_running_attempt_ids {
// Check if this attempt has a running execution
let has_running_execution = {
let executions = app_state.running_executions.lock().await;
@@ -58,7 +58,7 @@ pub async fn execution_monitor(app_state: AppState) {
let activity_id = Uuid::new_v4();
let create_activity = CreateTaskAttemptActivity {
task_attempt_id: attempt_id,
status: Some(TaskAttemptStatus::Paused),
status: Some(TaskAttemptStatus::ExecutorFailed),
note: Some("Execution lost (server restart or crash)".to_string()),
};
@@ -66,7 +66,7 @@ pub async fn execution_monitor(app_state: AppState) {
&app_state.db_pool,
&create_activity,
activity_id,
TaskAttemptStatus::Paused,
TaskAttemptStatus::ExecutorFailed,
)
.await
{
@@ -101,119 +101,7 @@ pub async fn execution_monitor(app_state: AppState) {
}
}
// Check for task attempts with latest activity status = Init
let init_attempt_ids =
match TaskAttemptActivity::find_attempts_with_latest_init_status(&app_state.db_pool)
.await
{
Ok(attempts) => attempts,
Err(e) => {
tracing::error!("Failed to query init attempts: {}", e);
continue;
}
};
for attempt_id in init_attempt_ids {
// Check if we already have a running execution for this attempt
{
let executions = app_state.running_executions.lock().await;
if executions
.values()
.any(|exec| exec.task_attempt_id == attempt_id)
{
continue;
}
}
// Get the task attempt to access the executor
let task_attempt = match TaskAttempt::find_by_id(&app_state.db_pool, attempt_id).await {
Ok(Some(attempt)) => attempt,
Ok(None) => {
tracing::error!("Task attempt {} not found", attempt_id);
continue;
}
Err(e) => {
tracing::error!("Failed to fetch task attempt {}: {}", attempt_id, e);
continue;
}
};
// Get the executor and start streaming execution
let executor = task_attempt.get_executor();
let child = match executor
.execute_streaming(
&app_state.db_pool,
task_attempt.task_id,
attempt_id,
&task_attempt.worktree_path,
)
.await
{
Ok(child) => child,
Err(e) => {
tracing::error!(
"Failed to start streaming execution for task attempt {}: {}",
attempt_id,
e
);
continue;
}
};
// Add to running executions
let execution_id = Uuid::new_v4();
{
let mut executions = app_state.running_executions.lock().await;
executions.insert(
execution_id,
RunningExecution {
task_attempt_id: attempt_id,
child,
started_at: Utc::now(),
},
);
}
// Update task attempt activity to InProgress
let activity_id = Uuid::new_v4();
let create_activity = CreateTaskAttemptActivity {
task_attempt_id: attempt_id,
status: Some(TaskAttemptStatus::InProgress),
note: Some("Started execution".to_string()),
};
if let Err(e) = TaskAttemptActivity::create(
&app_state.db_pool,
&create_activity,
activity_id,
TaskAttemptStatus::InProgress,
)
.await
{
tracing::error!("Failed to create in-progress activity: {}", e);
}
// Update task status to InProgress - get task to access project_id
if let Ok(Some(task)) = Task::find_by_id(&app_state.db_pool, task_attempt.task_id).await
{
if let Err(e) = Task::update_status(
&app_state.db_pool,
task.id,
task.project_id,
TaskStatus::InProgress,
)
.await
{
tracing::error!("Failed to update task status to InProgress: {}", e);
}
}
tracing::info!(
"Started execution {} for task attempt {}",
execution_id,
attempt_id
);
}
// Note: Execution starting logic moved to create_task_attempt endpoint
// Check for completed processes
let mut completed_executions = Vec::new();
@@ -267,11 +155,16 @@ pub async fn execution_monitor(app_state: AppState) {
tracing::info!("Execution {} {}{}", execution_id, status_text, exit_text);
// Create task attempt activity with Paused status
// Create task attempt activity with appropriate completion status
let activity_id = Uuid::new_v4();
let status = if success {
TaskAttemptStatus::ExecutorComplete
} else {
TaskAttemptStatus::ExecutorFailed
};
let create_activity = CreateTaskAttemptActivity {
task_attempt_id,
status: Some(TaskAttemptStatus::Paused),
status: Some(status.clone()),
note: Some(format!("Execution completed{}", exit_text)),
};
@@ -279,7 +172,7 @@ pub async fn execution_monitor(app_state: AppState) {
&app_state.db_pool,
&create_activity,
activity_id,
TaskAttemptStatus::Paused,
status,
)
.await
{

View File

@@ -49,7 +49,12 @@ impl From<GitError> for TaskAttemptError {
#[ts(export)]
pub enum TaskAttemptStatus {
Init,
InProgress,
SetupRunning,
SetupComplete,
SetupFailed,
ExecutorRunning,
ExecutorComplete,
ExecutorFailed,
Paused,
}
@@ -181,41 +186,6 @@ impl TaskAttempt {
let branch_name = format!("attempt-{}", attempt_id);
repo.worktree(&branch_name, worktree_path, None)?;
// Run setup script if it exists
if let Some(setup_script) = &project.setup_script {
if !setup_script.trim().is_empty() {
tracing::info!("Running setup script for task attempt {}", attempt_id);
let output = std::process::Command::new("bash")
.arg("-c")
.arg(setup_script)
.current_dir(worktree_path)
.output()
.map_err(|e| {
TaskAttemptError::Git(git2::Error::from_str(&format!(
"Failed to execute setup script: {}",
e
)))
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::error!("Setup script failed for attempt {}: {}", attempt_id, stderr);
return Err(TaskAttemptError::Git(git2::Error::from_str(&format!(
"Setup script failed: {}",
stderr
))));
}
let stdout = String::from_utf8_lossy(&output.stdout);
tracing::info!(
"Setup script completed for attempt {}: {}",
attempt_id,
stdout
);
}
}
// Insert the record into the database
Ok(sqlx::query_as!(
TaskAttempt,
@@ -507,6 +477,171 @@ impl TaskAttempt {
Ok(merge_commit_id)
}
/// Start the execution flow for a task attempt (setup script + executor)
pub async fn start_execution(
pool: &SqlitePool,
app_state: &crate::execution_monitor::AppState,
attempt_id: Uuid,
task_id: Uuid,
project_id: Uuid,
) -> Result<(), TaskAttemptError> {
use crate::models::project::Project;
use crate::models::task::{Task, TaskStatus};
use crate::models::task_attempt_activity::{
CreateTaskAttemptActivity, TaskAttemptActivity,
};
// Get the task attempt, task, and project
let task_attempt = TaskAttempt::find_by_id(pool, attempt_id)
.await?
.ok_or(TaskAttemptError::TaskNotFound)?;
let _task = Task::find_by_id(pool, task_id)
.await?
.ok_or(TaskAttemptError::TaskNotFound)?;
let project = Project::find_by_id(pool, project_id)
.await?
.ok_or(TaskAttemptError::ProjectNotFound)?;
// Step 1: Run setup script if it exists
if let Some(setup_script) = &project.setup_script {
if !setup_script.trim().is_empty() {
// Create activity for setup script start
let activity_id = Uuid::new_v4();
let create_activity = CreateTaskAttemptActivity {
task_attempt_id: attempt_id,
status: Some(TaskAttemptStatus::SetupRunning),
note: Some("Starting setup script".to_string()),
};
TaskAttemptActivity::create(
pool,
&create_activity,
activity_id,
TaskAttemptStatus::SetupRunning,
)
.await?;
tracing::info!("Running setup script for task attempt {}", attempt_id);
let output = tokio::process::Command::new("bash")
.arg("-c")
.arg(setup_script)
.current_dir(&task_attempt.worktree_path)
.output()
.await
.map_err(|e| {
TaskAttemptError::Git(git2::Error::from_str(&format!(
"Failed to execute setup script: {}",
e
)))
})?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
tracing::error!("Setup script failed for attempt {}: {}", attempt_id, stderr);
// Create activity for setup script failure
let activity_id = Uuid::new_v4();
let create_activity = CreateTaskAttemptActivity {
task_attempt_id: attempt_id,
status: Some(TaskAttemptStatus::SetupFailed),
note: Some(format!("Setup script failed: {}", stderr)),
};
TaskAttemptActivity::create(
pool,
&create_activity,
activity_id,
TaskAttemptStatus::SetupFailed,
)
.await?;
// Update task status to InReview
Task::update_status(pool, task_id, project_id, TaskStatus::InReview).await?;
return Err(TaskAttemptError::Git(git2::Error::from_str(&format!(
"Setup script failed: {}",
stderr
))));
}
let stdout = String::from_utf8_lossy(&output.stdout);
tracing::info!(
"Setup script completed for attempt {}: {}",
attempt_id,
stdout
);
// Create activity for setup script completion
let activity_id = Uuid::new_v4();
let create_activity = CreateTaskAttemptActivity {
task_attempt_id: attempt_id,
status: Some(TaskAttemptStatus::SetupComplete),
note: Some("Setup script completed successfully".to_string()),
};
TaskAttemptActivity::create(
pool,
&create_activity,
activity_id,
TaskAttemptStatus::SetupComplete,
)
.await?;
}
}
// Step 2: Start the executor
let executor = task_attempt.get_executor();
// Create activity for executor start
let activity_id = Uuid::new_v4();
let create_activity = CreateTaskAttemptActivity {
task_attempt_id: attempt_id,
status: Some(TaskAttemptStatus::ExecutorRunning),
note: Some("Starting executor".to_string()),
};
TaskAttemptActivity::create(
pool,
&create_activity,
activity_id,
TaskAttemptStatus::ExecutorRunning,
)
.await?;
let child = executor
.execute_streaming(pool, task_id, attempt_id, &task_attempt.worktree_path)
.await
.map_err(|e| TaskAttemptError::Git(git2::Error::from_str(&e.to_string())))?;
// Add to running executions
let execution_id = Uuid::new_v4();
{
let mut executions = app_state.running_executions.lock().await;
executions.insert(
execution_id,
crate::execution_monitor::RunningExecution {
task_attempt_id: attempt_id,
child,
started_at: chrono::Utc::now(),
},
);
}
// Update task status to InProgress
Task::update_status(pool, task_id, project_id, TaskStatus::InProgress).await?;
tracing::info!(
"Started execution {} for task attempt {}",
execution_id,
attempt_id
);
Ok(())
}
/// Get the git diff between the base commit and the current worktree state
pub async fn get_diff(
pool: &SqlitePool,

View File

@@ -104,6 +104,30 @@ impl TaskAttemptActivity {
pub async fn find_attempts_with_latest_inprogress_status(
pool: &SqlitePool,
) -> Result<Vec<uuid::Uuid>, sqlx::Error> {
let records = sqlx::query!(
r#"SELECT DISTINCT ta.id as "id!: Uuid"
FROM task_attempts ta
INNER JOIN (
SELECT task_attempt_id, MAX(created_at) as latest_created_at
FROM task_attempt_activities
GROUP BY task_attempt_id
) latest_activity ON ta.id = latest_activity.task_attempt_id
INNER JOIN task_attempt_activities taa ON ta.id = taa.task_attempt_id
AND taa.created_at = latest_activity.latest_created_at
WHERE taa.status IN ($1, $2, $3)"#,
TaskAttemptStatus::SetupRunning as TaskAttemptStatus,
TaskAttemptStatus::ExecutorRunning as TaskAttemptStatus,
TaskAttemptStatus::Paused as TaskAttemptStatus
)
.fetch_all(pool)
.await?;
Ok(records.into_iter().map(|r| r.id).collect())
}
pub async fn find_attempts_with_latest_executor_running_status(
pool: &SqlitePool,
) -> Result<Vec<uuid::Uuid>, sqlx::Error> {
let records = sqlx::query!(
r#"SELECT DISTINCT ta.id as "id!: Uuid"
@@ -116,7 +140,7 @@ impl TaskAttemptActivity {
INNER JOIN task_attempt_activities taa ON ta.id = taa.task_attempt_id
AND taa.created_at = latest_activity.latest_created_at
WHERE taa.status = $1"#,
TaskAttemptStatus::InProgress as TaskAttemptStatus
TaskAttemptStatus::ExecutorRunning as TaskAttemptStatus
)
.fetch_all(pool)
.await?;

View File

@@ -2,7 +2,7 @@ use axum::{
extract::{Extension, Path},
http::StatusCode,
response::Json as ResponseJson,
routing::{get, post},
routing::get,
Json, Router,
};
use sqlx::SqlitePool;
@@ -213,6 +213,7 @@ pub async fn get_task_attempt_activities(
pub async fn create_task_attempt(
Path((project_id, task_id)): Path<(Uuid, Uuid)>,
Extension(pool): Extension<SqlitePool>,
Extension(app_state): Extension<crate::execution_monitor::AppState>,
Json(mut payload): Json<CreateTaskAttempt>,
) -> Result<ResponseJson<ApiResponse<TaskAttempt>>, StatusCode> {
// Verify task exists in project first
@@ -236,6 +237,28 @@ pub async fn create_task_attempt(
let activity_id = Uuid::new_v4();
let _ = TaskAttemptActivity::create_initial(&pool, attempt.id, activity_id).await;
// Start execution asynchronously (don't block the response)
let pool_clone = pool.clone();
let app_state_clone = app_state.clone();
let attempt_id = attempt.id;
tokio::spawn(async move {
if let Err(e) = TaskAttempt::start_execution(
&pool_clone,
&app_state_clone,
attempt_id,
task_id,
project_id,
)
.await
{
tracing::error!(
"Failed to start execution for task attempt {}: {}",
attempt_id,
e
);
}
});
Ok(ResponseJson(ApiResponse {
success: true,
data: Some(attempt),

View File

@@ -23,6 +23,7 @@ import type {
TaskStatus,
TaskAttempt,
TaskAttemptActivity,
TaskAttemptStatus,
} from "shared/types";
interface Task {
@@ -57,6 +58,29 @@ const statusLabels: Record<TaskStatus, string> = {
cancelled: "Cancelled",
};
const getAttemptStatusDisplay = (status: TaskAttemptStatus): { label: string; className: string } => {
switch (status) {
case "init":
return { label: "Init", className: "bg-gray-100 text-gray-800" };
case "setuprunning":
return { label: "Setup Running", className: "bg-blue-100 text-blue-800" };
case "setupcomplete":
return { label: "Setup Complete", className: "bg-green-100 text-green-800" };
case "setupfailed":
return { label: "Setup Failed", className: "bg-red-100 text-red-800" };
case "executorrunning":
return { label: "Executor Running", className: "bg-blue-100 text-blue-800" };
case "executorcomplete":
return { label: "Executor Complete", className: "bg-green-100 text-green-800" };
case "executorfailed":
return { label: "Executor Failed", className: "bg-red-100 text-red-800" };
case "paused":
return { label: "Paused", className: "bg-yellow-100 text-yellow-800" };
default:
return { label: "Unknown", className: "bg-gray-100 text-gray-800" };
}
};
export function TaskDetailsDialog({
isOpen,
onOpenChange,
@@ -84,11 +108,14 @@ export function TaskDetailsDialog({
const [editedStatus, setEditedStatus] = useState<TaskStatus>("todo");
const [savingTask, setSavingTask] = useState(false);
// Check if the selected attempt is currently running (latest activity is "inprogress")
// Check if the selected attempt is active (not in a final state)
const isAttemptRunning =
selectedAttempt &&
attemptActivities.length > 0 &&
attemptActivities[0].status === "inprogress";
(attemptActivities[0].status === "init" ||
attemptActivities[0].status === "setuprunning" ||
attemptActivities[0].status === "setupcomplete" ||
attemptActivities[0].status === "executorrunning");
useEffect(() => {
if (isOpen && task) {
@@ -629,18 +656,10 @@ export function TaskDetailsDialog({
<div className="flex items-center justify-between">
<span
className={`px-2 py-1 rounded-full text-xs font-medium ${
activity.status === "init"
? "bg-gray-100 text-gray-800"
: activity.status === "inprogress"
? "bg-blue-100 text-blue-800"
: "bg-yellow-100 text-yellow-800"
getAttemptStatusDisplay(activity.status).className
}`}
>
{activity.status === "init"
? "Init"
: activity.status === "inprogress"
? "In Progress"
: "Paused"}
{getAttemptStatusDisplay(activity.status).label}
</span>
<p className="text-xs text-muted-foreground">
{new Date(activity.created_at).toLocaleString()}

View File

@@ -18,6 +18,7 @@ import type {
TaskStatus,
TaskAttempt,
TaskAttemptActivity,
TaskAttemptStatus,
} from "shared/types";
interface Task {
@@ -44,6 +45,29 @@ const statusLabels: Record<TaskStatus, string> = {
cancelled: "Cancelled",
};
const getAttemptStatusDisplay = (status: TaskAttemptStatus): { label: string; className: string } => {
switch (status) {
case "init":
return { label: "Init", className: "bg-gray-100 text-gray-800" };
case "setuprunning":
return { label: "Setup Running", className: "bg-blue-100 text-blue-800" };
case "setupcomplete":
return { label: "Setup Complete", className: "bg-green-100 text-green-800" };
case "setupfailed":
return { label: "Setup Failed", className: "bg-red-100 text-red-800" };
case "executorrunning":
return { label: "Executor Running", className: "bg-blue-100 text-blue-800" };
case "executorcomplete":
return { label: "Executor Complete", className: "bg-green-100 text-green-800" };
case "executorfailed":
return { label: "Executor Failed", className: "bg-red-100 text-red-800" };
case "paused":
return { label: "Paused", className: "bg-yellow-100 text-yellow-800" };
default:
return { label: "Unknown", className: "bg-gray-100 text-gray-800" };
}
};
export function TaskDetailsPage() {
const { projectId, taskId } = useParams<{
projectId: string;
@@ -70,12 +94,14 @@ export function TaskDetailsPage() {
const [isTaskDialogOpen, setIsTaskDialogOpen] = useState(false);
// Check if the selected attempt is currently running (latest activity is "inprogress" or "init")
// Check if the selected attempt is active (not in a final state)
const isAttemptRunning =
selectedAttempt &&
attemptActivities.length > 0 &&
(attemptActivities[0].status === "inprogress" ||
attemptActivities[0].status === "init");
(attemptActivities[0].status === "init" ||
attemptActivities[0].status === "setuprunning" ||
attemptActivities[0].status === "setupcomplete" ||
attemptActivities[0].status === "executorrunning");
// Polling for updates when attempt is running
useEffect(() => {
@@ -677,18 +703,10 @@ export function TaskDetailsPage() {
<div className="flex items-center justify-between">
<span
className={`px-2 py-1 rounded-full text-xs font-medium ${
activity.status === "init"
? "bg-gray-100 text-gray-800"
: activity.status === "inprogress"
? "bg-blue-100 text-blue-800"
: "bg-yellow-100 text-yellow-800"
getAttemptStatusDisplay(activity.status).className
}`}
>
{activity.status === "init"
? "Init"
: activity.status === "inprogress"
? "In Progress"
: "Paused"}
{getAttemptStatusDisplay(activity.status).label}
</span>
<p className="text-xs text-muted-foreground">
{new Date(activity.created_at).toLocaleString()}

View File

@@ -25,7 +25,7 @@ export type TaskWithAttemptStatus = { id: string, project_id: string, title: str
export type UpdateTask = { title: string | null, description: string | null, status: TaskStatus | null, };
export type TaskAttemptStatus = "init" | "inprogress" | "paused";
export type TaskAttemptStatus = "init" | "setuprunning" | "setupcomplete" | "setupfailed" | "executorrunning" | "executorcomplete" | "executorfailed" | "paused";
export type TaskAttempt = { id: string, task_id: string, worktree_path: string, base_commit: string | null, merge_commit: string | null, executor: string | null, stdout: string | null, stderr: string | null, created_at: string, updated_at: string, };