Command execution
This commit is contained in:
@@ -6,17 +6,32 @@ use axum::{
|
||||
Json, Router,
|
||||
};
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
use std::env;
|
||||
use std::{collections::HashMap, env, sync::Arc};
|
||||
use tokio::sync::Mutex;
|
||||
use tower_http::cors::CorsLayer;
|
||||
use uuid::Uuid;
|
||||
|
||||
mod auth;
|
||||
mod models;
|
||||
mod routes;
|
||||
|
||||
use auth::{auth_middleware, hash_password};
|
||||
use models::ApiResponse;
|
||||
use models::{ApiResponse, user::User};
|
||||
use routes::{health, projects, tasks, users, filesystem};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RunningExecution {
|
||||
pub task_attempt_id: Uuid,
|
||||
pub child: tokio::process::Child,
|
||||
pub started_at: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AppState {
|
||||
pub running_executions: Arc<Mutex<HashMap<Uuid, RunningExecution>>>,
|
||||
pub db_pool: sqlx::PgPool,
|
||||
}
|
||||
|
||||
async fn echo_handler(
|
||||
Json(payload): Json<serde_json::Value>,
|
||||
) -> ResponseJson<ApiResponse<serde_json::Value>> {
|
||||
@@ -55,6 +70,18 @@ async fn main() -> anyhow::Result<()> {
|
||||
tracing::warn!("Failed to create admin account: {}", e);
|
||||
}
|
||||
|
||||
// Create app state
|
||||
let app_state = AppState {
|
||||
running_executions: Arc::new(Mutex::new(HashMap::new())),
|
||||
db_pool: pool.clone(),
|
||||
};
|
||||
|
||||
// Start background task to check for init status and spawn processes
|
||||
let state_clone = app_state.clone();
|
||||
tokio::spawn(async move {
|
||||
execution_monitor(state_clone).await;
|
||||
});
|
||||
|
||||
// Public routes (no auth required)
|
||||
let public_routes = Router::new()
|
||||
.route("/", get(|| async { "Bloop API" }))
|
||||
@@ -75,6 +102,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
.merge(public_routes)
|
||||
.merge(protected_routes)
|
||||
.layer(Extension(pool))
|
||||
.layer(Extension(app_state))
|
||||
.layer(CorsLayer::permissive());
|
||||
|
||||
let listener = tokio::net::TcpListener::bind("0.0.0.0:3001").await?;
|
||||
@@ -87,54 +115,124 @@ async fn main() -> anyhow::Result<()> {
|
||||
}
|
||||
|
||||
async fn create_admin_account(pool: &sqlx::PgPool) -> anyhow::Result<()> {
|
||||
use chrono::Utc;
|
||||
use uuid::Uuid;
|
||||
|
||||
let admin_email = "admin@example.com";
|
||||
let admin_password = env::var("ADMIN_PASSWORD").unwrap_or_else(|_| "admin123".to_string());
|
||||
|
||||
// Check if admin already exists
|
||||
let existing_admin = sqlx::query!(
|
||||
"SELECT id, password_hash FROM users WHERE email = $1",
|
||||
admin_email
|
||||
)
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
|
||||
let password_hash = hash_password(&admin_password)?;
|
||||
|
||||
if let Some(admin) = existing_admin {
|
||||
// Update existing admin password
|
||||
let now = Utc::now();
|
||||
sqlx::query!(
|
||||
"UPDATE users SET password_hash = $2, is_admin = $3, updated_at = $4 WHERE id = $1",
|
||||
admin.id,
|
||||
password_hash,
|
||||
true,
|
||||
now
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Updated admin account");
|
||||
} else {
|
||||
// Create new admin account
|
||||
let id = Uuid::new_v4();
|
||||
let now = Utc::now();
|
||||
sqlx::query!(
|
||||
"INSERT INTO users (id, email, password_hash, is_admin, created_at, updated_at) VALUES ($1, $2, $3, $4, $5, $6)",
|
||||
id,
|
||||
admin_email,
|
||||
password_hash,
|
||||
true,
|
||||
now,
|
||||
now
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Created admin account: {}", admin_email);
|
||||
}
|
||||
User::create_or_update_admin(pool, admin_email, &password_hash).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn execution_monitor(app_state: AppState) {
|
||||
use models::{task_attempt_activity::{CreateTaskAttemptActivity, TaskAttemptActivity}, task_attempt::TaskAttemptStatus};
|
||||
use chrono::Utc;
|
||||
use tokio::process::Command;
|
||||
|
||||
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
|
||||
// Check for task attempts with latest activity status = Init
|
||||
let init_attempt_ids = match TaskAttemptActivity::find_attempts_with_latest_init_status(&app_state.db_pool).await {
|
||||
Ok(attempts) => attempts,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to query init attempts: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
for attempt_id in init_attempt_ids {
|
||||
|
||||
// Check if we already have a running execution for this attempt
|
||||
{
|
||||
let executions = app_state.running_executions.lock().await;
|
||||
if executions.values().any(|exec| exec.task_attempt_id == attempt_id) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// Spawn the process
|
||||
let child = match Command::new("echo")
|
||||
.arg("hello world")
|
||||
.spawn() {
|
||||
Ok(child) => child,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to spawn echo command: {}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
// Add to running executions
|
||||
let execution_id = Uuid::new_v4();
|
||||
{
|
||||
let mut executions = app_state.running_executions.lock().await;
|
||||
executions.insert(execution_id, RunningExecution {
|
||||
task_attempt_id: attempt_id,
|
||||
child,
|
||||
started_at: Utc::now(),
|
||||
});
|
||||
}
|
||||
|
||||
// Update task attempt activity to InProgress
|
||||
let activity_id = Uuid::new_v4();
|
||||
let create_activity = CreateTaskAttemptActivity {
|
||||
task_attempt_id: attempt_id,
|
||||
status: Some(TaskAttemptStatus::InProgress),
|
||||
note: Some("Started execution".to_string()),
|
||||
};
|
||||
|
||||
if let Err(e) = TaskAttemptActivity::create(
|
||||
&app_state.db_pool,
|
||||
&create_activity,
|
||||
activity_id,
|
||||
TaskAttemptStatus::InProgress,
|
||||
).await {
|
||||
tracing::error!("Failed to create in-progress activity: {}", e);
|
||||
}
|
||||
|
||||
tracing::info!("Started execution {} for task attempt {}", execution_id, attempt_id);
|
||||
}
|
||||
|
||||
// Check for completed processes
|
||||
let mut completed_executions = Vec::new();
|
||||
{
|
||||
let mut executions = app_state.running_executions.lock().await;
|
||||
for (execution_id, running_exec) in executions.iter_mut() {
|
||||
match running_exec.child.try_wait() {
|
||||
Ok(Some(status)) => {
|
||||
let success = status.success();
|
||||
let exit_code = status.code();
|
||||
completed_executions.push((*execution_id, success, exit_code));
|
||||
}
|
||||
Ok(None) => {
|
||||
// Still running
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Error checking process status: {}", e);
|
||||
completed_executions.push((*execution_id, false, None));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove completed executions from the map
|
||||
for (execution_id, _, _) in &completed_executions {
|
||||
executions.remove(execution_id);
|
||||
}
|
||||
}
|
||||
|
||||
// Log completed executions
|
||||
for (execution_id, success, exit_code) in completed_executions {
|
||||
let status_text = if success { "completed successfully" } else { "failed" };
|
||||
let exit_text = if let Some(code) = exit_code {
|
||||
format!(" with exit code {}", code)
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
|
||||
tracing::info!("Execution {} {}{}", execution_id, status_text, exit_text);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,4 +66,24 @@ impl TaskAttemptActivity {
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn find_attempts_with_latest_init_status(pool: &PgPool) -> Result<Vec<uuid::Uuid>, sqlx::Error> {
|
||||
let records = sqlx::query!(
|
||||
r#"SELECT DISTINCT ta.id
|
||||
FROM task_attempts ta
|
||||
INNER JOIN (
|
||||
SELECT task_attempt_id, MAX(created_at) as latest_created_at
|
||||
FROM task_attempt_activities
|
||||
GROUP BY task_attempt_id
|
||||
) latest_activity ON ta.id = latest_activity.task_attempt_id
|
||||
INNER JOIN task_attempt_activities taa ON ta.id = taa.task_attempt_id
|
||||
AND taa.created_at = latest_activity.latest_created_at
|
||||
WHERE taa.status = $1"#,
|
||||
TaskAttemptStatus::Init as TaskAttemptStatus
|
||||
)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
|
||||
Ok(records.into_iter().map(|r| r.id).collect())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,4 +134,48 @@ impl User {
|
||||
.await?;
|
||||
Ok(result.rows_affected())
|
||||
}
|
||||
|
||||
pub async fn create_or_update_admin(pool: &PgPool, email: &str, password_hash: &str) -> Result<(), sqlx::Error> {
|
||||
use chrono::Utc;
|
||||
|
||||
// Check if admin already exists
|
||||
let existing_admin = sqlx::query!(
|
||||
"SELECT id, password_hash FROM users WHERE email = $1",
|
||||
email
|
||||
)
|
||||
.fetch_optional(pool)
|
||||
.await?;
|
||||
|
||||
if let Some(admin) = existing_admin {
|
||||
// Update existing admin password
|
||||
let now = Utc::now();
|
||||
sqlx::query!(
|
||||
"UPDATE users SET password_hash = $2, is_admin = $3, updated_at = $4 WHERE id = $1",
|
||||
admin.id,
|
||||
password_hash,
|
||||
true,
|
||||
now
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Updated admin account");
|
||||
} else {
|
||||
// Create new admin account
|
||||
let id = Uuid::new_v4();
|
||||
sqlx::query!(
|
||||
"INSERT INTO users (id, email, password_hash, is_admin) VALUES ($1, $2, $3, $4)",
|
||||
id,
|
||||
email,
|
||||
password_hash,
|
||||
true
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
tracing::info!("Created admin account: {}", email);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user