Add concept of executors

This commit is contained in:
Louis Knight-Webb
2025-06-16 18:37:19 -04:00
parent 0ce09944e5
commit 55b6a20c03
12 changed files with 189 additions and 14 deletions

View File

@@ -27,6 +27,7 @@ jsonwebtoken = "9.2"
ts-rs = { version = "9.0", features = ["uuid-impl", "chrono-impl"] }
dirs = "5.0"
git2 = "0.18"
async-trait = "0.1"
[build-dependencies]
ts-rs = { version = "9.0", features = ["uuid-impl", "chrono-impl"] }

View File

@@ -0,0 +1,2 @@
-- Add executor_config column to task_attempts table
ALTER TABLE task_attempts ADD COLUMN executor_config JSONB;

View File

@@ -69,8 +69,11 @@ export {}
export {}
export {}
export {}"#,
bloop_backend::models::ApiResponse::<()>::decl(),
bloop_backend::executor::ExecutorConfig::decl(),
bloop_backend::models::project::CreateProject::decl(),
bloop_backend::models::project::Project::decl(),
bloop_backend::models::project::UpdateProject::decl(),

View File

@@ -2,12 +2,12 @@ use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::process::Command;
use uuid::Uuid;
use crate::models::{
task_attempt_activity::{CreateTaskAttemptActivity, TaskAttemptActivity},
task_attempt::TaskAttemptStatus
task_attempt::{TaskAttempt, TaskAttemptStatus}
};
#[derive(Debug)]
@@ -86,13 +86,25 @@ pub async fn execution_monitor(app_state: AppState) {
}
}
// Spawn the process
let child = match Command::new("echo")
.arg("hello world")
.spawn() {
// Get the task attempt to access the executor
let task_attempt = match TaskAttempt::find_by_id(&app_state.db_pool, attempt_id).await {
Ok(Some(attempt)) => attempt,
Ok(None) => {
tracing::error!("Task attempt {} not found", attempt_id);
continue;
}
Err(e) => {
tracing::error!("Failed to fetch task attempt {}: {}", attempt_id, e);
continue;
}
};
// Get the executor and spawn the process
let executor = task_attempt.get_executor();
let child = match executor.spawn(&app_state.db_pool, task_attempt.task_id, &task_attempt.worktree_path).await {
Ok(child) => child,
Err(e) => {
tracing::error!("Failed to spawn echo command: {}", e);
tracing::error!("Failed to spawn command for task attempt {}: {}", attempt_id, e);
continue;
}
};

72
backend/src/executor.rs Normal file
View File

@@ -0,0 +1,72 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::process::Child;
use ts_rs::TS;
use uuid::Uuid;
use crate::executors::EchoExecutor;
#[derive(Debug)]
pub enum ExecutorError {
SpawnFailed(std::io::Error),
TaskNotFound,
DatabaseError(sqlx::Error),
}
impl std::fmt::Display for ExecutorError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ExecutorError::SpawnFailed(e) => write!(f, "Failed to spawn process: {}", e),
ExecutorError::TaskNotFound => write!(f, "Task not found"),
ExecutorError::DatabaseError(e) => write!(f, "Database error: {}", e),
}
}
}
impl std::error::Error for ExecutorError {}
impl From<sqlx::Error> for ExecutorError {
fn from(err: sqlx::Error) -> Self {
ExecutorError::DatabaseError(err)
}
}
/// Trait for defining CLI commands that can be executed for task attempts
#[async_trait]
pub trait Executor: Send + Sync {
/// Get the unique identifier for this executor type
fn executor_type(&self) -> &'static str;
/// Spawn the command for a given task attempt
async fn spawn(&self, pool: &sqlx::PgPool, task_id: Uuid, worktree_path: &str) -> Result<Child, ExecutorError>;
/// Get a human-readable description of what this executor does
fn description(&self) -> &'static str;
}
/// Configuration for different executor types
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[serde(tag = "type", rename_all = "lowercase")]
#[ts(export)]
pub enum ExecutorConfig {
Echo,
// Future executors can be added here
// Shell { command: String },
// Docker { image: String, command: String },
}
impl ExecutorConfig {
pub fn create_executor(&self) -> Box<dyn Executor> {
match self {
ExecutorConfig::Echo => Box::new(EchoExecutor),
}
}
pub fn executor_type(&self) -> &'static str {
match self {
ExecutorConfig::Echo => "echo",
}
}
}

View File

@@ -0,0 +1,40 @@
use async_trait::async_trait;
use tokio::process::{Child, Command};
use uuid::Uuid;
use crate::executor::{Executor, ExecutorError};
use crate::models::task::Task;
/// A dummy executor that echoes the task title and description
pub struct EchoExecutor;
#[async_trait]
impl Executor for EchoExecutor {
fn executor_type(&self) -> &'static str {
"echo"
}
async fn spawn(&self, pool: &sqlx::PgPool, task_id: Uuid, _worktree_path: &str) -> Result<Child, ExecutorError> {
// Get the task to fetch its description
let task = Task::find_by_id(pool, task_id)
.await?
.ok_or(ExecutorError::TaskNotFound)?;
let message = format!(
"Executing task: {} - {}",
task.title,
task.description.as_deref().unwrap_or("No description")
);
let child = Command::new("echo")
.arg(&message)
.spawn()
.map_err(ExecutorError::SpawnFailed)?;
Ok(child)
}
fn description(&self) -> &'static str {
"Echoes the task title and description"
}
}

View File

@@ -0,0 +1,3 @@
pub mod echo;
pub use echo::EchoExecutor;

View File

