* feat: implement CommandRunner and integrate with executors refactor: replace command_group::AsyncGroupChild with command_runner::CommandProcess in executor and process_service Migrate traits and claude to commandrunner Migrate gemini to command_runner Migrate sst_opencode Migrate ccr Migrate amp Migrate charm opencode Migrate cleanup_script Migrate executor (vibe-kanban 28b4ede6) Ive added an abstract command runner to enable local and remote execution later. I already migrated the amp executor, please go ahead and replace migrate process handling with the new command runner @backend/src/command_runner.rs . If there are any missing functions ask me about them. Migrate backend/src/executors/echo.rs to be compatible. Migrate executor (vibe-kanban 9dc48bc8) Ive added an abstract command runner to enable local and remote execution later. I already migrated the amp executor, please go ahead and replace migrate process handling with the new command runner @backend/src/command_runner.rs . If there are any missing functions ask me about them. Migrate @backend/src/executors/dev_server.rs to be compatible. Migrate executor (vibe-kanban d3ac2aa5) Ive added an abstract command runner to enable local and remote execution later. I already migrated the amp executor, please go ahead and replace migrate process handling with the new command runner @backend/src/command_runner.rs . If there are any missing functions ask me about them. Migrate backend/src/executors/setup_script.rs to be compatible. Fmt + lint * Refactor CommandRunner initialization to use new() method for improved environment handling * Add basic cloud runner and test scripts Enhance cloud runner and command runner for true streaming support - Refactor process management in cloud runner to use ProcessEntry struct for better handling of stdout and stderr streams. - Implement true chunk-based streaming for command output via HTTP in command runner. - Update test_remote to verify streaming functionality with real-time output capture. Clippy and fmt Refactor CommandStream and CommandProcess to remove dead code and improve stream handling Refactor cloud runner and command runner to improve API response handling and streamline process status management Change stream setup to be async * Revert "Change stream setup to be async" This reverts commit 79b5cde12aefafe9e669b93167036c8c6adf9145. Revert "Refactor cloud runner and command runner to improve API response handling and streamline process status management" This reverts commit 3cc03ff82424bd715a6f20f3124bd7bf80bc2d72. Revert "Refactor CommandStream and CommandProcess to remove dead code and improve stream handling" This reverts commit dcab0fcd9622416b7881af4add513b371894e408. * refactor: remove unused imports and update command execution to use CommandProcess * refactor: clean up CommandRunner and CommandProcess by removing dead code and updating initialization logic * Fix improts * refactor commandexecutors into local and remote * refactor: update stream methods to be asynchronous across command execution components * refactor: update command runner references; remove remote test binary; remove debug script * Remove unused stdout alias * Clippy * refactor: consolidate CommandExitStatus implementations for local and remote processes * refactor: replace CreateCommandRequest with CommandRunnerArgs in command execution * refactor: optimize stream creation by using concurrent HTTP requests
318 lines
12 KiB
Rust
318 lines
12 KiB
Rust
use std::{str::FromStr, sync::Arc};
|
|
|
|
use axum::{
|
|
body::Body,
|
|
http::{header, HeaderValue, StatusCode},
|
|
middleware::from_fn_with_state,
|
|
response::{IntoResponse, Json as ResponseJson, Response},
|
|
routing::{get, post},
|
|
Json, Router,
|
|
};
|
|
use sentry_tower::NewSentryLayer;
|
|
use sqlx::{sqlite::SqliteConnectOptions, SqlitePool};
|
|
use strip_ansi_escapes::strip;
|
|
use tokio::sync::RwLock;
|
|
use tower_http::cors::CorsLayer;
|
|
use tracing_subscriber::{filter::LevelFilter, prelude::*};
|
|
use vibe_kanban::{sentry_layer, Assets, ScriptAssets, SoundAssets};
|
|
|
|
mod app_state;
|
|
mod command_runner;
|
|
mod execution_monitor;
|
|
mod executor;
|
|
mod executors;
|
|
mod mcp;
|
|
mod middleware;
|
|
mod models;
|
|
mod routes;
|
|
mod services;
|
|
mod utils;
|
|
|
|
use app_state::AppState;
|
|
use execution_monitor::execution_monitor;
|
|
use middleware::{
|
|
load_execution_process_simple_middleware, load_project_middleware,
|
|
load_task_attempt_middleware, load_task_middleware, load_task_template_middleware,
|
|
};
|
|
use models::{ApiResponse, Config, Environment};
|
|
use routes::{
|
|
auth, config, filesystem, github, health, projects, stream, task_attempts, task_templates,
|
|
tasks,
|
|
};
|
|
use services::PrMonitorService;
|
|
|
|
async fn echo_handler(
|
|
Json(payload): Json<serde_json::Value>,
|
|
) -> ResponseJson<ApiResponse<serde_json::Value>> {
|
|
ResponseJson(ApiResponse::success(payload))
|
|
}
|
|
|
|
async fn static_handler(uri: axum::extract::Path<String>) -> impl IntoResponse {
|
|
let path = uri.trim_start_matches('/');
|
|
serve_file(path).await
|
|
}
|
|
|
|
async fn index_handler() -> impl IntoResponse {
|
|
serve_file("index.html").await
|
|
}
|
|
|
|
async fn serve_file(path: &str) -> impl IntoResponse {
|
|
let file = Assets::get(path);
|
|
|
|
match file {
|
|
Some(content) => {
|
|
let mime = mime_guess::from_path(path).first_or_octet_stream();
|
|
|
|
Response::builder()
|
|
.status(StatusCode::OK)
|
|
.header(
|
|
header::CONTENT_TYPE,
|
|
HeaderValue::from_str(mime.as_ref()).unwrap(),
|
|
)
|
|
.body(Body::from(content.data.into_owned()))
|
|
.unwrap()
|
|
}
|
|
None => {
|
|
// For SPA routing, serve index.html for unknown routes
|
|
if let Some(index) = Assets::get("index.html") {
|
|
Response::builder()
|
|
.status(StatusCode::OK)
|
|
.header(header::CONTENT_TYPE, HeaderValue::from_static("text/html"))
|
|
.body(Body::from(index.data.into_owned()))
|
|
.unwrap()
|
|
} else {
|
|
Response::builder()
|
|
.status(StatusCode::NOT_FOUND)
|
|
.body(Body::from("404 Not Found"))
|
|
.unwrap()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn serve_sound_file(
|
|
axum::extract::Path(filename): axum::extract::Path<String>,
|
|
) -> impl IntoResponse {
|
|
// Validate filename contains only expected sound files
|
|
let valid_sounds = [
|
|
"abstract-sound1.wav",
|
|
"abstract-sound2.wav",
|
|
"abstract-sound3.wav",
|
|
"abstract-sound4.wav",
|
|
"cow-mooing.wav",
|
|
"phone-vibration.wav",
|
|
"rooster.wav",
|
|
];
|
|
|
|
if !valid_sounds.contains(&filename.as_str()) {
|
|
return Response::builder()
|
|
.status(StatusCode::NOT_FOUND)
|
|
.body(Body::from("Sound file not found"))
|
|
.unwrap();
|
|
}
|
|
|
|
match SoundAssets::get(&filename) {
|
|
Some(content) => Response::builder()
|
|
.status(StatusCode::OK)
|
|
.header(header::CONTENT_TYPE, HeaderValue::from_static("audio/wav"))
|
|
.body(Body::from(content.data.into_owned()))
|
|
.unwrap(),
|
|
None => Response::builder()
|
|
.status(StatusCode::NOT_FOUND)
|
|
.body(Body::from("Sound file not found"))
|
|
.unwrap(),
|
|
}
|
|
}
|
|
|
|
fn main() -> anyhow::Result<()> {
|
|
let environment = if cfg!(debug_assertions) {
|
|
"dev"
|
|
} else {
|
|
"production"
|
|
};
|
|
let _guard = sentry::init(("https://1065a1d276a581316999a07d5dffee26@o4509603705192449.ingest.de.sentry.io/4509605576441937", sentry::ClientOptions {
|
|
release: sentry::release_name!(),
|
|
environment: Some(environment.into()),
|
|
attach_stacktrace: true,
|
|
..Default::default()
|
|
}));
|
|
sentry::configure_scope(|scope| {
|
|
scope.set_tag("source", "server");
|
|
});
|
|
tokio::runtime::Builder::new_multi_thread()
|
|
.enable_all()
|
|
.build()
|
|
.unwrap()
|
|
.block_on(async {
|
|
tracing_subscriber::registry()
|
|
.with(tracing_subscriber::fmt::layer().with_filter(LevelFilter::INFO))
|
|
.with(sentry_layer())
|
|
.init();
|
|
|
|
// Create asset directory if it doesn't exist
|
|
if !utils::asset_dir().exists() {
|
|
std::fs::create_dir_all(utils::asset_dir())?;
|
|
}
|
|
|
|
// Database connection
|
|
let database_url = format!(
|
|
"sqlite://{}",
|
|
utils::asset_dir().join("db.sqlite").to_string_lossy()
|
|
);
|
|
|
|
let options = SqliteConnectOptions::from_str(&database_url)?.create_if_missing(true);
|
|
let pool = SqlitePool::connect_with(options).await?;
|
|
sqlx::migrate!("./migrations").run(&pool).await?;
|
|
|
|
// Load configuration
|
|
let config_path = utils::config_path();
|
|
let config = Config::load(&config_path)?;
|
|
let config_arc = Arc::new(RwLock::new(config));
|
|
|
|
let env = std::env::var("ENVIRONMENT")
|
|
.unwrap_or_else(|_| "local".to_string());
|
|
let mode = env.parse().unwrap_or(Environment::Local);
|
|
tracing::info!("Running in {mode} mode" );
|
|
|
|
// Create app state
|
|
let app_state = AppState::new(pool.clone(), config_arc.clone(), mode).await;
|
|
|
|
app_state.update_sentry_scope().await;
|
|
|
|
// Track session start event
|
|
app_state.track_analytics_event("session_start", None).await;
|
|
// Start background task to check for init status and spawn processes
|
|
let state_clone = app_state.clone();
|
|
tokio::spawn(async move {
|
|
execution_monitor(state_clone).await;
|
|
});
|
|
|
|
// Start PR monitoring service
|
|
let pr_monitor = PrMonitorService::new(pool.clone());
|
|
let config_for_monitor = config_arc.clone();
|
|
|
|
tokio::spawn(async move {
|
|
pr_monitor.start_with_config(config_for_monitor).await;
|
|
});
|
|
|
|
// Public routes (no auth required)
|
|
let public_routes = Router::new()
|
|
.route("/api/health", get(health::health_check))
|
|
.route("/api/echo", post(echo_handler));
|
|
|
|
// Create routers with different middleware layers
|
|
let base_routes = Router::new()
|
|
.merge(stream::stream_router())
|
|
.merge(filesystem::filesystem_router())
|
|
.merge(config::config_router())
|
|
.merge(auth::auth_router())
|
|
.route("/sounds/:filename", get(serve_sound_file))
|
|
.merge(
|
|
Router::new()
|
|
.route("/execution-processes/:process_id", get(task_attempts::get_execution_process))
|
|
.route_layer(from_fn_with_state(app_state.clone(), load_execution_process_simple_middleware))
|
|
);
|
|
|
|
// Template routes with task template middleware applied selectively
|
|
let template_routes = Router::new()
|
|
.route("/templates", get(task_templates::list_templates).post(task_templates::create_template))
|
|
.route("/templates/global", get(task_templates::list_global_templates))
|
|
.route(
|
|
"/projects/:project_id/templates",
|
|
get(task_templates::list_project_templates),
|
|
)
|
|
.merge(
|
|
Router::new()
|
|
.route(
|
|
"/templates/:template_id",
|
|
get(task_templates::get_template)
|
|
.put(task_templates::update_template)
|
|
.delete(task_templates::delete_template),
|
|
)
|
|
.route_layer(from_fn_with_state(app_state.clone(), load_task_template_middleware))
|
|
);
|
|
|
|
// Project routes with project middleware
|
|
let project_routes = Router::new()
|
|
.merge(projects::projects_base_router())
|
|
.merge(projects::projects_with_id_router()
|
|
.layer(from_fn_with_state(app_state.clone(), load_project_middleware)));
|
|
|
|
// Task routes with appropriate middleware
|
|
let task_routes = Router::new()
|
|
.merge(tasks::tasks_project_router()
|
|
.layer(from_fn_with_state(app_state.clone(), load_project_middleware)))
|
|
.merge(tasks::tasks_with_id_router()
|
|
.layer(from_fn_with_state(app_state.clone(), load_task_middleware)));
|
|
|
|
// Task attempt routes with appropriate middleware
|
|
let task_attempt_routes = Router::new()
|
|
.merge(task_attempts::task_attempts_list_router(app_state.clone())
|
|
.layer(from_fn_with_state(app_state.clone(), load_task_middleware)))
|
|
.merge(task_attempts::task_attempts_with_id_router(app_state.clone())
|
|
.layer(from_fn_with_state(app_state.clone(), load_task_attempt_middleware)));
|
|
|
|
// Conditionally add GitHub routes for cloud mode
|
|
let mut api_routes = Router::new()
|
|
.merge(base_routes)
|
|
.merge(template_routes)
|
|
.merge(project_routes)
|
|
.merge(task_routes)
|
|
.merge(task_attempt_routes);
|
|
|
|
if mode.is_cloud() {
|
|
api_routes = api_routes.merge(github::github_router());
|
|
tracing::info!("GitHub repository routes enabled (cloud mode)");
|
|
}
|
|
|
|
// All routes (no auth required)
|
|
let app_routes = Router::new()
|
|
.nest(
|
|
"/api",
|
|
api_routes
|
|
.layer(from_fn_with_state(app_state.clone(), auth::sentry_user_context_middleware)),
|
|
);
|
|
|
|
let app = Router::new()
|
|
.merge(public_routes)
|
|
.merge(app_routes)
|
|
// Static file serving routes
|
|
.route("/", get(index_handler))
|
|
.route("/*path", get(static_handler))
|
|
.with_state(app_state)
|
|
.layer(CorsLayer::permissive())
|
|
.layer(NewSentryLayer::new_from_top());
|
|
|
|
let port = std::env::var("BACKEND_PORT")
|
|
.or_else(|_| std::env::var("PORT"))
|
|
.ok()
|
|
.and_then(|s| {
|
|
// remove any ANSI codes, then turn into String
|
|
let cleaned = String::from_utf8(strip(s.as_bytes()))
|
|
.expect("UTF-8 after stripping ANSI");
|
|
cleaned.trim().parse::<u16>().ok()
|
|
})
|
|
.unwrap_or_else(|| {
|
|
tracing::info!("No PORT environment variable set, using port 0 for auto-assignment");
|
|
0
|
|
}); // Use 0 to find free port if no specific port provided
|
|
|
|
let host = std::env::var("HOST").unwrap_or_else(|_| "127.0.0.1".to_string());
|
|
let listener = tokio::net::TcpListener::bind(format!("{host}:{port}")).await?;
|
|
let actual_port = listener.local_addr()?.port(); // get → 53427 (example)
|
|
|
|
tracing::info!("Server running on http://{host}:{actual_port}");
|
|
|
|
if !cfg!(debug_assertions) {
|
|
tracing::info!("Opening browser...");
|
|
if let Err(e) = utils::open_browser(&format!("http://127.0.0.1:{actual_port}")).await {
|
|
tracing::warn!("Failed to open browser automatically: {}. Please open http://127.0.0.1:{} manually.", e, actual_port);
|
|
}
|
|
}
|
|
|
|
axum::serve(listener, app).await?;
|
|
|
|
Ok(())
|
|
})
|
|
}
|