@@ -1,3 +1,5 @@
pub mod auth;
pub mod executor;
pub mod executors;
pub mod models;
pub mod routes;

View File

@@ -12,6 +12,8 @@ use tower_http::cors::CorsLayer;
mod auth;
mod execution_monitor;
mod executor;
mod executors;
mod models;
mod routes;

View File

@@ -8,6 +8,7 @@ use std::path::Path;
use super::task::Task;
use super::project::Project;
use crate::executor::ExecutorConfig;
#[derive(Debug)]
pub enum TaskAttemptError {
@@ -60,6 +61,9 @@ pub struct TaskAttempt {
pub worktree_path: String,
pub base_commit: Option<String>,
pub merge_commit: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[ts(skip)]
pub executor_config: Option<serde_json::Value>, // JSON field for ExecutorConfig
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
@@ -71,6 +75,7 @@ pub struct CreateTaskAttempt {
pub worktree_path: String,
pub base_commit: Option<String>,
pub merge_commit: Option<String>,
pub executor_config: Option<ExecutorConfig>,
}
#[derive(Debug, Deserialize, TS)]
@@ -82,10 +87,22 @@ pub struct UpdateTaskAttempt {
}
impl TaskAttempt {
pub async fn find_by_id(pool: &PgPool, id: Uuid) -> Result<Option<Self>, sqlx::Error> {
sqlx::query_as!(
TaskAttempt,
r#"SELECT id, task_id, worktree_path, base_commit, merge_commit, executor_config, created_at, updated_at
FROM task_attempts
WHERE id = $1"#,
id
)
.fetch_optional(pool)
.await
}
pub async fn find_by_task_id(pool: &PgPool, task_id: Uuid) -> Result<Vec<Self>, sqlx::Error> {
sqlx::query_as!(
TaskAttempt,
r#"SELECT id, task_id, worktree_path, base_commit, merge_commit, created_at, updated_at
r#"SELECT id, task_id, worktree_path, base_commit, merge_commit, executor_config, created_at, updated_at
FROM task_attempts
WHERE task_id = $1
ORDER BY created_at DESC"#,
@@ -119,17 +136,24 @@ impl TaskAttempt {
let branch_name = format!("attempt-{}", attempt_id);
repo.worktree(&branch_name, worktree_path, None)?;
// Serialize executor config to JSON
let executor_config_json = data.executor_config.as_ref()
.map(|config| serde_json::to_value(config))
.transpose()
.map_err(|e| TaskAttemptError::Database(sqlx::Error::decode(e)))?;
// Insert the record into the database
let task_attempt = sqlx::query_as!(
TaskAttempt,
r#"INSERT INTO task_attempts (id, task_id, worktree_path, base_commit, merge_commit)
VALUES ($1, $2, $3, $4, $5)
RETURNING id, task_id, worktree_path, base_commit, merge_commit, created_at, updated_at"#,
r#"INSERT INTO task_attempts (id, task_id, worktree_path, base_commit, merge_commit, executor_config)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING id, task_id, worktree_path, base_commit, merge_commit, executor_config, created_at, updated_at"#,
attempt_id,
data.task_id,
data.worktree_path,
data.base_commit,
data.merge_commit
data.merge_commit,
executor_config_json
)
.fetch_one(pool)
.await?;
@@ -150,4 +174,15 @@ impl TaskAttempt {
.await?;
Ok(result.is_some())
}
/// Get the executor for this task attempt, defaulting to Echo if none is specified
pub fn get_executor(&self) -> Box<dyn crate::executor::Executor> {
if let Some(config_json) = &self.executor_config {
if let Ok(config) = serde_json::from_value::<ExecutorConfig>(config_json.clone()) {
return config.create_executor();
}
}
// Default to echo executor
ExecutorConfig::Echo.create_executor()
}
}

View File

@@ -9,7 +9,7 @@ import {
import { Label } from '@/components/ui/label'
import { Button } from '@/components/ui/button'
import { makeAuthenticatedRequest } from '@/lib/auth'
import type { TaskStatus, TaskAttempt, TaskAttemptActivity } from 'shared/types'
import type { TaskStatus, TaskAttempt, TaskAttemptActivity, ExecutorConfig } from 'shared/types'
interface Task {
id: string
@@ -123,6 +123,7 @@ export function TaskDetailsDialog({ isOpen, onOpenChange, task, projectId, onErr
worktree_path: worktreePath,
base_commit: null,
merge_commit: null,
executor_config: { type: "echo" } as ExecutorConfig,
}),
}
)

View File

@@ -3,6 +3,8 @@
export type ApiResponse<T> = { success: boolean, data: T | null, message: string | null, };
export type ExecutorConfig = { "type": "echo" };
export type CreateProject = { name: string, git_repo_path: string, use_existing_repo: boolean, };
export type Project = { id: string, name: string, git_repo_path: string, owner_id: string, created_at: Date, updated_at: Date, };
@@ -21,7 +23,7 @@ export type TaskAttemptStatus = "init" | "inprogress" | "paused";
export type TaskAttempt = { id: string, task_id: string, worktree_path: string, base_commit: string | null, merge_commit: string | null, created_at: string, updated_at: string, };
export type CreateTaskAttempt = { task_id: string, worktree_path: string, base_commit: string | null, merge_commit: string | null, };
export type CreateTaskAttempt = { task_id: string, worktree_path: string, base_commit: string | null, merge_commit: string | null, executor_config: ExecutorConfig | null, };
export type UpdateTaskAttempt = { worktree_path: string | null, base_commit: string | null, merge_commit: string | null, };